ZMQII-25: Implement streamed request/reply

This commit is contained in:
Martin Sustrik 2009-11-24 11:23:10 +01:00
parent 5cd98bc575
commit c98fd6bc3f
22 changed files with 612 additions and 21 deletions

View File

@ -188,6 +188,12 @@ ZMQ_EXPORT int zmq_term (void *context);
// the peer that issued the last received request.
#define ZMQ_REP 4
// Socket to receive messages from up the stream.
#define ZMQ_UPSTREAM 5
// Socket to send messages downstream.
#define ZMQ_DOWNSTREAM 6
// Open a socket. 'type' is one of the socket types defined above.
//
// Errors: EINVAL - invalid socket type.

View File

@ -34,6 +34,8 @@ public class Socket
public static final int SUB = 2;
public static final int REQ = 3;
public static final int REP = 4;
public static final int UPSTREAM = 4;
public static final int DOWNSTREAM = 4;
public static final int HWM = 1;
public static final int LWM = 2;

View File

@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq ()
t = PyInt_FromLong (ZMQ_REP);
PyDict_SetItemString (dict, "REP", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_UPSTREAM);
PyDict_SetItemString (dict, "UPSTREAM", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_DOWNSTREAM);
PyDict_SetItemString (dict, "DOWNSTREAM", t);
Py_DECREF (t);
t = PyInt_FromLong (ZMQ_HWM);
PyDict_SetItemString (dict, "HWM", t);
Py_DECREF (t);

View File

@ -275,6 +275,8 @@ extern "C" void Init_librbzmq ()
rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB));
rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ));
rb_define_global_const ("REP", INT2NUM (ZMQ_REP));
rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM));
rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM));
rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL));
}

View File

@ -590,6 +590,14 @@ if test "x$with_forwarder" != "xno"; then
forwarder="yes"
fi
# streamer device
streamer="no"
AC_ARG_WITH([streamer], [AS_HELP_STRING([--with-streamer],
[build streamer device [default=no]])], [with_streamer=yes], [with_streamer=no])
if test "x$with_streamer" != "xno"; then
streamer="yes"
fi
# Perf
perf="no"
@ -619,6 +627,7 @@ AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes")
AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes")
AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno")
AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes")
AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes")
AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes")
AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes")
AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes")
@ -641,7 +650,8 @@ AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \
bindings/python/setup.py bindings/ruby/Makefile \
bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \
perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \
devices/Makefile devices/zmq_forwarder/Makefile bindings/Makefile)
devices/Makefile devices/zmq_forwarder/Makefile \
devices/zmq_streamer/Makefile bindings/Makefile)
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])
@ -676,6 +686,7 @@ AC_MSG_RESULT([ PGM: no])
fi
AC_MSG_RESULT([ Devices:])
AC_MSG_RESULT([ forwarder: $forwarder])
AC_MSG_RESULT([ streamer: $streamer])
AC_MSG_RESULT([ Performance tests: $perf])
AC_MSG_RESULT([])
AC_MSG_RESULT([ ******************************************************** ])

View File

@ -2,5 +2,9 @@ if BUILD_FORWARDER
FORWARDER_DIR = zmq_forwarder
endif
SUBDIRS = $(FORWARDER_DIR)
DIST_SUBDIRS = zmq_forwarder
if BUILD_STREAMER
STREAMER_DIR = zmq_streamer
endif
SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR)
DIST_SUBDIRS = zmq_forwarder zmq_streamer

View File

