initial version of req/rep sockets

This commit is contained in:
Martin Sustrik 2009-09-21 14:39:59 +02:00
parent 7668b246fc
commit cb1b6fe32c
24 changed files with 1461 additions and 473 deletions

View File

@ -68,7 +68,10 @@ libzmq_la_SOURCES = $(pgm_sources) \
pipe.hpp \ pipe.hpp \
platform.hpp \ platform.hpp \
poll.hpp \ poll.hpp \
p2p.hpp \
pub.hpp \ pub.hpp \
rep.hpp \
req.hpp \
select.hpp \ select.hpp \
session.hpp \ session.hpp \
simple_semaphore.hpp \ simple_semaphore.hpp \
@ -82,6 +85,8 @@ libzmq_la_SOURCES = $(pgm_sources) \
uuid.hpp \ uuid.hpp \
windows.hpp \ windows.hpp \
wire.hpp \ wire.hpp \
yarray.hpp \
yarray_item.hpp \
ypipe.hpp \ ypipe.hpp \
ypollset.hpp \ ypollset.hpp \
yqueue.hpp \ yqueue.hpp \
@ -108,9 +113,12 @@ libzmq_la_SOURCES = $(pgm_sources) \
pgm_receiver.cpp \ pgm_receiver.cpp \
pgm_sender.cpp \ pgm_sender.cpp \
pgm_socket.cpp \ pgm_socket.cpp \
p2p.cpp \
pipe.cpp \ pipe.cpp \
poll.cpp \ poll.cpp \
pub.cpp \ pub.cpp \
rep.cpp \
req.cpp \
select.cpp \ select.cpp \
session.cpp \ session.cpp \
socket_base.cpp \ socket_base.cpp \

View File

@ -39,6 +39,9 @@
#include "socket_base.hpp" #include "socket_base.hpp"
#include "pub.hpp" #include "pub.hpp"
#include "sub.hpp" #include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "p2p.hpp"
// If the RDTSC is available we use it to prevent excessive // If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any // polling for commands. The nice thing here is that it will work on any
@ -158,26 +161,27 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_SUB: case ZMQ_SUB:
s = new sub_t (this); s = new sub_t (this);
break; break;
case ZMQ_P2P:
case ZMQ_REQ: case ZMQ_REQ:
s = new req_t (this);
break;
case ZMQ_REP: case ZMQ_REP:
s = new socket_base_t (this, type_); s = new rep_t (this);
break;
case ZMQ_P2P:
s = new p2p_t (this);
break; break;
default: default:
// TODO: This should be EINVAL. // TODO: This should be EINVAL.
zmq_assert (false); zmq_assert (false);
} }
zmq_assert (s); zmq_assert (s);
s->set_index (sockets.size ());
sockets.push_back (s); sockets.push_back (s);
return s; return s;
} }
void zmq::app_thread_t::remove_socket (socket_base_t *socket_) void zmq::app_thread_t::remove_socket (socket_base_t *socket_)
{ {
int i = socket_->get_index (); sockets.erase (socket_);
socket_->set_index (-1);
sockets [i] = sockets.back ();
sockets [i]->set_index (i);
sockets.pop_back ();
} }

View File

@ -24,6 +24,7 @@
#include "stdint.hpp" #include "stdint.hpp"
#include "object.hpp" #include "object.hpp"
#include "yarray.hpp"
#include "thread.hpp" #include "thread.hpp"
namespace zmq namespace zmq
@ -67,7 +68,7 @@ namespace zmq
private: private:
// All the sockets created from this application thread. // All the sockets created from this application thread.
typedef std::vector <class socket_base_t*> sockets_t; typedef yarray_t <socket_base_t> sockets_t;
sockets_t sockets; sockets_t sockets;
// If false, app_thread_t object is not associated with any OS thread. // If false, app_thread_t object is not associated with any OS thread.

View File

@ -25,11 +25,12 @@ namespace zmq
struct i_endpoint struct i_endpoint
{ {
virtual void attach_inpipe (class reader_t *pipe_) = 0; virtual void attach_pipes (class reader_t *inpipe_,
virtual void attach_outpipe (class writer_t *pipe_) = 0; class writer_t *outpipe_) = 0;
virtual void revive (class reader_t *pipe_) = 0;
virtual void detach_inpipe (class reader_t *pipe_) = 0; virtual void detach_inpipe (class reader_t *pipe_) = 0;
virtual void detach_outpipe (class writer_t *pipe_) = 0; virtual void detach_outpipe (class writer_t *pipe_) = 0;
virtual void kill (class reader_t *pipe_) = 0;
virtual void revive (class reader_t *pipe_) = 0;
}; };
} }

View File

@ -17,7 +17,10 @@
along with this program. If not, see <http://www.gnu.org/licenses/>. along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../bindings/c/zmq.h"
#include "options.hpp" #include "options.hpp"
#include "err.hpp"
zmq::options_t::options_t () : zmq::options_t::options_t () :
hwm (0), hwm (0),
@ -29,3 +32,80 @@ zmq::options_t::options_t () :
use_multicast_loop (false) use_multicast_loop (false)
{ {
} }
int zmq::options_t::setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
switch (option_) {
case ZMQ_HWM:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
hwm = *((int64_t*) optval_);
return 0;
case ZMQ_LWM:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
lwm = *((int64_t*) optval_);
return 0;
case ZMQ_SWAP:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
swap = *((int64_t*) optval_);
return 0;
case ZMQ_AFFINITY:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
affinity = (uint64_t) *((int64_t*) optval_);
return 0;
case ZMQ_IDENTITY:
identity.assign ((const char*) optval_, optvallen_);
return 0;
case ZMQ_RATE:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
rate = (uint32_t) *((int64_t*) optval_);
return 0;
case ZMQ_RECOVERY_IVL:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
recovery_ivl = (uint32_t) *((int64_t*) optval_);
return 0;
case ZMQ_MCAST_LOOP:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
if ((int64_t) *((int64_t*) optval_) == 0)
use_multicast_loop = false;
else if ((int64_t) *((int64_t*) optval_) == 1)
use_multicast_loop = true;
else {
errno = EINVAL;
return -1;
}
return 0;
}
errno = EINVAL;
return -1;
}

View File

@ -22,6 +22,7 @@
#include <string> #include <string>
#include "stddef.h"
#include "stdint.hpp" #include "stdint.hpp"
namespace zmq namespace zmq
@ -31,6 +32,8 @@ namespace zmq
{ {
options_t (); options_t ();
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int64_t hwm; int64_t hwm;
int64_t lwm; int64_t lwm;
int64_t swap; int64_t swap;

92
src/p2p.cpp Normal file
View File

@ -0,0 +1,92 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "p2p.hpp"
#include "err.hpp"
zmq::p2p_t::p2p_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_P2P)
{
}
zmq::p2p_t::~p2p_t ()
{
}
bool zmq::p2p_t::xrequires_in ()
{
return true;
}
bool zmq::p2p_t::xrequires_out ()
{
return true;
}
void zmq::p2p_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (false);
}
void zmq::p2p_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (false);
}
void zmq::p2p_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (false);
}
void zmq::p2p_t::xkill (class reader_t *pipe_)
{
zmq_assert (false);
}
void zmq::p2p_t::xrevive (class reader_t *pipe_)
{
zmq_assert (false);
}
int zmq::p2p_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
int zmq::p2p_t::xsend (struct zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}
int zmq::p2p_t::xflush ()
{
zmq_assert (false);
}
int zmq::p2p_t::xrecv (struct zmq_msg_t *msg_, int flags_)
{
zmq_assert (false);
}

