From c13f1d52ff9ed51a651ad8bcc9379e82d9318e86 Mon Sep 17 00:00:00 2001 From: Ian Barber Date: Mon, 4 Jun 2012 10:27:16 +0100 Subject: [PATCH] On the advice of Martin Hurton, removed the new command type and just terminated the pipe in a reconnect situation, and notified the socket of the same. This handles the blocking properly, but at the cost of potentially losing in flight messages. However, this is a reasonable trade off given how much simpler it makes the patch. --- src/command.hpp | 6 ------ src/object.cpp | 19 ------------------- src/object.hpp | 2 -- src/session_base.cpp | 13 ++++++++----- src/socket_base.cpp | 11 ----------- src/socket_base.hpp | 3 --- 6 files changed, 8 insertions(+), 46 deletions(-) diff --git a/src/command.hpp b/src/command.hpp index b70b417c..83783695 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -47,7 +47,6 @@ namespace zmq own, attach, bind, - detach, activate_read, activate_write, hiccup, @@ -88,11 +87,6 @@ namespace zmq struct { zmq::pipe_t *pipe; } bind; - - // Sent from session to socket to disconnect a pipe - struct { - zmq::pipe_t *pipe; - } detach; // Sent by pipe writer to inform dormant pipe reader that there // are messages in the pipe. diff --git a/src/object.cpp b/src/object.cpp index 37db6f0c..80784d7a 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -92,11 +92,6 @@ void zmq::object_t::process_command (command_t &cmd_) process_seqnum (); break; - case command_t::detach: - process_detach (cmd_.args.detach.pipe); - process_seqnum (); - break; - case command_t::hiccup: process_hiccup (cmd_.args.hiccup.pipe); break; @@ -216,15 +211,6 @@ void zmq::object_t::send_bind (own_t *destination_, pipe_t *pipe_, send_command (cmd); } -void zmq::object_t::send_detach (own_t *destination_, pipe_t *pipe_) -{ - command_t cmd; - cmd.destination = destination_; - cmd.type = command_t::detach; - cmd.args.detach.pipe = pipe_; - send_command (cmd); -} - void zmq::object_t::send_activate_read (pipe_t *destination_) { command_t cmd; @@ -345,11 +331,6 @@ void zmq::object_t::process_bind (pipe_t *pipe_) zmq_assert (false); } -void zmq::object_t::process_detach (pipe_t *pipe_) -{ - zmq_assert (false); -} - void zmq::object_t::process_activate_read () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index e088c14b..932cea7d 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -78,7 +78,6 @@ namespace zmq zmq::i_engine *engine_, bool inc_seqnum_ = true); void send_bind (zmq::own_t *destination_, zmq::pipe_t *pipe_, bool inc_seqnum_ = true); - void send_detach (own_t *destination_, pipe_t *pipe_); void send_activate_read (zmq::pipe_t *destination_); void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_); @@ -100,7 +99,6 @@ namespace zmq virtual void process_own (zmq::own_t *object_); virtual void process_attach (zmq::i_engine *engine_); virtual void process_bind (zmq::pipe_t *pipe_); - virtual void process_detach (zmq::pipe_t *pipe_); virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); virtual void process_hiccup (void *pipe_); diff --git a/src/session_base.cpp b/src/session_base.cpp index aa1fc8b9..aba42657 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -411,11 +411,14 @@ void zmq::session_base_t::detached () // the socket object to resend all the subscriptions. if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB)) pipe->hiccup (); - - // For delayed connect situations, hiccup the socket to have it - // pause usage of this pipe - if (outpipe && options.delay_attach_on_connect == 1) - send_detach(socket, outpipe); + + // For delayed connect situations, terminate the pipe + // and reestablish later on + if (pipe && options.delay_attach_on_connect == 1) { + pipe->terminate (false); + socket->terminated (pipe); + pipe = NULL; + } } void zmq::session_base_t::start_connecting (bool wait_) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index f0e6e751..50d1aa6e 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -876,17 +876,6 @@ void zmq::socket_base_t::process_destroy () destroyed = true; } -void zmq::socket_base_t::process_detach (pipe_t *pipe_) -{ - // If we are blocking connecting threads, drop this one - if (options.delay_attach_on_connect == 1) { - zmq_assert (pipe_); - pipes.erase (pipe_); - // Let derived sockets know we're ditching this pipe - xterminated (pipe_); - } -} - int zmq::socket_base_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index ced475eb..ab630e35 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -185,9 +185,6 @@ namespace zmq void process_bind (zmq::pipe_t *pipe_); void process_term (int linger_); - // Allow blocking reconnecting pipes - void process_detach (pipe_t *pipe_); - // Socket's mailbox object. mailbox_t mailbox;