diff --git a/src/session_base.cpp b/src/session_base.cpp index 0e681ed2..10935a1b 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -111,6 +111,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, io_object_t (io_thread_), connect (connect_), pipe (NULL), + incomplete_detach (0), incomplete_in (false), pending (false), engine (NULL), @@ -230,26 +231,31 @@ void zmq::session_base_t::clean_pipes () void zmq::session_base_t::terminated (pipe_t *pipe_) { // Drop the reference to the deallocated pipe if required. - zmq_assert (pipe == pipe_ || incomplete_pipes.size () > 0); + zmq_assert (pipe == pipe_ || incomplete_detach > 0); + + // If we still have pipes outstanding, decrement. + // This will only have been set in a disconnect situation + // where delay_attach_on_connect is 1. + if (incomplete_detach > 0) + incomplete_detach --; - if (pipe == pipe_) - // If this is our current pipe, remove it - pipe = NULL; - else - // Remove the pipe from the detached pipes set - incomplete_pipes.erase (pipe_); + // If there are still extra detached pipes, don't continue + if (incomplete_detach > 0) + return; + + pipe = NULL; // If we are waiting for pending messages to be sent, at this point // we are sure that there will be no more messages and we can proceed // with termination safely. - if (pending && !pipe && incomplete_pipes.size () == 0) + if (pending) proceed_with_term (); } void zmq::session_base_t::read_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (incomplete_pipes.size () > 0 && pipe_ != pipe) + if (incomplete_detach > 0 && pipe_ != pipe) return; zmq_assert (pipe == pipe_); @@ -263,7 +269,7 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::write_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (incomplete_pipes.size () > 0 && pipe_ != pipe) + if (incomplete_detach > 0 && pipe_ != pipe) return; zmq_assert (pipe == pipe_); @@ -305,7 +311,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_) zmq_assert (engine_ != NULL); // Create the pipe if it does not exist yet. - if (!pipe && !is_terminating ()) { + if ((!pipe || incomplete_detach > 0) && !is_terminating ()) { object_t *parents [2] = {this, socket}; pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; @@ -415,8 +421,7 @@ void zmq::session_base_t::detached () && addr->protocol != "pgm" && addr->protocol != "epgm") { pipe->hiccup (); pipe->terminate (false); - incomplete_pipes.insert (pipe); - pipe = NULL; + incomplete_detach ++; } reset (); diff --git a/src/session_base.hpp b/src/session_base.hpp index 7b9f3fc6..73b5cf41 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -104,8 +104,8 @@ namespace zmq // Pipe connecting the session to its socket. zmq::pipe_t *pipe; - // This set is added to with pipes we are disconnecting, but haven't yet completed - std::set incomplete_pipes; + // This flag is set if we are disconnecting, but haven't yet completed + int incomplete_detach; // This flag is true if the remainder of the message being processed // is still in the in pipe. diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index d9b40f6f..13688093 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -33,6 +33,8 @@ static void *server (void *c) char buffer[16]; int rc, val; + shoulddie = *(long *)sd; + context = zmq_init (1); assert (context);