From 973d13d545f9b5a18ad49d82ceed49bb6409ea8f Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Tue, 3 Dec 2013 09:59:26 +0100 Subject: [PATCH] Code cleanup --- src/session_base.cpp | 69 +++++++++++++++++++------------------------ src/session_base.hpp | 10 +++---- src/stream_engine.cpp | 2 +- 3 files changed, 37 insertions(+), 44 deletions(-) diff --git a/src/session_base.cpp b/src/session_base.cpp index e7c3e819..b760db8c 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -33,13 +33,13 @@ #include "req.hpp" zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, - bool connect_, class socket_base_t *socket_, const options_t &options_, + bool active_, class socket_base_t *socket_, const options_t &options_, const address_t *addr_) { session_base_t *s = NULL; switch (options_.type) { case ZMQ_REQ: - s = new (std::nothrow) req_session_t (io_thread_, connect_, + s = new (std::nothrow) req_session_t (io_thread_, active_, socket_, options_, addr_); break; case ZMQ_DEALER: @@ -53,7 +53,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_PULL: case ZMQ_PAIR: case ZMQ_STREAM: - s = new (std::nothrow) session_base_t (io_thread_, connect_, + s = new (std::nothrow) session_base_t (io_thread_, active_, socket_, options_, addr_); break; default: @@ -65,11 +65,11 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, } zmq::session_base_t::session_base_t (class io_thread_t *io_thread_, - bool connect_, class socket_base_t *socket_, const options_t &options_, + bool active_, class socket_base_t *socket_, const options_t &options_, const address_t *addr_) : own_t (io_thread_, options_), io_object_t (io_thread_), - connect (connect_), + active (active_), pipe (NULL), zap_pipe (NULL), incomplete_in (false), @@ -177,23 +177,22 @@ void zmq::session_base_t::flush () void zmq::session_base_t::clean_pipes () { - if (pipe) { + zmq_assert (pipe != NULL); - // Get rid of half-processed messages in the out pipe. Flush any - // unflushed messages upstream. - pipe->rollback (); - pipe->flush (); + // Get rid of half-processed messages in the out pipe. Flush any + // unflushed messages upstream. + pipe->rollback (); + pipe->flush (); - // Remove any half-read message from the in pipe. - while (incomplete_in) { - msg_t msg; - int rc = msg.init (); - errno_assert (rc == 0); - rc = pull_msg (&msg); - errno_assert (rc == 0); - rc = msg.close (); - errno_assert (rc == 0); - } + // Remove any half-read message from the in pipe. + while (incomplete_in) { + msg_t msg; + int rc = msg.init (); + errno_assert (rc == 0); + rc = pull_msg (&msg); + errno_assert (rc == 0); + rc = msg.close (); + errno_assert (rc == 0); } } @@ -208,9 +207,8 @@ void zmq::session_base_t::pipe_terminated (pipe_t *pipe_) // If this is our current pipe, remove it pipe = NULL; else - if (pipe_ == zap_pipe) { + if (pipe_ == zap_pipe) zap_pipe = NULL; - } else // Remove the pipe from the detached pipes set terminating_pipes.erase (pipe_); @@ -275,7 +273,7 @@ zmq::socket_base_t *zmq::session_base_t::get_socket () void zmq::session_base_t::process_plug () { - if (connect) + if (active) start_connecting (false); } @@ -364,16 +362,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_) engine->plug (io_thread, this); } -void zmq::session_base_t::detach () +void zmq::session_base_t::engine_error () { // Engine is dead. Let's forget about it. engine = NULL; // Remove any half-done messages from the pipes. - clean_pipes (); + if (pipe) + clean_pipes (); - // Send the event to the derived class. - detached (); + if (active) + reconnect (); + else + terminate (); // Just in case there's only a delimiter in the pipe. if (pipe) @@ -432,7 +433,6 @@ void zmq::session_base_t::proceed_with_term () void zmq::session_base_t::timer_event (int id_) { - // Linger period expired. We can proceed with termination even though // there are still pending messages to be sent. zmq_assert (id_ == linger_timer_id); @@ -443,14 +443,8 @@ void zmq::session_base_t::timer_event (int id_) pipe->terminate (false); } -void zmq::session_base_t::detached () +void zmq::session_base_t::reconnect () { - // Transient session self-destructs after peer disconnects. - if (!connect) { - terminate (); - return; - } - // For delayed connect situations, terminate the pipe // and reestablish later on if (pipe && options.immediate == 1 @@ -475,7 +469,7 @@ void zmq::session_base_t::detached () void zmq::session_base_t::start_connecting (bool wait_) { - zmq_assert (connect); + zmq_assert (active); // Choose I/O thread to run connecter in. Given that we are already // running in an I/O thread, there must be at least one available. @@ -506,12 +500,11 @@ void zmq::session_base_t::start_connecting (bool wait_) tipc_connecter_t *connecter = new (std::nothrow) tipc_connecter_t ( io_thread, this, options, addr, wait_); alloc_assert (connecter); - launch_child(connecter); + launch_child (connecter); return; } #endif - #ifdef ZMQ_HAVE_OPENPGM // Both PGM and EPGM transports are using the same infrastructure. diff --git a/src/session_base.hpp b/src/session_base.hpp index 2ef7dc50..7cd8c192 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -46,7 +46,7 @@ namespace zmq // Create a session of the particular type. static session_base_t *create (zmq::io_thread_t *io_thread_, - bool connect_, zmq::socket_base_t *socket_, + bool active_, zmq::socket_base_t *socket_, const options_t &options_, const address_t *addr_); // To be used once only, when creating the session. @@ -55,7 +55,7 @@ namespace zmq // Following functions are the interface exposed towards the engine. virtual void reset (); void flush (); - void detach (); + void engine_error (); // i_pipe_events interface implementation. void read_activated (zmq::pipe_t *pipe_); @@ -88,7 +88,7 @@ namespace zmq protected: - session_base_t (zmq::io_thread_t *io_thread_, bool connect_, + session_base_t (zmq::io_thread_t *io_thread_, bool active_, zmq::socket_base_t *socket_, const options_t &options_, const address_t *addr_); virtual ~session_base_t (); @@ -97,7 +97,7 @@ namespace zmq void start_connecting (bool wait_); - void detached (); + void reconnect (); // Handlers for incoming commands. void process_plug (); @@ -116,7 +116,7 @@ namespace zmq // If true, this session (re)connects to the peer. Otherwise, it's // a transient session created by the listener. - bool connect; + bool active; // Pipe connecting the session to its socket. zmq::pipe_t *pipe; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index cb70680b..447052ed 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -743,7 +743,7 @@ void zmq::stream_engine_t::error () zmq_assert (session); socket->event_disconnected (endpoint, s); session->flush (); - session->detach (); + session->engine_error (); unplug (); delete this; }