diff --git a/src/session_base.cpp b/src/session_base.cpp index 60c530fa..a10ab0df 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -38,7 +38,6 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, bool active_, class socket_base_t *socket_, const options_t &options_, address_t *addr_) { - session_base_t *s = NULL; switch (options_.type) { case ZMQ_REQ: @@ -228,14 +227,16 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) // If we are waiting for pending messages to be sent, at this point // we are sure that there will be no more messages and we can proceed // with termination safely. - if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) - proceed_with_term (); + if (pending && !pipe && !zap_pipe && terminating_pipes.empty ()) { + pending = false; + own_t::process_term (0); + } } void zmq::session_base_t::read_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (unlikely(pipe_ != pipe && pipe_ != zap_pipe)) { + if (unlikely (pipe_ != pipe && pipe_ != zap_pipe)) { zmq_assert (terminating_pipes.count (pipe_) == 1); return; } @@ -354,9 +355,9 @@ void zmq::session_base_t::process_attach (i_engine *engine_) // Remember the local end of the pipe. zmq_assert (!pipe); pipe = pipes [0]; - // Store engine assoc_fd for lilnking pipe to fd - pipe->assoc_fd=engine_->get_assoc_fd(); - pipes[1]->assoc_fd=pipe->assoc_fd; + // Store engine assoc_fd for linking pipe to fd + pipe->assoc_fd = engine_->get_assoc_fd (); + pipes [1]->assoc_fd = pipe->assoc_fd; // Ask socket to plug into the remote end of the pipe. send_bind (socket, pipes [1]); } @@ -409,8 +410,8 @@ void zmq::session_base_t::process_term (int linger_) // If the termination of the pipe happens before the term command is // delivered there's nothing much to do. We can proceed with the // standard termination immediately. - if (!pipe && !zap_pipe) { - proceed_with_term (); + if (!pipe && !zap_pipe && terminating_pipes.empty ()) { + own_t::process_term (0); return; } @@ -440,15 +441,6 @@ void zmq::session_base_t::process_term (int linger_) zap_pipe->terminate (false); } -void zmq::session_base_t::proceed_with_term () -{ - // The pending phase has just ended. - pending = false; - - // Continue with standard termination. - own_t::process_term (0); -} - void zmq::session_base_t::timer_event (int id_) { // Linger period expired. We can proceed with termination even though @@ -465,8 +457,8 @@ void zmq::session_base_t::reconnect () { // For delayed connect situations, terminate the pipe // and reestablish later on - if (pipe && options.immediate == 1 - && addr->protocol != "pgm" && addr->protocol != "epgm" + if (pipe && options.immediate == 1 + && addr->protocol != "pgm" && addr->protocol != "epgm" && addr->protocol != "norm") { pipe->hiccup (); pipe->terminate (false); @@ -578,10 +570,9 @@ void zmq::session_base_t::start_connecting (bool wait_) return; } #endif - + #ifdef ZMQ_HAVE_NORM - if (addr->protocol == "norm") - { + if (addr->protocol == "norm") { // At this point we'll create message pipes to the session straight // away. There's no point in delaying it as no concept of 'connect' // exists with NORM anyway. diff --git a/src/session_base.hpp b/src/session_base.hpp index c90ac468..8491827f 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -112,12 +112,9 @@ namespace zmq // Call this function when engine disconnect to get rid of leftovers. void clean_pipes (); - // Call this function to move on with the delayed process_term. - void proceed_with_term (); - // If true, this session (re)connects to the peer. Otherwise, it's // a transient session created by the listener. - bool active; + const bool active; // Pipe connecting the session to its socket. zmq::pipe_t *pipe; diff --git a/src/xpub.cpp b/src/xpub.cpp index 3a5ed233..379a4367 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -26,11 +26,11 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), - verbose(false), - more (false) + verbose (false), + more (false), + lossy (true) { options.type = ZMQ_XPUB; - lossy = true; } zmq::xpub_t::~xpub_t ()