diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 8728472f..3a8c83bb 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -278,11 +278,13 @@ ZMQ_HEARTBEAT_TTL: Set the TTL value for ZMTP heartbeats The 'ZMQ_HEARTBEAT_TTL' option shall set the timeout on the remote peer for ZMTP heartbeats. If this option is greater than 0, the remote side shall time out the connection if it does not receive any more traffic within the TTL period. This option -does not have any effect if 'ZMQ_HEARTBEAT_IVL' is not set or is 0. +does not have any effect if 'ZMQ_HEARTBEAT_IVL' is not set or is 0. Internally, this +value is rounded down to the nearest decisecond, any value less than 100 will have +no effect. [horizontal] -Option value type:: uint16_t -Option value unit:: deciseconds (1/10th of a second) +Option value type:: int +Option value unit:: milliseconds Default value:: 0 Applicable socket types:: all, when using connection-oriented transports diff --git a/src/options.cpp b/src/options.cpp index ebc35290..6742f92e 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -72,7 +72,7 @@ zmq::options_t::options_t () : connected (false), heartbeat_ttl (0), heartbeat_interval (0), - heartbeat_timeout (0) + heartbeat_timeout (-1) { } @@ -530,7 +530,9 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, break; case ZMQ_HEARTBEAT_TTL: - if (is_int && value >= 0 && value < 0xffff) { + // Convert this to deciseconds from milliseconds + value = value / 100; + if (is_int && value >= 0 && value <= 6553) { heartbeat_ttl = (uint16_t)value; return 0; } @@ -905,7 +907,8 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) case ZMQ_HEARTBEAT_TTL: if (is_int) { - *(uint16_t*)value = heartbeat_ttl; + // Convert the internal deciseconds value to milliseconds + *value = heartbeat_ttl * 100; return 0; } break; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 6190395c..96b6bde6 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -98,6 +98,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, has_ttl_timer (false), has_timeout_timer (false), has_heartbeat_timer (false), + heartbeat_timeout (0), socket (NULL) { int rc = tx_msg.init (); @@ -144,6 +145,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); errno_assert (rc == 0); #endif + if(options.heartbeat_interval > 0) { + heartbeat_timeout = options.heartbeat_timeout; + if(heartbeat_timeout == -1) + heartbeat_timeout = options.heartbeat_interval; + } } zmq::stream_engine_t::~stream_engine_t () @@ -1032,8 +1038,8 @@ int zmq::stream_engine_t::produce_ping_message(msg_t * msg_) rc = mechanism->encode (msg_); next_msg = &stream_engine_t::pull_and_encode; - if(!has_timeout_timer && options.heartbeat_timeout > 0) { - add_timer(options.heartbeat_timeout, heartbeat_timeout_timer_id); + if(!has_timeout_timer && heartbeat_timeout > 0) { + add_timer(heartbeat_timeout, heartbeat_timeout_timer_id); has_timeout_timer = true; } return rc; @@ -1062,8 +1068,8 @@ int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_) memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2); remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl); // The remote heartbeat is in 10ths of a second - // so we multiply it by 10 to get the timer interval. - remote_heartbeat_ttl *= 10; + // so we multiply it by 100 to get the timer interval in ms. + remote_heartbeat_ttl *= 100; if(!has_ttl_timer && remote_heartbeat_ttl > 0) { add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 5d661d89..61cc3838 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -219,6 +219,7 @@ namespace zmq bool has_ttl_timer; bool has_timeout_timer; bool has_heartbeat_timer; + int heartbeat_timeout; // Socket zmq::socket_base_t *socket; diff --git a/tests/test_heartbeats.cpp b/tests/test_heartbeats.cpp index 0264a8c1..c5aa060b 100644 --- a/tests/test_heartbeats.cpp +++ b/tests/test_heartbeats.cpp @@ -131,10 +131,6 @@ prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_ value = 50; rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value)); assert (rc == 0); - - value = 50; - rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_TIMEOUT, &value, sizeof(value)); - assert (rc == 0); } if(is_curve) @@ -216,59 +212,49 @@ test_heartbeat_timeout (void) static void test_heartbeat_ttl (void) { - int rc; + int rc, value; // Set up our context and sockets void *ctx = zmq_ctx_new (); assert (ctx); - void * server, * server_mon; + void * server, * server_mon, *client; prep_server_socket(ctx, 0, 0, &server, &server_mon); - struct sockaddr_in ip4addr; - int s; + client = zmq_socket(ctx, ZMQ_DEALER); + assert(client != NULL); - ip4addr.sin_family = AF_INET; - ip4addr.sin_port = htons(5556); - inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr); + // Set the heartbeat TTL to 0.1 seconds + value = 100; + zmq_setsockopt(client, ZMQ_HEARTBEAT_TTL, &value, sizeof(value)); - s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr); - assert (rc > -1); + // Set the heartbeat interval to much longer than the TTL so that + // the socket times out oon the remote side. + value = 250; + zmq_setsockopt(client, ZMQ_HEARTBEAT_IVL, &value, sizeof(value)); - // Mock a ZMTP 3 client so we can forcibly time out a connection - mock_handshake(s); + rc = zmq_connect(client, "tcp://localhost:5556"); + assert(rc == 0); // By now everything should report as connected rc = get_monitor_event(server_mon); assert(rc == ZMQ_EVENT_ACCEPTED); - // This is a ping message with a 0.5 second TTL. - uint8_t ping_message[] = { - 0x4, // This specifies that this is a command message - 0x7, // The total payload length is 8 bytes - 0x4, 'P', 'I', 'N', 'G', // The command name - 0, 10 // This is a network-order 16-bit TTL value - }; - rc = send(s, (const char*)ping_message, sizeof(ping_message), 0); - assert(rc == sizeof(ping_message)); - - uint8_t pong_buffer[8] = { 0 }; - rc = recv(s, (char*)pong_buffer, 7, 0); - assert(rc == 7 && memcmp(pong_buffer, "\4\5\4PONG", 7) == 0); + msleep(100); // We should have been disconnected rc = get_monitor_event(server_mon); assert(rc == ZMQ_EVENT_DISCONNECTED); - close(s); - rc = zmq_close (server); assert (rc == 0); rc = zmq_close (server_mon); assert (rc == 0); + rc = zmq_close (client); + assert (rc == 0); + rc = zmq_ctx_term (ctx); assert (rc == 0); }