As Martin pointed out, there is a race condition in the old code where a pipe could start shutting down after disconnection, but the new one could connect first. This connection would not get a pipe created for it, so the messages could never flow. The simplest way round this would be a flag, but it is possibly for a very bouncy but fast connection to go up and down twice I imagine, so instead I have added a counter. This starts at zero, and will null out the pipe if terminate is called while it is zero. On a disconnect situation the counter is incremented, and the pipe is the not nulled if the value is non zero. In the terminated function it is decremented for each pipe that is shut down, and the assertion that the terminated pipe == the current pipe is skipped while it is non-zero. This should deal with the race condition and not allow any extra terminated() calls without hitting the assertion.

This commit is contained in:
Ian Barber 2012-06-10 19:57:02 +01:00
parent 841cf69eb7
commit a5f7300da6
2 changed files with 14 additions and 4 deletions

View File

@ -111,6 +111,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_object_t (io_thread_),
connect (connect_),
pipe (NULL),
incomplete_detach (0),
incomplete_in (false),
pending (false),
engine (NULL),
@ -229,8 +230,13 @@ void zmq::session_base_t::clean_pipes ()
void zmq::session_base_t::terminated (pipe_t *pipe_)
{
// Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_);
// Drop the reference to the deallocated pipe if required.
zmq_assert (pipe == pipe_ || incomplete_detach > 0);
if (incomplete_detach > 0)
incomplete_detach --;
if ( incomplete_detach == 0 )
pipe = NULL;
// If we are waiting for pending messages to be sent, at this point
@ -291,7 +297,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
zmq_assert (engine_ != NULL);
// Create the pipe if it does not exist yet.
if (!pipe && !is_terminating ()) {
if ((!pipe || incomplete_detach > 0) && !is_terminating ()) {
object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
@ -401,6 +407,7 @@ void zmq::session_base_t::detached ()
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
pipe->hiccup ();
pipe->terminate (false);
incomplete_detach ++;
}
reset ();

View File

@ -104,6 +104,9 @@ namespace zmq
// Pipe connecting the session to its socket.
zmq::pipe_t *pipe;
// This flag is set if we are disconnecting, but haven't yet completed
int incomplete_detach;
// This flag is true if the remainder of the message being processed
// is still in the in pipe.
bool incomplete_in;