ZMQ_RECONNECT_IVL socket options added.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik
2010-10-17 09:54:12 +02:00
parent 8b8837688a
commit e8e2944f45
8 changed files with 80 additions and 31 deletions

View File

@@ -212,8 +212,8 @@ Default value:: 0
Applicable socket types:: all
ZMQ_LINGER: Set linger period for socket shutdown
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ZMQ_LINGER: Retrieve linger period for socket shutdown
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_LINGER' option shall retrieve the period for pending outbound
messages to linger in memory after closing the socket. Value of -1 means
infinite. Pending messages will be kept until they are fully transferred to
@@ -227,6 +227,17 @@ Option value unit:: milliseconds
Default value:: -1
Applicable socket types:: all
ZMQ_RECONNECT_IVL: Retrieve reconnect period for connection-based transports
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECONNECT' option shall retrieve the period indicating how long it
takes for a disconnected underlying connection to attempt to reconnect.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 100
Applicable socket types:: all
ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@@ -232,6 +232,19 @@ Default value:: -1
Applicable socket types:: all
ZMQ_RECONNECT_IVL: Set reconnect period for connection-based transports
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_RECONNECT' option shall be set to specify how long it takes for a
disconnected underlying connection to attempt to reconnect. The interval
can be randomised to some extent by 0MQ to prevent reconnection storms.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 100
Applicable socket types:: all
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it

View File

@@ -192,6 +192,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_EVENTS 15
#define ZMQ_TYPE 16
#define ZMQ_LINGER 17
#define ZMQ_RECONNECT_IVL 18
/* Send/recv options. */
#define ZMQ_NOBLOCK 1

View File

@@ -69,9 +69,6 @@ namespace zmq
// Maximum number of events the I/O thread can process in one go.
max_io_events = 256,
// How long to wait (milliseconds) till reattempting to connect.
reconnect_period = 100,
// Should initial connection attempts be delayed?
wait_before_connect = false,

View File

@@ -35,6 +35,7 @@ zmq::options_t::options_t () :
rcvbuf (0),
type (-1),
linger (-1),
reconnect_ivl (100),
requires_in (false),
requires_out (false),
immediate_connect (true)
@@ -137,6 +138,18 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
linger = *((int*) optval_);
return 0;
case ZMQ_RECONNECT_IVL:
if (optvallen_ != sizeof (int)) {
errno = EINVAL;
return -1;
}
if (*((int*) optval_) < 0) {
errno = EINVAL;
return -1;
}
reconnect_ivl = *((int*) optval_);
return 0;
}
errno = EINVAL;
@@ -147,24 +160,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
{
switch (option_) {
case ZMQ_LINGER:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = linger;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_TYPE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = type;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_HWM:
if (*optvallen_ < sizeof (uint64_t)) {
errno = EINVAL;
@@ -246,6 +241,34 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*((uint64_t*) optval_) = rcvbuf;
*optvallen_ = sizeof (uint64_t);
return 0;
case ZMQ_TYPE:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = type;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_LINGER:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = linger;
*optvallen_ = sizeof (int);
return 0;
case ZMQ_RECONNECT_IVL:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = reconnect_ivl;
*optvallen_ = sizeof (int);
return 0;
}
errno = EINVAL;

View File

@@ -57,6 +57,9 @@ namespace zmq
// Linger time, in milliseconds.
int linger;
// Interval between attempts to reconnect, in milliseconds.
int reconnect_ivl;
// These options are never set by the user directly. Instead they are
// provided by the specific socket type.
bool requires_in;

View File

@@ -54,20 +54,21 @@ zmq::zmq_connecter_t::~zmq_connecter_t ()
rm_fd (handle);
}
int zmq::zmq_connecter_t::get_reconnect_period ()
int zmq::zmq_connecter_t::get_reconnect_ivl ()
{
#if defined ZMQ_HAVE_WINDOWS
return (reconnect_period + (((int)GetCurrentProcessId () * 13)
% reconnect_period));
return (options.reconnect_ivl + (((int) GetCurrentProcessId () * 13)
% options.reconnect_ivl));
#else
return (reconnect_period + (((int)getpid () * 13) % reconnect_period));
return (options.reconnect_ivl + (((int) getpid () * 13)
% options.reconnect_ivl));
#endif
}
void zmq::zmq_connecter_t::process_plug ()
{
if (wait)
add_timer (get_reconnect_period (), reconnect_timer_id);
add_timer (get_reconnect_ivl (), reconnect_timer_id);
else
start_connecting ();
}
@@ -90,7 +91,7 @@ void zmq::zmq_connecter_t::out_event ()
if (fd == retired_fd) {
tcp_connecter.close ();
wait = true;
add_timer (get_reconnect_period (), reconnect_timer_id);
add_timer (get_reconnect_ivl (), reconnect_timer_id);
return;
}
@@ -139,5 +140,5 @@ void zmq::zmq_connecter_t::start_connecting ()
// Handle any other error condition by eventual reconnect.
wait = true;
add_timer (get_reconnect_period (), reconnect_timer_id);
add_timer (get_reconnect_ivl (), reconnect_timer_id);
}

View File

@@ -56,7 +56,7 @@ namespace zmq
void start_connecting ();
// Internal function to return the reconnect backoff delay.
int get_reconnect_period ();
int get_reconnect_ivl ();
// Actual connecting socket.
tcp_connecter_t tcp_connecter;