Merge branch 'master' of git@github.com:sustrik/zeromq2

This commit is contained in:
Martin Sustrik 2009-09-16 10:56:55 +02:00
commit 6e03cb2f3e
12 changed files with 402 additions and 27 deletions

View File

@ -53,6 +53,7 @@ extern "C" {
#define ZMQ_UNSUBSCRIBE 7 // string
#define ZMQ_RATE 8 // int64_t
#define ZMQ_RECOVERY_IVL 9 // int64_t
#define ZMQ_MCAST_LOOP 10 // boolean
// The operation should be performed in non-blocking mode. I.e. if it cannot
// be processed immediately, error should be returned with errno set to EAGAIN.

View File

@ -62,6 +62,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
object.hpp \
options.hpp \
owned.hpp \
pgm_receiver.hpp \
pgm_sender.hpp \
pgm_socket.hpp \
pipe.hpp \
@ -104,6 +105,7 @@ libzmq_la_SOURCES = $(pgm_sources) \
object.cpp \
options.cpp \
owned.cpp \
pgm_receiver.cpp \
pgm_sender.cpp \
pgm_socket.cpp \
pipe.cpp \

View File

@ -25,6 +25,7 @@ zmq::options_t::options_t () :
swap (0),
affinity (0),
rate (100),
recovery_ivl (10)
recovery_ivl (10),
use_multicast_loop (false)
{
}

View File

@ -37,11 +37,14 @@ namespace zmq
uint64_t affinity;
std::string identity;
// Maximum tranfer rate [kb/s].
// Maximum tranfer rate [kb/s]. Default 100kb/s.
uint32_t rate;
// Reliability time interval [s].
// Reliability time interval [s]. Default 10s.
uint32_t recovery_ivl;
// Enable multicast loopback. Default disabled (false).
bool use_multicast_loop;
};
}

207
src/pgm_receiver.cpp Normal file
View File

@ -0,0 +1,207 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM
#include <iostream>
#include "pgm_receiver.hpp"
#include "err.hpp"
#include "stdint.hpp"
#include "wire.hpp"
#include "i_inout.hpp"
//#define PGM_RECEIVER_DEBUG
//#define PGM_RECEIVER_DEBUG_LEVEL 1
// level 1 = key behaviour
// level 2 = processing flow
// level 4 = infos
#ifndef PGM_RECEIVER_DEBUG
# define zmq_log(n, ...) while (0)
#else
# define zmq_log(n, ...) do { if ((n) <= PGM_RECEIVER_DEBUG_LEVEL) \
{ printf (__VA_ARGS__);}} while (0)
#endif
zmq::pgm_receiver_t::pgm_receiver_t (class io_thread_t *parent_,
const options_t &options_, const char *session_name_) :
io_object_t (parent_),
decoder (NULL),
pgm_socket (true, options_),
options (options_),
session_name (session_name_),
joined (false),
inout (NULL)
{
}
zmq::pgm_receiver_t::~pgm_receiver_t ()
{
if (decoder)
delete decoder;
}
int zmq::pgm_receiver_t::init (const char *network_)
{
decoder = new zmq_decoder_t;
zmq_assert (decoder);
return pgm_socket.init (network_);
}
void zmq::pgm_receiver_t::plug (i_inout *inout_)
{
// Allocate 2 fds one for socket second for waiting pipe.
int socket_fd;
int waiting_pipe_fd;
decoder->set_inout (inout_);
// Fill socket_fd and waiting_pipe_fd from PGM transport
pgm_socket.get_receiver_fds (&socket_fd, &waiting_pipe_fd);
// Add socket_fd into poller.
socket_handle = add_fd (socket_fd);
// Add waiting_pipe_fd into poller.
pipe_handle = add_fd (waiting_pipe_fd);
// Set POLLIN for both handlers.
set_pollin (pipe_handle);
set_pollin (socket_handle);
inout = inout_;
}
void zmq::pgm_receiver_t::unplug ()
{
rm_fd (socket_handle);
rm_fd (pipe_handle);
decoder->set_inout (NULL);
inout = NULL;
}
void zmq::pgm_receiver_t::revive ()
{
zmq_assert (false);
}
void zmq::pgm_receiver_t::reconnect ()
{
// Save inout ptr.
i_inout *inout_tmp = inout;
// PGM receiver is not joined anymore.
joined = false;
// Unplug - plug PGM transport.
unplug ();
delete decoder;
decoder = new zmq_decoder_t;
zmq_assert (decoder);
plug (inout_tmp);
}
// POLLIN event from socket or waiting_pipe.
void zmq::pgm_receiver_t::in_event ()
{
void *data_with_offset;
ssize_t nbytes = 0;
// Read all data from pgm socket.
while ((nbytes = receive_with_offset (&data_with_offset)) > 0) {
// Push all the data to the decoder.
decoder->write ((unsigned char*)data_with_offset, nbytes);
}
// Flush any messages decoder may have produced to the dispatcher.
inout->flush ();
// Data loss detected.
if (nbytes == -1) {
// Recreate PGM transport.
reconnect ();
}
}
void zmq::pgm_receiver_t::out_event ()
{
zmq_assert (false);
}
ssize_t zmq::pgm_receiver_t::receive_with_offset
(void **data_)
{
// Data from PGM socket.
void *rd = NULL;
unsigned char *raw_data = NULL;
// Read data from underlying pgm_socket.
ssize_t nbytes = pgm_socket.receive ((void**) &rd);
raw_data = (unsigned char*) rd;
// No ODATA or RDATA.
if (!nbytes)
return 0;
// Data loss.
if (nbytes == -1) {
return -1;
}
// Read offset of the fist message in current APDU.
uint16_t apdu_offset = get_uint16 (raw_data);
// Shift raw_data & decrease nbytes by the first message offset
// information (sizeof uint16_t).
*data_ = raw_data + sizeof (uint16_t);
nbytes -= sizeof (uint16_t);
// There is not beginning of the message in current APDU and we
// are not joined jet -> throwing data.
if (apdu_offset == 0xFFFF && !joined) {
*data_ = NULL;
return 0;
}
// Now is the possibility to join the stream.
if (!joined) {
// We have to move data to the begining of the first message.
*data_ = (unsigned char *)*data_ + apdu_offset;
nbytes -= apdu_offset;
// Joined the stream.
joined = true;
zmq_log (2, "joined into the stream, %s(%i)\n",
__FILE__, __LINE__);
}
return nbytes;
}
#endif

