mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-18 11:39:02 +02:00
Revert "Merge branch 'master' of github.com:ianbarber/libzmq"
This reverts commit3345902979
, reversing changes made to889b0e6f29
.
This commit is contained in:
@@ -111,7 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
|
||||
io_object_t (io_thread_),
|
||||
connect (connect_),
|
||||
pipe (NULL),
|
||||
incomplete_detach (0),
|
||||
incomplete_in (false),
|
||||
pending (false),
|
||||
engine (NULL),
|
||||
@@ -230,19 +229,8 @@ void zmq::session_base_t::clean_pipes ()
|
||||
|
||||
void zmq::session_base_t::terminated (pipe_t *pipe_)
|
||||
{
|
||||
// Drop the reference to the deallocated pipe if required.
|
||||
zmq_assert (pipe == pipe_ || incomplete_detach > 0);
|
||||
|
||||
// If we still have pipes outstanding, decrement.
|
||||
// This will only have been set in a disconnect situation
|
||||
// where delay_attach_on_connect is 1.
|
||||
if (incomplete_detach > 0)
|
||||
incomplete_detach --;
|
||||
|
||||
// If there are still extra detached pipes, don't continue
|
||||
if (incomplete_detach > 0)
|
||||
return;
|
||||
|
||||
// Drop the reference to the deallocated pipe.
|
||||
zmq_assert (pipe == pipe_);
|
||||
pipe = NULL;
|
||||
|
||||
// If we are waiting for pending messages to be sent, at this point
|
||||
@@ -254,10 +242,6 @@ void zmq::session_base_t::terminated (pipe_t *pipe_)
|
||||
|
||||
void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
||||
{
|
||||
// Skip activating if we're detaching this pipe
|
||||
if (incomplete_detach > 0 && pipe_ != pipe)
|
||||
return;
|
||||
|
||||
zmq_assert (pipe == pipe_);
|
||||
|
||||
if (likely (engine != NULL))
|
||||
@@ -268,10 +252,6 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
||||
|
||||
void zmq::session_base_t::write_activated (pipe_t *pipe_)
|
||||
{
|
||||
// Skip activating if we're detaching this pipe
|
||||
if (incomplete_detach > 0 && pipe_ != pipe)
|
||||
return;
|
||||
|
||||
zmq_assert (pipe == pipe_);
|
||||
|
||||
if (engine)
|
||||
@@ -311,7 +291,7 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
|
||||
zmq_assert (engine_ != NULL);
|
||||
|
||||
// Create the pipe if it does not exist yet.
|
||||
if ((!pipe || incomplete_detach > 0) && !is_terminating ()) {
|
||||
if (!pipe && !is_terminating ()) {
|
||||
object_t *parents [2] = {this, socket};
|
||||
pipe_t *pipes [2] = {NULL, NULL};
|
||||
int hwms [2] = {options.rcvhwm, options.sndhwm};
|
||||
@@ -328,6 +308,9 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
|
||||
|
||||
// Ask socket to plug into the remote end of the pipe.
|
||||
send_bind (socket, pipes [1]);
|
||||
|
||||
// Store the outpipe for disconnect situations
|
||||
outpipe = pipes [1];
|
||||
}
|
||||
|
||||
// Plug in the engine.
|
||||
@@ -415,17 +398,8 @@ void zmq::session_base_t::detached ()
|
||||
return;
|
||||
}
|
||||
|
||||
// For delayed connect situations, terminate the pipe
|
||||
// and reestablish later on
|
||||
if (pipe && options.delay_attach_on_connect == 1
|
||||
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
|
||||
pipe->hiccup ();
|
||||
pipe->terminate (false);
|
||||
incomplete_detach ++;
|
||||
}
|
||||
|
||||
reset ();
|
||||
|
||||
|
||||
// Reconnect.
|
||||
if (options.reconnect_ivl != -1)
|
||||
start_connecting (true);
|
||||
@@ -434,6 +408,14 @@ void zmq::session_base_t::detached ()
|
||||
// the socket object to resend all the subscriptions.
|
||||
if (pipe && (options.type == ZMQ_SUB || options.type == ZMQ_XSUB))
|
||||
pipe->hiccup ();
|
||||
|
||||
// For delayed connect situations, terminate the pipe
|
||||
// and reestablish later on
|
||||
if (pipe && options.delay_attach_on_connect == 1
|
||||
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
|
||||
pipe->terminate (false);
|
||||
outpipe->terminate (false);
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::session_base_t::start_connecting (bool wait_)
|
||||
|
Reference in New Issue
Block a user