From f0920caf0226f724e46f0b843b72841be3dcbc1f Mon Sep 17 00:00:00 2001 From: Ian Barber Date: Tue, 12 Jun 2012 14:50:50 +0100 Subject: [PATCH] Revert "On the advice of Martin Hurton, removed the new command type and just terminated the pipe in a reconnect situation, and notified the socket of the same. This handles the blocking properly, but at the cost of potentially losing in flight messages. However, this is a reasonable trade off given how much simpler it makes the patch." This reverts commit c13f1d52ff9ed51a651ad8bcc9379e82d9318e86. --- src/command.hpp | 6 ++++++ src/object.cpp | 19 +++++++++++++++++++ src/object.hpp | 2 ++ src/session_base.cpp | 13 +++++-------- src/socket_base.cpp | 11 +++++++++++ src/socket_base.hpp | 3 +++ 6 files changed, 46 insertions(+), 8 deletions(-) diff --git a/src/command.hpp b/src/command.hpp index 83783695..b70b417c 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -47,6 +47,7 @@ namespace zmq own, attach, bind, + detach, activate_read, activate_write, hiccup, @@ -87,6 +88,11 @@ namespace zmq struct { zmq::pipe_t *pipe; } bind; + + // Sent from session to socket to disconnect a pipe + struct { + zmq::pipe_t *pipe; + } detach; // Sent by pipe writer to inform dormant pipe reader that there // are messages in the pipe. diff --git a/src/object.cpp b/src/object.cpp index 80784d7a..37db6f0c 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -92,6 +92,11 @@ void zmq::object_t::process_command (command_t &cmd_) process_seqnum (); break; + case command_t::detach: + process_detach (cmd_.args.detach.pipe); + process_seqnum (); + break; + case command_t::hiccup: process_hiccup (cmd_.args.hiccup.pipe); break; @@ -211,6 +216,15 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, send_command (cmd); } +void zmq::object_t::send_detach (own_t *destination_, pipe_t *pipe_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::detach; + cmd.args.detach.pipe = pipe_; + send_command (cmd); +} + void zmq::object_t::send_activate_read (pipe_t *destination_) { command_t cmd; @@ -331,6 +345,11 @@ void zmq::object_t::process_bind (pipe_t *pipe_) zmq_assert (false); } +void zmq::object_t::process_detach (pipe_t *pipe_) +{ + zmq_assert (false); +} + void zmq::object_t::process_activate_read () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 932cea7d..e088c14b 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -78,6 +78,7 @@ namespace zmq zmq::i_engine *engine_, bool inc_seqnum_ = true); void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true); + void send_detach (own_t *destination_, pipe_t *pipe_); void send_activate_read (zmq::pipe_t *destination_); void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_); @@ -99,6 +100,7 @@ namespace zmq virtual void process_own (zmq::own_t *object_); virtual void process_attach (zmq::i_engine *engine_); virtual void process_bind (zmq::pipe_t *pipe_); + virtual void process_detach (zmq::pipe_t *pipe_); virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); virtual void process_hiccup (void *pipe_); diff --git a/src/session_base.cpp b/src/session_base.cpp index aba42657..aa1fc8b9 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -411,14 +411,11 @@ void zmq::session_base_t::detached () // the socket object to resend all the subscriptions. if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) pipe->hiccup (); - - // For delayed connect situations, terminate the pipe - // and reestablish later on - if (pipe && options.delay_attach_on_connect == 1) { - pipe->terminate (false); - socket->terminated (pipe); - pipe = NULL; - } + + // For delayed connect situations, hiccup the socket to have it + // pause usage of this pipe + if (outpipe && options.delay_attach_on_connect == 1) + send_detach(socket, outpipe); } void zmq::session_base_t::start_connecting (bool wait_) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 50d1aa6e..f0e6e751 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -876,6 +876,17 @@ void zmq::socket_base_t::process_destroy () destroyed = true; } +void zmq::socket_base_t::process_detach (pipe_t *pipe_) +{ + // If we are blocking connecting threads, drop this one + if (options.delay_attach_on_connect == 1) { + zmq_assert (pipe_); + pipes.erase (pipe_); + // Let derived sockets know we're ditching this pipe + xterminated (pipe_); + } +} + int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index ab630e35..ced475eb 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -185,6 +185,9 @@ namespace zmq void process_bind (zmq::pipe_t *pipe_); void process_term (int linger_); + // Allow blocking reconnecting pipes + void process_detach (pipe_t *pipe_); + // Socket's mailbox object. mailbox_t mailbox;