56
src/p2p.hpp Normal file
View File

@ -0,0 +1,56 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_P2P_INCLUDED__
#define __ZMQ_P2P_INCLUDED__
#include "socket_base.hpp"
namespace zmq
{
class p2p_t : public socket_base_t
{
public:
p2p_t (class app_thread_t *parent_);
~p2p_t ();
// Overloads of functions from socket_base_t.
bool xrequires_in ();
bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (struct zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (struct zmq_msg_t *msg_, int flags_);
private:
p2p_t (const p2p_t&);
void operator = (const p2p_t&);
};
}
#endif

View File

@ -28,7 +28,6 @@ zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_,
peer (&pipe_->writer), peer (&pipe_->writer),
hwm (hwm_), hwm (hwm_),
lwm (lwm_), lwm (lwm_),
index (-1),
endpoint (NULL) endpoint (NULL)
{ {
} }
@ -39,8 +38,10 @@ zmq::reader_t::~reader_t ()
bool zmq::reader_t::read (zmq_msg_t *msg_) bool zmq::reader_t::read (zmq_msg_t *msg_)
{ {
if (!pipe->read (msg_)) if (!pipe->read (msg_)) {
endpoint->kill (this);
return false; return false;
}
// If delimiter was read, start termination process of the pipe. // If delimiter was read, start termination process of the pipe.
unsigned char *offset = 0; unsigned char *offset = 0;
@ -61,17 +62,6 @@ void zmq::reader_t::set_endpoint (i_endpoint *endpoint_)
endpoint = endpoint_; endpoint = endpoint_;
} }
void zmq::reader_t::set_index (int index_)
{
index = index_;
}
int zmq::reader_t::get_index ()
{
zmq_assert (index != -1);
return index;
}
void zmq::reader_t::term () void zmq::reader_t::term ()
{ {
endpoint = NULL; endpoint = NULL;
@ -96,7 +86,6 @@ zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_,
peer (&pipe_->reader), peer (&pipe_->reader),
hwm (hwm_), hwm (hwm_),
lwm (lwm_), lwm (lwm_),
index (-1),
endpoint (NULL) endpoint (NULL)
{ {
} }
@ -106,17 +95,6 @@ void zmq::writer_t::set_endpoint (i_endpoint *endpoint_)
endpoint = endpoint_; endpoint = endpoint_;
} }
void zmq::writer_t::set_index (int index_)
{
index = index_;
}
int zmq::writer_t::get_index ()
{
zmq_assert (index != -1);
return index;
}
zmq::writer_t::~writer_t () zmq::writer_t::~writer_t ()
{ {
} }

View File

@ -24,6 +24,7 @@
#include "stdint.hpp" #include "stdint.hpp"
#include "i_endpoint.hpp" #include "i_endpoint.hpp"
#include "yarray_item.hpp"
#include "ypipe.hpp" #include "ypipe.hpp"
#include "config.hpp" #include "config.hpp"
#include "object.hpp" #include "object.hpp"
@ -31,7 +32,7 @@
namespace zmq namespace zmq
{ {
class reader_t : public object_t class reader_t : public object_t, public yarray_item_t
{ {
public: public:
@ -44,10 +45,6 @@ namespace zmq
// Reads a message to the underlying pipe. // Reads a message to the underlying pipe.
bool read (struct zmq_msg_t *msg_); bool read (struct zmq_msg_t *msg_);
// Mnaipulation of index of the pipe.
void set_index (int index_);
int get_index ();
// Ask pipe to terminate. // Ask pipe to terminate.
void term (); void term ();
@ -72,9 +69,6 @@ namespace zmq
uint64_t tail; uint64_t tail;
uint64_t last_sent_head; uint64_t last_sent_head;
// Index of the pipe in the socket's list of inbound pipes.
int index;
// Endpoint (either session or socket) the pipe is attached to. // Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint; i_endpoint *endpoint;
@ -82,7 +76,7 @@ namespace zmq
void operator = (const reader_t&); void operator = (const reader_t&);
}; };
class writer_t : public object_t class writer_t : public object_t, public yarray_item_t
{ {
public: public:
@ -104,10 +98,6 @@ namespace zmq
// Flush the messages downsteam. // Flush the messages downsteam.
void flush (); void flush ();
// Mnaipulation of index of the pipe.
void set_index (int index_);
int get_index ();
// Ask pipe to terminate. // Ask pipe to terminate.
void term (); void term ();
@ -130,9 +120,6 @@ namespace zmq
uint64_t head; uint64_t head;
uint64_t tail; uint64_t tail;
// Index of the pipe in the socket's list of outbound pipes.
int index;
// Endpoint (either session or socket) the pipe is attached to. // Endpoint (either session or socket) the pipe is attached to.
i_endpoint *endpoint; i_endpoint *endpoint;

View File

@ -21,6 +21,8 @@
#include "pub.hpp" #include "pub.hpp"
#include "err.hpp" #include "err.hpp"
#include "msg_content.hpp"
#include "pipe.hpp"
zmq::pub_t::pub_t (class app_thread_t *parent_) : zmq::pub_t::pub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_PUB) socket_base_t (parent_, ZMQ_PUB)
@ -29,9 +31,134 @@ zmq::pub_t::pub_t (class app_thread_t *parent_) :
zmq::pub_t::~pub_t () zmq::pub_t::~pub_t ()
{ {
for (out_pipes_t::size_type i = 0; i != out_pipes.size (); i++)
out_pipes [i]->term ();
out_pipes.clear ();
} }
int zmq::pub_t::recv (struct zmq_msg_t *msg_, int flags_) bool zmq::pub_t::xrequires_in ()
{
return false;
}
bool zmq::pub_t::xrequires_out ()
{
return true;
}
void zmq::pub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (!inpipe_);
out_pipes.push_back (outpipe_);
}
void zmq::pub_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (false);
}
void zmq::pub_t::xdetach_outpipe (class writer_t *pipe_)
{
out_pipes.erase (pipe_);
}
void zmq::pub_t::xkill (class reader_t *pipe_)
{
zmq_assert (false);
}
void zmq::pub_t::xrevive (class reader_t *pipe_)
{
zmq_assert (false);
}
int zmq::pub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
int zmq::pub_t::xsend (struct zmq_msg_t *msg_, int flags_)
{
out_pipes_t::size_type pipes_count = out_pipes.size ();
// If there are no pipes available, simply drop the message.
if (pipes_count == 0) {
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
// First check whether all pipes are available for writing.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++)
if (!out_pipes [i]->check_write (zmq_msg_size (msg_))) {
errno = EAGAIN;
return -1;
}
msg_content_t *content = (msg_content_t*) msg_->content;
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
out_pipes [i]->write (msg_);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush ();
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
// Optimisation for the case when there's only a single pipe
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (pipes_count == 1) {
out_pipes [0]->write (msg_);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [0]->flush ();
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
// There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why -1).
if (msg_->shared)
content->refcnt.add (pipes_count - 1);
else {
content->refcnt.set (pipes_count);
msg_->shared = true;
}
// Push the message to all destinations.
for (out_pipes_t::size_type i = 0; i != pipes_count; i++) {
out_pipes [i]->write (msg_);
if (!(flags_ & ZMQ_NOFLUSH))
out_pipes [i]->flush ();
}
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
int zmq::pub_t::xflush ()
{
out_pipes_t::size_type pipe_count = out_pipes.size ();
for (out_pipes_t::size_type i = 0; i != pipe_count; i++)
out_pipes [i]->flush ();
return 0;
}
int zmq::pub_t::xrecv (struct zmq_msg_t *msg_, int flags_)
{ {
errno = EFAULT; errno = EFAULT;
return -1; return -1;

View File

@ -21,6 +21,7 @@
#define __ZMQ_PUB_INCLUDED__ #define __ZMQ_PUB_INCLUDED__
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq namespace zmq
{ {
@ -32,8 +33,27 @@ namespace zmq
pub_t (class app_thread_t *parent_); pub_t (class app_thread_t *parent_);
~pub_t (); ~pub_t ();
// Overloads of API functions from socket_base_t. // Overloads of functions from socket_base_t.
int recv (struct zmq_msg_t *msg_, int flags_); bool xrequires_in ();
bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (struct zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (struct zmq_msg_t *msg_, int flags_);
private:
// Outbound pipes, i.e. those the socket is sending messages to.
typedef yarray_t <class writer_t> out_pipes_t;
out_pipes_t out_pipes;
pub_t (const pub_t&);
void operator = (const pub_t&);
}; };
} }

204
src/rep.cpp Normal file
View File

@ -0,0 +1,204 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "rep.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::rep_t::rep_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_REP),
active (0),
current (0),
waiting_for_reply (false),
reply_pipe (NULL)
{
}
zmq::rep_t::~rep_t ()
{
}
bool zmq::rep_t::xrequires_in ()
{
return true;
}
bool zmq::rep_t::xrequires_out ()
{
return true;
}
void zmq::rep_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
in_pipes.push_back (inpipe_);
in_pipes.swap (active, in_pipes.size () - 1);
out_pipes.push_back (outpipe_);
out_pipes.swap (active, out_pipes.size () - 1);
active++;
}
void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
in_pipes_t::size_type index = in_pipes.index (pipe_);
// If corresponding outpipe is still in place simply nullify the pointer
// to the inpipe and move it to the passive state.
if (out_pipes [index]) {
in_pipes [index] = NULL;
if (in_pipes.index (pipe_) < active) {
active--;
in_pipes.swap (index, active);
out_pipes.swap (index, active);
}
return;
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
if (in_pipes.index (pipe_) < active)
active--;
in_pipes.erase (index);
out_pipes.erase (index);
}
void zmq::rep_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
out_pipes_t::size_type index = out_pipes.index (pipe_);
// TODO: If the connection we've got the request from disconnects,
// there's nowhere to send the reply. DLQ?
if (waiting_for_reply && pipe_ == reply_pipe) {
zmq_assert (false);
}
// If corresponding inpipe is still in place simply nullify the pointer
// to the outpipe.
if (in_pipes [index]) {
out_pipes [index] = NULL;
if (out_pipes.index (pipe_) < active) {
active--;
in_pipes.swap (index, active);
out_pipes.swap (index, active);
}
return;
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
if (out_pipes.index (pipe_) < active)
active--;
in_pipes.erase (index);
out_pipes.erase (index);
}
void zmq::rep_t::xkill (class reader_t *pipe_)
{
// Move the pipe to the list of inactive pipes.
in_pipes_t::size_type index = in_pipes.index (pipe_);
active--;
in_pipes.swap (index, active);
out_pipes.swap (index, active);
}
void zmq::rep_t::xrevive (class reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
in_pipes_t::size_type index = in_pipes.index (pipe_);
in_pipes.swap (index, active);
out_pipes.swap (index, active);
active++;
}
int zmq::rep_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
int zmq::rep_t::xsend (struct zmq_msg_t *msg_, int flags_)
{
if (!waiting_for_reply) {
errno = EFAULT;
return -1;
}
// TODO: Implement this once queue limits are in-place. If the reply
// overloads the buffer, connection should be torn down.
zmq_assert (reply_pipe->check_write (zmq_msg_size (msg_)));
// Push message to the selected pipe.
reply_pipe->write (msg_);
reply_pipe->flush ();
waiting_for_reply = false;
reply_pipe = NULL;
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
}
int zmq::rep_t::xflush ()
{
errno = EFAULT;
return -1;
}
int zmq::rep_t::xrecv (struct zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
if (waiting_for_reply) {
errno = EFAULT;
return -1;
}
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current]->read (msg_);
current++;
if (current >= active)
current = 0;
if (fetched) {
reply_pipe = out_pipes [current];
waiting_for_reply = true;
return 0;
}
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
}

79
src/rep.hpp Normal file
View File

@ -0,0 +1,79 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_REP_INCLUDED__
#define __ZMQ_REP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq
{
class rep_t : public socket_base_t
{
public:
rep_t (class app_thread_t *parent_);
~rep_t ();
// Overloads of functions from socket_base_t.
bool xrequires_in ();
bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (struct zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (struct zmq_msg_t *msg_, int flags_);
private:
// List in outbound and inbound pipes. Note that the two lists are
// always in sync. I.e. outpipe with index N communicates with the
// same session as inpipe with index N.
typedef yarray_t <class writer_t> out_pipes_t;
out_pipes_t out_pipes;
typedef yarray_t <class reader_t> in_pipes_t;
in_pipes_t in_pipes;
// Number of active inpipes. All the active inpipes are located at the
// beginning of the in_pipes array.
in_pipes_t::size_type active;
// Index of the next inbound pipe to read a request from.
in_pipes_t::size_type current;
// If true, request was already received and reply wasn't sent yet.
bool waiting_for_reply;
// Pipe we are going to send reply to.
class writer_t *reply_pipe;
rep_t (const rep_t&);
void operator = (const rep_t&);
};
}
#endif

206
src/req.cpp Normal file
View File

@ -0,0 +1,206 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "req.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::req_t::req_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_REQ),
current (0),
waiting_for_reply (false),
reply_pipe_active (false),
reply_pipe (NULL)
{
}
zmq::req_t::~req_t ()
{
}
bool zmq::req_t::xrequires_in ()
{
return true;
}
bool zmq::req_t::xrequires_out ()
{
return true;
}
void zmq::req_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (inpipe_ && outpipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
in_pipes.push_back (inpipe_);
out_pipes.push_back (outpipe_);
}
void zmq::req_t::xdetach_inpipe (class reader_t *pipe_)
{
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
// TODO: The pipe we are awaiting the reply from is detached. What now?
// Return ECONNRESET from subsequent recv?
if (waiting_for_reply && pipe_ == reply_pipe) {
zmq_assert (false);
}
in_pipes_t::size_type index = in_pipes.index (pipe_);
// If corresponding outpipe is still in place simply nullify the pointer
// to the inpipe.
if (out_pipes [index]) {
in_pipes [index] = NULL;
return;
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
in_pipes.erase (index);
out_pipes.erase (index);
}
void zmq::req_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (pipe_);
zmq_assert (in_pipes.size () == out_pipes.size ());
out_pipes_t::size_type index = out_pipes.index (pipe_);
// If corresponding inpipe is still in place simply nullify the pointer
// to the outpipe.
if (in_pipes [index]) {
out_pipes [index] = NULL;
return;
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
in_pipes.erase (index);
out_pipes.erase (index);
}
void zmq::req_t::xkill (class reader_t *pipe_)
{
zmq_assert (pipe_ == reply_pipe);
reply_pipe_active = false;
}
void zmq::req_t::xrevive (class reader_t *pipe_)
{
// TODO: Actually, misbehaving peer can cause this kind of thing.
// Handle it decently, presumably kill the offending connection.
zmq_assert (pipe_ == reply_pipe);
reply_pipe_active = true;
}
int zmq::req_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
errno = EINVAL;
return -1;
}
int zmq::req_t::xsend (struct zmq_msg_t *msg_, int flags_)
{
// If we've sent a request and we still haven't got the reply,
// we can't send another request.
if (waiting_for_reply) {
errno = EFAULT;
return -1;
}
if (out_pipes.empty ()) {
errno = EFAULT;
return -1;
}
current++;
if (current >= out_pipes.size ())
current = 0;
// TODO: Infinite loop can result here. Integrate the algorithm with
// the active pipes list (i.e. pipe pair that has one pipe missing is
// considered to be inactive.
while (!in_pipes [current] || !out_pipes [current]) {
current++;
if (current >= out_pipes.size ())
current = 0;
}
// TODO: Implement this once queue limits are in-place.
zmq_assert (out_pipes [current]->check_write (zmq_msg_size (msg_)));
// Push message to the selected pipe.
out_pipes [current]->write (msg_);
out_pipes [current]->flush ();
waiting_for_reply = true;
reply_pipe = in_pipes [current];
// We can safely assume that the reply pipe is active as the last time
// we've used it we've read the reply and haven't tried to read from it
// anymore.
reply_pipe_active = true;
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
int zmq::req_t::xflush ()
{
errno = EFAULT;
return -1;
}
int zmq::req_t::xrecv (struct zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// If request wasn't send, we can't wait for reply.
if (!waiting_for_reply) {
zmq_msg_init (msg_);
errno = EFAULT;
return -1;
}
// Get the reply from the reply pipe.
if (!reply_pipe_active || !reply_pipe->read (msg_)) {
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
}
waiting_for_reply = false;
reply_pipe = NULL;
return 0;
}

84
src/req.hpp Normal file
View File

@ -0,0 +1,84 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_REQ_INCLUDED__
#define __ZMQ_REQ_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq
{
class req_t : public socket_base_t
{
public:
req_t (class app_thread_t *parent_);
~req_t ();
// Overloads of functions from socket_base_t.
bool xrequires_in ();
bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (struct zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (struct zmq_msg_t *msg_, int flags_);
private:
// List in outbound and inbound pipes. Note that the two lists are
// always in sync. I.e. outpipe with index N communicates with the
// same session as inpipe with index N.
//
// TODO: Once we have queue limits in place, list of active outpipes
// is to be held (presumably by stacking active outpipes at
// the beginning of the array). We don't have to do the same thing for
// inpipes, because we know which pipe we want to read the
// reply from.
typedef yarray_t <class writer_t> out_pipes_t;
out_pipes_t out_pipes;
typedef yarray_t <class reader_t> in_pipes_t;
in_pipes_t in_pipes;
// Req_t load-balances the requests - 'current' points to the session
// that's processing the request at the moment.
out_pipes_t::size_type current;
// If true, request was already sent and reply wasn't received yet.
bool waiting_for_reply;
// True, if read can be attempted from the reply pipe.
bool reply_pipe_active;
// Pipe we are awaiting the reply from.
class reader_t *reply_pipe;
req_t (const req_t&);
void operator = (const req_t&);
};
}
#endif

View File

@ -46,11 +46,7 @@ bool zmq::session_t::read (::zmq_msg_t *msg_)
if (!active) if (!active)
return false; return false;
bool fetched = in_pipe->read (msg_); return in_pipe->read (msg_);
if (!fetched)
active = false;
return fetched;
} }
bool zmq::session_t::write (::zmq_msg_t *msg_) bool zmq::session_t::write (::zmq_msg_t *msg_)
@ -84,27 +80,21 @@ void zmq::session_t::detach ()
term (); term ();
} }
void zmq::session_t::attach_inpipe (reader_t *pipe_) void zmq::session_t::attach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{ {
zmq_assert (!in_pipe); if (inpipe_) {
in_pipe = pipe_; zmq_assert (!in_pipe);
active = true; in_pipe = inpipe_;
in_pipe->set_endpoint (this); active = true;
} in_pipe->set_endpoint (this);
}
void zmq::session_t::attach_outpipe (writer_t *pipe_) if (outpipe_) {
{ zmq_assert (!out_pipe);
zmq_assert (!out_pipe); out_pipe = outpipe_;
out_pipe = pipe_; out_pipe->set_endpoint (this);
out_pipe->set_endpoint (this); }
}
void zmq::session_t::revive (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
active = true;
if (engine)
engine->revive ();
} }
void zmq::session_t::detach_inpipe (reader_t *pipe_) void zmq::session_t::detach_inpipe (reader_t *pipe_)
@ -118,6 +108,19 @@ void zmq::session_t::detach_outpipe (writer_t *pipe_)
out_pipe = NULL; out_pipe = NULL;
} }
void zmq::session_t::kill (reader_t *pipe_)
{
active = false;
}
void zmq::session_t::revive (reader_t *pipe_)
{
zmq_assert (in_pipe == pipe_);
active = true;
if (engine)
engine->revive ();
}
void zmq::session_t::process_plug () void zmq::session_t::process_plug ()
{ {
// Register the session with the socket. // Register the session with the socket.

View File

@ -44,11 +44,11 @@ namespace zmq
void detach (); void detach ();
// i_endpoint interface implementation. // i_endpoint interface implementation.
void attach_inpipe (class reader_t *pipe_); void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void attach_outpipe (class writer_t *pipe_);
void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_); void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_); void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
private: private:

View File

@ -27,7 +27,6 @@
#include "dispatcher.hpp" #include "dispatcher.hpp"
#include "zmq_listener.hpp" #include "zmq_listener.hpp"
#include "zmq_connecter.hpp" #include "zmq_connecter.hpp"
#include "msg_content.hpp"
#include "io_thread.hpp" #include "io_thread.hpp"
#include "session.hpp" #include "session.hpp"
#include "config.hpp" #include "config.hpp"
@ -42,145 +41,28 @@
zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) : zmq::socket_base_t::socket_base_t (app_thread_t *parent_, int type_) :
object_t (parent_), object_t (parent_),
type (type_), type (type_),
current (0),
active (0),
pending_term_acks (0), pending_term_acks (0),
ticks (0), ticks (0),
app_thread (parent_), app_thread (parent_),
shutting_down (false), shutting_down (false)
index (-1)
{ {
} }
zmq::socket_base_t::~socket_base_t () zmq::socket_base_t::~socket_base_t ()
{ {
shutting_down = true;
// Ask all pipes to terminate.
for (in_pipes_t::iterator it = in_pipes.begin ();
it != in_pipes.end (); it++)
(*it)->term ();
in_pipes.clear ();
for (out_pipes_t::iterator it = out_pipes.begin ();
it != out_pipes.end (); it++)
(*it)->term ();
out_pipes.clear ();
while (true) {
// On third pass of the loop there should be no more I/O objects
// because all connecters and listerners were destroyed during
// the first pass and all engines delivered by delayed 'own' commands
// are destroyed during the second pass.
if (io_objects.empty () && !pending_term_acks)
break;
// Send termination request to all associated I/O objects.
for (io_objects_t::iterator it = io_objects.begin ();
it != io_objects.end (); it++)
send_term (*it);
// Move the objects to the list of pending term acks.
pending_term_acks += io_objects.size ();
io_objects.clear ();
// Process commands till we get all the termination acknowledgements.
while (pending_term_acks)
app_thread->process_commands (true, false);
}
// Check whether there are no session leaks.
sessions_sync.lock ();
zmq_assert (sessions.empty ());
sessions_sync.unlock ();
} }
int zmq::socket_base_t::setsockopt (int option_, const void *optval_, int zmq::socket_base_t::setsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
switch (option_) { // First, check whether specific socket type overloads the option.
int rc = xsetsockopt (option_, optval_, optvallen_);
if (rc == 0 || errno != EINVAL)
return rc;
case ZMQ_HWM: // If the socket type doesn't support the option, pass it to
if (optvallen_ != sizeof (int64_t)) { // the generic option parser.
errno = EINVAL; return options.setsockopt (option_, optval_, optvallen_);
return -1;
}
options.hwm = *((int64_t*) optval_);
return 0;
case ZMQ_LWM:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
options.lwm = *((int64_t*) optval_);
return 0;
case ZMQ_SWAP:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
options.swap = *((int64_t*) optval_);
return 0;
case ZMQ_AFFINITY:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
options.affinity = (uint64_t) *((int64_t*) optval_);
return 0;
case ZMQ_IDENTITY:
options.identity.assign ((const char*) optval_, optvallen_);
return 0;
case ZMQ_SUBSCRIBE:
case ZMQ_UNSUBSCRIBE:
errno = EFAULT;
return -1;
case ZMQ_RATE:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
options.rate = (uint32_t) *((int64_t*) optval_);
return 0;
case ZMQ_RECOVERY_IVL:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
options.recovery_ivl = (uint32_t) *((int64_t*) optval_);
return 0;
case ZMQ_MCAST_LOOP:
if (optvallen_ != sizeof (int64_t)) {
errno = EINVAL;
return -1;
}
if ((int64_t) *((int64_t*) optval_) == 0) {
options.use_multicast_loop = false;
} else if ((int64_t) *((int64_t*) optval_) == 1) {
options.use_multicast_loop = true;
} else {
errno = EINVAL;
return -1;
}
return 0;
default:
errno = EINVAL;
return -1;
}
} }
int zmq::socket_base_t::bind (const char *addr_) int zmq::socket_base_t::bind (const char *addr_)
@ -251,23 +133,29 @@ int zmq::socket_base_t::connect (const char *addr_)
options, true); options, true);
zmq_assert (session); zmq_assert (session);
// Create inbound pipe. pipe_t *in_pipe = NULL;
pipe_t *in_pipe = new pipe_t (this, session, options.hwm, options.lwm); pipe_t *out_pipe = NULL;
zmq_assert (in_pipe);
in_pipe->reader.set_endpoint (this);
session->attach_outpipe (&in_pipe->writer);
in_pipes.push_back (&in_pipe->reader);
in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
// Create outbound pipe. // Create inbound pipe, if required.
pipe_t *out_pipe = new pipe_t (session, this, options.hwm, options.lwm); if (xrequires_in ()) {
zmq_assert (out_pipe); in_pipe = new pipe_t (this, session, options.hwm, options.lwm);
out_pipe->writer.set_endpoint (this); zmq_assert (in_pipe);
session->attach_inpipe (&out_pipe->reader);
out_pipes.push_back (&out_pipe->writer); }
// Create outbound pipe, if required.
if (xrequires_out ()) {
out_pipe = new pipe_t (session, this, options.hwm, options.lwm);
zmq_assert (out_pipe);
}
// Attach the pipes to the socket object.
attach_pipes (in_pipe ? &in_pipe->reader : NULL,
out_pipe ? &out_pipe->writer : NULL);
// Attach the pipes to the session object.
session->attach_pipes (out_pipe ? &out_pipe->reader : NULL,
in_pipe ? &in_pipe->writer : NULL);
// Activate the session. // Activate the session.
send_plug (session); send_plug (session);
@ -294,6 +182,13 @@ int zmq::socket_base_t::connect (const char *addr_)
#if defined ZMQ_HAVE_OPENPGM #if defined ZMQ_HAVE_OPENPGM
if (addr_type == "pgm" || addr_type == "udp") { if (addr_type == "pgm" || addr_type == "udp") {
// If the socket type requires bi-directional communication
// multicast is not an option (it is uni-directional).
if (xrequires_in () && xrequires_out ()) {
errno = EFAULT;
return -1;
}
// For udp, pgm transport with udp encapsulation is used. // For udp, pgm transport with udp encapsulation is used.
bool udp_encapsulation = false; bool udp_encapsulation = false;
if (addr_type == "udp") if (addr_type == "udp")
@ -365,56 +260,61 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
app_thread->process_commands (false, true); app_thread->process_commands (false, true);
// Try to send the message. // Try to send the message.
bool sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH)); int rc = xsend (msg_, flags_);
if (rc == 0)
return 0;
if (!(flags_ & ZMQ_NOBLOCK)) { // In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - upwards.
// Oops, we couldn't send the message. Wait for the next if (flags_ & ZMQ_NOBLOCK)
// command, process it and try to send the message again.
while (!sent) {
app_thread->process_commands (true, false);
sent = distribute (msg_, !(flags_ & ZMQ_NOFLUSH));
}
}
else if (!sent) {
errno = EAGAIN;
return -1; return -1;
}
// Oops, we couldn't send the message. Wait for the next
// command, process it and try to send the message again.
while (rc != 0) {
if (errno != EAGAIN)
return -1;
app_thread->process_commands (true, false);
rc = xsend (msg_, flags_);
}
return 0; return 0;
} }
int zmq::socket_base_t::flush () int zmq::socket_base_t::flush ()
{ {
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end (); return xflush ();
it++)
(*it)->flush ();
return 0;
} }
int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
{ {
// Get the message and return immediately if successfull.
int rc = xrecv (msg_, flags_);
if (rc == 0)
return 0;
// If the message cannot be fetched immediately, there are two scenarios. // If the message cannot be fetched immediately, there are two scenarios.
// For non-blocking recv, commands are processed in case there's a message // For non-blocking recv, commands are processed in case there's a revive
// already waiting we don't know about. If it's not, return EAGAIN. // command already waiting int a command pipe. If it's not, return EAGAIN.
// In blocking scenario, commands are processed over and over again until // In blocking scenario, commands are processed over and over again until
// we are able to fetch a message. // we are able to fetch a message.
bool fetched = fetch (msg_); if (flags_ & ZMQ_NOBLOCK) {
if (!fetched) { if (errno != EAGAIN)
if (flags_ & ZMQ_NOBLOCK) { return -1;
app_thread->process_commands (false, false); app_thread->process_commands (false, false);
fetched = fetch (msg_); ticks = 0;
} rc = xrecv (msg_, flags_);
else { }
while (!fetched) { else {
app_thread->process_commands (true, false); while (rc != 0) {
ticks = 0; if (errno != EAGAIN)
fetched = fetch (msg_); return -1;
} app_thread->process_commands (true, false);
ticks = 0;
rc = xrecv (msg_, flags_);
} }
} }
// Once every inbound_poll_rate messages check for signals and process // Once every inbound_poll_rate messages check for signals and process
// incoming commands. This happens only if we are not polling altogether // incoming commands. This happens only if we are not polling altogether
// because there are messages available all the time. If poll occurs, // because there are messages available all the time. If poll occurs,
@ -428,12 +328,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
ticks = 0; ticks = 0;
} }
if (!fetched) { return rc;
errno = EAGAIN;
return -1;
}
return 0;
} }
int zmq::socket_base_t::close () int zmq::socket_base_t::close ()
@ -443,6 +338,37 @@ int zmq::socket_base_t::close ()
// Pointer to the dispatcher must be retrieved before the socket is // Pointer to the dispatcher must be retrieved before the socket is
// deallocated. Afterwards it is not available. // deallocated. Afterwards it is not available.
dispatcher_t *dispatcher = get_dispatcher (); dispatcher_t *dispatcher = get_dispatcher ();
shutting_down = true;
while (true) {
// On third pass of the loop there should be no more I/O objects
// because all connecters and listerners were destroyed during
// the first pass and all engines delivered by delayed 'own' commands
// are destroyed during the second pass.
if (io_objects.empty () && !pending_term_acks)
break;
// Send termination request to all associated I/O objects.
for (io_objects_t::iterator it = io_objects.begin ();
it != io_objects.end (); it++)
send_term (*it);
// Move the objects to the list of pending term acks.
pending_term_acks += io_objects.size ();
io_objects.clear ();
// Process commands till we get all the termination acknowledgements.
while (pending_term_acks)
app_thread->process_commands (true, false);
}
// Check whether there are no session leaks.
sessions_sync.lock ();
zmq_assert (sessions.empty ());
sessions_sync.unlock ();
delete this; delete this;
// This function must be called after the socket is completely deallocated // This function must be called after the socket is completely deallocated
@ -488,68 +414,36 @@ zmq::session_t *zmq::socket_base_t::find_session (const char *name_)
return it->second; return it->second;
} }
void zmq::socket_base_t::attach_inpipe (class reader_t *pipe_) void zmq::socket_base_t::kill (reader_t *pipe_)
{ {
pipe_->set_endpoint (this); xkill (pipe_);
in_pipes.push_back (pipe_);
in_pipes.back ()->set_index (active);
in_pipes [active]->set_index (in_pipes.size () - 1);
std::swap (in_pipes.back (), in_pipes [active]);
active++;
}
void zmq::socket_base_t::attach_outpipe (class writer_t *pipe_)
{
pipe_->set_endpoint (this);
out_pipes.push_back (pipe_);
pipe_->set_index (out_pipes.size () - 1);
} }
void zmq::socket_base_t::revive (reader_t *pipe_) void zmq::socket_base_t::revive (reader_t *pipe_)
{ {
// Move the pipe to the list of active pipes. xrevive (pipe_);
in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); }
in_pipes [index]->set_index (active);
in_pipes [active]->set_index (index); void zmq::socket_base_t::attach_pipes (class reader_t *inpipe_,
std::swap (in_pipes [index], in_pipes [active]); class writer_t *outpipe_)
active++; {
if (inpipe_)
inpipe_->set_endpoint (this);
if (outpipe_)
outpipe_->set_endpoint (this);
xattach_pipes (inpipe_, outpipe_);
} }
void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_) void zmq::socket_base_t::detach_inpipe (class reader_t *pipe_)
{ {
// Remove the pipe from the list of inbound pipes. xdetach_inpipe (pipe_);
in_pipes_t::size_type index = (in_pipes_t::size_type) pipe_->get_index (); pipe_->set_endpoint (NULL); // ?
if (index < active) {
in_pipes [index]->set_index (active - 1);
in_pipes [active - 1]->set_index (index);
std::swap (in_pipes [index], in_pipes [active - 1]);
active--;
index = active;
}
in_pipes [index]->set_index (in_pipes.size () - 1);
in_pipes [in_pipes.size () - 1]->set_index (index);
std::swap (in_pipes [index], in_pipes [in_pipes.size () - 1]);
in_pipes.pop_back ();
} }
void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_) void zmq::socket_base_t::detach_outpipe (class writer_t *pipe_)
{ {
out_pipes_t::size_type index = (out_pipes_t::size_type) pipe_->get_index (); xdetach_outpipe (pipe_);
out_pipes [index]->set_index (out_pipes.size () - 1); pipe_->set_endpoint (NULL); // ?
out_pipes [out_pipes.size () - 1]->set_index (index);
std::swap (out_pipes [index], out_pipes [out_pipes.size () - 1]);
out_pipes.pop_back ();
}
void zmq::socket_base_t::set_index (int index_)
{
index = index_;
}
int zmq::socket_base_t::get_index ()
{
zmq_assert (index != -1);
return index;
} }
void zmq::socket_base_t::process_own (owned_t *object_) void zmq::socket_base_t::process_own (owned_t *object_)
@ -560,10 +454,7 @@ void zmq::socket_base_t::process_own (owned_t *object_)
void zmq::socket_base_t::process_bind (owned_t *session_, void zmq::socket_base_t::process_bind (owned_t *session_,
reader_t *in_pipe_, writer_t *out_pipe_) reader_t *in_pipe_, writer_t *out_pipe_)
{ {
zmq_assert (in_pipe_); attach_pipes (in_pipe_, out_pipe_);
attach_inpipe (in_pipe_);
zmq_assert (out_pipe_);
attach_outpipe (out_pipe_);
} }
void zmq::socket_base_t::process_term_req (owned_t *object_) void zmq::socket_base_t::process_term_req (owned_t *object_)
@ -593,106 +484,3 @@ void zmq::socket_base_t::process_term_ack ()
pending_term_acks--; pending_term_acks--;
} }
bool zmq::socket_base_t::distribute (zmq_msg_t *msg_, bool flush_)
{
int pipes_count = out_pipes.size ();
// If there are no pipes available, simply drop the message.
if (pipes_count == 0) {
int rc = zmq_msg_close (msg_);
zmq_assert (rc == 0);
rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
}
// First check whether all pipes are available for writing.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++)
if (!(*it)->check_write (zmq_msg_size (msg_)))
return false;
msg_content_t *content = (msg_content_t*) msg_->content;
// For VSMs the copying is straighforward.
if (content == (msg_content_t*) ZMQ_VSM) {
for (out_pipes_t::iterator it = out_pipes.begin ();
it != out_pipes.end (); it++) {
(*it)->write (msg_);
if (flush_)
(*it)->flush ();
}
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
}
// Optimisation for the case when there's only a single pipe
// to send the message to - no refcount adjustment i.e. no atomic
// operations are needed.
if (pipes_count == 1) {
(*out_pipes.begin ())->write (msg_);
if (flush_)
(*out_pipes.begin ())->flush ();
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
}
// There are at least 2 destinations for the message. That means we have
// to deal with reference counting. First add N-1 references to
// the content (we are holding one reference anyway, that's why -1).
if (msg_->shared)
content->refcnt.add (pipes_count - 1);
else {
content->refcnt.set (pipes_count);
msg_->shared = true;
}
// Push the message to all destinations.
for (out_pipes_t::iterator it = out_pipes.begin (); it != out_pipes.end ();
it++) {
(*it)->write (msg_);
if (flush_)
(*it)->flush ();
}
// Detach the original message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return true;
}
bool zmq::socket_base_t::fetch (zmq_msg_t *msg_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current]->read (msg_);
// If there's no message in the pipe, move it to the list of
// non-active pipes.
if (!fetched) {
in_pipes [current]->set_index (active - 1);
in_pipes [active - 1]->set_index (current);
std::swap (in_pipes [current], in_pipes [active - 1]);
active--;
}
current ++;
if (current >= active)
current = 0;
if (fetched)
return true;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
return false;
}