98
src/pgm_receiver.hpp Normal file
View File

@ -0,0 +1,98 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
#define __ZMQ_PGM_RECEIVER_HPP_INCLUDED__
#include "platform.hpp"
#if defined ZMQ_HAVE_OPENPGM
#include "io_object.hpp"
#include "i_engine.hpp"
#include "options.hpp"
#include "zmq_decoder.hpp"
#include "pgm_socket.hpp"
namespace zmq
{
class pgm_receiver_t : public io_object_t, public i_engine
{
public:
// Creates gm_engine. Underlying PGM connection is initialised
// using network_ parameter.
pgm_receiver_t (class io_thread_t *parent_, const options_t &options_,
const char *session_name_);
~pgm_receiver_t ();
int init (const char *network_);
void reconnect ();
// i_engine interface implementation.
void plug (struct i_inout *inout_);
void unplug ();
void revive ();
// i_poll_events interface implementation.
void in_event ();
void out_event ();
private:
// Read exactly iov_len_ count APDUs, function returns number
// of bytes received. Note that if we did not join message stream
// before and there is not message beginning in the APDUs being
// received iov_len for such a APDUs will be 0.
ssize_t receive_with_offset (void **data_);
// Message decoder.
zmq_decoder_t *decoder;
// PGM socket.
pgm_socket_t pgm_socket;
// Socket options.
options_t options;
// Name of the session associated with the connecter.
std::string session_name;
// If receiver joined the messages stream.
bool joined;
// Parent session.
i_inout *inout;
// Poll handle associated with PGM socket.
handle_t socket_handle;
// Poll handle associated with engine PGM waiting pipe.
handle_t pipe_handle;
pgm_receiver_t (const pgm_receiver_t&);
void operator = (const pgm_receiver_t&);
};
}
#endif
#endif

View File

