mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-24 17:30:47 +02:00
Remove the extra outpipe handling as the session is quite capable of delaying the creation of the pipe until the connection has happened. Simply don't build the pipe, and let it do that automatically.
This commit is contained in:
@@ -111,7 +111,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
|
||||
io_object_t (io_thread_),
|
||||
connect (connect_),
|
||||
pipe (NULL),
|
||||
outpipe (NULL),
|
||||
incomplete_in (false),
|
||||
pending (false),
|
||||
engine (NULL),
|
||||
@@ -151,13 +150,6 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
|
||||
pipe->set_event_sink (this);
|
||||
}
|
||||
|
||||
void zmq::session_base_t::onconnect_attach_pipe (pipe_t *pipe_)
|
||||
{
|
||||
zmq_assert (!is_terminating ());
|
||||
zmq_assert (pipe_);
|
||||
outpipe = pipe_;
|
||||
}
|
||||
|
||||
int zmq::session_base_t::read (msg_t *msg_)
|
||||
{
|
||||
// First message to send is identity (if required).
|
||||
@@ -237,11 +229,6 @@ void zmq::session_base_t::clean_pipes ()
|
||||
|
||||
void zmq::session_base_t::terminated (pipe_t *pipe_)
|
||||
{
|
||||
// If we get a term signal from our held outpipe
|
||||
// we can safely ignore it.
|
||||
if (pipe_ == outpipe)
|
||||
return;
|
||||
|
||||
// Drop the reference to the deallocated pipe.
|
||||
zmq_assert (pipe == pipe_);
|
||||
pipe = NULL;
|
||||
@@ -319,16 +306,10 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
|
||||
zmq_assert (!pipe);
|
||||
pipe = pipes [0];
|
||||
|
||||
// Ask socket to plug into the remote end of the pipe.
|
||||
// Ask socket to plug into the pipe.
|
||||
send_bind (socket, pipes [1]);
|
||||
}
|
||||
|
||||
if (outpipe && options.delay_attach_on_connect) {
|
||||
send_bind (socket, outpipe);
|
||||
// Forget the outpipe
|
||||
outpipe = NULL;
|
||||
}
|
||||
|
||||
// Plug in the engine.
|
||||
zmq_assert (!engine);
|
||||
engine = engine_;
|
||||
@@ -378,12 +359,6 @@ void zmq::session_base_t::process_term (int linger_)
|
||||
// are processed in case the linger time is non-zero.
|
||||
pipe->terminate (linger_ != 0);
|
||||
|
||||
// If we're storing a pipe to be connected, we can clear that as well
|
||||
if (outpipe) {
|
||||
outpipe->set_event_sink (this);
|
||||
outpipe->terminate (linger_ != 0);
|
||||
}
|
||||
|
||||
// TODO: Should this go into pipe_t::terminate ?
|
||||
// In case there's no engine and there's only delimiter in the
|
||||
// pipe it wouldn't be ever read. Thus we check for it explicitly.
|
||||
@@ -410,9 +385,6 @@ void zmq::session_base_t::timer_event (int id_)
|
||||
// Ask pipe to terminate even though there may be pending messages in it.
|
||||
zmq_assert (pipe);
|
||||
pipe->terminate (false);
|
||||
|
||||
if (outpipe)
|
||||
outpipe->terminate (false);
|
||||
}
|
||||
|
||||
void zmq::session_base_t::detached ()
|
||||
|
||||
@@ -107,9 +107,6 @@ namespace zmq
|
||||
// Pipe connecting the session to its socket.
|
||||
zmq::pipe_t *pipe;
|
||||
|
||||
// Pipe connecting the socket to the client
|
||||
zmq::pipe_t *outpipe;
|
||||
|
||||
// This flag is true if the remainder of the message being processed
|
||||
// is still in the in pipe.
|
||||
bool incomplete_in;
|
||||
|
||||
@@ -530,6 +530,13 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
options, paddr);
|
||||
errno_assert (session);
|
||||
|
||||
// PGM does not support subscription forwarding; ask for all data to be
|
||||
// sent to this pipe.
|
||||
bool icanhasall = false;
|
||||
if (protocol == "pgm" || protocol == "epgm")
|
||||
icanhasall = true;
|
||||
|
||||
if (options.delay_attach_on_connect != 1 && icanhasall != true) {
|
||||
// Create a bi-directional pipe.
|
||||
object_t *parents [2] = {this, session};
|
||||
pipe_t *pipes [2] = {NULL, NULL};
|
||||
@@ -538,20 +545,12 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
rc = pipepair (parents, pipes, hwms, delays);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
// PGM does not support subscription forwarding; ask for all data to be
|
||||
// sent to this pipe.
|
||||
bool icanhasall = false;
|
||||
if (protocol == "pgm" || protocol == "epgm")
|
||||
icanhasall = true;
|
||||
|
||||
// Attach local end of the pipe to the socket object.
|
||||
if (options.delay_attach_on_connect == 0)
|
||||
attach_pipe (pipes [0], icanhasall);
|
||||
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (pipes [1]);
|
||||
if (options.delay_attach_on_connect == 1)
|
||||
session->onconnect_attach_pipe (pipes [0]);
|
||||
}
|
||||
|
||||
// Save last endpoint URI
|
||||
paddr->to_string (options.last_endpoint);
|
||||
|
||||
Reference in New Issue
Block a user