Add ZMTP heartbeats

This commit adds ZMTP connection heartbeats described in
http://rfc.zeromq.org/spec:37/ZMTP.
This commit is contained in:
Jonathan Reams
2015-03-16 21:39:16 -04:00
committed by Jonathan Reams
parent 4b4e00bde0
commit cbb3b176a6
12 changed files with 541 additions and 8 deletions

View File

@@ -95,6 +95,9 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
input_stopped (false),
output_stopped (false),
has_handshake_timer (false),
has_ttl_timer (false),
has_timeout_timer (false),
has_heartbeat_timer (false),
socket (NULL)
{
int rc = tx_msg.init ();
@@ -250,6 +253,20 @@ void zmq::stream_engine_t::unplug ()
has_handshake_timer = false;
}
if (has_ttl_timer) {
cancel_timer (heartbeat_ttl_timer_id);
has_ttl_timer = false;
}
if (has_timeout_timer) {
cancel_timer (heartbeat_timeout_timer_id);
has_timeout_timer = false;
}
if (has_heartbeat_timer) {
cancel_timer (heartbeat_ivl_timer_id);
has_heartbeat_timer = false;
}
// Cancel all fd subscriptions.
if (!io_error)
rm_fd (handle);
@@ -686,6 +703,11 @@ bool zmq::stream_engine_t::handshake ()
}
next_msg = &stream_engine_t::next_handshake_command;
process_msg = &stream_engine_t::process_handshake_command;
if(options.heartbeat_interval > 0) {
add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
has_heartbeat_timer = true;
}
}
// Start polling for output if necessary.
@@ -883,6 +905,23 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_)
if (mechanism->decode (msg_) == -1)
return -1;
if(has_timeout_timer) {
has_timeout_timer = false;
cancel_timer(heartbeat_timeout_timer_id);
}
if(has_ttl_timer) {
has_ttl_timer = false;
cancel_timer(heartbeat_ttl_timer_id);
}
if(msg_->flags() & msg_t::command) {
uint8_t cmd_id = *((uint8_t*)msg_->data());
if(cmd_id == 4)
process_heartbeat_message(msg_);
}
if (metadata)
msg_->set_metadata (metadata);
if (session->push_msg (msg_) == -1) {
@@ -954,9 +993,86 @@ bool zmq::stream_engine_t::init_properties (properties_t & properties) {
void zmq::stream_engine_t::timer_event (int id_)
{
zmq_assert (id_ == handshake_timer_id);
has_handshake_timer = false;
// handshake timer expired before handshake completed, so engine fails
error (timeout_error);
if(id_ == handshake_timer_id) {
has_handshake_timer = false;
// handshake timer expired before handshake completed, so engine fail
error (timeout_error);
}
else if(id_ == heartbeat_ivl_timer_id) {
next_msg = &stream_engine_t::produce_ping_message;
out_event();
add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id);
}
else if(id_ == heartbeat_ttl_timer_id) {
has_ttl_timer = false;
error(timeout_error);
}
else if(id_ == heartbeat_timeout_timer_id) {
has_timeout_timer = false;
error(timeout_error);
}
else
// There are no other valid timer ids!
assert(false);
}
int zmq::stream_engine_t::produce_ping_message(msg_t * msg_)
{
int rc = 0;
zmq_assert (mechanism != NULL);
// 16-bit TTL + \4PING == 7
msg_->init_size(7);
msg_->set_flags(msg_t::command);
// Copy in the command message
memcpy(msg_->data(), "\4PING", 5);
uint16_t ttl_val = htons(options.heartbeat_ttl);
memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val));
rc = mechanism->encode (msg_);
next_msg = &stream_engine_t::pull_and_encode;
if(!has_timeout_timer && options.heartbeat_timeout > 0) {
add_timer(options.heartbeat_timeout, heartbeat_timeout_timer_id);
has_timeout_timer = true;
}
return rc;
}
int zmq::stream_engine_t::produce_pong_message(msg_t * msg_)
{
int rc = 0;
zmq_assert (mechanism != NULL);
msg_->init_size(5);
msg_->set_flags(msg_t::command);
memcpy(msg_->data(), "\4PONG", 5);
rc = mechanism->encode (msg_);
next_msg = &stream_engine_t::pull_and_encode;
return rc;
}
int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_)
{
if(memcmp(msg_->data(), "\4PING", 5) == 0) {
uint16_t remote_heartbeat_ttl;
// Get the remote heartbeat TTL to setup the timer
memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2);
remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl);
// The remote heartbeat is in 10ths of a second
// so we multiply it by 10 to get the timer interval.
remote_heartbeat_ttl *= 10;
if(!has_ttl_timer && remote_heartbeat_ttl > 0) {
add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id);
has_ttl_timer = true;
}
next_msg = &stream_engine_t::produce_pong_message;
out_event();
}
return 0;
}