@ -23,7 +23,7 @@
int main (int argc, char *argv [])
{
if (argc != 2) {
fprintf (stderr, "usage: forwarder <config-file>\n");
fprintf (stderr, "usage: zmq_forwarder <config-file>\n");
return 1;
}
@ -53,8 +53,9 @@ int main (int argc, char *argv [])
// TODO: make the number of I/O threads configurable.
zmq::context_t ctx (1, 1);
zmq::socket_t in_socket (ctx, ZMQ_P2P);
zmq::socket_t out_socket (ctx, ZMQ_P2P);
zmq::socket_t in_socket (ctx, ZMQ_SUB);
in_socket.setsockopt (ZMQ_SUBSCRIBE, "*", 1);
zmq::socket_t out_socket (ctx, ZMQ_PUB);
int n = 0;
while (true) {

View File

@ -0,0 +1,9 @@
INCLUDES = -I$(top_builddir)/bindings/c
bin_PROGRAMS = zmq_streamer
zmq_streamer_LDADD = $(top_builddir)/src/libzmq.la
zmq_streamer_SOURCES = zmq_streamer.cpp
zmq_streamer_CXXFLAGS = -Wall -pedantic -Werror

View File

@ -0,0 +1,122 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../../bindings/cpp/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"
int main (int argc, char *argv [])
{
if (argc != 2) {
fprintf (stderr, "usage: zmq_streamer <config-file>\n");
return 1;
}
XMLNode root = XMLNode::parseFile (argv [1]);
if (root.isEmpty ()) {
fprintf (stderr, "configuration file not found\n");
return 1;
}
if (strcmp (root.getName (), "streamer") != 0) {
fprintf (stderr, "root element in the configuration file should be "
"named 'streamer'\n");
return 1;
}
XMLNode in_node = root.getChildNode ("in");
if (in_node.isEmpty ()) {
fprintf (stderr, "'in' node is missing in the configuration file\n");
return 1;
}
XMLNode out_node = root.getChildNode ("out");
if (out_node.isEmpty ()) {
fprintf (stderr, "'out' node is missing in the configuration file\n");
return 1;
}
// TODO: make the number of I/O threads configurable.
zmq::context_t ctx (1, 1);
zmq::socket_t in_socket (ctx, ZMQ_UPSTREAM);
zmq::socket_t out_socket (ctx, ZMQ_DOWNSTREAM);
int n = 0;
while (true) {
XMLNode bind = in_node.getChildNode ("bind", n);
if (bind.isEmpty ())
break;
const char *addr = bind.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
return 1;
}
in_socket.bind (addr);
n++;
}
n = 0;
while (true) {
XMLNode connect = in_node.getChildNode ("connect", n);
if (connect.isEmpty ())
break;
const char *addr = connect.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
return 1;
}
in_socket.connect (addr);
n++;
}
n = 0;
while (true) {
XMLNode bind = out_node.getChildNode ("bind", n);
if (bind.isEmpty ())
break;
const char *addr = bind.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'bind' node is missing 'addr' attribute\n");
return 1;
}
out_socket.bind (addr);
n++;
}
n = 0;
while (true) {
XMLNode connect = out_node.getChildNode ("connect", n);
if (connect.isEmpty ())
break;
const char *addr = connect.getAttribute ("addr");
if (!addr) {
fprintf (stderr, "'connect' node is missing 'addr' attribute\n");
return 1;
}
out_socket.connect (addr);
n++;
}
zmq::message_t msg;
while (true) {
in_socket.recv (&msg);
out_socket.send (msg);
}
return 0;
}

View File

@ -37,6 +37,15 @@ Socket to receive requests and send replies. This socket type allows
only an alternated sequence of recv's and send's. Each send is routed to
the peer that issued the last received request.
.IP "\fBZMQ_UPSTREAM\fP"
Socket to receive messages from up the stream. Messages are fair-queued
from among all the connected peers. Send function is not implemented for
this socket type.
.IP "\fBZMQ_DOWNSTREAM\fP"
Socket to send messages down stream. Messages are load-balanced among all the
connected peers. Send function is not implemented for this socket type.
.SH RETURN VALUE
Function returns socket handle is successful. Otherwise it returns NULL and
sets errno to one of the values below.

View File

@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \
decoder.hpp \
devpoll.hpp \
dispatcher.hpp \
downstream.hpp \
encoder.hpp \
epoll.hpp \
err.hpp \
@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.hpp \
tcp_socket.hpp \
thread.hpp \
upstream.hpp \
uuid.hpp \
windows.hpp \
wire.hpp \
@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \
app_thread.cpp \
devpoll.cpp \
dispatcher.cpp \
downstream.cpp \
epoll.cpp \
err.cpp \
fd_signaler.cpp \
@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \
tcp_listener.cpp \
tcp_socket.cpp \
thread.cpp \
upstream.cpp \
uuid.cpp \
ypollset.cpp \
zmq.cpp \

View File

