ZMQ_RECONNECT_IVL_MAX socket option added

It allows for exponential back-off strategy when reconnecting.

Signed-off-by: Thijs Terlouw <thijsterlouw@gmail.com>
This commit is contained in:
Thijs Terlouw 2011-01-26 07:01:06 +01:00 committed by Martin Sustrik
parent 8e61a11b39
commit f7f1dfc86d
8 changed files with 120 additions and 26 deletions

View File

@ -54,6 +54,7 @@ Tamara Kustarova <kustarova.tamara@gmail.com>
Taras Shpot <taras.shpot@eleks.com>
Tero Marttila <terom@fixme.fi>
Terry Wilson <terry@logivox.net>
Thijs Terlouw <thijsterlouw@gmail.com>
Toralf Wittner <toralf.wittner@gmail.com>
Vitaly Mayatskikh <v.mayatskih@gmail.com>

View File

@ -261,10 +261,11 @@ Option value unit:: milliseconds
Default value:: -1 (infinite)
Applicable socket types:: all
ZMQ_RECONNECT_IVL: Retrieve reconnection interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECONNECT_IVL' option shall retrieve the reconnection interval for the
specified 'socket'. The reconnection interval is the maximum period 0MQ shall
The 'ZMQ_RECONNECT_IVL' option shall retrieve the initial reconnection interval
for the specified 'socket'. The reconnection interval is the period 0MQ shall
wait between attempts to reconnect disconnected peers when using
connection-oriented transports.
@ -278,6 +279,24 @@ Default value:: 100
Applicable socket types:: all, only for connection-oriented transports
ZMQ_RECONNECT_IVL_MAX: Retrieve maximum reconnection interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECONNECT_IVL_MAX' option shall retrieve the maximum reconnection
interval for the specified 'socket'. This is the maximum period 0MQ shall wait
between attempts to reconnect. On each reconnect attempt, the previous interval
shall be doubled untill ZMQ_RECONNECT_IVL_MAX is reached. This allows for
exponential backoff strategy. Default value means no exponential backoff is
performed and reconnect interval calculations are only based on ZMQ_RECONNECT_IVL.
NOTE: Values less than ZMQ_RECONNECT_IVL will be ignored.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 0 (only use ZMQ_RECONNECT_IVL)
Applicable socket types:: all, only for connection-oriented transport
ZMQ_BACKLOG: Retrieve maximum length of the queue of outstanding connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_BACKLOG' option shall retrieve the maximum length of the queue of

View File

@ -272,9 +272,9 @@ Applicable socket types:: all
ZMQ_RECONNECT_IVL: Set reconnection interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECONNECT_IVL' option shall set the reconnection interval for the
specified 'socket'. The reconnection interval is the maximum period 0MQ shall
wait between attempts to reconnect disconnected peers when using
The 'ZMQ_RECONNECT_IVL' option shall set the initial reconnection interval for
the specified 'socket'. The reconnection interval is the period 0MQ
shall wait between attempts to reconnect disconnected peers when using
connection-oriented transports.
NOTE: The reconnection interval may be randomized by 0MQ to prevent
@ -287,6 +287,24 @@ Default value:: 100
Applicable socket types:: all, only for connection-oriented transports
ZMQ_RECONNECT_IVL_MAX: Set maximum reconnection interval
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECONNECT_IVL_MAX' option shall set the maximum reconnection interval
for the specified 'socket'. This is the maximum period 0MQ shall wait between
attempts to reconnect. On each reconnect attempt, the previous interval shall be
doubled untill ZMQ_RECONNECT_IVL_MAX is reached. This allows for exponential
backoff strategy. Default value means no exponential backoff is performed and
reconnect interval calculations are only based on ZMQ_RECONNECT_IVL.
NOTE: Values less than ZMQ_RECONNECT_IVL will be ignored.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 0 (only use ZMQ_RECONNECT_IVL)
Applicable socket types:: all, only for connection-oriented transports
ZMQ_BACKLOG: Set maximum length of the queue of outstanding connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_BACKLOG' option shall set the maximum length of the queue of

View File

@ -203,7 +203,8 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_RECONNECT_IVL 18
#define ZMQ_BACKLOG 19
#define ZMQ_RECOVERY_IVL_MSEC 20 /* opt. recovery time, reconcile in 3.x */
#define ZMQ_RECONNECT_IVL_MAX 21
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
#define ZMQ_SNDMORE 2

