From 86697347fc32872e03f30b6c2d9d9a9674408eea Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Tue, 5 Feb 2019 16:03:49 +0000 Subject: [PATCH 1/9] Problem: symlinking manpages in automake does not work Solution: remove them for now --- doc/zmq_poller_add.txt | 1 - doc/zmq_poller_add_fd.txt | 1 - doc/zmq_poller_destroy.txt | 1 - doc/zmq_poller_modify.txt | 1 - doc/zmq_poller_modify_fd.txt | 1 - doc/zmq_poller_new.txt | 1 - doc/zmq_poller_remove.txt | 1 - doc/zmq_poller_remove_fd.txt | 1 - doc/zmq_poller_wait_all.txt | 1 - 9 files changed, 9 deletions(-) delete mode 120000 doc/zmq_poller_add.txt delete mode 120000 doc/zmq_poller_add_fd.txt delete mode 120000 doc/zmq_poller_destroy.txt delete mode 120000 doc/zmq_poller_modify.txt delete mode 120000 doc/zmq_poller_modify_fd.txt delete mode 120000 doc/zmq_poller_new.txt delete mode 120000 doc/zmq_poller_remove.txt delete mode 120000 doc/zmq_poller_remove_fd.txt delete mode 120000 doc/zmq_poller_wait_all.txt diff --git a/doc/zmq_poller_add.txt b/doc/zmq_poller_add.txt deleted file mode 120000 index 6133b292..00000000 --- a/doc/zmq_poller_add.txt +++ /dev/null @@ -1 +0,0 @@ -zmq_poller.txt \ No newline at end of file diff --git a/doc/zmq_poller_add_fd.txt b/doc/zmq_poller_add_fd.txt deleted file mode 120000 index 6133b292..00000000 --- a/doc/zmq_poller_add_fd.txt +++ /dev/null @@ -1 +0,0 @@ -zmq_poller.txt \ No newline at end of file diff --git a/doc/zmq_poller_destroy.txt b/doc/zmq_poller_destroy.txt deleted file mode 120000 index 6133b292..00000000 --- a/doc/zmq_poller_destroy.txt +++ /dev/null @@ -1 +0,0 @@ -zmq_poller.txt \ No newline at end of file diff --git a/doc/zmq_poller_modify.txt b/doc/zmq_poller_modify.txt deleted file mode 120000 index 6133b292..00000000 --- a/doc/zmq_poller_modify.txt +++ /dev/null @@ -1 +0,0 @@ -zmq_poller.txt \ No newline at end of file diff --git a/doc/zmq_poller_modify_fd.txt b/doc/zmq_poller_modify_fd.txt deleted file mode 120000 index 6133b292..00000000 --- a/doc/zmq_poller_modify_fd.txt +++ /dev/null @@ -1 +0,0 @@ -zmq_poller.txt \ No newline at end of file diff --git a/doc/zmq_poller_new.txt b/doc/zmq_poller_new.txt deleted file mode 120000 index 6133b292..00000000 --- a/doc/zmq_poller_new.txt +++ /dev/null @@ -1 +0,0 @@ -zmq_poller.txt \ No newline at end of file diff --git a/doc/zmq_poller_remove.txt b/doc/zmq_poller_remove.txt deleted file mode 120000 index 6133b292..00000000 --- a/doc/zmq_poller_remove.txt +++ /dev/null @@ -1 +0,0 @@ -zmq_poller.txt \ No newline at end of file diff --git a/doc/zmq_poller_remove_fd.txt b/doc/zmq_poller_remove_fd.txt deleted file mode 120000 index 6133b292..00000000 --- a/doc/zmq_poller_remove_fd.txt +++ /dev/null @@ -1 +0,0 @@ -zmq_poller.txt \ No newline at end of file diff --git a/doc/zmq_poller_wait_all.txt b/doc/zmq_poller_wait_all.txt deleted file mode 120000 index 6133b292..00000000 --- a/doc/zmq_poller_wait_all.txt +++ /dev/null @@ -1 +0,0 @@ -zmq_poller.txt \ No newline at end of file From 2edba1259edaaa43cc4d22e7dcfd89f8b517dc90 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Tue, 5 Feb 2019 00:17:14 +0000 Subject: [PATCH 2/9] Problem: zmq_socket_monitor_versioned manpage doesn't build Solution: fix formatting --- doc/zmq_socket_monitor_versioned.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/zmq_socket_monitor_versioned.txt b/doc/zmq_socket_monitor_versioned.txt index 928166dc..e3aa9be7 100644 --- a/doc/zmq_socket_monitor_versioned.txt +++ b/doc/zmq_socket_monitor_versioned.txt @@ -1,5 +1,5 @@ zmq_socket_monitor_versioned(3) -===================== +=============================== NAME From b20cb122d9e82670624e7a7e87e3793a383afea3 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Mon, 4 Feb 2019 08:50:03 +0100 Subject: [PATCH 3/9] Problem: zmq_socket_monitor_versioned manpage not generated Solution: add it to doc/Makefile.am --- doc/Makefile.am | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/Makefile.am b/doc/Makefile.am index f2eba812..ba63d705 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -11,6 +11,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \ zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \ zmq_getsockopt.3 zmq_setsockopt.3 \ zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \ + zmq_socket_monitor_versioned.3 \ zmq_errno.3 zmq_strerror.3 zmq_version.3 \ zmq_sendmsg.3 zmq_recvmsg.3 \ zmq_proxy.3 zmq_proxy_steerable.3 \ From edf79dfefccba4a43067d18a4fd59a1353e750ec Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Mon, 4 Feb 2019 08:54:58 +0100 Subject: [PATCH 4/9] Problem: test_monitor check for DRAFT has to be kept up to date Solution: check for the available version instead --- tests/test_monitor.cpp | 6 ++++-- tests/testutil_monitoring.hpp | 4 +++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 9788efe0..7b0400d3 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -116,7 +116,8 @@ void test_monitor_basic () test_context_socket_close_zero_linger (server_mon); } -#ifdef ZMQ_BUILD_DRAFT_API +#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) || \ + (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) void test_monitor_versioned_basic (bind_function_t bind_function_, const char *expected_prefix_) { @@ -240,7 +241,8 @@ int main () RUN_TEST (test_monitor_invalid_protocol_fails); RUN_TEST (test_monitor_basic); -#ifdef ZMQ_BUILD_DRAFT_API +#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) || \ + (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) RUN_TEST (test_monitor_versioned_basic_tcp_ipv4); RUN_TEST (test_monitor_versioned_basic_tcp_ipv6); RUN_TEST (test_monitor_versioned_basic_ipc); diff --git a/tests/testutil_monitoring.hpp b/tests/testutil_monitoring.hpp index 1554391a..2de6dcca 100644 --- a/tests/testutil_monitoring.hpp +++ b/tests/testutil_monitoring.hpp @@ -190,7 +190,9 @@ int expect_monitor_event_multiple (void *server_mon_, return count_of_expected_events; } -#ifdef ZMQ_BUILD_DRAFT_API +#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ + || (defined ZMQ_CURRENT_EVENT_VERSION \ + && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) static int64_t get_monitor_event_internal_v2 (void *monitor_, uint64_t *value_, char **local_address_, From 83946d5c98d55433d8ebad9e4bae1ebb55dc6da7 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Tue, 5 Feb 2019 15:20:15 +0000 Subject: [PATCH 5/9] Problem: testutil_monitoring does not close received messages Solution: do it, as above 32 bytes they might be on the heap --- tests/testutil_monitoring.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/testutil_monitoring.hpp b/tests/testutil_monitoring.hpp index 2de6dcca..7384ca1a 100644 --- a/tests/testutil_monitoring.hpp +++ b/tests/testutil_monitoring.hpp @@ -211,6 +211,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, uint64_t event; memcpy (&event, zmq_msg_data (&msg), sizeof event); + zmq_msg_close (&msg); // Second frame in message contains event value zmq_msg_init (&msg); @@ -223,6 +224,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, if (value_) memcpy (value_, zmq_msg_data (&msg), sizeof *value_); + zmq_msg_close (&msg); // Third frame in message contains local address zmq_msg_init (&msg); @@ -237,6 +239,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, memcpy (*local_address_, data, size); (*local_address_)[size] = 0; } + zmq_msg_close (&msg); // Fourth and last frame in message contains remote address zmq_msg_init (&msg); @@ -251,6 +254,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, memcpy (*remote_address_, data, size); (*remote_address_)[size] = 0; } + zmq_msg_close (&msg); return event; } From c1b374fa6a4a2a4542d8b20ec2ebd3f7c04a529d Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Tue, 5 Feb 2019 15:57:24 +0000 Subject: [PATCH 6/9] Problem: manpage for zmq_socket_monitor_versioned has old example Solution: update it --- doc/zmq_socket_monitor_versioned.txt | 62 ++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 17 deletions(-) diff --git a/doc/zmq_socket_monitor_versioned.txt b/doc/zmq_socket_monitor_versioned.txt index e3aa9be7..18d3ed39 100644 --- a/doc/zmq_socket_monitor_versioned.txt +++ b/doc/zmq_socket_monitor_versioned.txt @@ -41,8 +41,9 @@ Each event is sent in multiple frames. The first frame contains an event number (64 bits). The number and content of further frames depend on this event number. -For all currently defined event types, the second frame contains an event +Unless it is specified differently, the second frame contains an event value (64 bits) that provides additional data according to the event number. +Some events might define additional value frames following the second one. The third and fourth frames contain strings that specifies the affected connection or endpoint. The third frame contains a string denoting the local endpoint, while the fourth frame contains a string denoting the remote endpoint. @@ -62,7 +63,7 @@ to multiple endpoints using different transports. ---- -Supported events +Supported events (v1) ---------------- ZMQ_EVENT_CONNECTED @@ -196,34 +197,61 @@ EXAMPLE // by reference, if not null, and event number by value. Returns -1 // in case of error. -static int -get_monitor_event (void *monitor, int *value, char **address) +static uint64_t +get_monitor_event (void *monitor, uint64_t *value, char **local_address, char **remote_address) { - // First frame in message contains event number and value + // First frame in message contains event number zmq_msg_t msg; zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably assert (zmq_msg_more (&msg)); - uint8_t *data = (uint8_t *) zmq_msg_data (&msg); - uint16_t event = *(uint16_t *) (data); - if (value) - *value = *(uint32_t *) (data + 2); + uint64_t event; + memcpy (&event, zmq_msg_data (&msg), sizeof (event)); + zmq_msg_close (&msg); - // Second frame in message contains event address + // Second frame to Nth frame (depends on the event) in message + // contains event value + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; // Interrupted, presumably + assert (zmq_msg_more (&msg)); + + if (value_) + memcpy (value_, zmq_msg_data (&msg), sizeof *value_); + zmq_msg_close (&msg); + + // Third frame in message contains local address + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; // Interrupted, presumably + assert (zmq_msg_more (&msg)); + + if (local_address_) { + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + size_t size = zmq_msg_size (&msg); + *local_address_ = (char *) malloc (size + 1); + memcpy (*local_address_, data, size); + (*local_address_)[size] = 0; + } + zmq_msg_close (&msg); + + // Fourth frame in message contains remote address zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably assert (!zmq_msg_more (&msg)); - if (address) { + if (remote_address_) { uint8_t *data = (uint8_t *) zmq_msg_data (&msg); size_t size = zmq_msg_size (&msg); - *address = (char *) malloc (size + 1); - memcpy (*address, data, size); - (*address)[size] = 0; + *remote_address_ = (char *) malloc (size + 1); + memcpy (*remote_address_, data, size); + (*remote_address_)[size] = 0; } + zmq_msg_close (&msg); + return event; } @@ -239,14 +267,14 @@ int main (void) assert (server); // Socket monitoring only works over inproc:// - int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0); + int rc = zmq_socket_monitor_versioned (client, "tcp://127.0.0.1:9999", 0, 2); assert (rc == -1); assert (zmq_errno () == EPROTONOSUPPORT); // Monitor all events on client and server sockets - rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL); + rc = zmq_socket_monitor_versioned (client, "inproc://monitor-client", ZMQ_EVENT_ALL, 2); assert (rc == 0); - rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL); + rc = zmq_socket_monitor_versioned (server, "inproc://monitor-server", ZMQ_EVENT_ALL, 2); assert (rc == 0); // Create two sockets for collecting monitor events From 1e26a93ce26a20c4a78d4158b8063caa27bcef34 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Wed, 6 Feb 2019 22:29:30 +0000 Subject: [PATCH 7/9] Problem: test_monitor fails in valgrind Solution: expect additional events --- tests/test_monitor.cpp | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 7b0400d3..5204f513 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -94,7 +94,12 @@ void test_monitor_basic () event = get_monitor_event (client_mon, NULL, NULL); assert (event == ZMQ_EVENT_CONNECTED); expect_monitor_event (client_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED); - expect_monitor_event (client_mon, ZMQ_EVENT_MONITOR_STOPPED); + event = get_monitor_event (client_mon, NULL, NULL); + if (event == ZMQ_EVENT_DISCONNECTED) { + expect_monitor_event (client_mon, ZMQ_EVENT_CONNECT_RETRIED); + expect_monitor_event (client_mon, ZMQ_EVENT_MONITOR_STOPPED); + } else + TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event); // This is the flow of server events expect_monitor_event (server_mon, ZMQ_EVENT_LISTENING); @@ -181,7 +186,13 @@ void test_monitor_versioned_basic (bind_function_t bind_function_, expect_monitor_event_v2 (client_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED, client_local_address, client_remote_address); - expect_monitor_event_v2 (client_mon, ZMQ_EVENT_MONITOR_STOPPED, "", ""); + event = get_monitor_event_v2 (client_mon, NULL, NULL, NULL); + if (event == ZMQ_EVENT_DISCONNECTED) { + expect_monitor_event_v2 (client_mon, ZMQ_EVENT_CONNECT_RETRIED, + client_local_address, client_remote_address); + expect_monitor_event_v2 (client_mon, ZMQ_EVENT_MONITOR_STOPPED, "", ""); + } else + TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event); // This is the flow of server events expect_monitor_event_v2 (server_mon, ZMQ_EVENT_LISTENING, From cb73745250dce53aa6e059751a47940b7518a1c3 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Tue, 5 Feb 2019 19:23:21 +0000 Subject: [PATCH 8/9] Problem: cannot send more than one value per v2 event Solution: refactor code and add extra frame with value count before the values in v2 --- src/socket_base.cpp | 86 +++++++++++++++++++++++------------ src/socket_base.hpp | 6 ++- tests/testutil_monitoring.hpp | 23 ++++++++-- 3 files changed, 81 insertions(+), 34 deletions(-) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c5c0c296..27aa23da 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1658,101 +1658,117 @@ int zmq::socket_base_t::monitor (const char *endpoint_, void zmq::socket_base_t::event_connected ( const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CONNECTED); + uint64_t values[1] = {(uint64_t) fd_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECTED); } void zmq::socket_base_t::event_connect_delayed ( const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_pair_, err_, ZMQ_EVENT_CONNECT_DELAYED); + uint64_t values[1] = {(uint64_t) err_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_DELAYED); } void zmq::socket_base_t::event_connect_retried ( const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_) { - event (endpoint_uri_pair_, interval_, ZMQ_EVENT_CONNECT_RETRIED); + uint64_t values[1] = {(uint64_t) interval_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_RETRIED); } void zmq::socket_base_t::event_listening ( const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_pair_, fd_, ZMQ_EVENT_LISTENING); + uint64_t values[1] = {(uint64_t) fd_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_LISTENING); } void zmq::socket_base_t::event_bind_failed ( const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_pair_, err_, ZMQ_EVENT_BIND_FAILED); + uint64_t values[1] = {(uint64_t) err_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_BIND_FAILED); } void zmq::socket_base_t::event_accepted ( const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_pair_, fd_, ZMQ_EVENT_ACCEPTED); + uint64_t values[1] = {(uint64_t) fd_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPTED); } void zmq::socket_base_t::event_accept_failed ( const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_pair_, err_, ZMQ_EVENT_ACCEPT_FAILED); + uint64_t values[1] = {(uint64_t) err_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPT_FAILED); } void zmq::socket_base_t::event_closed ( const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CLOSED); + uint64_t values[1] = {(uint64_t) fd_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSED); } void zmq::socket_base_t::event_close_failed ( const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_pair_, err_, ZMQ_EVENT_CLOSE_FAILED); + uint64_t values[1] = {(uint64_t) err_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSE_FAILED); } void zmq::socket_base_t::event_disconnected ( const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_) { - event (endpoint_uri_pair_, fd_, ZMQ_EVENT_DISCONNECTED); + uint64_t values[1] = {(uint64_t) fd_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_DISCONNECTED); } void zmq::socket_base_t::event_handshake_failed_no_detail ( const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL); + uint64_t values[1] = {(uint64_t) err_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL); } void zmq::socket_base_t::event_handshake_failed_protocol ( const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL); + uint64_t values[1] = {(uint64_t) err_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL); } void zmq::socket_base_t::event_handshake_failed_auth ( const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH); + uint64_t values[1] = {(uint64_t) err_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH); } void zmq::socket_base_t::event_handshake_succeeded ( const endpoint_uri_pair_t &endpoint_uri_pair_, int err_) { - event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED); + uint64_t values[1] = {(uint64_t) err_}; + event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_SUCCEEDED); } void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_, - uint64_t value_, + uint64_t values_[], + uint64_t values_count_, uint64_t type_) { scoped_lock_t lock (_monitor_sync); if (_monitor_events & type_) { - monitor_event (type_, value_, endpoint_uri_pair_); + monitor_event (type_, values_, values_count_, endpoint_uri_pair_); } } // Send a monitor event void zmq::socket_base_t::monitor_event ( uint64_t event_, - uint64_t value_, + uint64_t values_[], + uint64_t values_count_, const endpoint_uri_pair_t &endpoint_uri_pair_) const { // this is a private method which is only called from @@ -1765,11 +1781,14 @@ void zmq::socket_base_t::monitor_event ( case 1: { // The API should not allow to activate unsupported events zmq_assert (event_ <= std::numeric_limits::max ()); - zmq_assert (value_ <= std::numeric_limits::max ()); + // v1 only allows one value + zmq_assert (values_count_ == 1); + zmq_assert (values_[0] + <= std::numeric_limits::max ()); // Send event and value in first frame const uint16_t event = static_cast (event_); - const uint32_t value = static_cast (value_); + const uint32_t value = static_cast (values_[0]); zmq_msg_init_size (&msg, sizeof (event) + sizeof (value)); uint8_t *data = static_cast (zmq_msg_data (&msg)); // Avoid dereferencing uint32_t on unaligned address @@ -1788,22 +1807,31 @@ void zmq::socket_base_t::monitor_event ( } break; case 2: { // Send event in first frame (64bit unsigned) - zmq_msg_init_size (&msg, sizeof event_); - memcpy (zmq_msg_data (&msg), &event_, sizeof event_); + zmq_msg_init_size (&msg, sizeof (event_)); + memcpy (zmq_msg_data (&msg), &event_, sizeof (event_)); zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); - // Send value in second frame (64bit unsigned) - zmq_msg_init_size (&msg, sizeof value_); - memcpy (zmq_msg_data (&msg), &value_, sizeof value_); + // Send number of values that will follow in second frame + zmq_msg_init_size (&msg, sizeof (values_count_)); + memcpy (zmq_msg_data (&msg), &values_count_, + sizeof (values_count_)); zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); - // Send local endpoint URI in third frame (string) + // Send values in third-Nth frames (64bit unsigned) + for (uint64_t i = 0; i < values_count_; ++i) { + zmq_msg_init_size (&msg, sizeof (values_[i])); + memcpy (zmq_msg_data (&msg), &values_[i], + sizeof (values_[i])); + zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); + } + + // Send local endpoint URI in second-to-last frame (string) zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ()); memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (), endpoint_uri_pair_.local.size ()); zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE); - // Send remote endpoint URI in fourth frame (string) + // Send remote endpoint URI in last frame (string) zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ()); memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (), endpoint_uri_pair_.remote.size ()); @@ -1820,9 +1848,11 @@ void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_) if (_monitor_socket) { if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED) - && send_monitor_stopped_event_) - monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0, + && send_monitor_stopped_event_) { + uint64_t values[1] = {0}; + monitor_event (ZMQ_EVENT_MONITOR_STOPPED, values, 1, endpoint_uri_pair_t ()); + } zmq_close (_monitor_socket); _monitor_socket = NULL; _monitor_events = 0; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 0751f92e..38e31e32 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -200,12 +200,14 @@ class socket_base_t : public own_t, private: // test if event should be sent and then dispatch it void event (const endpoint_uri_pair_t &endpoint_uri_pair_, - uint64_t value_, + uint64_t values_[], + uint64_t values_count_, uint64_t type_); // Socket event data dispatch void monitor_event (uint64_t event_, - uint64_t value_, + uint64_t values_[], + uint64_t values_count_, const endpoint_uri_pair_t &endpoint_uri_pair_) const; // Monitor socket cleanup diff --git a/tests/testutil_monitoring.hpp b/tests/testutil_monitoring.hpp index 7384ca1a..3e03638e 100644 --- a/tests/testutil_monitoring.hpp +++ b/tests/testutil_monitoring.hpp @@ -210,10 +210,10 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, assert (sizeof (uint64_t) == zmq_msg_size (&msg)); uint64_t event; - memcpy (&event, zmq_msg_data (&msg), sizeof event); + memcpy (&event, zmq_msg_data (&msg), sizeof (event)); zmq_msg_close (&msg); - // Second frame in message contains event value + // Second frame in message contains the number of values zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) { assert (errno == EAGAIN); @@ -222,10 +222,25 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, assert (zmq_msg_more (&msg)); assert (sizeof (uint64_t) == zmq_msg_size (&msg)); - if (value_) - memcpy (value_, zmq_msg_data (&msg), sizeof *value_); + uint64_t value_count; + memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count)); zmq_msg_close (&msg); + for (uint64_t i = 0; i < value_count; ++i) { + // Subsequent frames in message contain event values + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) { + assert (errno == EAGAIN); + return -1; // timed out or no message available + } + assert (zmq_msg_more (&msg)); + assert (sizeof (uint64_t) == zmq_msg_size (&msg)); + + if (value_ && value_ + i) + memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_)); + zmq_msg_close (&msg); + } + // Third frame in message contains local address zmq_msg_init (&msg); int res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1; From feadf6d40f302aa556b6e5b47e2737379bc46f81 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Thu, 31 Jan 2019 17:23:42 +0100 Subject: [PATCH 9/9] Problem: cannot monitor state of queues at runtime Solution: add API and ZMQ_EVENT_PIPES_STATS event which generates 2 values, one for the egress and one for the ingress pipes respectively. Refactor the events code to be able to send multiple values. --- doc/zmq_socket_monitor_versioned.txt | 84 +++++++++++---- include/zmq.h | 6 +- src/command.hpp | 20 ++++ src/object.cpp | 56 ++++++++++ src/object.hpp | 16 +++ src/pipe.cpp | 16 +++ src/pipe.hpp | 5 + src/session_base.cpp | 5 + src/socket_base.cpp | 39 +++++++ src/socket_base.hpp | 8 ++ src/zmq.cpp | 8 ++ src/zmq_draft.h | 10 ++ tests/test_monitor.cpp | 149 ++++++++++++++++++++++++++- tests/testutil_monitoring.hpp | 4 +- 14 files changed, 401 insertions(+), 25 deletions(-) diff --git a/doc/zmq_socket_monitor_versioned.txt b/doc/zmq_socket_monitor_versioned.txt index 18d3ed39..72646100 100644 --- a/doc/zmq_socket_monitor_versioned.txt +++ b/doc/zmq_socket_monitor_versioned.txt @@ -12,6 +12,8 @@ SYNOPSIS -------- *int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');* +*int zmq_socket_monitor_pipes_stats (void '*socket');* + DESCRIPTION ----------- @@ -41,18 +43,23 @@ Each event is sent in multiple frames. The first frame contains an event number (64 bits). The number and content of further frames depend on this event number. -Unless it is specified differently, the second frame contains an event -value (64 bits) that provides additional data according to the event number. -Some events might define additional value frames following the second one. -The third and fourth frames contain strings that specifies the affected -connection or endpoint. The third frame contains a string denoting the local -endpoint, while the fourth frame contains a string denoting the remote endpoint. +Unless it is specified differently, the second frame contains the number of +value frames that will follow it as a 64 bits integer. The third frame to N-th +frames contain an event value (64 bits) that provides additional data according +to the event number. Each event type might have a different number of values. +The second-to-last and last frames contain strings that specifies the affected +connection or endpoint. The former frame contains a string denoting the local +endpoint, while the latter frame contains a string denoting the remote endpoint. Either of these may be empty, depending on the event type and whether the connection uses a bound or connected local endpoint. Note that the format of the second and further frames, and also the number of frames, may be different for events added in the future. +The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type +ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket. +NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state. + ---- Monitoring events are only generated by some transports: At the moment these are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call @@ -168,17 +175,35 @@ The ZMTP security mechanism handshake failed due to an authentication failure. The event value is the status code returned by the ZAP handler (i.e. 300, 400 or 500). +---- + +Supported events (v2) +---------------- + +ZMQ_EVENT_PIPE_STATS +~~~~~~~~~~~~~~~~~~~~ +This event provides two values, the number of messages in each of the two +queues associated with the returned endpoint (respectively egress and ingress). +This event only triggers after calling the function +_zmq_socket_monitor_pipes_stats()_. +NOTE: this measurement is asynchronous, so by the time the message is received +the internal state might have already changed. +NOTE: when the monitored socket and the monitor are not used in a poll, the +event might not be delivered until an API has been called on the monitored +socket, like zmq_getsockopt for example (the option is irrelevant). +NOTE: in DRAFT state, not yet available in stable releases. + RETURN VALUE ------------ -The _zmq_socket_monitor()_ function returns a value of 0 or greater if -successful. Otherwise it returns `-1` and sets 'errno' to one of the values -defined below. +The _zmq_socket_monitor()_ and _zmq_socket_monitor_pipes_stats()_ functions +return a value of 0 or greater if successful. Otherwise they return `-1` and +set 'errno' to one of the values defined below. -ERRORS ------- +ERRORS - _zmq_socket_monitor()_ +------------------------------- *ETERM*:: The 0MQ 'context' associated with the specified 'socket' was terminated. @@ -189,11 +214,23 @@ sockets are required to use the inproc:// transport. *EINVAL*:: The monitor 'endpoint' supplied does not exist. + +ERRORS - _zmq_socket_monitor_pipes_stats()_ +------------------------------------------- +*ENOTSOCK*:: +The 'socket' parameter was not a valid 0MQ socket. + +*EINVAL*:: +The socket did not have monitoring enabled. + +*EAGAIN*:: +The monitored socket did not have any connections to monitor yet. + EXAMPLE ------- .Monitoring client and server sockets ---- -// Read one event off the monitor socket; return value and address +// Read one event off the monitor socket; return values and addresses // by reference, if not null, and event number by value. Returns -1 // in case of error. @@ -211,18 +248,29 @@ get_monitor_event (void *monitor, uint64_t *value, char **local_address, char ** memcpy (&event, zmq_msg_data (&msg), sizeof (event)); zmq_msg_close (&msg); - // Second frame to Nth frame (depends on the event) in message - // contains event value + // Second frame in message contains the number of values zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably assert (zmq_msg_more (&msg)); - if (value_) - memcpy (value_, zmq_msg_data (&msg), sizeof *value_); + uint64_t value_count; + memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count)); zmq_msg_close (&msg); - // Third frame in message contains local address + for (uint64_t i = 0; i < value_count; ++i) { + // Subsequent frames in message contain event values + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) + return -1; // Interrupted, presumably + assert (zmq_msg_more (&msg)); + + if (value_ && value_ + i) + memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_)); + zmq_msg_close (&msg); + } + + // Second-to-last frame in message contains local address zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably @@ -237,7 +285,7 @@ get_monitor_event (void *monitor, uint64_t *value, char **local_address, char ** } zmq_msg_close (&msg); - // Fourth frame in message contains remote address + // Last frame in message contains remote address zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) return -1; // Interrupted, presumably diff --git a/include/zmq.h b/include/zmq.h index 56cc75b8..2b3a5ec7 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -726,16 +726,20 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket, const void *routing_id, size_t routing_id_size); +/* DRAFT Socket monitoring events */ +#define ZMQ_EVENT_PIPES_STATS 0x10000 + #define ZMQ_CURRENT_EVENT_VERSION 1 #define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2 #define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL -#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 +#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_, const char *addr_, uint64_t events_, int event_version_); +ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s); #endif // ZMQ_BUILD_DRAFT_API diff --git a/src/command.hpp b/src/command.hpp index ed095e1c..8c75005b 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -32,6 +32,7 @@ #include #include "stdint.hpp" +#include "endpoint.hpp" namespace zmq { @@ -73,6 +74,8 @@ __declspec(align (64)) reap, reaped, inproc_connected, + pipe_peer_stats, + pipe_stats_publish, done } type; @@ -186,6 +189,23 @@ __declspec(align (64)) { } reaped; + // Send application-side pipe count and ask to send monitor event + struct + { + uint64_t queue_count; + zmq::own_t *socket_base; + endpoint_uri_pair_t *endpoint_pair; + } pipe_peer_stats; + + // Collate application thread and I/O thread pipe counts and endpoints + // and send as event + struct + { + uint64_t outbound_queue_count; + uint64_t inbound_queue_count; + endpoint_uri_pair_t *endpoint_pair; + } pipe_stats_publish; + // Sent by reaper thread to the term thread when all the sockets // are successfully deallocated. struct diff --git a/src/object.cpp b/src/object.cpp index eb264427..92b74557 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -107,6 +107,19 @@ void zmq::object_t::process_command (command_t &cmd_) process_hiccup (cmd_.args.hiccup.pipe); break; + case command_t::pipe_peer_stats: + process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count, + cmd_.args.pipe_peer_stats.socket_base, + cmd_.args.pipe_peer_stats.endpoint_pair); + break; + + case command_t::pipe_stats_publish: + process_pipe_stats_publish ( + cmd_.args.pipe_stats_publish.outbound_queue_count, + cmd_.args.pipe_stats_publish.inbound_queue_count, + cmd_.args.pipe_stats_publish.endpoint_pair); + break; + case command_t::pipe_term: process_pipe_term (); break; @@ -285,6 +298,35 @@ void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_) send_command (cmd); } +void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_, + uint64_t queue_count_, + own_t *socket_base_, + endpoint_uri_pair_t *endpoint_pair_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::pipe_peer_stats; + cmd.args.pipe_peer_stats.queue_count = queue_count_; + cmd.args.pipe_peer_stats.socket_base = socket_base_; + cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_; + send_command (cmd); +} + +void zmq::object_t::send_pipe_stats_publish ( + own_t *destination_, + uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::pipe_stats_publish; + cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_; + cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_; + cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_; + send_command (cmd); +} + void zmq::object_t::send_pipe_term (pipe_t *destination_) { command_t cmd; @@ -422,6 +464,20 @@ void zmq::object_t::process_hiccup (void *) zmq_assert (false); } +void zmq::object_t::process_pipe_peer_stats (uint64_t, + own_t *, + endpoint_uri_pair_t *) +{ + zmq_assert (false); +} + +void zmq::object_t::process_pipe_stats_publish (uint64_t, + uint64_t, + endpoint_uri_pair_t *) +{ + zmq_assert (false); +} + void zmq::object_t::process_pipe_term () { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 10d7bffe..70cc5cd4 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -32,6 +32,7 @@ #include #include "stdint.hpp" +#include "endpoint.hpp" namespace zmq { @@ -96,6 +97,14 @@ class object_t void send_activate_read (zmq::pipe_t *destination_); void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_); void send_hiccup (zmq::pipe_t *destination_, void *pipe_); + void send_pipe_peer_stats (zmq::pipe_t *destination_, + uint64_t queue_count_, + zmq::own_t *socket_base, + endpoint_uri_pair_t *endpoint_pair_); + void send_pipe_stats_publish (zmq::own_t *destination_, + uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_); void send_pipe_term (zmq::pipe_t *destination_); void send_pipe_term_ack (zmq::pipe_t *destination_); void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_); @@ -117,6 +126,13 @@ class object_t virtual void process_activate_read (); virtual void process_activate_write (uint64_t msgs_read_); virtual void process_hiccup (void *pipe_); + virtual void process_pipe_peer_stats (uint64_t queue_count_, + zmq::own_t *socket_base_, + endpoint_uri_pair_t *endpoint_pair_); + virtual void + process_pipe_stats_publish (uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); virtual void process_pipe_hwm (int inhwm_, int outhwm_); diff --git a/src/pipe.cpp b/src/pipe.cpp index 855ba0af..bcf1cff5 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -563,3 +563,19 @@ const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const { return _endpoint_pair; } + +void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_) +{ + endpoint_uri_pair_t *ep = + new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair); + send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_, + ep); +} + +void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_, + own_t *socket_base_, + endpoint_uri_pair_t *endpoint_pair_) +{ + send_pipe_stats_publish (socket_base_, queue_count_, + _msgs_written - _peers_msgs_read, endpoint_pair_); +} diff --git a/src/pipe.hpp b/src/pipe.hpp index 76f71758..d7019bc5 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -145,6 +145,8 @@ class pipe_t : public object_t, void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_); const endpoint_uri_pair_t &get_endpoint_pair () const; + void send_stats_to_peer (own_t *socket_base_); + private: // Type of the underlying lock-free pipe. typedef ypipe_base_t upipe_t; @@ -153,6 +155,9 @@ class pipe_t : public object_t, void process_activate_read (); void process_activate_write (uint64_t msgs_read_); void process_hiccup (void *pipe_); + void process_pipe_peer_stats (uint64_t queue_count_, + own_t *socket_base_, + endpoint_uri_pair_t *endpoint_pair_); void process_pipe_term (); void process_pipe_term_ack (); void process_pipe_hwm (int inhwm_, int outhwm_); diff --git a/src/session_base.cpp b/src/session_base.cpp index 2de67eca..2b4a6daa 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -409,6 +409,11 @@ void zmq::session_base_t::process_attach (i_engine *engine_) zmq_assert (!_pipe); _pipe = pipes[0]; + // The endpoints strings are not set on bind, set them here so that + // events can use them. + pipes[0]->set_endpoint_pair (engine_->get_endpoint ()); + pipes[1]->set_endpoint_pair (engine_->get_endpoint ()); + // Ask socket to plug into the remote end of the pipe. send_bind (_socket, pipes[1]); } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 27aa23da..84dfc674 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1421,6 +1421,45 @@ void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_) delete endpoint_; } +void zmq::socket_base_t::process_pipe_stats_publish ( + uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_) +{ + uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_}; + event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS); + delete endpoint_pair_; +} + +/* + * There are 2 pipes per connection, and the inbound one _must_ be queried from + * the I/O thread. So ask the outbound pipe, in the application thread, to send + * a message (pipe_peer_stats) to its peer. The message will carry the outbound + * pipe stats and endpoint, and the reference to the socket object. + * The inbound pipe on the I/O thread will then add its own stats and endpoint, + * and write back a message to the socket object (pipe_stats_publish) which + * will raise an event with the data. + */ +int zmq::socket_base_t::query_pipes_stats () +{ + { + scoped_lock_t lock (_monitor_sync); + if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) { + errno = EINVAL; + return -1; + } + } + if (_pipes.size () == 0) { + errno = EAGAIN; + return -1; + } + for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) { + _pipes[i]->send_stats_to_peer (this); + } + + return 0; +} + void zmq::socket_base_t::update_pipe_options (int option_) { if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) { diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 38e31e32..ce6c57ed 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -157,6 +157,11 @@ class socket_base_t : public own_t, virtual int get_peer_state (const void *routing_id_, size_t routing_id_size_) const; + // Request for pipes statistics - will generate a ZMQ_EVENT_PIPES_STATS + // after gathering the data asynchronously. Requires event monitoring to + // be enabled. + int query_pipes_stats (); + protected: socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, @@ -278,6 +283,9 @@ class socket_base_t : public own_t, // Handlers for incoming commands. void process_stop (); void process_bind (zmq::pipe_t *pipe_); + void process_pipe_stats_publish (uint64_t outbound_queue_count_, + uint64_t inbound_queue_count_, + endpoint_uri_pair_t *endpoint_pair_); void process_term (int linger_); void process_term_endpoint (std::string *endpoint_); diff --git a/src/zmq.cpp b/src/zmq.cpp index 5e32dfd0..5d186283 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1452,3 +1452,11 @@ int zmq_has (const char *capability_) // Whatever the application asked for, we don't have return false; } + +int zmq_socket_monitor_pipes_stats (void *s_) +{ + zmq::socket_base_t *s = as_socket_base_t (s_); + if (!s) + return -1; + return s->query_pipes_stats (); +} diff --git a/src/zmq_draft.h b/src/zmq_draft.h index d31f37da..c8934d45 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -123,10 +123,20 @@ int zmq_socket_get_peer_state (void *socket_, const void *routing_id_, size_t routing_id_size_); +/* DRAFT Socket monitoring events */ +#define ZMQ_EVENT_PIPES_STATS 0x10000 + +#define ZMQ_CURRENT_EVENT_VERSION 1 +#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2 + +#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL +#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS + int zmq_socket_monitor_versioned (void *s_, const char *addr_, uint64_t events_, int event_version_); +int zmq_socket_monitor_pipes_stats (void *s_); #endif // ZMQ_BUILD_DRAFT_API diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 5204f513..8cfd4674 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -50,6 +50,13 @@ void test_monitor_invalid_protocol_fails () TEST_ASSERT_FAILURE_ERRNO ( EPROTONOSUPPORT, zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0)); +#ifdef ZMQ_EVENT_PIPES_STATS + // Stats command needs to be called on a valid socket with monitoring + // enabled + TEST_ASSERT_FAILURE_ERRNO (ENOTSOCK, zmq_socket_monitor_pipes_stats (NULL)); + TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_socket_monitor_pipes_stats (client)); +#endif + test_context_socket_close_zero_linger (client); } @@ -121,8 +128,9 @@ void test_monitor_basic () test_context_socket_close_zero_linger (server_mon); } -#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) || \ - (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) +#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ + || (defined ZMQ_CURRENT_EVENT_VERSION \ + && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) void test_monitor_versioned_basic (bind_function_t bind_function_, const char *expected_prefix_) { @@ -242,6 +250,133 @@ void test_monitor_versioned_basic_tipc () static const char prefix[] = "tipc://"; test_monitor_versioned_basic (bind_loopback_tipc, prefix); } + +#ifdef ZMQ_EVENT_PIPES_STATS +void test_monitor_versioned_stats (bind_function_t bind_function_, + const char *expected_prefix_) +{ + char server_endpoint[MAX_SOCKET_STRING]; + const int pulls_count = 4; + void *pulls[pulls_count]; + + // We'll monitor these two sockets + void *push = test_context_socket (ZMQ_PUSH); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned ( + push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2)); + + // Should fail if there are no pipes to monitor + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_socket_monitor_pipes_stats (push)); + + void *push_mon = test_context_socket (ZMQ_PAIR); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (push_mon, "inproc://monitor-push")); + + // Set lower HWM - queues will be filled so we should see it in the stats + int send_hwm = 500; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (push, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm))); + // Set very low TCP buffers so that messages cannot be stored in-flight + const int tcp_buffer_size = 4096; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + push, ZMQ_SNDBUF, &tcp_buffer_size, sizeof (tcp_buffer_size))); + bind_function_ (push, server_endpoint, sizeof (server_endpoint)); + + int ipv6_; + size_t ipv6_size_ = sizeof (ipv6_); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (push, ZMQ_IPV6, &ipv6_, &ipv6_size_)); + for (int i = 0; i < pulls_count; ++i) { + pulls[i] = test_context_socket (ZMQ_PULL); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pulls[i], ZMQ_IPV6, &ipv6_, sizeof (int))); + int timeout_ms = 10; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + pulls[i], ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pulls[i], ZMQ_RCVHWM, &send_hwm, sizeof (send_hwm))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + pulls[i], ZMQ_RCVBUF, &tcp_buffer_size, sizeof (tcp_buffer_size))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[i], server_endpoint)); + } + + // Send until we block + int send_count = 0; + // Saturate the TCP buffers too + char data[tcp_buffer_size * 2]; + memset (data, 0, sizeof (data)); + // Saturate all pipes - send + receive - on all connections + while (send_count < send_hwm * 2 * pulls_count) { + TEST_ASSERT_EQUAL_INT (sizeof (data), + zmq_send (push, data, sizeof (data), 0)); + ++send_count; + } + + // Drain one of the pulls - doesn't matter how many messages, at least one + send_count = send_count / 4; + do { + zmq_recv (pulls[0], data, sizeof (data), 0); + --send_count; + } while (send_count > 0); + + // To kick the application thread, do a dummy getsockopt - users here + // should use the monitor and the other sockets in a poll. + unsigned long int dummy; + size_t dummy_size = sizeof (dummy); + msleep (SETTLE_TIME); + // Note that the pipe stats on the sender will not get updated until the + // receiver has processed at least lwm ((hwm + 1) / 2) messages AND until + // the application thread has ran through the mailbox, as the update is + // delivered via a message (send_activate_write) + zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size); + + // Ask for stats and check that they match + zmq_socket_monitor_pipes_stats (push); + + msleep (SETTLE_TIME); + zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size); + + for (int i = 0; i < pulls_count; ++i) { + char *push_local_address = NULL; + char *push_remote_address = NULL; + uint64_t queue_stat[2]; + int64_t event = get_monitor_event_v2 ( + push_mon, queue_stat, &push_local_address, &push_remote_address); + TEST_ASSERT_EQUAL_STRING (server_endpoint, push_local_address); + TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, push_remote_address, + strlen (expected_prefix_)); + TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_PIPES_STATS, event); + TEST_ASSERT_EQUAL_INT (i == 0 ? 0 : send_hwm, queue_stat[0]); + TEST_ASSERT_EQUAL_INT (0, queue_stat[1]); + free (push_local_address); + free (push_remote_address); + } + + // Close client and server + test_context_socket_close_zero_linger (push_mon); + test_context_socket_close_zero_linger (push); + for (int i = 0; i < pulls_count; ++i) + test_context_socket_close_zero_linger (pulls[i]); +} + +void test_monitor_versioned_stats_tcp_ipv4 () +{ + static const char prefix[] = "tcp://127.0.0.1:"; + test_monitor_versioned_stats (bind_loopback_ipv4, prefix); +} + +void test_monitor_versioned_stats_tcp_ipv6 () +{ + static const char prefix[] = "tcp://[::1]:"; + test_monitor_versioned_stats (bind_loopback_ipv6, prefix); +} + +void test_monitor_versioned_stats_ipc () +{ + static const char prefix[] = "ipc://"; + test_monitor_versioned_stats (bind_loopback_ipc, prefix); +} +#endif // ZMQ_EVENT_PIPES_STATS #endif int main () @@ -252,12 +387,18 @@ int main () RUN_TEST (test_monitor_invalid_protocol_fails); RUN_TEST (test_monitor_basic); -#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) || \ - (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) +#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ + || (defined ZMQ_CURRENT_EVENT_VERSION \ + && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) RUN_TEST (test_monitor_versioned_basic_tcp_ipv4); RUN_TEST (test_monitor_versioned_basic_tcp_ipv6); RUN_TEST (test_monitor_versioned_basic_ipc); RUN_TEST (test_monitor_versioned_basic_tipc); +#ifdef ZMQ_EVENT_PIPES_STATS + RUN_TEST (test_monitor_versioned_stats_tcp_ipv4); + RUN_TEST (test_monitor_versioned_stats_tcp_ipv6); + RUN_TEST (test_monitor_versioned_stats_ipc); +#endif #endif return UNITY_END (); diff --git a/tests/testutil_monitoring.hpp b/tests/testutil_monitoring.hpp index 3e03638e..03830d44 100644 --- a/tests/testutil_monitoring.hpp +++ b/tests/testutil_monitoring.hpp @@ -241,7 +241,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, zmq_msg_close (&msg); } - // Third frame in message contains local address + // Second-to-last frame in message contains local address zmq_msg_init (&msg); int res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1; assert (res != -1); @@ -256,7 +256,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_, } zmq_msg_close (&msg); - // Fourth and last frame in message contains remote address + // Last frame in message contains remote address zmq_msg_init (&msg); res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1; assert (res != -1);