Simplify {tcp|ipc}_connecter

The patch is meant to make the code easier to understand.
The 'wait' attribute is replaced by 'delayed_start'
and 'timer_started' attributes. The former is constant and
is initialized in the constructor. The latter is a flag
reflecting whether a timer has been started and changes during
the lifetime of the object.
This commit is contained in:
Martin Hurton 2012-06-13 02:33:02 +02:00
parent c8d0d68471
commit e0fed9d29a
4 changed files with 34 additions and 28 deletions

View File

@ -42,13 +42,14 @@
zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_,
const address_t *addr_, bool wait_) :
const address_t *addr_, bool delayed_start_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
addr (addr_),
s (retired_fd),
handle_valid (false),
wait (wait_),
delayed_start (delayed_start_),
timer_started (false),
session (session_),
current_reconnect_ivl(options.reconnect_ivl)
{
@ -59,24 +60,24 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
zmq::ipc_connecter_t::~ipc_connecter_t ()
{
zmq_assert (!wait);
zmq_assert (!timer_started);
zmq_assert (!handle_valid);
zmq_assert (s == retired_fd);
}
void zmq::ipc_connecter_t::process_plug ()
{
if (wait)
add_reconnect_timer();
if (delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::ipc_connecter_t::process_term (int linger_)
{
if (wait) {
if (timer_started) {
cancel_timer (reconnect_timer_id);
wait = false;
timer_started = false;
}
if (handle_valid) {
@ -107,7 +108,6 @@ void zmq::ipc_connecter_t::out_event ()
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) {
close ();
wait = true;
add_reconnect_timer();
return;
}
@ -128,7 +128,7 @@ void zmq::ipc_connecter_t::out_event ()
void zmq::ipc_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id);
wait = false;
timer_started = false;
start_connecting ();
}
@ -156,7 +156,6 @@ void zmq::ipc_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect.
close ();
wait = true;
add_reconnect_timer();
}
@ -165,6 +164,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer()
int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl);
timer_started = true;
}
int zmq::ipc_connecter_t::get_new_reconnect_ivl ()

View File

@ -41,11 +41,11 @@ namespace zmq
{
public:
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
// If 'delayed_start' is true connecter first waits for a while,
// then starts connection process.
ipc_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_, const options_t &options_,
const address_t *addr_, bool delay_);
const address_t *addr_, bool delayed_start_);
~ipc_connecter_t ();
private:
@ -99,7 +99,10 @@ namespace zmq
bool handle_valid;
// If true, connecter is waiting a while before trying to connect.
bool wait;
const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
// Reference to the session we belong to.
zmq::session_base_t *session;

View File

@ -52,13 +52,14 @@
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_,
const address_t *addr_, bool wait_) :
const address_t *addr_, bool delayed_start_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
addr (addr_),
s (retired_fd),
handle_valid (false),
wait (wait_),
delayed_start (delayed_start_),
timer_started (false),
session (session_),
current_reconnect_ivl(options.reconnect_ivl)
{
@ -69,24 +70,24 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
zmq::tcp_connecter_t::~tcp_connecter_t ()
{
zmq_assert (!wait);
zmq_assert (!timer_started);
zmq_assert (!handle_valid);
zmq_assert (s == retired_fd);
}
void zmq::tcp_connecter_t::process_plug ()
{
if (wait)
add_reconnect_timer();
if (delayed_start)
add_reconnect_timer ();
else
start_connecting ();
}
void zmq::tcp_connecter_t::process_term (int linger_)
{
if (wait) {
if (timer_started) {
cancel_timer (reconnect_timer_id);
wait = false;
timer_started = false;
}
if (handle_valid) {
@ -117,7 +118,6 @@ void zmq::tcp_connecter_t::out_event ()
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) {
close ();
wait = true;
add_reconnect_timer();
return;
}
@ -141,7 +141,7 @@ void zmq::tcp_connecter_t::out_event ()
void zmq::tcp_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id);
wait = false;
timer_started = false;
start_connecting ();
}
@ -169,7 +169,6 @@ void zmq::tcp_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect.
close ();
wait = true;
add_reconnect_timer();
}
@ -178,6 +177,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer()
int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl);
timer_started = true;
}
int zmq::tcp_connecter_t::get_new_reconnect_ivl ()

View File

@ -39,11 +39,11 @@ namespace zmq
{
public:
// If 'delay' is true connecter first waits for a while, then starts
// connection process.
// If 'delayed_start' is true connecter first waits for a while,
// then starts connection process.
tcp_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_, const options_t &options_,
const address_t *addr_, bool delay_);
const address_t *addr_, bool delayed_start_);
~tcp_connecter_t ();
private:
@ -97,7 +97,10 @@ namespace zmq
bool handle_valid;
// If true, connecter is waiting a while before trying to connect.
bool wait;
const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
// Reference to the session we belong to.
zmq::session_base_t *session;