From 0319cb2cd16aa40911855a1765312886bf081db2 Mon Sep 17 00:00:00 2001 From: Martin Lucina Date: Thu, 2 Feb 2012 13:07:48 +0100 Subject: [PATCH] Fix data loss for PUB/SUB and unidirectional transports (LIBZMQ-268) With the introduction of subscription forwarding, the first message sent on a PUB socket using a unidirectional transport (e.g. PGM) is always lost due to the "subscribe to all" being done asynchronously. This patch fixes the problem and also refactors the code to have a single point where the "subscribe to all" is performed. Signed-off-by: Martin Lucina --- src/pair.cpp | 2 +- src/pair.hpp | 2 +- src/pgm_sender.cpp | 10 ---------- src/pull.cpp | 2 +- src/pull.hpp | 2 +- src/push.cpp | 2 +- src/push.hpp | 2 +- src/socket_base.cpp | 12 +++++++++--- src/socket_base.hpp | 5 +++-- src/xpub.cpp | 7 ++++++- src/xpub.hpp | 2 +- src/xrep.cpp | 2 +- src/xrep.hpp | 2 +- src/xreq.cpp | 2 +- src/xreq.hpp | 2 +- src/xsub.cpp | 2 +- src/xsub.hpp | 2 +- 17 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/pair.cpp b/src/pair.cpp index 6c652db0..82fd39b5 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -36,7 +36,7 @@ zmq::pair_t::~pair_t () zmq_assert (!pipe); } -void zmq::pair_t::xattach_pipe (pipe_t *pipe_) +void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (!pipe); pipe = pipe_; diff --git a/src/pair.hpp b/src/pair.hpp index fb447f1c..5deea184 100644 --- a/src/pair.hpp +++ b/src/pair.hpp @@ -42,7 +42,7 @@ namespace zmq ~pair_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 1a1ae9a9..1d2083d3 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -91,16 +91,6 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_) // Set POLLOUT for downlink_socket_handle. set_pollout (handle); - - // PGM is not able to pass subscriptions upstream, thus we have no idea - // what messages are peers interested in. Because of that we have to - // subscribe for all the messages. - msg_t msg; - msg.init_size (1); - *(unsigned char*) msg.data () = 1; - int rc = session_->write (&msg); - errno_assert (rc == 0); - session_->flush (); } void zmq::pgm_sender_t::unplug () diff --git a/src/pull.cpp b/src/pull.cpp index 6028118f..ee840d91 100644 --- a/src/pull.cpp +++ b/src/pull.cpp @@ -34,7 +34,7 @@ zmq::pull_t::~pull_t () { } -void zmq::pull_t::xattach_pipe (pipe_t *pipe_) +void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/pull.hpp b/src/pull.hpp index da3bee16..11c16b86 100644 --- a/src/pull.hpp +++ b/src/pull.hpp @@ -45,7 +45,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); void xread_activated (zmq::pipe_t *pipe_); diff --git a/src/push.cpp b/src/push.cpp index a0ed992b..2a29d353 100644 --- a/src/push.cpp +++ b/src/push.cpp @@ -34,7 +34,7 @@ zmq::push_t::~push_t () { } -void zmq::push_t::xattach_pipe (pipe_t *pipe_) +void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); lb.attach (pipe_); diff --git a/src/push.hpp b/src/push.hpp index edee19db..18eaf095 100644 --- a/src/push.hpp +++ b/src/push.hpp @@ -45,7 +45,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); void xwrite_activated (zmq::pipe_t *pipe_); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 72934193..5a7f1453 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -208,14 +208,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return 0; } -void zmq::socket_base_t::attach_pipe (pipe_t *pipe_) +void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_) { // First, register the pipe so that we can terminate it later on. pipe_->set_event_sink (this); pipes.push_back (pipe_); // Let the derived socket type know about new pipe. - xattach_pipe (pipe_); + xattach_pipe (pipe_, icanhasall_); // If the socket is already being closed, ask any new pipes to terminate // straight away. @@ -454,8 +454,14 @@ int zmq::socket_base_t::connect (const char *addr_) rc = pipepair (parents, pipes, hwms, delays); errno_assert (rc == 0); + // PGM does not support subscription forwarding; ask for all data to be + // sent to this pipe. + bool icanhasall = false; + if (protocol == "pgm" || protocol == "epgm") + icanhasall = true; + // Attach local end of the pipe to the socket object. - attach_pipe (pipes [0]); + attach_pipe (pipes [0], icanhasall); // Attach remote end of the pipe to the session object later on. session->attach_pipe (pipes [1]); diff --git a/src/socket_base.hpp b/src/socket_base.hpp index dda87f4a..b74f2511 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -103,7 +103,8 @@ namespace zmq // Concrete algorithms for the x- methods are to be defined by // individual socket types. - virtual void xattach_pipe (zmq::pipe_t *pipe_) = 0; + virtual void xattach_pipe (zmq::pipe_t *pipe_, + bool icanhasall_ = false) = 0; // The default implementation assumes there are no specific socket // options for the particular socket type. If not so, overload this @@ -158,7 +159,7 @@ namespace zmq int check_protocol (const std::string &protocol_); // Register the pipe with this socket. - void attach_pipe (zmq::pipe_t *pipe_); + void attach_pipe (zmq::pipe_t *pipe_, bool icanhasall_ = false); // Processes commands sent to this socket (if any). If timeout is -1, // returns only after at least one command was processed. diff --git a/src/xpub.cpp b/src/xpub.cpp index 5d7a97c4..4754b28d 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -37,11 +37,16 @@ zmq::xpub_t::~xpub_t () { } -void zmq::xpub_t::xattach_pipe (pipe_t *pipe_) +void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); dist.attach (pipe_); + // If icanhasall_ is specified, the caller would like to subscribe + // to all data on this pipe, implicitly. + if (icanhasall_) + subscriptions.add (NULL, 0, pipe_); + // The pipe is active when attached. Let's read the subscriptions from // it, if any. xread_activated (pipe_); diff --git a/src/xpub.hpp b/src/xpub.hpp index 3f80d4a3..3e6721b7 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -47,7 +47,7 @@ namespace zmq ~xpub_t (); // Implementations of virtual functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_ = false); int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); int xrecv (zmq::msg_t *msg_, int flags_); diff --git a/src/xrep.cpp b/src/xrep.cpp index 520fa243..4d9e5937 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -55,7 +55,7 @@ zmq::xrep_t::~xrep_t () prefetched_msg.close (); } -void zmq::xrep_t::xattach_pipe (pipe_t *pipe_) +void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); diff --git a/src/xrep.hpp b/src/xrep.hpp index 65bd5641..9fb19b7c 100644 --- a/src/xrep.hpp +++ b/src/xrep.hpp @@ -48,7 +48,7 @@ namespace zmq ~xrep_t (); // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (msg_t *msg_, int flags_); int xrecv (msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/xreq.cpp b/src/xreq.cpp index bf288eee..13809d9f 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -46,7 +46,7 @@ zmq::xreq_t::~xreq_t () prefetched_msg.close (); } -void zmq::xreq_t::xattach_pipe (pipe_t *pipe_) +void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/xreq.hpp b/src/xreq.hpp index 897c197c..4a213446 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -46,7 +46,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (zmq::msg_t *msg_, int flags_); int xrecv (zmq::msg_t *msg_, int flags_); bool xhas_in (); diff --git a/src/xsub.cpp b/src/xsub.cpp index 454dfb07..80ca1f2d 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -45,7 +45,7 @@ zmq::xsub_t::~xsub_t () errno_assert (rc == 0); } -void zmq::xsub_t::xattach_pipe (pipe_t *pipe_) +void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { zmq_assert (pipe_); fq.attach (pipe_); diff --git a/src/xsub.hpp b/src/xsub.hpp index 24f81572..fd199367 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -45,7 +45,7 @@ namespace zmq protected: // Overloads of functions from socket_base_t. - void xattach_pipe (zmq::pipe_t *pipe_); + void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_); int xsend (zmq::msg_t *msg_, int flags_); bool xhas_out (); int xrecv (zmq::msg_t *msg_, int flags_);