mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-14 23:07:59 +02:00
Fix data loss for PUB/SUB and unidirectional transports (LIBZMQ-268)
With the introduction of subscription forwarding, the first message sent on a PUB socket using a unidirectional transport (e.g. PGM) is always lost due to the "subscribe to all" being done asynchronously. This patch fixes the problem and also refactors the code to have a single point where the "subscribe to all" is performed. Signed-off-by: Martin Lucina <martin@lucina.net>
This commit is contained in:
@@ -36,7 +36,7 @@ zmq::pair_t::~pair_t ()
|
|||||||
zmq_assert (!pipe);
|
zmq_assert (!pipe);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::pair_t::xattach_pipe (pipe_t *pipe_)
|
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||||
{
|
{
|
||||||
zmq_assert (!pipe);
|
zmq_assert (!pipe);
|
||||||
pipe = pipe_;
|
pipe = pipe_;
|
||||||
|
@@ -42,7 +42,7 @@ namespace zmq
|
|||||||
~pair_t ();
|
~pair_t ();
|
||||||
|
|
||||||
// Overloads of functions from socket_base_t.
|
// Overloads of functions from socket_base_t.
|
||||||
void xattach_pipe (zmq::pipe_t *pipe_);
|
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
|
||||||
int xsend (zmq::msg_t *msg_, int flags_);
|
int xsend (zmq::msg_t *msg_, int flags_);
|
||||||
int xrecv (zmq::msg_t *msg_, int flags_);
|
int xrecv (zmq::msg_t *msg_, int flags_);
|
||||||
bool xhas_in ();
|
bool xhas_in ();
|
||||||
|
@@ -91,16 +91,6 @@ void zmq::pgm_sender_t::plug (io_thread_t *io_thread_, session_base_t *session_)
|
|||||||
|
|
||||||
// Set POLLOUT for downlink_socket_handle.
|
// Set POLLOUT for downlink_socket_handle.
|
||||||
set_pollout (handle);
|
set_pollout (handle);
|
||||||
|
|
||||||
// PGM is not able to pass subscriptions upstream, thus we have no idea
|
|
||||||
// what messages are peers interested in. Because of that we have to
|
|
||||||
// subscribe for all the messages.
|
|
||||||
msg_t msg;
|
|
||||||
msg.init_size (1);
|
|
||||||
*(unsigned char*) msg.data () = 1;
|
|
||||||
int rc = session_->write (&msg);
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
session_->flush ();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::pgm_sender_t::unplug ()
|
void zmq::pgm_sender_t::unplug ()
|
||||||
|
@@ -34,7 +34,7 @@ zmq::pull_t::~pull_t ()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::pull_t::xattach_pipe (pipe_t *pipe_)
|
void zmq::pull_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||||
{
|
{
|
||||||
zmq_assert (pipe_);
|
zmq_assert (pipe_);
|
||||||
fq.attach (pipe_);
|
fq.attach (pipe_);
|
||||||
|
@@ -45,7 +45,7 @@ namespace zmq
|
|||||||
protected:
|
protected:
|
||||||
|
|
||||||
// Overloads of functions from socket_base_t.
|
// Overloads of functions from socket_base_t.
|
||||||
void xattach_pipe (zmq::pipe_t *pipe_);
|
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
|
||||||
int xrecv (zmq::msg_t *msg_, int flags_);
|
int xrecv (zmq::msg_t *msg_, int flags_);
|
||||||
bool xhas_in ();
|
bool xhas_in ();
|
||||||
void xread_activated (zmq::pipe_t *pipe_);
|
void xread_activated (zmq::pipe_t *pipe_);
|
||||||
|
@@ -34,7 +34,7 @@ zmq::push_t::~push_t ()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::push_t::xattach_pipe (pipe_t *pipe_)
|
void zmq::push_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||||
{
|
{
|
||||||
zmq_assert (pipe_);
|
zmq_assert (pipe_);
|
||||||
lb.attach (pipe_);
|
lb.attach (pipe_);
|
||||||
|
@@ -45,7 +45,7 @@ namespace zmq
|
|||||||
protected:
|
protected:
|
||||||
|
|
||||||
// Overloads of functions from socket_base_t.
|
// Overloads of functions from socket_base_t.
|
||||||
void xattach_pipe (zmq::pipe_t *pipe_);
|
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
|
||||||
int xsend (zmq::msg_t *msg_, int flags_);
|
int xsend (zmq::msg_t *msg_, int flags_);
|
||||||
bool xhas_out ();
|
bool xhas_out ();
|
||||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||||
|
@@ -208,14 +208,14 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_)
|
void zmq::socket_base_t::attach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||||
{
|
{
|
||||||
// First, register the pipe so that we can terminate it later on.
|
// First, register the pipe so that we can terminate it later on.
|
||||||
pipe_->set_event_sink (this);
|
pipe_->set_event_sink (this);
|
||||||
pipes.push_back (pipe_);
|
pipes.push_back (pipe_);
|
||||||
|
|
||||||
// Let the derived socket type know about new pipe.
|
// Let the derived socket type know about new pipe.
|
||||||
xattach_pipe (pipe_);
|
xattach_pipe (pipe_, icanhasall_);
|
||||||
|
|
||||||
// If the socket is already being closed, ask any new pipes to terminate
|
// If the socket is already being closed, ask any new pipes to terminate
|
||||||
// straight away.
|
// straight away.
|
||||||
@@ -454,8 +454,14 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
rc = pipepair (parents, pipes, hwms, delays);
|
rc = pipepair (parents, pipes, hwms, delays);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
|
// PGM does not support subscription forwarding; ask for all data to be
|
||||||
|
// sent to this pipe.
|
||||||
|
bool icanhasall = false;
|
||||||
|
if (protocol == "pgm" || protocol == "epgm")
|
||||||
|
icanhasall = true;
|
||||||
|
|
||||||
// Attach local end of the pipe to the socket object.
|
// Attach local end of the pipe to the socket object.
|
||||||
attach_pipe (pipes [0]);
|
attach_pipe (pipes [0], icanhasall);
|
||||||
|
|
||||||
// Attach remote end of the pipe to the session object later on.
|
// Attach remote end of the pipe to the session object later on.
|
||||||
session->attach_pipe (pipes [1]);
|
session->attach_pipe (pipes [1]);
|
||||||
|
@@ -103,7 +103,8 @@ namespace zmq
|
|||||||
|
|
||||||
// Concrete algorithms for the x- methods are to be defined by
|
// Concrete algorithms for the x- methods are to be defined by
|
||||||
// individual socket types.
|
// individual socket types.
|
||||||
virtual void xattach_pipe (zmq::pipe_t *pipe_) = 0;
|
virtual void xattach_pipe (zmq::pipe_t *pipe_,
|
||||||
|
bool icanhasall_ = false) = 0;
|
||||||
|
|
||||||
// The default implementation assumes there are no specific socket
|
// The default implementation assumes there are no specific socket
|
||||||
// options for the particular socket type. If not so, overload this
|
// options for the particular socket type. If not so, overload this
|
||||||
@@ -158,7 +159,7 @@ namespace zmq
|
|||||||
int check_protocol (const std::string &protocol_);
|
int check_protocol (const std::string &protocol_);
|
||||||
|
|
||||||
// Register the pipe with this socket.
|
// Register the pipe with this socket.
|
||||||
void attach_pipe (zmq::pipe_t *pipe_);
|
void attach_pipe (zmq::pipe_t *pipe_, bool icanhasall_ = false);
|
||||||
|
|
||||||
// Processes commands sent to this socket (if any). If timeout is -1,
|
// Processes commands sent to this socket (if any). If timeout is -1,
|
||||||
// returns only after at least one command was processed.
|
// returns only after at least one command was processed.
|
||||||
|
@@ -37,11 +37,16 @@ zmq::xpub_t::~xpub_t ()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_)
|
void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||||
{
|
{
|
||||||
zmq_assert (pipe_);
|
zmq_assert (pipe_);
|
||||||
dist.attach (pipe_);
|
dist.attach (pipe_);
|
||||||
|
|
||||||
|
// If icanhasall_ is specified, the caller would like to subscribe
|
||||||
|
// to all data on this pipe, implicitly.
|
||||||
|
if (icanhasall_)
|
||||||
|
subscriptions.add (NULL, 0, pipe_);
|
||||||
|
|
||||||
// The pipe is active when attached. Let's read the subscriptions from
|
// The pipe is active when attached. Let's read the subscriptions from
|
||||||
// it, if any.
|
// it, if any.
|
||||||
xread_activated (pipe_);
|
xread_activated (pipe_);
|
||||||
|
@@ -47,7 +47,7 @@ namespace zmq
|
|||||||
~xpub_t ();
|
~xpub_t ();
|
||||||
|
|
||||||
// Implementations of virtual functions from socket_base_t.
|
// Implementations of virtual functions from socket_base_t.
|
||||||
void xattach_pipe (zmq::pipe_t *pipe_);
|
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_ = false);
|
||||||
int xsend (zmq::msg_t *msg_, int flags_);
|
int xsend (zmq::msg_t *msg_, int flags_);
|
||||||
bool xhas_out ();
|
bool xhas_out ();
|
||||||
int xrecv (zmq::msg_t *msg_, int flags_);
|
int xrecv (zmq::msg_t *msg_, int flags_);
|
||||||
|
@@ -55,7 +55,7 @@ zmq::xrep_t::~xrep_t ()
|
|||||||
prefetched_msg.close ();
|
prefetched_msg.close ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_)
|
void zmq::xrep_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||||
{
|
{
|
||||||
zmq_assert (pipe_);
|
zmq_assert (pipe_);
|
||||||
|
|
||||||
|
@@ -48,7 +48,7 @@ namespace zmq
|
|||||||
~xrep_t ();
|
~xrep_t ();
|
||||||
|
|
||||||
// Overloads of functions from socket_base_t.
|
// Overloads of functions from socket_base_t.
|
||||||
void xattach_pipe (zmq::pipe_t *pipe_);
|
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
|
||||||
int xsend (msg_t *msg_, int flags_);
|
int xsend (msg_t *msg_, int flags_);
|
||||||
int xrecv (msg_t *msg_, int flags_);
|
int xrecv (msg_t *msg_, int flags_);
|
||||||
bool xhas_in ();
|
bool xhas_in ();
|
||||||
|
@@ -46,7 +46,7 @@ zmq::xreq_t::~xreq_t ()
|
|||||||
prefetched_msg.close ();
|
prefetched_msg.close ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::xreq_t::xattach_pipe (pipe_t *pipe_)
|
void zmq::xreq_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||||
{
|
{
|
||||||
zmq_assert (pipe_);
|
zmq_assert (pipe_);
|
||||||
fq.attach (pipe_);
|
fq.attach (pipe_);
|
||||||
|
@@ -46,7 +46,7 @@ namespace zmq
|
|||||||
protected:
|
protected:
|
||||||
|
|
||||||
// Overloads of functions from socket_base_t.
|
// Overloads of functions from socket_base_t.
|
||||||
void xattach_pipe (zmq::pipe_t *pipe_);
|
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
|
||||||
int xsend (zmq::msg_t *msg_, int flags_);
|
int xsend (zmq::msg_t *msg_, int flags_);
|
||||||
int xrecv (zmq::msg_t *msg_, int flags_);
|
int xrecv (zmq::msg_t *msg_, int flags_);
|
||||||
bool xhas_in ();
|
bool xhas_in ();
|
||||||
|
@@ -45,7 +45,7 @@ zmq::xsub_t::~xsub_t ()
|
|||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_)
|
void zmq::xsub_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
|
||||||
{
|
{
|
||||||
zmq_assert (pipe_);
|
zmq_assert (pipe_);
|
||||||
fq.attach (pipe_);
|
fq.attach (pipe_);
|
||||||
|
@@ -45,7 +45,7 @@ namespace zmq
|
|||||||
protected:
|
protected:
|
||||||
|
|
||||||
// Overloads of functions from socket_base_t.
|
// Overloads of functions from socket_base_t.
|
||||||
void xattach_pipe (zmq::pipe_t *pipe_);
|
void xattach_pipe (zmq::pipe_t *pipe_, bool icanhasall_);
|
||||||
int xsend (zmq::msg_t *msg_, int flags_);
|
int xsend (zmq::msg_t *msg_, int flags_);
|
||||||
bool xhas_out ();
|
bool xhas_out ();
|
||||||
int xrecv (zmq::msg_t *msg_, int flags_);
|
int xrecv (zmq::msg_t *msg_, int flags_);
|
||||||
|
Reference in New Issue
Block a user