From feadf6d40f302aa556b6e5b47e2737379bc46f81 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Thu, 31 Jan 2019 17:23:42 +0100 Subject: [PATCH] Problem: cannot monitor state of queues at runtime Solution: add API and ZMQ_EVENT_PIPES_STATS event which generates 2 values, one for the egress and one for the ingress pipes respectively. Refactor the events code to be able to send multiple values. --- doc/zmq_socket_monitor_versioned.txt | 84 +++++++++++---- include/zmq.h | 6 +- src/command.hpp | 20 ++++ src/object.cpp | 56 ++++++++++ src/object.hpp | 16 +++ src/pipe.cpp | 16 +++ src/pipe.hpp | 5 + src/session_base.cpp | 5 + src/socket_base.cpp | 39 +++++++ src/socket_base.hpp | 8 ++ src/zmq.cpp | 8 ++ src/zmq_draft.h | 10 ++ tests/test_monitor.cpp | 149 ++++++++++++++++++++++++++- tests/testutil_monitoring.hpp | 4 +- 14 files changed, 401 insertions(+), 25 deletions(-) diff --git a/doc/zmq_socket_monitor_versioned.txt b/doc/zmq_socket_monitor_versioned.txt index 18d3ed39..72646100 100644 --- a/doc/zmq_socket_monitor_versioned.txt +++ b/doc/zmq_socket_monitor_versioned.txt @@ -12,6 +12,8 @@ SYNOPSIS -------- *int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');* +*int zmq_socket_monitor_pipes_stats (void '*socket');* + DESCRIPTION ----------- @@ -41,18 +43,23 @@ Each event is sent in multiple frames. The first frame contains an event number (64 bits). The number and content of further frames depend on this event number. -Unless it is specified differently, the second frame contains an event -value (64 bits) that provides additional data according to the event number. -Some events might define additional value frames following the second one. -The third and fourth frames contain strings that specifies the affected -connection or endpoint. The third frame contains a string denoting the local -endpoint, while the fourth frame contains a string denoting the remote endpoint. +Unless it is specified differently, the second frame contains the number of +value frames that will follow it as a 64 bits integer. The third frame to N-th +frames contain an event value (64 bits) that provides additional data according +to the event number. Each event type might have a different number of values. +The second-to-last and last frames contain strings that specifies the affected +connection or endpoint. The former frame contains a string denoting the local +endpoint, while the latter frame contains a string denoting the remote endpoint. Either of these may be empty, depending on the event type and whether the connection uses a bound or connected local endpoint. Note that the format of the second and further frames, and also the number of frames, may be different for events added in the future. +The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type +ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket. +NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state. + ---- Monitoring events are only generated by some transports: At the moment these are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call @@ -168,17 +175,35 @@ The ZMTP security mechanism handshake failed due to an authentication failure. The event value is the status code returned by the ZAP handler (i.e. 300, 400 or 500). +---- + +Supported events (v2) +---------------- + +ZMQ_EVENT_PIPE_STATS +~~~~~~~~~~~~~~~~~~~~ +This event provides two values, the number of messages in each of the two +queues associated with the returned endpoint (respectively egress and ingress). +This event only triggers after calling the function +_zmq_socket_monitor_pipes_stats()_. +NOTE: this measurement is asynchronous, so by the time the message is received +the internal state might have already changed. +NOTE: when the monitored socket and the monitor are not used in a poll, the +event might not be delivered until an API has been called on the monitored +socket, like zmq_getsockopt for example (the option is irrelevant). +NOTE: in DRAFT state, not yet available in stable releases. + RETURN VALUE ------------ -The _zmq_socket_monitor()_ function returns a value of 0 or greater if -successful. Otherwise it returns `-1` and sets 'errno' to one of the values -defined below. +The _zmq_socket_monitor()_ and _zmq_socket_monitor_pipes_stats()_ functions +return a value of 0 or greater if successful. Otherwise they return `-1` and +set 'errno' to one of the values defined below. -ERRORS ------- +ERRORS - _zmq_socket_monitor()_ +------------------------------- *ETERM*:: The 0MQ 'context' associated with the specified 'socket' was terminated. @@ -189,11 +214,23 @@ sockets are required to use the inproc:// transport. *EINVAL*:: The monitor 'endpoint' supplied does not exist. + +ERRORS - _zmq_socket_monitor_pipes_stats()_ +------------------------------------------- +*ENOTSOCK*:: +The 'socket' parameter was not a valid 0MQ socket. + +*EINVAL*:: +The socket did not have monitoring enabled. + +*EAGAIN*:: +The monitored socket did not have any connections to monitor yet. + EXAMPLE ------- .Monitoring client and server sockets ---- -// Read one event off the monitor socket; return value and address +// Read one event off the monitor socket; return values and addresses // by reference, if not null, and event number by value. Returns -1 // in case of error. @@ -211,18 +248,29 @@ get_monitor_event (void *monitor, uint64_t *value, char **local_address, char ** memcpy (&event, zmq_msg_data (&msg), sizeof (event)); zmq_msg_close (&msg); - // Second frame to Nth frame (depends on the event) in message - // contains event value + // Second frame in message contains the number of values zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably assert (zmq_msg_more (&msg)); - if (value_) - memcpy (value_, zmq_msg_data (&msg), sizeof *value_); + uint64_t value_count; + memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count)); zmq_msg_close (&msg); - // Third frame in message contains local address + for (uint64_t i = 0; i < value_count; ++i) { + // Subsequent frames in message contain event values + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; // Interrupted, presumably + assert (zmq_msg_more (&msg)); + + if (value_ && value_ + i) + memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_)); + zmq_msg_close (&msg); + } + + // Second-to-last frame in message contains local address zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably @@ -237,7 +285,7 @@ get_monitor_event (void *monitor, uint64_t *value, char **local_address, char ** } zmq_msg_close (&msg); - // Fourth frame in message contains remote address + // Last frame in message contains remote address zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably diff --git a/include/zmq.h b/include/zmq.h index 56cc75b8..2b3a5ec7 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -726,16 +726,20 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket, const void *routing_id, size_t routing_id_size); +/* DRAFT Socket monitoring events */ +#define ZMQ_EVENT_PIPES_STATS 0x10000 + #define ZMQ_CURRENT_EVENT_VERSION 1 #define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2 #define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL -#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 +#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_, const char *addr_, uint64_t events_, int event_version_); +ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s); #endif // ZMQ_BUILD_DRAFT_API diff --git a/src/command.hpp b/src/command.hpp index ed095e1c..8c75005b 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -32,6 +32,7 @@ #include #include "stdint.hpp" +#include "endpoint.hpp" namespace zmq { @@ -73,6 +74,8 @@ __declspec(align (64)) reap, reaped, inproc_connected, + pipe_peer_stats, + pipe_stats_publish, done } type; @@ -186,6 +189,23 @@ __declspec(align (64)) { } reaped; + // Send application-side pipe count and ask to send monitor event + struct + { + uint64_t queue_count; + zmq::own_t *socket_base; + endpoint_uri_pair_t *endpoint_pair; + } pipe_peer_stats; + + // Collate application thread and I/O thread pipe counts and endpoints + // and send as event + struct + { + uint64_t outbound_queue_count; + uint64_t inbound_queue_count; + endpoint_uri_pair_t *endpoint_pair; + } pipe_stats_publish; + // Sent by reaper thread to the term thread when all the sockets // are successfully deallocated. struct diff --git a/src/object.cpp b/src/object.cpp index eb264427..92b74557 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -107,6 +107,19 @@ void zmq::object_t::process_command (command_t &cmd_) process_hiccup (cmd_.args.hiccup.pipe); break; + case command_t::pipe_peer_stats: + process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count, + cmd_.args.pipe_peer_stats.socket_base, + cmd_.args.pipe_peer_stats.endpoint_pair); + break; + + case command_t::pipe_stats_publish: + process_pipe_stats_publish ( + cmd_.args.pipe_stats_publish.outbound_queue_count, + cmd_.args.pipe_stats_publish.inbound_queue_count, + cmd_.args.pipe_stats_publish.endpoint_pair); + break; + case command_t::pipe_term: process_pipe_term (); break; @@ -285,6 +298,35 @@ void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_) send_command (cmd); } +void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_, + uint64_t queue_count_, + own_t *socket_base_, + endpoint_uri_pair_t *endpoint_pair_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::pipe_peer_stats; + cmd.args.pipe_peer_stats.queue_count = queue_count_; + cmd.args.pipe_peer_stats.socket_base = socket_base_; + cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_; + send_command (cmd); +} + +void zmq::object_t::send_pipe_stats_publish ( + own_t *destination_, + uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::pipe_stats_publish; + cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_; + cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_; + cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_; + send_command (cmd); +} + void zmq::object_t::send_pipe_term (pipe_t *destination_) { command_t cmd; @@ -422,6 +464,20 @@ void zmq::object_t::process_hiccup (void *) zmq_assert (false); } +void zmq::object_t::process_pipe_peer_stats (uint64_t, + own_t *, + endpoint_uri_pair_t *) +{ + zmq_assert (false); +} + +void zmq::object_t::process_pipe_stats_publish (uint64_t, + uint64_t, + endpoint_uri_pair_t *) +{ + zmq_assert (false); +} + void zmq::object_t::process_pipe_term () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 10d7bffe..70cc5cd4 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,6 +32,7 @@ #include #include "stdint.hpp" +#include "endpoint.hpp" namespace zmq { @@ -96,6 +97,14 @@ class object_t void send_activate_read (zmq::pipe_t *destination_); void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_); void send_hiccup (zmq::pipe_t *destination_, void *pipe_); + void send_pipe_peer_stats (zmq::pipe_t *destination_, + uint64_t queue_count_, + zmq::own_t *socket_base, + endpoint_uri_pair_t *endpoint_pair_); + void send_pipe_stats_publish (zmq::own_t *destination_, + uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_); void send_pipe_term (zmq::pipe_t *destination_); void send_pipe_term_ack (zmq::pipe_t *destination_); void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_); @@ -117,6 +126,13 @@ class object_t virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); virtual void process_hiccup (void *pipe_); + virtual void process_pipe_peer_stats (uint64_t queue_count_, + zmq::own_t *socket_base_, + endpoint_uri_pair_t *endpoint_pair_); + virtual void + process_pipe_stats_publish (uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); virtual void process_pipe_hwm (int inhwm_, int outhwm_); diff --git a/src/pipe.cpp b/src/pipe.cpp index 855ba0af..bcf1cff5 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -563,3 +563,19 @@ const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const { return _endpoint_pair; } + +void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_) +{ + endpoint_uri_pair_t *ep = + new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair); + send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_, + ep); +} + +void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_, + own_t *socket_base_, + endpoint_uri_pair_t *endpoint_pair_) +{ + send_pipe_stats_publish (socket_base_, queue_count_, + _msgs_written - _peers_msgs_read, endpoint_pair_); +} diff --git a/src/pipe.hpp b/src/pipe.hpp index 76f71758..d7019bc5 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -145,6 +145,8 @@ class pipe_t : public object_t, void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_); const endpoint_uri_pair_t &get_endpoint_pair () const; + void send_stats_to_peer (own_t *socket_base_); + private: // Type of the underlying lock-free pipe. typedef ypipe_base_t upipe_t; @@ -153,6 +155,9 @@ class pipe_t : public object_t, void process_activate_read (); void process_activate_write (uint64_t msgs_read_); void process_hiccup (void *pipe_); + void process_pipe_peer_stats (uint64_t queue_count_, + own_t *socket_base_, + endpoint_uri_pair_t *endpoint_pair_); void process_pipe_term (); void process_pipe_term_ack (); void process_pipe_hwm (int inhwm_, int outhwm_); diff --git a/src/session_base.cpp b/src/session_base.cpp index 2de67eca..2b4a6daa 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -409,6 +409,11 @@ void zmq::session_base_t::process_attach (i_engine *engine_) zmq_assert (!_pipe); _pipe = pipes[0]; + // The endpoints strings are not set on bind, set them here so that + // events can use them. + pipes[0]->set_endpoint_pair (engine_->get_endpoint ()); + pipes[1]->set_endpoint_pair (engine_->get_endpoint ()); + // Ask socket to plug into the remote end of the pipe. send_bind (_socket, pipes[1]); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 27aa23da..84dfc674 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1421,6 +1421,45 @@ void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_) delete endpoint_; } +void zmq::socket_base_t::process_pipe_stats_publish ( + uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_) +{ + uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_}; + event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS); + delete endpoint_pair_; +} + +/* + * There are 2 pipes per connection, and the inbound one _must_ be queried from + * the I/O thread. So ask the outbound pipe, in the application thread, to send + * a message (pipe_peer_stats) to its peer. The message will carry the outbound + * pipe stats and endpoint, and the reference to the socket object. + * The inbound pipe on the I/O thread will then add its own stats and endpoint, + * and write back a message to the socket object (pipe_stats_publish) which + * will raise an event with the data. + */ +int zmq::socket_base_t::query_pipes_stats () +{ + { + scoped_lock_t lock (_monitor_sync); + if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) { + errno = EINVAL; + return -1; + } + } + if (_pipes.size () == 0) { + errno = EAGAIN; + return -1; + } + for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) { + _pipes[i]->send_stats_to_peer (this); + } + + return 0; +} + void zmq::socket_base_t::update_pipe_options (int option_) { if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 38e31e32..ce6c57ed 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -157,6 +157,11 @@ class socket_base_t : public own_t, virtual int get_peer_state (const void *routing_id_, size_t routing_id_size_) const; + // Request for pipes statistics - will generate a ZMQ_EVENT_PIPES_STATS + // after gathering the data asynchronously. Requires event monitoring to + // be enabled. + int query_pipes_stats (); + protected: socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, @@ -278,6 +283,9 @@ class socket_base_t : public own_t, // Handlers for incoming commands. void process_stop (); void process_bind (zmq::pipe_t *pipe_); + void process_pipe_stats_publish (uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_); void process_term (int linger_); void process_term_endpoint (std::string *endpoint_); diff --git a/src/zmq.cpp b/src/zmq.cpp index 5e32dfd0..5d186283 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1452,3 +1452,11 @@ int zmq_has (const char *capability_) // Whatever the application asked for, we don't have return false; } + +int zmq_socket_monitor_pipes_stats (void *s_) +{ + zmq::socket_base_t *s = as_socket_base_t (s_); + if (!s) + return -1; + return s->query_pipes_stats (); +} diff --git a/src/zmq_draft.h b/src/zmq_draft.h index d31f37da..c8934d45 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -123,10 +123,20 @@ int zmq_socket_get_peer_state (void *socket_, const void *routing_id_, size_t routing_id_size_); +/* DRAFT Socket monitoring events */ +#define ZMQ_EVENT_PIPES_STATS 0x10000 + +#define ZMQ_CURRENT_EVENT_VERSION 1 +#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2 + +#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL +#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS + int zmq_socket_monitor_versioned (void *s_, const char *addr_, uint64_t events_, int event_version_); +int zmq_socket_monitor_pipes_stats (void *s_); #endif // ZMQ_BUILD_DRAFT_API diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 5204f513..8cfd4674 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -50,6 +50,13 @@ void test_monitor_invalid_protocol_fails () TEST_ASSERT_FAILURE_ERRNO ( EPROTONOSUPPORT, zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0)); +#ifdef ZMQ_EVENT_PIPES_STATS + // Stats command needs to be called on a valid socket with monitoring + // enabled + TEST_ASSERT_FAILURE_ERRNO (ENOTSOCK, zmq_socket_monitor_pipes_stats (NULL)); + TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_socket_monitor_pipes_stats (client)); +#endif + test_context_socket_close_zero_linger (client); } @@ -121,8 +128,9 @@ void test_monitor_basic () test_context_socket_close_zero_linger (server_mon); } -#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) || \ - (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) +#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ + || (defined ZMQ_CURRENT_EVENT_VERSION \ + && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) void test_monitor_versioned_basic (bind_function_t bind_function_, const char *expected_prefix_) { @@ -242,6 +250,133 @@ void test_monitor_versioned_basic_tipc () static const char prefix[] = "tipc://"; test_monitor_versioned_basic (bind_loopback_tipc, prefix); } + +#ifdef ZMQ_EVENT_PIPES_STATS +void test_monitor_versioned_stats (bind_function_t bind_function_, + const char *expected_prefix_) +{ + char server_endpoint[MAX_SOCKET_STRING]; + const int pulls_count = 4; + void *pulls[pulls_count]; + + // We'll monitor these two sockets + void *push = test_context_socket (ZMQ_PUSH); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned ( + push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2)); + + // Should fail if there are no pipes to monitor + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_socket_monitor_pipes_stats (push)); + + void *push_mon = test_context_socket (ZMQ_PAIR); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (push_mon, "inproc://monitor-push")); + + // Set lower HWM - queues will be filled so we should see it in the stats + int send_hwm = 500; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (push, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm))); + // Set very low TCP buffers so that messages cannot be stored in-flight + const int tcp_buffer_size = 4096; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + push, ZMQ_SNDBUF, &tcp_buffer_size, sizeof (tcp_buffer_size))); + bind_function_ (push, server_endpoint, sizeof (server_endpoint)); + + int ipv6_; + size_t ipv6_size_ = sizeof (ipv6_); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (push, ZMQ_IPV6, &ipv6_, &ipv6_size_)); + for (int i = 0; i < pulls_count; ++i) { + pulls[i] = test_context_socket (ZMQ_PULL); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pulls[i], ZMQ_IPV6, &ipv6_, sizeof (int))); + int timeout_ms = 10; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + pulls[i], ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pulls[i], ZMQ_RCVHWM, &send_hwm, sizeof (send_hwm))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + pulls[i], ZMQ_RCVBUF, &tcp_buffer_size, sizeof (tcp_buffer_size))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[i], server_endpoint)); + } + + // Send until we block + int send_count = 0; + // Saturate the TCP buffers too + char data[tcp_buffer_size * 2]; + memset (data, 0, sizeof (data)); + // Saturate all pipes - send + receive - on all connections + while (send_count < send_hwm * 2 * pulls_count) { + TEST_ASSERT_EQUAL_INT (sizeof (data), + zmq_send (push, data, sizeof (data), 0)); + ++send_count; + } + + // Drain one of the pulls - doesn't matter how many messages, at least one + send_count = send_count / 4; + do { + zmq_recv (pulls[0], data, sizeof (data), 0); + --send_count; + } while (send_count > 0); + + // To kick the application thread, do a dummy getsockopt - users here + // should use the monitor and the other sockets in a poll. + unsigned long int dummy; + size_t dummy_size = sizeof (dummy); + msleep (SETTLE_TIME); + // Note that the pipe stats on the sender will not get updated until the + // receiver has processed at least lwm ((hwm + 1) / 2) messages AND until + // the application thread has ran through the mailbox, as the update is + // delivered via a message (send_activate_write) + zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size); + + // Ask for stats and check that they match + zmq_socket_monitor_pipes_stats (push); + + msleep (SETTLE_TIME); + zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size); + + for (int i = 0; i < pulls_count; ++i) { + char *push_local_address = NULL; + char *push_remote_address = NULL; + uint64_t queue_stat[2]; + int64_t event = get_monitor_event_v2 ( + push_mon, queue_stat, &push_local_address, &push_remote_address); + TEST_ASSERT_EQUAL_STRING (server_endpoint, push_local_address); + TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, push_remote_address, + strlen (expected_prefix_)); + TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_PIPES_STATS, event); + TEST_ASSERT_EQUAL_INT (i == 0 ? 0 : send_hwm, queue_stat[0]); + TEST_ASSERT_EQUAL_INT (0, queue_stat[1]); + free (push_local_address); + free (push_remote_address); + } + + // Close client and server + test_context_socket_close_zero_linger (push_mon); + test_context_socket_close_zero_linger (push); + for (int i = 0; i < pulls_count; ++i) + test_context_socket_close_zero_linger (pulls[i]); +} + +void test_monitor_versioned_stats_tcp_ipv4 () +{ + static const char prefix[] = "tcp://127.0.0.1:"; + test_monitor_versioned_stats (bind_loopback_ipv4, prefix); +} + +void test_monitor_versioned_stats_tcp_ipv6 () +{ + static const char prefix[] = "tcp://[::1]:"; + test_monitor_versioned_stats (bind_loopback_ipv6, prefix); +} + +void test_monitor_versioned_stats_ipc () +{ + static const char prefix[] = "ipc://"; + test_monitor_versioned_stats (bind_loopback_ipc, prefix); +} +#endif // ZMQ_EVENT_PIPES_STATS #endif int main () @@ -252,12 +387,18 @@ int main () RUN_TEST (test_monitor_invalid_protocol_fails); RUN_TEST (test_monitor_basic); -#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) || \ - (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) +#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ + || (defined ZMQ_CURRENT_EVENT_VERSION \ + && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) RUN_TEST (test_monitor_versioned_basic_tcp_ipv4); RUN_TEST (test_monitor_versioned_basic_tcp_ipv6); RUN_TEST (test_monitor_versioned_basic_ipc); RUN_TEST (test_monitor_versioned_basic_tipc); +#ifdef ZMQ_EVENT_PIPES_STATS + RUN_TEST (test_monitor_versioned_stats_tcp_ipv4); + RUN_TEST (test_monitor_versioned_stats_tcp_ipv6); + RUN_TEST (test_monitor_versioned_stats_ipc); +#endif #endif return UNITY_END (); diff --git a/tests/testutil_monitoring.hpp b/tests/testutil_monitoring.hpp index 3e03638e..03830d44 100644 --- a/tests/testutil_monitoring.hpp +++ b/tests/testutil_monitoring.hpp @@ -241,7 +241,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, zmq_msg_close (&msg); } - // Third frame in message contains local address + // Second-to-last frame in message contains local address zmq_msg_init (&msg); int res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1; assert (res != -1); @@ -256,7 +256,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, } zmq_msg_close (&msg); - // Fourth and last frame in message contains remote address + // Last frame in message contains remote address zmq_msg_init (&msg); res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1; assert (res != -1);