libzmq/tests/test_heartbeats.cpp
Jonathan Reams e9a5bc8d1e Fix units and default values for heartbeats options
Set the ZMQ_HEARTBEAT_TIMEOUT to default to the value of
ZMQ_HEARTBEAT_IVL if it's not explicitly set.
Change the units of ZMQ_HEARTBEAT_TTL to milliseconds in the API
and round down to the nearest decisecond so that all the options
are using the same units.
Make the maximum heartbeat TTL match the spec (6553 seconds)
2015-06-26 14:25:58 -04:00

316 lines
8.8 KiB
C++

/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#if defined (ZMQ_HAVE_WINDOWS)
# include <winsock2.h>
# include <ws2tcpip.h>
# include <stdexcept>
# define close closesocket
#else
# include <sys/socket.h>
# include <netinet/in.h>
# include <arpa/inet.h>
# include <unistd.h>
#endif
// Read one event off the monitor socket; return value and address
// by reference, if not null, and event number by value. Returns -1
// in case of error.
static int
get_monitor_event (void *monitor)
{
for(int i = 0; i < 2; i++) {
// First frame in message contains event number and value
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) {
msleep(150);
continue; // Interruped, presumably
}
assert (zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
// Second frame in message contains event address
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1) {
return -1; // Interruped, presumably
}
assert (!zmq_msg_more (&msg));
return event;
}
return -1;
}
static void
mock_handshake (int fd) {
const uint8_t zmtp_greeting[33] = { 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0 };
char buffer[128];
memset(buffer, 0, sizeof(buffer));
memcpy(buffer, zmtp_greeting, sizeof(zmtp_greeting));
int rc = send(fd, buffer, 64, 0);
assert(rc == 64);
rc = recv(fd, buffer, 64, 0);
assert(rc == 64);
const uint8_t zmtp_ready[43] = {
4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p', 'e',
0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y',
0, 0, 0, 0
};
memset(buffer, 0, sizeof(buffer));
memcpy(buffer, zmtp_ready, 43);
rc = send(fd, buffer, 43, 0);
assert(rc == 43);
rc = recv(fd, buffer, 43, 0);
assert(rc == 43);
}
static void
setup_curve(void * socket, int is_server) {
const char *secret_key;
const char *public_key;
const char *server_key;
if(is_server) {
secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6";
public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
server_key = NULL;
}
else {
secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs";
public_key = "Yne@$w-vo<fVvi]a<NY6T1ed:M$fCG*[IaLV{hID";
server_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7";
}
zmq_setsockopt(socket, ZMQ_CURVE_SECRETKEY, secret_key, strlen(secret_key));
zmq_setsockopt(socket, ZMQ_CURVE_PUBLICKEY, public_key, strlen(public_key));
if(is_server)
zmq_setsockopt(socket, ZMQ_CURVE_SERVER, &is_server, sizeof(is_server));
else
zmq_setsockopt(socket, ZMQ_CURVE_SERVERKEY, server_key, strlen(server_key));
}
static void
prep_server_socket(void * ctx, int set_heartbeats, int is_curve, void ** server_out, void ** mon_out)
{
int rc;
// We'll be using this socket in raw mode
void *server = zmq_socket (ctx, ZMQ_ROUTER);
assert (server);
int value = 0;
rc = zmq_setsockopt (server, ZMQ_LINGER, &value, sizeof (value));
assert (rc == 0);
if(set_heartbeats) {
value = 50;
rc = zmq_setsockopt (server, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
assert (rc == 0);
}
if(is_curve)
setup_curve(server, 1);
rc = zmq_bind (server, "tcp://127.0.0.1:5556");
assert (rc == 0);
// Create and connect a socket for collecting monitor events on dealer
void *server_mon = zmq_socket (ctx, ZMQ_PAIR);
assert (server_mon);
rc = zmq_socket_monitor (server, "inproc://monitor-dealer",
ZMQ_EVENT_CONNECTED | ZMQ_EVENT_DISCONNECTED | ZMQ_EVENT_ACCEPTED);
assert (rc == 0);
// Connect to the inproc endpoint so we'll get events
rc = zmq_connect (server_mon, "inproc://monitor-dealer");
assert (rc == 0);
*server_out = server;
*mon_out = server_mon;
}
// This checks for a broken TCP connection (or, in this case a stuck one
// where the peer never responds to PINGS). There should be an accepted event
// then a disconnect event.
static void
test_heartbeat_timeout (void)
{
int rc;
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void * server, * server_mon;
prep_server_socket(ctx, 1, 0, &server, &server_mon);
struct sockaddr_in ip4addr;
int s;
ip4addr.sin_family = AF_INET;
ip4addr.sin_port = htons(5556);
inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr);
s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr);
assert (rc > -1);
// Mock a ZMTP 3 client so we can forcibly time out a connection
mock_handshake(s);
// By now everything should report as connected
rc = get_monitor_event(server_mon);
assert(rc == ZMQ_EVENT_ACCEPTED);
// We should have been disconnected
rc = get_monitor_event(server_mon);
assert(rc == ZMQ_EVENT_DISCONNECTED);
close(s);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (server_mon);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
// This checks that peers respect the TTL value in ping messages
// We set up a mock ZMTP 3 client and send a ping message with a TLL
// to a server that is not doing any heartbeating. Then we sleep,
// if the server disconnects the client, then we know the TTL did
// its thing correctly.
static void
test_heartbeat_ttl (void)
{
int rc, value;
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void * server, * server_mon, *client;
prep_server_socket(ctx, 0, 0, &server, &server_mon);
client = zmq_socket(ctx, ZMQ_DEALER);
assert(client != NULL);
// Set the heartbeat TTL to 0.1 seconds
value = 100;
zmq_setsockopt(client, ZMQ_HEARTBEAT_TTL, &value, sizeof(value));
// Set the heartbeat interval to much longer than the TTL so that
// the socket times out oon the remote side.
value = 250;
zmq_setsockopt(client, ZMQ_HEARTBEAT_IVL, &value, sizeof(value));
rc = zmq_connect(client, "tcp://localhost:5556");
assert(rc == 0);
// By now everything should report as connected
rc = get_monitor_event(server_mon);
assert(rc == ZMQ_EVENT_ACCEPTED);
msleep(100);
// We should have been disconnected
rc = get_monitor_event(server_mon);
assert(rc == ZMQ_EVENT_DISCONNECTED);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (server_mon);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
// This checks for normal operation - that is pings and pongs being
// exchanged normally. There should be an accepted event on the server,
// and then no event afterwards.
static void
test_heartbeat_notimeout (int is_curve)
{
int rc;
// Set up our context and sockets
void *ctx = zmq_ctx_new ();
assert (ctx);
void * server, * server_mon;
prep_server_socket(ctx, 1, is_curve, &server, &server_mon);
void * client = zmq_socket(ctx, ZMQ_DEALER);
if(is_curve)
setup_curve(client, 0);
rc = zmq_connect(client, "tcp://127.0.0.1:5556");
// Give it a sec to connect and handshake
msleep(100);
// By now everything should report as connected
rc = get_monitor_event(server_mon);
assert(rc == ZMQ_EVENT_ACCEPTED);
// We should still be connected because pings and pongs are happenin'
rc = get_monitor_event(server_mon);
assert(rc == -1);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (server_mon);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
int main (void)
{
setup_test_environment();
test_heartbeat_timeout();
test_heartbeat_ttl();
// Run this test without curve
test_heartbeat_notimeout(0);
// Then rerun it with curve
test_heartbeat_notimeout(1);
}