Revert "As Martin pointed out, there is a race condition in the old code where a pipe could start shutting down after disconnection, but the new one could connect first. This connection would not get a pipe created for it, so the messages could never flow. The simplest way round this would be a flag, but it is possibly for a very bouncy but fast connection to go up and down twice I imagine, so instead I have added a counter. This starts at zero, and will null out the pipe if terminate is called while it is zero. On a disconnect situation the counter is incremented, and the pipe is the not nulled if the value is non zero. In the terminated function it is decremented for each pipe that is shut down, and the assertion that the terminated pipe == the current pipe is skipped while it is non-zero. This should deal with the race condition and not allow any extra terminated() calls without hitting the assertion."

This reverts commit a5f7300da6.
This commit is contained in:
Ian Barber
2012-06-12 14:45:14 +01:00
parent 19da88be67
commit c9926f6f24
2 changed files with 4 additions and 14 deletions

View File

@@ -111,7 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_object_t (io_thread_),
connect (connect_),
pipe (NULL),
incomplete_detach (0),
incomplete_in (false),
pending (false),
engine (NULL),
@@ -230,14 +229,9 @@ void zmq::session_base_t::clean_pipes ()
void zmq::session_base_t::terminated (pipe_t *pipe_)
{
// Drop the reference to the deallocated pipe if required.
zmq_assert (pipe == pipe_ || incomplete_detach > 0);
if (incomplete_detach > 0)
incomplete_detach --;
if ( incomplete_detach == 0 )
pipe = NULL;
// Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_);
pipe = NULL;
// If we are waiting for pending messages to be sent, at this point
// we are sure that there will be no more messages and we can proceed
@@ -297,7 +291,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
zmq_assert (engine_ != NULL);
// Create the pipe if it does not exist yet.
if ((!pipe || incomplete_detach > 0) && !is_terminating ()) {
if (!pipe && !is_terminating ()) {
object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
@@ -407,7 +401,6 @@ void zmq::session_base_t::detached ()
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
pipe->hiccup ();
pipe->terminate (false);
incomplete_detach ++;
}
reset ();