From 97935c582ee3cadaa6c2f7848941591f5af6bc7e Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Thu, 10 Apr 2014 11:55:40 +0200 Subject: [PATCH 1/2] Added link to zmq_msg_gets --- doc/zmq.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/zmq.txt b/doc/zmq.txt index 2f63edb5..7dd17c14 100644 --- a/doc/zmq.txt +++ b/doc/zmq.txt @@ -102,6 +102,7 @@ Access message content:: linkzmq:zmq_msg_more[3] Work with message properties:: + linkzmq:zmq_msg_gets[3] linkzmq:zmq_msg_get[3] linkzmq:zmq_msg_set[3] From 9753de8566d335703c96160aa4e5f9c6e55208a9 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Mon, 28 Apr 2014 11:30:04 +0200 Subject: [PATCH 2/2] Problem: zmq_socket_monitor code is dirty Specifically: * zmq_event_t should not be used internally in libzmq, it was meant to be an outward facing structure. * In 4.x, zmq_event_t does not correspond to monitor events, so I removed the structure entirely. * man page for zmq_socket_monitor is incomplete and the example code was particularly nasty. * test_monitor.cpp needed rewriting, it was not clean. --- doc/zmq_socket_monitor.txt | 318 +++++++++++++++++-------------------- include/zmq.h | 40 ++--- src/socket_base.cpp | 128 +++++---------- src/socket_base.hpp | 2 +- tests/test_monitor.cpp | 302 +++++++++-------------------------- tests/testutil.hpp | 2 +- 6 files changed, 282 insertions(+), 510 deletions(-) diff --git a/doc/zmq_socket_monitor.txt b/doc/zmq_socket_monitor.txt index 90cc75d0..ec6028ef 100644 --- a/doc/zmq_socket_monitor.txt +++ b/doc/zmq_socket_monitor.txt @@ -1,115 +1,100 @@ -zmq_ctx_socket_monitor(3) -========================= +zmq_socket_monitor(3) +===================== NAME ---- -zmq_socket_monitor - register a monitoring callback +zmq_socket_monitor - monitor socket events SYNOPSIS -------- -*int zmq_socket_monitor (void '*socket', char * '*addr', int 'events');* +*int zmq_socket_monitor (void '*socket', char '*endpoint', int 'events');* DESCRIPTION ----------- -The _zmq_socket_monitor()_ function shall spawn a 'PAIR' socket that publishes -socket state changes (events) over the inproc:// transport to a given endpoint. +The _zmq_socket_monitor()_ method lets an application thread track +socket events (like connects) on a ZeroMQ socket. Each call to this +method creates a 'ZMQ_PAIR' socket and binds that to the specified +inproc:// 'endpoint'. To collect the socket events, you must create +your own 'ZMQ_PAIR' socket, and connect that to the endpoint. -Messages consist of 2 Frames, the first containing the event-id and the -associated value. The second frame holds the affected endpoint as string. +The 'events' argument is a bitmask of the socket events you wish to +monitor, see 'Supported events' below. To monitor all events, use the +event value ZMQ_EVENT_ALL. -The layout of the first Frame is: - 16 bit event id - 32 bit event value - -event id and value are in the native byte order (for the machine the -application is running on). There is no padding between the fields. - -The event value has to be interpreted in the context of the event id. -See 'Supported events' below for details. +Each event is sent as two frames. The first frame contains an event +number (16 bits), and an event value (32 bits) that provides additional +data according to the event number. The second frame contains a string +that specifies the affected TCP or IPC endpoint. ---- - -Only connection oriented (tcp and ipc) transports are supported in this initial -implementation. - +The _zmq_socket_monitor()_ method supports only connection-oriented +transports, that is, TCP, IPC, and TIPC. ---- Supported events ---------------- -ZMQ_EVENT_CONNECTED: connection established -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established -to a remote peer. This can happen either synchronous or asynchronous. -Value is the FD of the newly connected socket. +ZMQ_EVENT_CONNECTED +~~~~~~~~~~~~~~~~~~~ +The socket has successfully connected to a remote peer. The event value +is the file descriptor (FD) of the underlying network socket. Warning: +there is no guarantee that the FD is still valid by the time your code +receives this event. +ZMQ_EVENT_CONNECT_DELAYED +~~~~~~~~~~~~~~~~~~~~~~~~~ +A connect request on the socket is pending. The event value is unspecified. -ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection -attempt is delayed and its completion is being polled for. -Value has no meaning. +ZMQ_EVENT_CONNECT_RETRIED +~~~~~~~~~~~~~~~~~~~~~~~~~ +A connect request failed, and is now being retried. The event value is the +reconnect interval in milliseconds. Note that the reconnect interval is +recalculated at each retry. +ZMQ_EVENT_LISTENING +~~~~~~~~~~~~~~~~~~~ +The socket was successfully bound to a network interface. The event value +is the FD of the underlying network socket. Warning: there is no guarantee +that the FD is still valid by the time your code receives this event. -ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt -is being handled by reconnect timer. The reconnect interval's recomputed -for each attempt. -Value is the reconnect interval. +ZMQ_EVENT_BIND_FAILED +~~~~~~~~~~~~~~~~~~~~~ +The socket could not bind to a given interface. The event value is the +errno generated by the system bind call. +ZMQ_EVENT_ACCEPTED +~~~~~~~~~~~~~~~~~~ +The socket has accepted a connection from a remote peer. The event value is +the FD of the underlying network socket. Warning: there is no guarantee that +the FD is still valid by the time your code receives this event. -ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound -to a an interface. -Value is the FD of the newly bound socket. +ZMQ_EVENT_ACCEPT_FAILED +~~~~~~~~~~~~~~~~~~~~~~~ +The socket has rejected a connection from a remote peer. The event value is +the errno generated by the accept call. +ZMQ_EVENT_CLOSED +~~~~~~~~~~~~~~~~ +The socket was closed. The event value is the FD of the (now closed) network +socket. -ZMQ_EVENT_BIND_FAILED: socket could not bind to an address -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to -a given interface. -Value is the errno generated by the bind call. +ZMQ_EVENT_CLOSE_FAILED +~~~~~~~~~~~~~~~~~~~~~~ +The socket close failed. The event value is the errno returned by the system +call. Note that this event occurs only on IPC transports. +ZMQ_EVENT_DISCONNECTED +~~~~~~~~~~~~~~~~~~~~~~ +The socket was disconnected unexpectedly. The event value is the FD of the +underlying network socket. Warning: this socket will be closed. -ZMQ_EVENT_ACCEPTED: connection accepted to bound interface -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer -has been established with a socket's listen address. -Value is the FD of the accepted socket. - - -ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to -a socket's bound address fails. -Value is the errno generated by accept. - - -ZMQ_EVENT_CLOSED: connection closed -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor -has been closed. -Value is the former FD of the for the closed socket. FD has been closed already! - - -ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be -released back to the OS. Implementation note: ONLY FOR IPC SOCKETS. -Value is the errno generated by unlink. - - -ZMQ_EVENT_DISCONNECTED: broken session -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc -specific) detects a corrupted / broken session. -Value is the FD of the socket. +ZMQ_EVENT_MONITOR_STOPPED +~~~~~~~~~~~~~~~~~~~~~~~~~ +Monitoring on this socket ended. RETURN VALUE @@ -133,115 +118,110 @@ The endpoint supplied is invalid. EXAMPLE ------- -.Observing a 'REP' socket's connection state +.Monitoring client and server sockets ---- -#include -#include -#include -#include -#include +// Read one event off the monitor socket; return value and address +// by reference, if not null, and event number by value. Returns -1 +// in case of error. -static int read_msg(void* s, zmq_event_t* event, char* ep) +static int +get_monitor_event (void *monitor, int *value, char **address) { - int rc ; - zmq_msg_t msg1; // binary part - zmq_msg_init (&msg1); - zmq_msg_t msg2; // address part - zmq_msg_init (&msg2); - rc = zmq_msg_recv (&msg1, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - return 1 ; - assert (rc != -1); - assert (zmq_msg_more(&msg1) != 0); - rc = zmq_msg_recv (&msg2, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - return 1; - assert (rc != -1); - assert (zmq_msg_more(&msg2) == 0); - // copy binary data to event struct - const char* data = (char*)zmq_msg_data(&msg1); - memcpy(&(event->event), data, sizeof(event->event)); - memcpy(&(event->value), data+sizeof(event->event), sizeof(event->value)); - // copy address part - const size_t len = zmq_msg_size(&msg2) ; - ep = memcpy(ep, zmq_msg_data(&msg2), len); - *(ep + len) = 0 ; - return 0 ; -} + // First frame in message contains event number and value + zmq_msg_t msg; + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; // Interruped, presumably + assert (zmq_msg_more (&msg)); -// REP socket monitor thread -static void *rep_socket_monitor (void *ctx) -{ - zmq_event_t event; - static char addr[1025] ; - int rc; + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + uint16_t event = *(uint16_t *) (data); + if (value) + *value = *(uint32_t *) (data + 2); - printf("starting monitor...\n"); + // Second frame in message contains event address + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; // Interruped, presumably + assert (!zmq_msg_more (&msg)); - void *s = zmq_socket (ctx, ZMQ_PAIR); - assert (s); - - rc = zmq_connect (s, "inproc://monitor.rep"); - assert (rc == 0); - while (!read_msg(s, &event, addr)) { - switch (event.event) { - case ZMQ_EVENT_LISTENING: - printf ("listening socket descriptor %d\n", event.value); - printf ("listening socket address %s\n", addr); - break; - case ZMQ_EVENT_ACCEPTED: - printf ("accepted socket descriptor %d\n", event.value); - printf ("accepted socket address %s\n", addr); - break; - case ZMQ_EVENT_CLOSE_FAILED: - printf ("socket close failure error code %d\n", event.value); - printf ("socket address %s\n", addr); - break; - case ZMQ_EVENT_CLOSED: - printf ("closed socket descriptor %d\n", event.value); - printf ("closed socket address %s\n", addr); - break; - case ZMQ_EVENT_DISCONNECTED: - printf ("disconnected socket descriptor %d\n", event.value); - printf ("disconnected socket address %s\n", addr); - break; - } + if (address) { + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + size_t size = zmq_msg_size (&msg); + *address = (char *) malloc (size + 1); + memcpy (*address, data, size); + *address [size] = 0; } - zmq_close (s); - return NULL; + return event; } - -int main() +int main (void) { - const char* addr = "tcp://127.0.0.1:6666" ; - pthread_t thread ; - - // Create the infrastructure - void *ctx = zmq_init (1); + void *ctx = zmq_ctx_new (); assert (ctx); - // REP socket - void* rep = zmq_socket (ctx, ZMQ_REP); - assert (rep); + // We'll monitor these two sockets + void *client = zmq_socket (ctx, ZMQ_DEALER); + assert (client); + void *server = zmq_socket (ctx, ZMQ_DEALER); + assert (server); - // REP socket monitor, all events - int rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); + // Socket monitoring only works over inproc:// + int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0); + assert (rc == -1); + assert (zmq_errno () == EPROTONOSUPPORT); + + // Monitor all events on client and server sockets + rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL); assert (rc == 0); - rc = pthread_create (&thread, NULL, rep_socket_monitor, ctx); + rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL); assert (rc == 0); - rc = zmq_bind (rep, addr); + // Create two sockets for collecting monitor events + void *client_mon = zmq_socket (ctx, ZMQ_PAIR); + assert (client_mon); + void *server_mon = zmq_socket (ctx, ZMQ_PAIR); + assert (server_mon); + + // Connect these to the inproc endpoints so they'll get events + rc = zmq_connect (client_mon, "inproc://monitor-client"); + assert (rc == 0); + rc = zmq_connect (server_mon, "inproc://monitor-server"); assert (rc == 0); - // Allow some time for event detection - zmq_sleep (1); - - // Close the REP socket - rc = zmq_close (rep); + // Now do a basic ping test + rc = zmq_bind (server, "tcp://127.0.0.1:9998"); assert (rc == 0); + rc = zmq_connect (client, "tcp://127.0.0.1:9998"); + assert (rc == 0); + bounce (client, server); - zmq_term (ctx); + // Close client and server + close_zero_linger (client); + close_zero_linger (server); + + // Now collect and check events from both sockets + int event = get_monitor_event (client_mon, NULL, NULL); + if (event == ZMQ_EVENT_CONNECT_DELAYED) + event = get_monitor_event (client_mon, NULL, NULL); + assert (event == ZMQ_EVENT_CONNECTED); + event = get_monitor_event (client_mon, NULL, NULL); + assert (event == ZMQ_EVENT_MONITOR_STOPPED); + + // This is the flow of server events + event = get_monitor_event (server_mon, NULL, NULL); + assert (event == ZMQ_EVENT_LISTENING); + event = get_monitor_event (server_mon, NULL, NULL); + assert (event == ZMQ_EVENT_ACCEPTED); + event = get_monitor_event (server_mon, NULL, NULL); + assert (event == ZMQ_EVENT_CLOSED); + event = get_monitor_event (server_mon, NULL, NULL); + assert (event == ZMQ_EVENT_MONITOR_STOPPED); + + // Close down the sockets + close_zero_linger (client_mon); + close_zero_linger (server_mon); + zmq_ctx_term (ctx); return 0 ; } diff --git a/include/zmq.h b/include/zmq.h index f93b7a7c..a6465d9c 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -326,34 +326,20 @@ ZMQ_EXPORT char *zmq_msg_gets (zmq_msg_t *msg, char *property); /* 0MQ socket events and monitoring */ /******************************************************************************/ -/* Socket transport events (tcp and ipc only) */ -#define ZMQ_EVENT_CONNECTED 1 -#define ZMQ_EVENT_CONNECT_DELAYED 2 -#define ZMQ_EVENT_CONNECT_RETRIED 4 +/* Socket transport events (TCP and IPC only) */ -#define ZMQ_EVENT_LISTENING 8 -#define ZMQ_EVENT_BIND_FAILED 16 - -#define ZMQ_EVENT_ACCEPTED 32 -#define ZMQ_EVENT_ACCEPT_FAILED 64 - -#define ZMQ_EVENT_CLOSED 128 -#define ZMQ_EVENT_CLOSE_FAILED 256 -#define ZMQ_EVENT_DISCONNECTED 512 -#define ZMQ_EVENT_MONITOR_STOPPED 1024 - -#define ZMQ_EVENT_ALL ( ZMQ_EVENT_CONNECTED | ZMQ_EVENT_CONNECT_DELAYED | \ - ZMQ_EVENT_CONNECT_RETRIED | ZMQ_EVENT_LISTENING | \ - ZMQ_EVENT_BIND_FAILED | ZMQ_EVENT_ACCEPTED | \ - ZMQ_EVENT_ACCEPT_FAILED | ZMQ_EVENT_CLOSED | \ - ZMQ_EVENT_CLOSE_FAILED | ZMQ_EVENT_DISCONNECTED | \ - ZMQ_EVENT_MONITOR_STOPPED) - -/* Socket event data */ -typedef struct { - uint16_t event; // id of the event as bitfield - int32_t value ; // value is either error code, fd or reconnect interval -} zmq_event_t; +#define ZMQ_EVENT_CONNECTED 0x0001 +#define ZMQ_EVENT_CONNECT_DELAYED 0x0002 +#define ZMQ_EVENT_CONNECT_RETRIED 0x0004 +#define ZMQ_EVENT_LISTENING 0x0008 +#define ZMQ_EVENT_BIND_FAILED 0x0010 +#define ZMQ_EVENT_ACCEPTED 0x0020 +#define ZMQ_EVENT_ACCEPT_FAILED 0x0040 +#define ZMQ_EVENT_CLOSED 0x0080 +#define ZMQ_EVENT_CLOSE_FAILED 0x0100 +#define ZMQ_EVENT_DISCONNECTED 0x0200 +#define ZMQ_EVENT_MONITOR_STOPPED 0x0400 +#define ZMQ_EVENT_ALL 0xFFFF ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT int zmq_close (void *s); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index d467ceb8..06b3a6b8 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1169,22 +1169,19 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) int zmq::socket_base_t::monitor (const char *addr_, int events_) { - int rc; if (unlikely (ctx_terminated)) { errno = ETERM; return -1; } - - // Support deregistering monitoring endpoints as well + // Support deregistering monitoring endpoints as well if (addr_ == NULL) { stop_monitor (); return 0; } - // Parse addr_ string. std::string protocol; std::string address; - rc = parse_uri (addr_, protocol, address); + int rc = parse_uri (addr_, protocol, address); if (rc != 0) return -1; @@ -1192,25 +1189,24 @@ int zmq::socket_base_t::monitor (const char *addr_, int events_) if (rc != 0) return -1; - // Event notification only supported over inproc:// + // Event notification only supported over inproc:// if (protocol != "inproc") { errno = EPROTONOSUPPORT; return -1; } - - // Register events to monitor + // Register events to monitor monitor_events = events_; monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR); if (monitor_socket == NULL) return -1; - // Never block context termination on pending event messages + // Never block context termination on pending event messages int linger = 0; rc = zmq_setsockopt (monitor_socket, ZMQ_LINGER, &linger, sizeof (linger)); if (rc == -1) stop_monitor (); - // Spawn the monitor socket endpoint + // Spawn the monitor socket endpoint rc = zmq_bind (monitor_socket, addr_); if (rc == -1) stop_monitor (); @@ -1229,134 +1225,88 @@ zmq::fd_t zmq::socket_base_t::fd() void zmq::socket_base_t::event_connected (std::string &addr_, int fd_) { - if (monitor_events & ZMQ_EVENT_CONNECTED) { - zmq_event_t event; - event.event = ZMQ_EVENT_CONNECTED; - event.value = fd_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_CONNECTED) + monitor_event (ZMQ_EVENT_CONNECTED, fd_, addr_); } void zmq::socket_base_t::event_connect_delayed (std::string &addr_, int err_) { - if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) { - zmq_event_t event; - event.event = ZMQ_EVENT_CONNECT_DELAYED; - event.value = err_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_CONNECT_DELAYED) + monitor_event (ZMQ_EVENT_CONNECT_DELAYED, err_, addr_); } void zmq::socket_base_t::event_connect_retried (std::string &addr_, int interval_) { - if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) { - zmq_event_t event; - event.event = ZMQ_EVENT_CONNECT_RETRIED; - event.value = interval_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_CONNECT_RETRIED) + monitor_event (ZMQ_EVENT_CONNECT_RETRIED, interval_, addr_); } void zmq::socket_base_t::event_listening (std::string &addr_, int fd_) { - if (monitor_events & ZMQ_EVENT_LISTENING) { - zmq_event_t event; - event.event = ZMQ_EVENT_LISTENING; - event.value = fd_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_LISTENING) + monitor_event (ZMQ_EVENT_LISTENING, fd_, addr_); } void zmq::socket_base_t::event_bind_failed (std::string &addr_, int err_) { - if (monitor_events & ZMQ_EVENT_BIND_FAILED) { - zmq_event_t event; - event.event = ZMQ_EVENT_BIND_FAILED; - event.value = err_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_BIND_FAILED) + monitor_event (ZMQ_EVENT_BIND_FAILED, err_, addr_); } void zmq::socket_base_t::event_accepted (std::string &addr_, int fd_) { - if (monitor_events & ZMQ_EVENT_ACCEPTED) { - zmq_event_t event; - event.event = ZMQ_EVENT_ACCEPTED; - event.value = fd_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_ACCEPTED) + monitor_event (ZMQ_EVENT_ACCEPTED, fd_, addr_); } void zmq::socket_base_t::event_accept_failed (std::string &addr_, int err_) { - if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) { - zmq_event_t event; - event.event = ZMQ_EVENT_ACCEPT_FAILED; - event.value= err_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_ACCEPT_FAILED) + monitor_event (ZMQ_EVENT_ACCEPT_FAILED, err_, addr_); } void zmq::socket_base_t::event_closed (std::string &addr_, int fd_) { - if (monitor_events & ZMQ_EVENT_CLOSED) { - zmq_event_t event; - event.event = ZMQ_EVENT_CLOSED; - event.value = fd_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_CLOSED) + monitor_event (ZMQ_EVENT_CLOSED, fd_, addr_); } void zmq::socket_base_t::event_close_failed (std::string &addr_, int err_) { - if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) { - zmq_event_t event; - event.event = ZMQ_EVENT_CLOSE_FAILED; - event.value = err_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_CLOSE_FAILED) + monitor_event (ZMQ_EVENT_CLOSE_FAILED, err_, addr_); } void zmq::socket_base_t::event_disconnected (std::string &addr_, int fd_) { - if (monitor_events & ZMQ_EVENT_DISCONNECTED) { - zmq_event_t event; - event.event = ZMQ_EVENT_DISCONNECTED; - event.value = fd_; - monitor_event (event, addr_); - } + if (monitor_events & ZMQ_EVENT_DISCONNECTED) + monitor_event (ZMQ_EVENT_DISCONNECTED, fd_, addr_); } -void zmq::socket_base_t::monitor_event (zmq_event_t event_, const std::string& addr_) +// Send a monitor event +void zmq::socket_base_t::monitor_event (int event_, int value_, const std::string &addr_) { if (monitor_socket) { - const uint16_t eid = (uint16_t)event_.event; - const uint32_t value = (uint32_t)event_.value; - // prepare and send first message frame - // containing event id and value + // Send event in first frame zmq_msg_t msg; - zmq_msg_init_size (&msg, sizeof(eid) + sizeof(value)); - char* data1 = (char*)zmq_msg_data(&msg); - memcpy (data1, &eid, sizeof(eid)); - memcpy (data1+sizeof(eid), &value, sizeof(value)); + zmq_msg_init_size (&msg, 6); + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + *(uint16_t *) (data + 0) = (uint16_t) event_; + *(uint32_t *) (data + 2) = (uint32_t) value_; zmq_sendmsg (monitor_socket, &msg, ZMQ_SNDMORE); - // prepare and send second message frame - // containing the address (endpoint) + + // Send address in second frame zmq_msg_init_size (&msg, addr_.size()); - memcpy(zmq_msg_data(&msg), addr_.c_str(), addr_.size()); + memcpy (zmq_msg_data (&msg), addr_.c_str (), addr_.size ()); zmq_sendmsg (monitor_socket, &msg, 0); } } -void zmq::socket_base_t::stop_monitor() +void zmq::socket_base_t::stop_monitor (void) { if (monitor_socket) { - if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED) { - zmq_event_t event; - event.event = ZMQ_EVENT_MONITOR_STOPPED; - event.value = 0; - monitor_event (event, ""); - } + if (monitor_events & ZMQ_EVENT_MONITOR_STOPPED) + monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, ""); zmq_close (monitor_socket); monitor_socket = NULL; monitor_events = 0; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index efb0cced..51cf2074 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -160,7 +160,7 @@ namespace zmq void process_destroy (); // Socket event data dispath - void monitor_event (zmq_event_t data_, const std::string& addr_); + void monitor_event (int event_, int value_, const std::string& addr_); // Monitor socket cleanup void stop_monitor (); diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 6a16a8ed..90b314b5 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -19,254 +19,110 @@ #include "testutil.hpp" -// REQ socket events handled -static int req_socket_events; -// 2nd REQ socket events handled -static int req2_socket_events; -// REP socket events handled -static int rep_socket_events; +// Read one event off the monitor socket; return value and address +// by reference, if not null, and event number by value. Returns -1 +// in case of error. -std::string addr ; - -static bool read_msg(void* s, zmq_event_t& event, std::string& ep) +static int +get_monitor_event (void *monitor, int *value, char **address) { - int rc ; - zmq_msg_t msg1; // binary part - zmq_msg_init (&msg1); - zmq_msg_t msg2; // address part - zmq_msg_init (&msg2); - rc = zmq_msg_recv (&msg1, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - return true ; + // First frame in message contains event number and value + zmq_msg_t msg; + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; // Interruped, presumably + assert (zmq_msg_more (&msg)); + + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + uint16_t event = *(uint16_t *) (data); + if (value) + *value = *(uint32_t *) (data + 2); - assert (rc != -1); - assert (zmq_msg_more(&msg1) != 0); - rc = zmq_msg_recv (&msg2, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - return true; - - assert (rc != -1); - assert (zmq_msg_more(&msg2) == 0); - // copy binary data to event struct - const char* data = (char*)zmq_msg_data(&msg1); - memcpy(&event.event, data, sizeof(event.event)); - memcpy(&event.value, data+sizeof(event.event), sizeof(event.value)); - // copy address part - ep = std::string((char*)zmq_msg_data(&msg2), zmq_msg_size(&msg2)); - - if (event.event == ZMQ_EVENT_MONITOR_STOPPED) - return true; - - return false; -} - - -// REQ socket monitor thread -static void req_socket_monitor (void *ctx) -{ - zmq_event_t event; - std::string ep ; - int rc; - - void *s = zmq_socket (ctx, ZMQ_PAIR); - assert (s); - - rc = zmq_connect (s, "inproc://monitor.req"); - assert (rc == 0); - while (!read_msg(s, event, ep)) { - assert (ep == addr); - switch (event.event) { - case ZMQ_EVENT_CONNECTED: - assert (event.value > 0); - req_socket_events |= ZMQ_EVENT_CONNECTED; - req2_socket_events |= ZMQ_EVENT_CONNECTED; - break; - case ZMQ_EVENT_CONNECT_DELAYED: - assert (event.value != 0); - req_socket_events |= ZMQ_EVENT_CONNECT_DELAYED; - break; - case ZMQ_EVENT_CLOSE_FAILED: - assert (event.value != 0); - req_socket_events |= ZMQ_EVENT_CLOSE_FAILED; - break; - case ZMQ_EVENT_CLOSED: - assert (event.value != 0); - req_socket_events |= ZMQ_EVENT_CLOSED; - break; - case ZMQ_EVENT_DISCONNECTED: - assert (event.value != 0); - req_socket_events |= ZMQ_EVENT_DISCONNECTED; - break; - } + // Second frame in message contains event address + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; // Interruped, presumably + assert (!zmq_msg_more (&msg)); + + if (address) { + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + size_t size = zmq_msg_size (&msg); + *address = (char *) malloc (size + 1); + memcpy (*address, data, size); + *address [size] = 0; } - zmq_close (s); -} - -// 2nd REQ socket monitor thread -static void req2_socket_monitor (void *ctx) -{ - zmq_event_t event; - std::string ep ; - int rc; - - void *s = zmq_socket (ctx, ZMQ_PAIR); - assert (s); - - rc = zmq_connect (s, "inproc://monitor.req2"); - assert (rc == 0); - while (!read_msg(s, event, ep)) { - assert (ep == addr); - switch (event.event) { - case ZMQ_EVENT_CONNECTED: - assert (event.value > 0); - req2_socket_events |= ZMQ_EVENT_CONNECTED; - break; - case ZMQ_EVENT_CLOSED: - assert (event.value != 0); - req2_socket_events |= ZMQ_EVENT_CLOSED; - break; - } - } - zmq_close (s); -} - -// REP socket monitor thread -static void rep_socket_monitor (void *ctx) -{ - zmq_event_t event; - std::string ep ; - int rc; - - void *s = zmq_socket (ctx, ZMQ_PAIR); - assert (s); - - rc = zmq_connect (s, "inproc://monitor.rep"); - assert (rc == 0); - while (!read_msg(s, event, ep)) { - assert (ep == addr); - switch (event.event) { - case ZMQ_EVENT_LISTENING: - assert (event.value > 0); - rep_socket_events |= ZMQ_EVENT_LISTENING; - break; - case ZMQ_EVENT_ACCEPTED: - assert (event.value > 0); - rep_socket_events |= ZMQ_EVENT_ACCEPTED; - break; - case ZMQ_EVENT_CLOSE_FAILED: - assert (event.value != 0); - rep_socket_events |= ZMQ_EVENT_CLOSE_FAILED; - break; - case ZMQ_EVENT_CLOSED: - assert (event.value != 0); - rep_socket_events |= ZMQ_EVENT_CLOSED; - break; - case ZMQ_EVENT_DISCONNECTED: - assert (event.value != 0); - rep_socket_events |= ZMQ_EVENT_DISCONNECTED; - break; - } - } - zmq_close (s); + return event; } int main (void) { setup_test_environment(); - int rc; - void *req; - void *req2; - void *rep; - void* threads [3]; - addr = "tcp://127.0.0.1:5560"; - - // Create the infrastructure void *ctx = zmq_ctx_new (); assert (ctx); + + // We'll monitor these two sockets + void *client = zmq_socket (ctx, ZMQ_DEALER); + assert (client); + void *server = zmq_socket (ctx, ZMQ_DEALER); + assert (server); - // REP socket - rep = zmq_socket (ctx, ZMQ_REP); - assert (rep); - - // Assert supported protocols - rc = zmq_socket_monitor (rep, addr.c_str(), 0); + // Socket monitoring only works over inproc:// + int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0); assert (rc == -1); - assert (zmq_errno() == EPROTONOSUPPORT); + assert (zmq_errno () == EPROTONOSUPPORT); - // Deregister monitor - rc = zmq_socket_monitor (rep, NULL, 0); + // Monitor all events on client and server sockets + rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL); + assert (rc == 0); + rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL); assert (rc == 0); - // REP socket monitor, all events - rc = zmq_socket_monitor (rep, "inproc://monitor.rep", ZMQ_EVENT_ALL); + // Create two sockets for collecting monitor events + void *client_mon = zmq_socket (ctx, ZMQ_PAIR); + assert (client_mon); + void *server_mon = zmq_socket (ctx, ZMQ_PAIR); + assert (server_mon); + + // Connect these to the inproc endpoints so they'll get events + rc = zmq_connect (client_mon, "inproc://monitor-client"); + assert (rc == 0); + rc = zmq_connect (server_mon, "inproc://monitor-server"); assert (rc == 0); - threads [0] = zmq_threadstart(&rep_socket_monitor, ctx); - // REQ socket - req = zmq_socket (ctx, ZMQ_REQ); - assert (req); - - // REQ socket monitor, all events - rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); + // Now do a basic ping test + rc = zmq_bind (server, "tcp://127.0.0.1:9998"); assert (rc == 0); - threads [1] = zmq_threadstart(&req_socket_monitor, ctx); - msleep (SETTLE_TIME); - - // Bind REQ and REP - rc = zmq_bind (rep, addr.c_str()); + rc = zmq_connect (client, "tcp://127.0.0.1:9998"); assert (rc == 0); + bounce (client, server); - rc = zmq_connect (req, addr.c_str()); - assert (rc == 0); - - bounce (rep, req); + // Close client and server + close_zero_linger (client); + close_zero_linger (server); - // 2nd REQ socket - req2 = zmq_socket (ctx, ZMQ_REQ); - assert (req2); - - // 2nd REQ socket monitor, connected event only - rc = zmq_socket_monitor (req2, "inproc://monitor.req2", ZMQ_EVENT_CONNECTED); - assert (rc == 0); - threads [2] = zmq_threadstart(&req2_socket_monitor, ctx); - - rc = zmq_connect (req2, addr.c_str()); - assert (rc == 0); - - // Close the REP socket - rc = zmq_close (rep); - assert (rc == 0); - - // Allow enough time for detecting error states - msleep (250); - - // Close the REQ socket - rc = zmq_close (req); - assert (rc == 0); - - // Close the 2nd REQ socket - rc = zmq_close (req2); - assert (rc == 0); + // Now collect and check events from both sockets + int event = get_monitor_event (client_mon, NULL, NULL); + if (event == ZMQ_EVENT_CONNECT_DELAYED) + event = get_monitor_event (client_mon, NULL, NULL); + assert (event == ZMQ_EVENT_CONNECTED); + event = get_monitor_event (client_mon, NULL, NULL); + assert (event == ZMQ_EVENT_MONITOR_STOPPED); + // This is the flow of server events + event = get_monitor_event (server_mon, NULL, NULL); + assert (event == ZMQ_EVENT_LISTENING); + event = get_monitor_event (server_mon, NULL, NULL); + assert (event == ZMQ_EVENT_ACCEPTED); + event = get_monitor_event (server_mon, NULL, NULL); + assert (event == ZMQ_EVENT_CLOSED); + event = get_monitor_event (server_mon, NULL, NULL); + assert (event == ZMQ_EVENT_MONITOR_STOPPED); + + // Close down the sockets + close_zero_linger (client_mon); + close_zero_linger (server_mon); zmq_ctx_term (ctx); - // Expected REP socket events - assert (rep_socket_events & ZMQ_EVENT_LISTENING); - assert (rep_socket_events & ZMQ_EVENT_ACCEPTED); - assert (rep_socket_events & ZMQ_EVENT_CLOSED); - - // Expected REQ socket events - assert (req_socket_events & ZMQ_EVENT_CONNECTED); - assert (req_socket_events & ZMQ_EVENT_DISCONNECTED); - assert (req_socket_events & ZMQ_EVENT_CLOSED); - - // Expected 2nd REQ socket events - assert (req2_socket_events & ZMQ_EVENT_CONNECTED); - assert (!(req2_socket_events & ZMQ_EVENT_CLOSED)); - - for (unsigned int i = 0; i < 3; ++i) - zmq_threadclose(threads [i]); - return 0 ; } diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 7c070bfd..7e468673 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -244,7 +244,7 @@ void s_recv_seq (void *socket, ...) } -// Sets a zero linger period on a socket and closes it. +// Sets a zero linger period on a socket and closes it. void close_zero_linger (void *socket) { int linger = 0;