@ -40,11 +40,13 @@
#include "pipe.hpp"
#include "config.hpp"
#include "socket_base.hpp"
#include "p2p.hpp"
#include "pub.hpp"
#include "sub.hpp"
#include "req.hpp"
#include "rep.hpp"
#include "p2p.hpp"
#include "upstream.hpp"
#include "downstream.hpp"
// If the RDTSC is available we use it to prevent excessive
// polling for commands. The nice thing here is that it will work on any
@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
{
socket_base_t *s = NULL;
switch (type_) {
case ZMQ_P2P:
s = new p2p_t (this);
break;
case ZMQ_PUB:
s = new pub_t (this);
break;
@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_)
case ZMQ_REP:
s = new rep_t (this);
break;
case ZMQ_P2P:
s = new p2p_t (this);
case ZMQ_UPSTREAM:
s = new upstream_t (this);
break;
case ZMQ_DOWNSTREAM:
s = new downstream_t (this);
break;
default:
// TODO: This should be EINVAL.

131
src/downstream.cpp Normal file
View File

@ -0,0 +1,131 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "downstream.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::downstream_t::downstream_t (class app_thread_t *parent_) :
socket_base_t (parent_),
current (0)
{
options.requires_in = false;
options.requires_out = true;
}
zmq::downstream_t::~downstream_t ()
{
}
void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (!inpipe_ && outpipe_);
pipes.push_back (outpipe_);
}
void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_)
{
zmq_assert (pipe_);
pipes.erase (pipes.index (pipe_));
}
void zmq::downstream_t::xkill (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
void zmq::downstream_t::xrevive (class reader_t *pipe_)
{
// There are no inpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
int zmq::downstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special option for this socket type.
errno = EINVAL;
return -1;
}
int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_)
{
// If there are no pipes we cannot send the message.
if (pipes.empty ()) {
errno = EAGAIN;
return -1;
}
// Move to the next pipe (load-balancing).
current++;
if (current >= pipes.size ())
current = 0;
// TODO: Implement this once queue limits are in-place.
zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_)));
// Push message to the selected pipe.
pipes [current]->write (msg_);
pipes [current]->flush ();
// Detach the message from the data buffer.
int rc = zmq_msg_init (msg_);
zmq_assert (rc == 0);
return 0;
}
int zmq::downstream_t::xflush ()
{
// TODO: Maybe there's a point in flushing messages downstream.
// It may be useful in the case where number of messages in a single
// transaction is much greater than the number of attached pipes.
errno = ENOTSUP;
return -1;
}
int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
bool zmq::downstream_t::xhas_in ()
{
return false;
}
bool zmq::downstream_t::xhas_out ()
{
// TODO: Modify this code once pipe limits are in place.
return true;
}

64
src/downstream.hpp Normal file
View File

@ -0,0 +1,64 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__
#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq
{
class downstream_t : public socket_base_t
{
public:
downstream_t (class app_thread_t *parent_);
~downstream_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
// List of outbound pipes.
typedef yarray_t <class writer_t> pipes_t;
pipes_t pipes;
// Points to the last pipe that the most recent message was sent to.
pipes_t::size_type current;
downstream_t (const downstream_t&);
void operator = (const downstream_t&);
};
}
#endif

View File

@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_P2P_INCLUDED__
#define __ZMQ_P2P_INCLUDED__
#ifndef __ZMQ_P2P_HPP_INCLUDED__
#define __ZMQ_P2P_HPP_INCLUDED__
#include "socket_base.hpp"

View File

@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_PUB_INCLUDED__
#define __ZMQ_PUB_INCLUDED__
#ifndef __ZMQ_PUB_HPP_INCLUDED__
#define __ZMQ_PUB_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"

View File

@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_)
}
// Now both inpipe and outpipe are detached. Remove them from the lists.
if (in_pipes.index (pipe_) < active)
if (index < active)
active--;
in_pipes.erase (index);
out_pipes.erase (index);

View File

@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_REP_INCLUDED__
#define __ZMQ_REP_INCLUDED__
#ifndef __ZMQ_REP_HPP_INCLUDED__
#define __ZMQ_REP_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"

View File

@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_REQ_INCLUDED__
#define __ZMQ_REQ_INCLUDED__
#ifndef __ZMQ_REQ_HPP_INCLUDED__
#define __ZMQ_REQ_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"

