diff --git a/src/session_base.cpp b/src/session_base.cpp index 681b1455..c13308aa 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -280,13 +280,7 @@ void zmq::session_base_t::process_plug () void zmq::session_base_t::process_attach (i_engine *engine_) { - // If some other object (e.g. init) notifies us that the connection failed - // without creating an engine we need to start the reconnection process. - if (!engine_) { - zmq_assert (!engine); - detached (); - return; - } + zmq_assert (engine_ != NULL); // Create the pipe if it does not exist yet. if (!pipe && !is_terminating ()) { diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 1161c292..8142ce5d 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -52,7 +52,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : outsize (0), encoder (out_batch_size), session (NULL), - leftover_session (NULL), options (options_), plugged (false) { @@ -109,7 +108,6 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, { zmq_assert (!plugged); plugged = true; - leftover_session = NULL; // Connect to session object. zmq_assert (!session); @@ -144,7 +142,6 @@ void zmq::stream_engine_t::unplug () // Disconnect from session object. encoder.set_session (NULL); decoder.set_session (NULL); - leftover_session = session; session = NULL; endpoint.clear(); } @@ -185,12 +182,8 @@ void zmq::stream_engine_t::in_event () else { // Stop polling for input if we got stuck. - if (processed < insize) { - - // This may happen if queue limits are in effect. - if (plugged) - reset_pollin (handle); - } + if (processed < insize) + reset_pollin (handle); // Adjust the buffer. inpos += processed; @@ -198,20 +191,14 @@ void zmq::stream_engine_t::in_event () } // Flush all messages the decoder may have produced. - // If IO handler has unplugged engine, flush transient IO handler. - if (unlikely (!plugged)) { - zmq_assert (leftover_session); - leftover_session->flush (); - } else { - session->flush (); - } + session->flush (); // Input error has occurred. If the last decoded // message has already been accepted, we terminate // the engine immediately. Otherwise, we stop // waiting for input events and postpone the termination // until after the session has accepted the message. - if (session != NULL && disconnection) { + if (disconnection) { input_error = true; if (decoder.stalled ()) reset_pollin (handle); @@ -228,13 +215,6 @@ void zmq::stream_engine_t::out_event () outpos = NULL; encoder.get_data (&outpos, &outsize); - // If IO handler has unplugged engine, flush transient IO handler. - if (unlikely (!plugged)) { - zmq_assert (leftover_session); - leftover_session->flush (); - return; - } - // If there is no data to send, stop polling for output. if (outsize == 0) { reset_pollout (handle);