Revert "Revert "Merge branch 'master' of github.com:ianbarber/libzmq""

This reverts commit 029d3dfae2.
This commit is contained in:
Ian Barber
2012-06-12 14:43:18 +01:00
parent bdd4e1351d
commit eb14890d23
15 changed files with 331 additions and 119 deletions

View File

@@ -111,6 +111,7 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
io_object_t (io_thread_),
connect (connect_),
pipe (NULL),
incomplete_detach (0),
incomplete_in (false),
pending (false),
engine (NULL),
@@ -229,8 +230,19 @@ void zmq::session_base_t::clean_pipes ()
void zmq::session_base_t::terminated (pipe_t *pipe_)
{
// Drop the reference to the deallocated pipe.
zmq_assert (pipe == pipe_);
// Drop the reference to the deallocated pipe if required.
zmq_assert (pipe == pipe_ || incomplete_detach > 0);
// If we still have pipes outstanding, decrement.
// This will only have been set in a disconnect situation
// where delay_attach_on_connect is 1.
if (incomplete_detach > 0)
incomplete_detach --;
// If there are still extra detached pipes, don't continue
if (incomplete_detach > 0)
return;
pipe = NULL;
// If we are waiting for pending messages to be sent, at this point
@@ -242,6 +254,10 @@ void zmq::session_base_t::terminated (pipe_t *pipe_)
void zmq::session_base_t::read_activated (pipe_t *pipe_)
{
// Skip activating if we're detaching this pipe
if (incomplete_detach > 0 && pipe_ != pipe)
return;
zmq_assert (pipe == pipe_);
if (likely (engine != NULL))
@@ -252,6 +268,10 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
void zmq::session_base_t::write_activated (pipe_t *pipe_)
{
// Skip activating if we're detaching this pipe
if (incomplete_detach > 0 && pipe_ != pipe)
return;
zmq_assert (pipe == pipe_);
if (engine)
@@ -291,7 +311,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
zmq_assert (engine_ != NULL);
// Create the pipe if it does not exist yet.
if (!pipe && !is_terminating ()) {
if ((!pipe || incomplete_detach > 0) && !is_terminating ()) {
object_t *parents [2] = {this, socket};
pipe_t *pipes [2] = {NULL, NULL};
int hwms [2] = {options.rcvhwm, options.sndhwm};
@@ -308,9 +328,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
// Ask socket to plug into the remote end of the pipe.
send_bind (socket, pipes [1]);
// Store the outpipe for disconnect situations
outpipe = pipes [1];
}
// Plug in the engine.
@@ -398,8 +415,17 @@ void zmq::session_base_t::detached ()
return;
}
reset ();
// For delayed connect situations, terminate the pipe
// and reestablish later on
if (pipe && options.delay_attach_on_connect == 1
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
pipe->hiccup ();
pipe->terminate (false);
incomplete_detach ++;
}
reset ();
// Reconnect.
if (options.reconnect_ivl != -1)
start_connecting (true);
@@ -408,14 +434,6 @@ void zmq::session_base_t::detached ()
// the socket object to resend all the subscriptions.
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
pipe->hiccup ();
// For delayed connect situations, terminate the pipe
// and reestablish later on
if (pipe && options.delay_attach_on_connect == 1
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
pipe->terminate (false);
outpipe->terminate (false);
}
}
void zmq::session_base_t::start_connecting (bool wait_)