mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-13 18:55:10 +01:00
Merge pull request #1451 from jbreams/heartbeat-defaults
Fix units and default values for heartbeats options
This commit is contained in:
commit
6ab66ca51a
@ -278,11 +278,13 @@ ZMQ_HEARTBEAT_TTL: Set the TTL value for ZMTP heartbeats
|
||||
The 'ZMQ_HEARTBEAT_TTL' option shall set the timeout on the remote peer for ZMTP
|
||||
heartbeats. If this option is greater than 0, the remote side shall time out the
|
||||
connection if it does not receive any more traffic within the TTL period. This option
|
||||
does not have any effect if 'ZMQ_HEARTBEAT_IVL' is not set or is 0.
|
||||
does not have any effect if 'ZMQ_HEARTBEAT_IVL' is not set or is 0. Internally, this
|
||||
value is rounded down to the nearest decisecond, any value less than 100 will have
|
||||
no effect.
|
||||
|
||||
[horizontal]
|
||||
Option value type:: uint16_t
|
||||
Option value unit:: deciseconds (1/10th of a second)
|
||||
Option value type:: int
|
||||
Option value unit:: milliseconds
|
||||
Default value:: 0
|
||||
Applicable socket types:: all, when using connection-oriented transports
|
||||
|
||||
|
@ -72,7 +72,7 @@ zmq::options_t::options_t () :
|
||||
connected (false),
|
||||
heartbeat_ttl (0),
|
||||
heartbeat_interval (0),
|
||||
heartbeat_timeout (0)
|
||||
heartbeat_timeout (-1)
|
||||
{
|
||||
}
|
||||
|
||||
@ -530,7 +530,9 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
|
||||
break;
|
||||
|
||||
case ZMQ_HEARTBEAT_TTL:
|
||||
if (is_int && value >= 0 && value < 0xffff) {
|
||||
// Convert this to deciseconds from milliseconds
|
||||
value = value / 100;
|
||||
if (is_int && value >= 0 && value <= 6553) {
|
||||
heartbeat_ttl = (uint16_t)value;
|
||||
return 0;
|
||||
}
|
||||
@ -905,7 +907,8 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
|
||||
|
||||
case ZMQ_HEARTBEAT_TTL:
|
||||
if (is_int) {
|
||||
*(uint16_t*)value = heartbeat_ttl;
|
||||
// Convert the internal deciseconds value to milliseconds
|
||||
*value = heartbeat_ttl * 100;
|
||||
return 0;
|
||||
}
|
||||
break;
|
||||
|
@ -98,6 +98,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
|
||||
has_ttl_timer (false),
|
||||
has_timeout_timer (false),
|
||||
has_heartbeat_timer (false),
|
||||
heartbeat_timeout (0),
|
||||
socket (NULL)
|
||||
{
|
||||
int rc = tx_msg.init ();
|
||||
@ -144,6 +145,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
|
||||
rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int));
|
||||
errno_assert (rc == 0);
|
||||
#endif
|
||||
if(options.heartbeat_interval > 0) {
|
||||
heartbeat_timeout = options.heartbeat_timeout;
|
||||
if(heartbeat_timeout == -1)
|
||||
heartbeat_timeout = options.heartbeat_interval;
|
||||
}
|
||||
}
|
||||
|
||||
zmq::stream_engine_t::~stream_engine_t ()
|
||||
@ -1032,8 +1038,8 @@ int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
|
||||
|
||||
rc = mechanism->encode (msg_);
|
||||
next_msg = &stream_engine_t::pull_and_encode;
|
||||
if(!has_timeout_timer && options.heartbeat_timeout > 0) {
|
||||
add_timer(options.heartbeat_timeout, heartbeat_timeout_timer_id);
|
||||
if(!has_timeout_timer && heartbeat_timeout > 0) {
|
||||
add_timer(heartbeat_timeout, heartbeat_timeout_timer_id);
|
||||
has_timeout_timer = true;
|
||||
}
|
||||
return rc;
|
||||
@ -1062,8 +1068,8 @@ int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
|
||||
memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
|
||||
remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
|
||||
// The remote heartbeat is in 10ths of a second
|
||||
// so we multiply it by 10 to get the timer interval.
|
||||
remote_heartbeat_ttl *= 10;
|
||||
// so we multiply it by 100 to get the timer interval in ms.
|
||||
remote_heartbeat_ttl *= 100;
|
||||
|
||||
if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
|
||||
add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
|
||||
|
@ -219,6 +219,7 @@ namespace zmq
|
||||
bool has_ttl_timer;
|
||||
bool has_timeout_timer;
|
||||
bool has_heartbeat_timer;
|
||||
int heartbeat_timeout;
|
||||
|
||||
// Socket
|
||||
zmq::socket_base_t *socket;
|
||||
|
@ -131,10 +131,6 @@ prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_
|
||||
value = 50;
|
||||
rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
|
||||
assert (rc == 0);
|
||||
|
||||
value = 50;
|
||||
rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_TIMEOUT, &value, sizeof(value));
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
if(is_curve)
|
||||
@ -216,59 +212,49 @@ test_heartbeat_timeout (void)
|
||||
static void
|
||||
test_heartbeat_ttl (void)
|
||||
{
|
||||
int rc;
|
||||
int rc, value;
|
||||
|
||||
// Set up our context and sockets
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
void * server, * server_mon;
|
||||
void * server, * server_mon, *client;
|
||||
prep_server_socket(ctx, 0, 0, &server, &server_mon);
|
||||
|
||||
struct sockaddr_in ip4addr;
|
||||
int s;
|
||||
client = zmq_socket(ctx, ZMQ_DEALER);
|
||||
assert(client != NULL);
|
||||
|
||||
ip4addr.sin_family = AF_INET;
|
||||
ip4addr.sin_port = htons(5556);
|
||||
inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr);
|
||||
// Set the heartbeat TTL to 0.1 seconds
|
||||
value = 100;
|
||||
zmq_setsockopt(client, ZMQ_HEARTBEAT_TTL, &value, sizeof(value));
|
||||
|
||||
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
||||
rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr);
|
||||
assert (rc > -1);
|
||||
// Set the heartbeat interval to much longer than the TTL so that
|
||||
// the socket times out oon the remote side.
|
||||
value = 250;
|
||||
zmq_setsockopt(client, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
|
||||
|
||||
// Mock a ZMTP 3 client so we can forcibly time out a connection
|
||||
mock_handshake(s);
|
||||
rc = zmq_connect(client, "tcp://localhost:5556");
|
||||
assert(rc == 0);
|
||||
|
||||
// By now everything should report as connected
|
||||
rc = get_monitor_event(server_mon);
|
||||
assert(rc == ZMQ_EVENT_ACCEPTED);
|
||||
|
||||
// This is a ping message with a 0.5 second TTL.
|
||||
uint8_t ping_message[] = {
|
||||
0x4, // This specifies that this is a command message
|
||||
0x7, // The total payload length is 8 bytes
|
||||
0x4, 'P', 'I', 'N', 'G', // The command name
|
||||
0, 10 // This is a network-order 16-bit TTL value
|
||||
};
|
||||
rc = send(s, (const char*)ping_message, sizeof(ping_message), 0);
|
||||
assert(rc == sizeof(ping_message));
|
||||
|
||||
uint8_t pong_buffer[8] = { 0 };
|
||||
rc = recv(s, (char*)pong_buffer, 7, 0);
|
||||
assert(rc == 7 && memcmp(pong_buffer, "\4\5\4PONG", 7) == 0);
|
||||
msleep(100);
|
||||
|
||||
// We should have been disconnected
|
||||
rc = get_monitor_event(server_mon);
|
||||
assert(rc == ZMQ_EVENT_DISCONNECTED);
|
||||
|
||||
close(s);
|
||||
|
||||
rc = zmq_close (server);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (server_mon);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (client);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user