diff --git a/bindings/c/zmq.h b/bindings/c/zmq.h index 9b11a1d3..a65926ef 100644 --- a/bindings/c/zmq.h +++ b/bindings/c/zmq.h @@ -188,6 +188,12 @@ ZMQ_EXPORT int zmq_term (void *context); // the peer that issued the last received request. #define ZMQ_REP 4 +// Socket to receive messages from up the stream. +#define ZMQ_UPSTREAM 5 + +// Socket to send messages downstream. +#define ZMQ_DOWNSTREAM 6 + // Open a socket. 'type' is one of the socket types defined above. // // Errors: EINVAL - invalid socket type. diff --git a/bindings/java/org/zmq/Socket.java b/bindings/java/org/zmq/Socket.java index 501bc160..396a6a03 100644 --- a/bindings/java/org/zmq/Socket.java +++ b/bindings/java/org/zmq/Socket.java @@ -34,6 +34,8 @@ public class Socket public static final int SUB = 2; public static final int REQ = 3; public static final int REP = 4; + public static final int UPSTREAM = 4; + public static final int DOWNSTREAM = 4; public static final int HWM = 1; public static final int LWM = 2; diff --git a/bindings/python/pyzmq.cpp b/bindings/python/pyzmq.cpp index b180bcd5..26ca7ac3 100644 --- a/bindings/python/pyzmq.cpp +++ b/bindings/python/pyzmq.cpp @@ -498,6 +498,12 @@ PyMODINIT_FUNC initlibpyzmq () t = PyInt_FromLong (ZMQ_REP); PyDict_SetItemString (dict, "REP", t); Py_DECREF (t); + t = PyInt_FromLong (ZMQ_UPSTREAM); + PyDict_SetItemString (dict, "UPSTREAM", t); + Py_DECREF (t); + t = PyInt_FromLong (ZMQ_DOWNSTREAM); + PyDict_SetItemString (dict, "DOWNSTREAM", t); + Py_DECREF (t); t = PyInt_FromLong (ZMQ_HWM); PyDict_SetItemString (dict, "HWM", t); Py_DECREF (t); diff --git a/bindings/ruby/rbzmq.cpp b/bindings/ruby/rbzmq.cpp index 61129721..2a26ce15 100644 --- a/bindings/ruby/rbzmq.cpp +++ b/bindings/ruby/rbzmq.cpp @@ -275,6 +275,8 @@ extern "C" void Init_librbzmq () rb_define_global_const ("PUB", INT2NUM (ZMQ_PUB)); rb_define_global_const ("REQ", INT2NUM (ZMQ_REQ)); rb_define_global_const ("REP", INT2NUM (ZMQ_REP)); + rb_define_global_const ("UPSTREAM", INT2NUM (ZMQ_UPSTREAM)); + rb_define_global_const ("DOWNSTREAM", INT2NUM (ZMQ_DOWNSTREAM)); rb_define_global_const ("POLL", INT2NUM (ZMQ_POLL)); } diff --git a/configure.in b/configure.in index c2cf6780..bd3a0f4a 100644 --- a/configure.in +++ b/configure.in @@ -590,6 +590,14 @@ if test "x$with_forwarder" != "xno"; then forwarder="yes" fi +# streamer device +streamer="no" +AC_ARG_WITH([streamer], [AS_HELP_STRING([--with-streamer], + [build streamer device [default=no]])], [with_streamer=yes], [with_streamer=no]) + +if test "x$with_streamer" != "xno"; then + streamer="yes" +fi # Perf perf="no" @@ -618,7 +626,8 @@ AM_CONDITIONAL(BUILD_CPP, test "x$cppzmq" = "xyes") AM_CONDITIONAL(BUILD_PGM1, test "x$pgm1_ext" = "xyes") AM_CONDITIONAL(BUILD_PGM2, test "x$pgm2_ext" = "xyes") AM_CONDITIONAL(BUILD_NO_PGM, test "x$pgm2_ext" = "xno" -a "x$pgm1_ext" = "xno") -AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") +AM_CONDITIONAL(BUILD_FORWARDER, test "x$forwarder" = "xyes") +AM_CONDITIONAL(BUILD_STREAMER, test "x$streamer" = "xyes") AM_CONDITIONAL(BUILD_PERF, test "x$perf" = "xyes") AM_CONDITIONAL(ON_MINGW, test "x$on_mingw32" = "xyes") AM_CONDITIONAL(BUILD_PGM2_EXAMPLES, test "x$with_pgm2_ext" = "xyes") @@ -641,7 +650,8 @@ AC_OUTPUT(Makefile src/Makefile man/Makefile bindings/python/Makefile \ bindings/python/setup.py bindings/ruby/Makefile \ bindings/java/Makefile perf/Makefile perf/c/Makefile perf/cpp/Makefile \ perf/python/Makefile perf/ruby/Makefile perf/java/Makefile src/libzmq.pc \ - devices/Makefile devices/zmq_forwarder/Makefile bindings/Makefile) + devices/Makefile devices/zmq_forwarder/Makefile \ + devices/zmq_streamer/Makefile bindings/Makefile) AC_MSG_RESULT([]) AC_MSG_RESULT([ ******************************************************** ]) @@ -676,6 +686,7 @@ AC_MSG_RESULT([ PGM: no]) fi AC_MSG_RESULT([ Devices:]) AC_MSG_RESULT([ forwarder: $forwarder]) +AC_MSG_RESULT([ streamer: $streamer]) AC_MSG_RESULT([ Performance tests: $perf]) AC_MSG_RESULT([]) AC_MSG_RESULT([ ******************************************************** ]) diff --git a/devices/Makefile.am b/devices/Makefile.am index 4cbad149..ab18976a 100644 --- a/devices/Makefile.am +++ b/devices/Makefile.am @@ -2,5 +2,9 @@ if BUILD_FORWARDER FORWARDER_DIR = zmq_forwarder endif -SUBDIRS = $(FORWARDER_DIR) -DIST_SUBDIRS = zmq_forwarder +if BUILD_STREAMER +STREAMER_DIR = zmq_streamer +endif + +SUBDIRS = $(FORWARDER_DIR) $(STREAMER_DIR) +DIST_SUBDIRS = zmq_forwarder zmq_streamer diff --git a/devices/zmq_forwarder/zmq_forwarder.cpp b/devices/zmq_forwarder/zmq_forwarder.cpp index 32af5dd6..d29ed626 100644 --- a/devices/zmq_forwarder/zmq_forwarder.cpp +++ b/devices/zmq_forwarder/zmq_forwarder.cpp @@ -23,7 +23,7 @@ int main (int argc, char *argv []) { if (argc != 2) { - fprintf (stderr, "usage: forwarder \n"); + fprintf (stderr, "usage: zmq_forwarder \n"); return 1; } @@ -53,8 +53,9 @@ int main (int argc, char *argv []) // TODO: make the number of I/O threads configurable. zmq::context_t ctx (1, 1); - zmq::socket_t in_socket (ctx, ZMQ_P2P); - zmq::socket_t out_socket (ctx, ZMQ_P2P); + zmq::socket_t in_socket (ctx, ZMQ_SUB); + in_socket.setsockopt (ZMQ_SUBSCRIBE, "*", 1); + zmq::socket_t out_socket (ctx, ZMQ_PUB); int n = 0; while (true) { diff --git a/devices/zmq_streamer/Makefile.am b/devices/zmq_streamer/Makefile.am new file mode 100644 index 00000000..e3681bfe --- /dev/null +++ b/devices/zmq_streamer/Makefile.am @@ -0,0 +1,9 @@ +INCLUDES = -I$(top_builddir)/bindings/c + +bin_PROGRAMS = zmq_streamer + +zmq_streamer_LDADD = $(top_builddir)/src/libzmq.la +zmq_streamer_SOURCES = zmq_streamer.cpp +zmq_streamer_CXXFLAGS = -Wall -pedantic -Werror + + diff --git a/devices/zmq_streamer/zmq_streamer.cpp b/devices/zmq_streamer/zmq_streamer.cpp new file mode 100644 index 00000000..84e65691 --- /dev/null +++ b/devices/zmq_streamer/zmq_streamer.cpp @@ -0,0 +1,122 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../../bindings/cpp/zmq.hpp" +#include "../../foreign/xmlParser/xmlParser.cpp" + +int main (int argc, char *argv []) +{ + if (argc != 2) { + fprintf (stderr, "usage: zmq_streamer \n"); + return 1; + } + + XMLNode root = XMLNode::parseFile (argv [1]); + if (root.isEmpty ()) { + fprintf (stderr, "configuration file not found\n"); + return 1; + } + + if (strcmp (root.getName (), "streamer") != 0) { + fprintf (stderr, "root element in the configuration file should be " + "named 'streamer'\n"); + return 1; + } + + XMLNode in_node = root.getChildNode ("in"); + if (in_node.isEmpty ()) { + fprintf (stderr, "'in' node is missing in the configuration file\n"); + return 1; + } + + XMLNode out_node = root.getChildNode ("out"); + if (out_node.isEmpty ()) { + fprintf (stderr, "'out' node is missing in the configuration file\n"); + return 1; + } + + // TODO: make the number of I/O threads configurable. + zmq::context_t ctx (1, 1); + zmq::socket_t in_socket (ctx, ZMQ_UPSTREAM); + zmq::socket_t out_socket (ctx, ZMQ_DOWNSTREAM); + + int n = 0; + while (true) { + XMLNode bind = in_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = in_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + in_socket.connect (addr); + n++; + } + + n = 0; + while (true) { + XMLNode bind = out_node.getChildNode ("bind", n); + if (bind.isEmpty ()) + break; + const char *addr = bind.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'bind' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.bind (addr); + n++; + } + + n = 0; + while (true) { + XMLNode connect = out_node.getChildNode ("connect", n); + if (connect.isEmpty ()) + break; + const char *addr = connect.getAttribute ("addr"); + if (!addr) { + fprintf (stderr, "'connect' node is missing 'addr' attribute\n"); + return 1; + } + out_socket.connect (addr); + n++; + } + + zmq::message_t msg; + while (true) { + in_socket.recv (&msg); + out_socket.send (msg); + } + + return 0; +} diff --git a/man/man3/zmq_socket.3 b/man/man3/zmq_socket.3 index 8b819b5c..a73bba50 100644 --- a/man/man3/zmq_socket.3 +++ b/man/man3/zmq_socket.3 @@ -37,6 +37,15 @@ Socket to receive requests and send replies. This socket type allows only an alternated sequence of recv's and send's. Each send is routed to the peer that issued the last received request. +.IP "\fBZMQ_UPSTREAM\fP" +Socket to receive messages from up the stream. Messages are fair-queued +from among all the connected peers. Send function is not implemented for +this socket type. + +.IP "\fBZMQ_DOWNSTREAM\fP" +Socket to send messages down stream. Messages are load-balanced among all the +connected peers. Send function is not implemented for this socket type. + .SH RETURN VALUE Function returns socket handle is successful. Otherwise it returns NULL and sets errno to one of the values below. diff --git a/src/Makefile.am b/src/Makefile.am index 91fb5553..3d038b7e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -77,6 +77,7 @@ libzmq_la_SOURCES = app_thread.hpp \ decoder.hpp \ devpoll.hpp \ dispatcher.hpp \ + downstream.hpp \ encoder.hpp \ epoll.hpp \ err.hpp \ @@ -117,6 +118,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.hpp \ tcp_socket.hpp \ thread.hpp \ + upstream.hpp \ uuid.hpp \ windows.hpp \ wire.hpp \ @@ -135,6 +137,7 @@ libzmq_la_SOURCES = app_thread.hpp \ app_thread.cpp \ devpoll.cpp \ dispatcher.cpp \ + downstream.cpp \ epoll.cpp \ err.cpp \ fd_signaler.cpp \ @@ -162,6 +165,7 @@ libzmq_la_SOURCES = app_thread.hpp \ tcp_listener.cpp \ tcp_socket.cpp \ thread.cpp \ + upstream.cpp \ uuid.cpp \ ypollset.cpp \ zmq.cpp \ diff --git a/src/app_thread.cpp b/src/app_thread.cpp index fbda3357..a6718228 100644 --- a/src/app_thread.cpp +++ b/src/app_thread.cpp @@ -40,11 +40,13 @@ #include "pipe.hpp" #include "config.hpp" #include "socket_base.hpp" +#include "p2p.hpp" #include "pub.hpp" #include "sub.hpp" #include "req.hpp" #include "rep.hpp" -#include "p2p.hpp" +#include "upstream.hpp" +#include "downstream.hpp" // If the RDTSC is available we use it to prevent excessive // polling for commands. The nice thing here is that it will work on any @@ -158,6 +160,9 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) { socket_base_t *s = NULL; switch (type_) { + case ZMQ_P2P: + s = new p2p_t (this); + break; case ZMQ_PUB: s = new pub_t (this); break; @@ -170,8 +175,11 @@ zmq::socket_base_t *zmq::app_thread_t::create_socket (int type_) case ZMQ_REP: s = new rep_t (this); break; - case ZMQ_P2P: - s = new p2p_t (this); + case ZMQ_UPSTREAM: + s = new upstream_t (this); + break; + case ZMQ_DOWNSTREAM: + s = new downstream_t (this); break; default: // TODO: This should be EINVAL. diff --git a/src/downstream.cpp b/src/downstream.cpp new file mode 100644 index 00000000..4f994e6c --- /dev/null +++ b/src/downstream.cpp @@ -0,0 +1,131 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "downstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::downstream_t::downstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + current (0) +{ + options.requires_in = false; + options.requires_out = true; +} + +zmq::downstream_t::~downstream_t () +{ +} + +void zmq::downstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (!inpipe_ && outpipe_); + pipes.push_back (outpipe_); +} + +void zmq::downstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + zmq_assert (pipe_); + pipes.erase (pipes.index (pipe_)); +} + +void zmq::downstream_t::xkill (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::downstream_t::xrevive (class reader_t *pipe_) +{ + // There are no inpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +int zmq::downstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special option for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::downstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + // If there are no pipes we cannot send the message. + if (pipes.empty ()) { + errno = EAGAIN; + return -1; + } + + // Move to the next pipe (load-balancing). + current++; + if (current >= pipes.size ()) + current = 0; + + // TODO: Implement this once queue limits are in-place. + zmq_assert (pipes [current]->check_write (zmq_msg_size (msg_))); + + // Push message to the selected pipe. + pipes [current]->write (msg_); + pipes [current]->flush (); + + // Detach the message from the data buffer. + int rc = zmq_msg_init (msg_); + zmq_assert (rc == 0); + + return 0; +} + +int zmq::downstream_t::xflush () +{ + // TODO: Maybe there's a point in flushing messages downstream. + // It may be useful in the case where number of messages in a single + // transaction is much greater than the number of attached pipes. + errno = ENOTSUP; + return -1; + +} + +int zmq::downstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::downstream_t::xhas_in () +{ + return false; +} + +bool zmq::downstream_t::xhas_out () +{ + // TODO: Modify this code once pipe limits are in place. + return true; +} + + diff --git a/src/downstream.hpp b/src/downstream.hpp new file mode 100644 index 00000000..c6a7ed83 --- /dev/null +++ b/src/downstream.hpp @@ -0,0 +1,64 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DOWNSTREAM_HPP_INCLUDED__ +#define __ZMQ_DOWNSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class downstream_t : public socket_base_t + { + public: + + downstream_t (class app_thread_t *parent_); + ~downstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // List of outbound pipes. + typedef yarray_t pipes_t; + pipes_t pipes; + + // Points to the last pipe that the most recent message was sent to. + pipes_t::size_type current; + + downstream_t (const downstream_t&); + void operator = (const downstream_t&); + }; + +} + +#endif diff --git a/src/p2p.hpp b/src/p2p.hpp index 1fd7e349..32d77557 100644 --- a/src/p2p.hpp +++ b/src/p2p.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_P2P_INCLUDED__ -#define __ZMQ_P2P_INCLUDED__ +#ifndef __ZMQ_P2P_HPP_INCLUDED__ +#define __ZMQ_P2P_HPP_INCLUDED__ #include "socket_base.hpp" diff --git a/src/pub.hpp b/src/pub.hpp index b3e868db..9dbcb4ad 100644 --- a/src/pub.hpp +++ b/src/pub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_PUB_INCLUDED__ -#define __ZMQ_PUB_INCLUDED__ +#ifndef __ZMQ_PUB_HPP_INCLUDED__ +#define __ZMQ_PUB_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/rep.cpp b/src/rep.cpp index 7599cb54..f06f4ab8 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -71,7 +71,7 @@ void zmq::rep_t::xdetach_inpipe (class reader_t *pipe_) } // Now both inpipe and outpipe are detached. Remove them from the lists. - if (in_pipes.index (pipe_) < active) + if (index < active) active--; in_pipes.erase (index); out_pipes.erase (index); diff --git a/src/rep.hpp b/src/rep.hpp index 3e87dc1f..0b327aa9 100644 --- a/src/rep.hpp +++ b/src/rep.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_REP_INCLUDED__ -#define __ZMQ_REP_INCLUDED__ +#ifndef __ZMQ_REP_HPP_INCLUDED__ +#define __ZMQ_REP_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/req.hpp b/src/req.hpp index 86554b50..756cc426 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_REQ_INCLUDED__ -#define __ZMQ_REQ_INCLUDED__ +#ifndef __ZMQ_REQ_HPP_INCLUDED__ +#define __ZMQ_REQ_HPP_INCLUDED__ #include "socket_base.hpp" #include "yarray.hpp" diff --git a/src/sub.hpp b/src/sub.hpp index fb881dc7..8ad8a181 100644 --- a/src/sub.hpp +++ b/src/sub.hpp @@ -17,8 +17,8 @@ along with this program. If not, see . */ -#ifndef __ZMQ_SUB_INCLUDED__ -#define __ZMQ_SUB_INCLUDED__ +#ifndef __ZMQ_SUB_HPP_INCLUDED__ +#define __ZMQ_SUB_HPP_INCLUDED__ #include #include diff --git a/src/upstream.cpp b/src/upstream.cpp new file mode 100644 index 00000000..da202f84 --- /dev/null +++ b/src/upstream.cpp @@ -0,0 +1,143 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#include "../bindings/c/zmq.h" + +#include "upstream.hpp" +#include "err.hpp" +#include "pipe.hpp" + +zmq::upstream_t::upstream_t (class app_thread_t *parent_) : + socket_base_t (parent_), + active (0), + current (0) +{ + options.requires_in = true; + options.requires_out = false; +} + +zmq::upstream_t::~upstream_t () +{ +} + +void zmq::upstream_t::xattach_pipes (class reader_t *inpipe_, + class writer_t *outpipe_) +{ + zmq_assert (inpipe_ && !outpipe_); + + pipes.push_back (inpipe_); + pipes.swap (active, pipes.size () - 1); + active++; +} + +void zmq::upstream_t::xdetach_inpipe (class reader_t *pipe_) +{ + // Remove the pipe from the list; adjust number of active pipes + // accordingly. + zmq_assert (pipe_); + pipes_t::size_type index = pipes.index (pipe_); + if (index < active) + active--; + pipes.erase (index); +} + +void zmq::upstream_t::xdetach_outpipe (class writer_t *pipe_) +{ + // There are no outpipes, so this function shouldn't be called at all. + zmq_assert (false); +} + +void zmq::upstream_t::xkill (class reader_t *pipe_) +{ + // Move the pipe to the list of inactive pipes. + active--; + pipes.swap (pipes.index (pipe_), active); +} + +void zmq::upstream_t::xrevive (class reader_t *pipe_) +{ + // Move the pipe to the list of active pipes. + pipes.swap (pipes.index (pipe_), active); + active++; +} + +int zmq::upstream_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + // No special options for this socket type. + errno = EINVAL; + return -1; +} + +int zmq::upstream_t::xsend (zmq_msg_t *msg_, int flags_) +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xflush () +{ + errno = ENOTSUP; + return -1; +} + +int zmq::upstream_t::xrecv (zmq_msg_t *msg_, int flags_) +{ + // Deallocate old content of the message. + zmq_msg_close (msg_); + + // Round-robin over the pipes to get next message. + for (int count = active; count != 0; count--) { + bool fetched = pipes [current]->read (msg_); + current++; + if (current >= active) + current = 0; + if (fetched) + return 0; + } + + // No message is available. Initialise the output parameter + // to be a 0-byte message. + zmq_msg_init (msg_); + errno = EAGAIN; + return -1; +} + +bool zmq::upstream_t::xhas_in () +{ + // Note that messing with current doesn't break the fairness of fair + // queueing algorithm. If there are no messages available current will + // get back to its original value. Otherwise it'll point to the first + // pipe holding messages, skipping only pipes with no messages available. + for (int count = active; count != 0; count--) { + if (pipes [current]->check_read ()) + return true; + current++; + if (current >= active) + current = 0; + } + + return false; +} + +bool zmq::upstream_t::xhas_out () +{ + return false; +} + diff --git a/src/upstream.hpp b/src/upstream.hpp new file mode 100644 index 00000000..0e2f5ada --- /dev/null +++ b/src/upstream.hpp @@ -0,0 +1,69 @@ +/* + Copyright (c) 2007-2009 FastMQ Inc. + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the Lesser GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + Lesser GNU General Public License for more details. + + You should have received a copy of the Lesser GNU General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_UPSTREAM_HPP_INCLUDED__ +#define __ZMQ_UPSTREAM_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "yarray.hpp" + +namespace zmq +{ + + class upstream_t : public socket_base_t + { + public: + + upstream_t (class app_thread_t *parent_); + ~upstream_t (); + + // Overloads of functions from socket_base_t. + void xattach_pipes (class reader_t *inpipe_, class writer_t *outpipe_); + void xdetach_inpipe (class reader_t *pipe_); + void xdetach_outpipe (class writer_t *pipe_); + void xkill (class reader_t *pipe_); + void xrevive (class reader_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + int xsend (zmq_msg_t *msg_, int flags_); + int xflush (); + int xrecv (zmq_msg_t *msg_, int flags_); + bool xhas_in (); + bool xhas_out (); + + private: + + // Inbound pipes. + typedef yarray_t pipes_t; + pipes_t pipes; + + // Number of active pipes. All the active pipes are located at the + // beginning of the pipes array. + pipes_t::size_type active; + + // Index of the next bound pipe to read a message from. + pipes_t::size_type current; + + upstream_t (const upstream_t&); + void operator = (const upstream_t&); + + }; + +} + +#endif