View File

@ -37,6 +37,7 @@ zmq::options_t::options_t () :
type (-1),
linger (-1),
reconnect_ivl (100),
reconnect_ivl_max (0),
backlog (100),
requires_in (false),
requires_out (false),
@ -161,6 +162,18 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
reconnect_ivl = *((int*) optval_);
return 0;
case ZMQ_RECONNECT_IVL_MAX:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
if (*((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
reconnect_ivl_max = *((int*) optval_);
return 0;
case ZMQ_BACKLOG:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
@ -297,6 +310,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
case ZMQ_RECONNECT_IVL_MAX:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = reconnect_ivl_max;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_BACKLOG:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;

View File

@ -59,8 +59,12 @@ namespace zmq
// Linger time, in milliseconds.
int linger;
// Interval between attempts to reconnect, in milliseconds.
// Minimum interval between attempts to reconnect, in milliseconds.
// Default 100ms
int reconnect_ivl;
// Maximum interval between attempts to reconnect, in milliseconds.
// Default 0 (unused)
int reconnect_ivl_max;
// Maximum backlog for pending connections.
int backlog;

View File

@ -40,10 +40,11 @@ zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_,
io_object_t (io_thread_),
handle_valid (false),
wait (wait_before_connect),
session (session_)
session (session_),
current_reconnect_ivl(options.reconnect_ivl)
{
int rc = tcp_connecter.set_address (protocol_, address_);
zmq_assert (rc == 0);
int rc = tcp_connecter.set_address (protocol_, address_);
zmq_assert (rc == 0);
}
zmq::zmq_connecter_t::~zmq_connecter_t ()
@ -54,21 +55,10 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
rm_fd (handle);
}
int zmq::zmq_connecter_t::get_reconnect_ivl ()
{
#if defined ZMQ_HAVE_WINDOWS
return (options.reconnect_ivl + (((int) GetCurrentProcessId () * 13)
% options.reconnect_ivl));
#else
return (options.reconnect_ivl + (((int) getpid () * 13)
% options.reconnect_ivl));
#endif
}
void zmq::zmq_connecter_t::process_plug ()
{
if (wait)
add_timer (get_reconnect_ivl (), reconnect_timer_id);
add_reconnect_timer();
else
start_connecting ();
}
@ -91,7 +81,7 @@ void zmq::zmq_connecter_t::out_event ()
if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
add_timer (get_reconnect_ivl (), reconnect_timer_id);
add_reconnect_timer();
return;
}
@ -140,5 +130,36 @@ void zmq::zmq_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect.
wait = true;
add_timer (get_reconnect_ivl (), reconnect_timer_id);
add_reconnect_timer();
}
void zmq::zmq_connecter_t::add_reconnect_timer()
{
add_timer (get_new_reconnect_ivl(), reconnect_timer_id);
}
int zmq::zmq_connecter_t::get_new_reconnect_ivl ()
{
#if defined ZMQ_HAVE_WINDOWS
int pid = (int) GetCurrentProcessId ();
#else
int pid = (int) getpid ();
#endif
// The new interval is the current interval + random value.
int this_interval = current_reconnect_ivl +
((pid * 13) % options.reconnect_ivl);
// Only change the current reconnect interval if the maximum reconnect
// interval was set and if it's larger than the reconnect interval.
if (options.reconnect_ivl_max > 0 &&
options.reconnect_ivl_max > options.reconnect_ivl) {
// Calculate the next interval
current_reconnect_ivl = current_reconnect_ivl * 2;
if(current_reconnect_ivl >= options.reconnect_ivl_max) {
current_reconnect_ivl = options.reconnect_ivl_max;
}
}
return this_interval;
}

View File

@ -55,8 +55,13 @@ namespace zmq
// Internal function to start the actual connection establishment.
void start_connecting ();
// Internal function to return the reconnect backoff delay.
int get_reconnect_ivl ();
// Internal function to add a reconnect timer
void add_reconnect_timer();
// Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval
int get_new_reconnect_ivl ();
// Actual connecting socket.
tcp_connecter_t tcp_connecter;
@ -74,6 +79,9 @@ namespace zmq
// Reference to the session we belong to.
class session_t *session;
// Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl;
zmq_connecter_t (const zmq_connecter_t&);
const zmq_connecter_t &operator = (const zmq_connecter_t&);
};