View File

@ -27,6 +27,7 @@
#include "i_endpoint.hpp" #include "i_endpoint.hpp"
#include "object.hpp" #include "object.hpp"
#include "yarray_item.hpp"
#include "mutex.hpp" #include "mutex.hpp"
#include "options.hpp" #include "options.hpp"
#include "stdint.hpp" #include "stdint.hpp"
@ -34,41 +35,59 @@
namespace zmq namespace zmq
{ {
class socket_base_t : public object_t, public i_endpoint class socket_base_t :
public object_t, public i_endpoint, public yarray_item_t
{ {
public: public:
socket_base_t (class app_thread_t *parent_, int type_); socket_base_t (class app_thread_t *parent_, int type_);
virtual ~socket_base_t ();
// Interface for communication with the API layer. // Interface for communication with the API layer.
virtual int setsockopt (int option_, const void *optval_, int setsockopt (int option_, const void *optval_,
size_t optvallen_); size_t optvallen_);
virtual int bind (const char *addr_); int bind (const char *addr_);
virtual int connect (const char *addr_); int connect (const char *addr_);
virtual int send (struct zmq_msg_t *msg_, int flags_); int send (struct zmq_msg_t *msg_, int flags_);
virtual int flush (); int flush ();
virtual int recv (struct zmq_msg_t *msg_, int flags_); int recv (struct zmq_msg_t *msg_, int flags_);
virtual int close (); int close ();
// The list of sessions cannot be accessed via inter-thread // The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the // commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application // action till user application yields control of the application
// thread to 0MQ. // thread to 0MQ. Locking is used instead.
bool register_session (const char *name_, class session_t *session_); bool register_session (const char *name_, class session_t *session_);
bool unregister_session (const char *name_); bool unregister_session (const char *name_);
class session_t *find_session (const char *name_); class session_t *find_session (const char *name_);
// i_endpoint interface implementation. // i_endpoint interface implementation.
void attach_inpipe (class reader_t *pipe_); void attach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void attach_outpipe (class writer_t *pipe_);
void revive (class reader_t *pipe_);
void detach_inpipe (class reader_t *pipe_); void detach_inpipe (class reader_t *pipe_);
void detach_outpipe (class writer_t *pipe_); void detach_outpipe (class writer_t *pipe_);
void kill (class reader_t *pipe_);
void revive (class reader_t *pipe_);
// Manipulating index in the app_thread's list of sockets. protected:
void set_index (int index);
int get_index (); // Destructor is protected. Socket is closed using 'close' function.
virtual ~socket_base_t ();
// Pipe management is done by individual socket types.
virtual bool xrequires_in () = 0;
virtual bool xrequires_out () = 0;
virtual void xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_) = 0;
virtual void xdetach_inpipe (class reader_t *pipe_) = 0;
virtual void xdetach_outpipe (class writer_t *pipe_) = 0;
virtual void xkill (class reader_t *pipe_) = 0;
virtual void xrevive (class reader_t *pipe_) = 0;
// Actual algorithms are to be defined by individual socket types.
virtual int xsetsockopt (int option_, const void *optval_,
size_t optvallen_) = 0;
virtual int xsend (struct zmq_msg_t *msg_, int options_) = 0;
virtual int xflush () = 0;
virtual int xrecv (struct zmq_msg_t *msg_, int options_) = 0;
private: private:
@ -79,14 +98,6 @@ namespace zmq
void process_term_req (class owned_t *object_); void process_term_req (class owned_t *object_);
void process_term_ack (); void process_term_ack ();
// Attempts to distribute the message to all the outbound pipes.
// Returns false if not possible because of pipe overflow.
bool distribute (struct zmq_msg_t *msg_, bool flush_);
// Gets a message from one of the inbound pipes. Implementation of
// fair queueing.
bool fetch (struct zmq_msg_t *msg_);
// Type of the socket. // Type of the socket.
int type; int type;
@ -95,21 +106,6 @@ namespace zmq
typedef std::set <class owned_t*> io_objects_t; typedef std::set <class owned_t*> io_objects_t;
io_objects_t io_objects; io_objects_t io_objects;
// Inbound pipes, i.e. those the socket is getting messages from.
typedef std::vector <class reader_t*> in_pipes_t;
in_pipes_t in_pipes;
// Index of the next inbound pipe to read messages from.
in_pipes_t::size_type current;
// Number of active inbound pipes. Active pipes are stored in the
// initial section of the in_pipes array.
in_pipes_t::size_type active;
// Outbound pipes, i.e. those the socket is sending messages to.
typedef std::vector <class writer_t*> out_pipes_t;
out_pipes_t out_pipes;
// Number of I/O objects that were already asked to terminate // Number of I/O objects that were already asked to terminate
// but haven't acknowledged it yet. // but haven't acknowledged it yet.
int pending_term_acks; int pending_term_acks;
@ -138,9 +134,6 @@ namespace zmq
sessions_t sessions; sessions_t sessions;
mutex_t sessions_sync; mutex_t sessions_sync;
// Index of the socket in the app_thread's list of sockets.
int index;
socket_base_t (const socket_base_t&); socket_base_t (const socket_base_t&);
void operator = (const socket_base_t&); void operator = (const socket_base_t&);
}; };

View File

@ -21,18 +21,69 @@
#include "sub.hpp" #include "sub.hpp"
#include "err.hpp" #include "err.hpp"
#include "pipe.hpp"
zmq::sub_t::sub_t (class app_thread_t *parent_) : zmq::sub_t::sub_t (class app_thread_t *parent_) :
socket_base_t (parent_, ZMQ_SUB), socket_base_t (parent_, ZMQ_SUB),
active (0),
current (0),
all_count (0) all_count (0)
{ {
} }
zmq::sub_t::~sub_t () zmq::sub_t::~sub_t ()
{ {
for (in_pipes_t::size_type i = 0; i != in_pipes.size (); i++)
in_pipes [i]->term ();
in_pipes.clear ();
} }
int zmq::sub_t::setsockopt (int option_, const void *optval_, bool zmq::sub_t::xrequires_in ()
{
return true;
}
bool zmq::sub_t::xrequires_out ()
{
return false;
}
void zmq::sub_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (!outpipe_);
in_pipes.push_back (inpipe_);
in_pipes.swap (active, in_pipes.size () - 1);
active++;
}
void zmq::sub_t::xdetach_inpipe (class reader_t *pipe_)
{
if (in_pipes.index (pipe_) < active)
active--;
in_pipes.erase (pipe_);
}
void zmq::sub_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (false);
}
void zmq::sub_t::xkill (class reader_t *pipe_)
{
// Move the pipe to the list of inactive pipes.
in_pipes.swap (in_pipes.index (pipe_), active - 1);
active--;
}
void zmq::sub_t::xrevive (class reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
in_pipes.swap (in_pipes.index (pipe_), active);
active++;
}
int zmq::sub_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_) size_t optvallen_)
{ {
if (option_ == ZMQ_SUBSCRIBE) { if (option_ == ZMQ_SUBSCRIBE) {
@ -75,27 +126,28 @@ int zmq::sub_t::setsockopt (int option_, const void *optval_,
return 0; return 0;
} }
return socket_base_t::setsockopt (option_, optval_, optvallen_); errno = EINVAL;
return -1;
} }
int zmq::sub_t::send (struct zmq_msg_t *msg_, int flags_) int zmq::sub_t::xsend (struct zmq_msg_t *msg_, int flags_)
{ {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
int zmq::sub_t::flush () int zmq::sub_t::xflush ()
{ {
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_) int zmq::sub_t::xrecv (struct zmq_msg_t *msg_, int flags_)
{ {
while (true) { while (true) {
// Get a message. // Get a message using fair queueing algorithm.
int rc = socket_base_t::recv (msg_, flags_); int rc = fq (msg_, flags_);
// If there's no message available, return immediately. // If there's no message available, return immediately.
if (rc != 0 && errno == EAGAIN) if (rc != 0 && errno == EAGAIN)
@ -131,3 +183,25 @@ int zmq::sub_t::recv (struct zmq_msg_t *msg_, int flags_)
return 0; return 0;
} }
} }
int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = in_pipes [current]->read (msg_);
current++;
if (current >= active)
current = 0;
if (fetched)
return 0;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
}

View File

@ -24,6 +24,7 @@
#include <string> #include <string>
#include "socket_base.hpp" #include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq namespace zmq
{ {
@ -35,14 +36,38 @@ namespace zmq
sub_t (class app_thread_t *parent_); sub_t (class app_thread_t *parent_);
~sub_t (); ~sub_t ();
// Overloads of API functions from socket_base_t. protected:
int setsockopt (int option_, const void *optval_, size_t optvallen_);
int send (struct zmq_msg_t *msg_, int flags_); // Overloads of functions from socket_base_t.
int flush (); bool xrequires_in ();
int recv (struct zmq_msg_t *msg_, int flags_); bool xrequires_out ();
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (struct zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (struct zmq_msg_t *msg_, int flags_);
private: private:
// Helper function to return one message choosed using
// fair queueing algorithm.
int fq (struct zmq_msg_t *msg_, int flags_);
// Inbound pipes, i.e. those the socket is getting messages from.
typedef yarray_t <class reader_t> in_pipes_t;
in_pipes_t in_pipes;
// Number of active inbound pipes. Active pipes are stored in the
// initial section of the in_pipes array.
in_pipes_t::size_type active;
// Index of the next inbound pipe to read messages from.
in_pipes_t::size_type current;
// Number of active "*" subscriptions. // Number of active "*" subscriptions.
int all_count; int all_count;
@ -52,6 +77,9 @@ namespace zmq
// List of all exact match subscriptions. // List of all exact match subscriptions.
subscriptions_t topics; subscriptions_t topics;
sub_t (const sub_t&);
void operator = (const sub_t&);
}; };
} }

110
src/yarray.hpp Normal file
View File

@ -0,0 +1,110 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_YARRAY_INCLUDED__
#define __ZMQ_YARRAY_INCLUDED__
#include <vector>
#include <algorithm>
namespace zmq
{
// Fast array implementation with O(1) access to item, insertion and
// removal. Yarray stores pointers rather than objects. The objects have
// to be derived from yarray_item_t class.
template <typename T> class yarray_t
{
public:
typedef typename std::vector <T*>::size_type size_type;
inline yarray_t ()
{
}
inline ~yarray_t ()
{
}
inline size_type size ()
{
return items.size ();
}
inline bool empty ()
{
return items.empty ();
}
inline T *&operator [] (size_type index_)
{
return items [index_];
}
inline void push_back (T *item_)
{
if (item_)
item_->set_yarray_index (items.size ());
items.push_back (item_);
}
inline void erase (T *item_) {
erase (item_->get_yarray_index ());
}
inline void erase (size_type index_) {
if (items.back ())
items.back ()->set_yarray_index (index_);
items [index_] = items.back ();
items.pop_back ();
}
inline void swap (size_type index1_, size_type index2_)
{
if (items [index1_])
items [index1_]->set_yarray_index (index2_);
if (items [index2_])
items [index2_]->set_yarray_index (index1_);
std::swap (items [index1_], items [index2_]);
}
inline void clear ()
{
items.clear ();
}
inline size_type index (T *item_)
{
return (size_type) item_->get_yarray_index ();
}
private:
typedef std::vector <T*> items_t;
items_t items;
yarray_t (const yarray_t&);
void operator = (const yarray_t&);
};
}
#endif

62
src/yarray_item.hpp Normal file
View File

@ -0,0 +1,62 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_YARRAY_ITEM_INCLUDED__
#define __ZMQ_YARRAY_ITEM_INCLUDED__
namespace zmq
{
// Base class for objects stored in yarray. Note that each object can
// be stored in at most one yarray.
class yarray_item_t
{
public:
inline yarray_item_t () :
yarray_index (-1)
{
}
inline ~yarray_item_t ()
{
}
inline void set_yarray_index (int index_)
{
yarray_index = index_;
}
inline int get_yarray_index ()
{
return yarray_index;
}
private:
int yarray_index;
yarray_item_t (const yarray_item_t&);
void operator = (const yarray_item_t&);
};
}
#endif