View File

@ -17,8 +17,8 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_SUB_INCLUDED__
#define __ZMQ_SUB_INCLUDED__
#ifndef __ZMQ_SUB_HPP_INCLUDED__
#define __ZMQ_SUB_HPP_INCLUDED__
#include <set>
#include <string>

143
src/upstream.cpp Normal file
View File

@ -0,0 +1,143 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "../bindings/c/zmq.h"
#include "upstream.hpp"
#include "err.hpp"
#include "pipe.hpp"
zmq::upstream_t::upstream_t (class app_thread_t *parent_) :
socket_base_t (parent_),
active (0),
current (0)
{
options.requires_in = true;
options.requires_out = false;
}
zmq::upstream_t::~upstream_t ()
{
}
void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_,
class writer_t *outpipe_)
{
zmq_assert (inpipe_ && !outpipe_);
pipes.push_back (inpipe_);
pipes.swap (active, pipes.size () - 1);
active++;
}
void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_)
{
// Remove the pipe from the list; adjust number of active pipes
// accordingly.
zmq_assert (pipe_);
pipes_t::size_type index = pipes.index (pipe_);
if (index < active)
active--;
pipes.erase (index);
}
void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_)
{
// There are no outpipes, so this function shouldn't be called at all.
zmq_assert (false);
}
void zmq::upstream_t::xkill (class reader_t *pipe_)
{
// Move the pipe to the list of inactive pipes.
active--;
pipes.swap (pipes.index (pipe_), active);
}
void zmq::upstream_t::xrevive (class reader_t *pipe_)
{
// Move the pipe to the list of active pipes.
pipes.swap (pipes.index (pipe_), active);
active++;
}
int zmq::upstream_t::xsetsockopt (int option_, const void *optval_,
size_t optvallen_)
{
// No special options for this socket type.
errno = EINVAL;
return -1;
}
int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_)
{
errno = ENOTSUP;
return -1;
}
int zmq::upstream_t::xflush ()
{
errno = ENOTSUP;
return -1;
}
int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_)
{
// Deallocate old content of the message.
zmq_msg_close (msg_);
// Round-robin over the pipes to get next message.
for (int count = active; count != 0; count--) {
bool fetched = pipes [current]->read (msg_);
current++;
if (current >= active)
current = 0;
if (fetched)
return 0;
}
// No message is available. Initialise the output parameter
// to be a 0-byte message.
zmq_msg_init (msg_);
errno = EAGAIN;
return -1;
}
bool zmq::upstream_t::xhas_in ()
{
// Note that messing with current doesn't break the fairness of fair
// queueing algorithm. If there are no messages available current will
// get back to its original value. Otherwise it'll point to the first
// pipe holding messages, skipping only pipes with no messages available.
for (int count = active; count != 0; count--) {
if (pipes [current]->check_read ())
return true;
current++;
if (current >= active)
current = 0;
}
return false;
}
bool zmq::upstream_t::xhas_out ()
{
return false;
}

69
src/upstream.hpp Normal file
View File

@ -0,0 +1,69 @@
/*
Copyright (c) 2007-2009 FastMQ Inc.
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the Lesser GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
Lesser GNU General Public License for more details.
You should have received a copy of the Lesser GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__
#define __ZMQ_UPSTREAM_HPP_INCLUDED__
#include "socket_base.hpp"
#include "yarray.hpp"
namespace zmq
{
class upstream_t : public socket_base_t
{
public:
upstream_t (class app_thread_t *parent_);
~upstream_t ();
// Overloads of functions from socket_base_t.
void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_);
void xdetach_inpipe (class reader_t *pipe_);
void xdetach_outpipe (class writer_t *pipe_);
void xkill (class reader_t *pipe_);
void xrevive (class reader_t *pipe_);
int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();
private:
// Inbound pipes.
typedef yarray_t <class reader_t> pipes_t;
pipes_t pipes;
// Number of active pipes. All the active pipes are located at the
// beginning of the pipes array.
pipes_t::size_type active;
// Index of the next bound pipe to read a message from.
pipes_t::size_type current;
upstream_t (const upstream_t&);
void operator = (const upstream_t&);
};
}
#endif