@ -90,11 +90,6 @@ void zmq::pgm_sender_t::plug (i_inout *inout_)
set_pollout (handle);
inout = inout_;
zmq_log (1, "plug: downlink_socket_fd %i, uplink_socket_fd %i, %s(%i)",
downlink_socket_fd, uplink_socket_fd, __FILE__, __LINE__);
std::cout << std::flush;
}
void zmq::pgm_sender_t::unplug ()
@ -185,7 +180,7 @@ void zmq::pgm_sender_t::out_event ()
size_t zmq::pgm_sender_t::write_one_pkt_with_offset (unsigned char *data_,
size_t size_, uint16_t offset_)
{
zmq_log (1, "data_size %i, first message offset %i, %s(%i)",
zmq_log (1, "data_size %i, first message offset %i, %s(%i)\n",
(int) size_, offset_, __FILE__, __LINE__);
std::cout << std::flush;

View File

@ -77,7 +77,7 @@ namespace zmq
handle_t handle;
handle_t uplink_handle;
// ?
// Parent session.
i_inout *inout;
// Output buffer from pgm_socket.

View File

@ -386,12 +386,14 @@ int zmq::pgm_socket_t::open_transport (void)
return -1;
}
}
// Enable multicast loopback.
rc = pgm_transport_set_multicast_loop (g_transport, true);
if (rc != 0) {
errno = EINVAL;
return -1;
if (options.use_multicast_loop) {
rc = pgm_transport_set_multicast_loop (g_transport, true);
if (rc != 0) {
errno = EINVAL;
return -1;
}
}
// Bind a transport to the specified network devices.
@ -486,6 +488,7 @@ int zmq::pgm_socket_t::get_sender_fds (int *send_fd_, int *receive_fd_)
// Send one APDU, transmit window owned memory.
size_t zmq::pgm_socket_t::send (unsigned char *data_, size_t data_len_)
{
iovec iov = {data_,data_len_};
ssize_t nbytes = pgm_transport_send_packetv (g_transport, &iov, 1,
@ -561,7 +564,6 @@ void zmq::pgm_socket_t::free_buffer (void *data_)
// returned.
ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
{
// We just sent all data from pgm_transport_recvmsgv up
// and have to return 0 that another engine in this thread is scheduled.
if (nbytes_rec == nbytes_processed && nbytes_rec > 0) {
@ -575,7 +577,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
}
// If we have are going first time or if we have processed all pgm_msgv_t
// structure previaously read from the pgm socket.
// structure previously read from the pgm socket.
if (nbytes_rec == nbytes_processed) {
// Check program flow.
@ -615,6 +617,7 @@ ssize_t zmq::pgm_socket_t::receive (void **raw_data_)
}
zmq_log (4, "received %i bytes\n", (int)nbytes_rec);
}
zmq_assert (nbytes_rec > 0);

View File

@ -23,7 +23,7 @@
#include "err.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_SUB)
socket_base_t (parent_, ZMQ_PUB)
{
}

View File

@ -37,6 +37,7 @@
#include "err.hpp"
#include "platform.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :
object_t (parent_),
@ -156,6 +157,14 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
options.recovery_ivl = (uint32_t) *((int64_t*) optval_);
return 0;
case ZMQ_MCAST_LOOP:
if (optvallen_ != sizeof (bool)) {
errno = EINVAL;
return -1;
}
options.use_multicast_loop = optval_;
return 0;
default:
errno = EINVAL;
return -1;
@ -164,15 +173,43 @@ int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
int zmq::socket_base_t::bind (const char *addr_)
{
zmq_listener_t *listener = new zmq_listener_t (
choose_io_thread (options.affinity), this, options);
int rc = listener->set_address (addr_);
if (rc != 0)
return -1;
// Parse addr_ string.
std::string addr_type;
std::string addr_args;
send_plug (listener);
send_own (this, listener);
return 0;
std::string addr (addr_);
std::string::size_type pos = addr.find ("://");
if (pos == std::string::npos) {
errno = EINVAL;
return -1;
}
addr_type = addr.substr (0, pos);
addr_args = addr.substr (pos + 3);
if (addr_type == "tcp") {
zmq_listener_t *listener = new zmq_listener_t (
choose_io_thread (options.affinity), this, options);
int rc = listener->set_address (addr_args.c_str ());
if (rc != 0)
return -1;
send_plug (listener);
send_own (this, listener);
return 0;
}
#if defined ZMQ_HAVE_OPENPGM
if (addr_type == "pgm") {
// In the case of PGM bind behaves the same like connect.
return connect (addr_);
}
#endif
// Unknown address type.
errno = EFAULT;
return -1;
}
int zmq::socket_base_t::connect (const char *addr_)
@ -246,6 +283,8 @@ int zmq::socket_base_t::connect (const char *addr_)
if (addr_type == "pgm") {
switch (type) {
// PGM sender.
case ZMQ_PUB:
{
pgm_sender_t *pgm_sender =
@ -266,9 +305,29 @@ int zmq::socket_base_t::connect (const char *addr_)
break;
}
// PGM receiver.
case ZMQ_SUB:
zmq_assert (false);
{
pgm_receiver_t *pgm_receiver =
new pgm_receiver_t (choose_io_thread (options.affinity), options,
session_name.c_str ());
int rc = pgm_receiver->init (addr_args.c_str ());
if (rc != 0) {
delete pgm_receiver;
return -1;
}
// Reserve a sequence number for following 'attach' command.
session->inc_seqnum ();
send_attach (session, pgm_receiver);
pgm_receiver = NULL;
break;
}
default:
errno = EINVAL;
return -1;

View File

@ -101,6 +101,12 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
if (rc != 0 && errno == EAGAIN)
return -1;
// If there is no subscription return -1/EAGAIN.
if (!all_count && prefixes.empty () && topics.empty ()) {
errno = EAGAIN;
return -1;
}
// If there is at least one "*" subscription, the message matches.
if (all_count)
return 0;