diff --git a/src/session_base.cpp b/src/session_base.cpp index 10935a1b..0e681ed2 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -111,7 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, io_object_t (io_thread_), connect (connect_), pipe (NULL), - incomplete_detach (0), incomplete_in (false), pending (false), engine (NULL), @@ -231,31 +230,26 @@ void zmq::session_base_t::clean_pipes () void zmq::session_base_t::terminated (pipe_t *pipe_) { // Drop the reference to the deallocated pipe if required. - zmq_assert (pipe == pipe_ || incomplete_detach > 0); - - // If we still have pipes outstanding, decrement. - // This will only have been set in a disconnect situation - // where delay_attach_on_connect is 1. - if (incomplete_detach > 0) - incomplete_detach --; + zmq_assert (pipe == pipe_ || incomplete_pipes.size () > 0); - // If there are still extra detached pipes, don't continue - if (incomplete_detach > 0) - return; - - pipe = NULL; + if (pipe == pipe_) + // If this is our current pipe, remove it + pipe = NULL; + else + // Remove the pipe from the detached pipes set + incomplete_pipes.erase (pipe_); // If we are waiting for pending messages to be sent, at this point // we are sure that there will be no more messages and we can proceed // with termination safely. - if (pending) + if (pending && !pipe && incomplete_pipes.size () == 0) proceed_with_term (); } void zmq::session_base_t::read_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (incomplete_detach > 0 && pipe_ != pipe) + if (incomplete_pipes.size () > 0 && pipe_ != pipe) return; zmq_assert (pipe == pipe_); @@ -269,7 +263,7 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::write_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (incomplete_detach > 0 && pipe_ != pipe) + if (incomplete_pipes.size () > 0 && pipe_ != pipe) return; zmq_assert (pipe == pipe_); @@ -311,7 +305,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_) zmq_assert (engine_ != NULL); // Create the pipe if it does not exist yet. - if ((!pipe || incomplete_detach > 0) && !is_terminating ()) { + if (!pipe && !is_terminating ()) { object_t *parents [2] = {this, socket}; pipe_t *pipes [2] = {NULL, NULL}; int hwms [2] = {options.rcvhwm, options.sndhwm}; @@ -421,7 +415,8 @@ void zmq::session_base_t::detached () && addr->protocol != "pgm" && addr->protocol != "epgm") { pipe->hiccup (); pipe->terminate (false); - incomplete_detach ++; + incomplete_pipes.insert (pipe); + pipe = NULL; } reset (); diff --git a/src/session_base.hpp b/src/session_base.hpp index 73b5cf41..7b9f3fc6 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -104,8 +104,8 @@ namespace zmq // Pipe connecting the session to its socket. zmq::pipe_t *pipe; - // This flag is set if we are disconnecting, but haven't yet completed - int incomplete_detach; + // This set is added to with pipes we are disconnecting, but haven't yet completed + std::set incomplete_pipes; // This flag is true if the remainder of the message being processed // is still in the in pipe. diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index 13688093..d9b40f6f 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -33,8 +33,6 @@ static void *server (void *c) char buffer[16]; int rc, val; - shoulddie = *(long *)sd; - context = zmq_init (1); assert (context);