diff --git a/doc/zmq_socket_monitor_versioned.txt b/doc/zmq_socket_monitor_versioned.txt index 72646100..f70c9a19 100644 --- a/doc/zmq_socket_monitor_versioned.txt +++ b/doc/zmq_socket_monitor_versioned.txt @@ -11,6 +11,8 @@ zmq_socket_monitor_versioned - monitor socket events SYNOPSIS -------- *int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');* +*int zmq_socket_monitor_versioned_typed ( + void '*socket', char '*endpoint', uint64_t 'events', int 'event_version', int 'type');* *int zmq_socket_monitor_pipes_stats (void '*socket');* @@ -56,6 +58,17 @@ connection uses a bound or connected local endpoint. Note that the format of the second and further frames, and also the number of frames, may be different for events added in the future. +The _zmq_socket_monitor_versioned_typed()_ is a generalisation of +_zmq_socket_monitor_versioned_ that supports more monitoring socket types. +The 'type' argument is used to specify the type of the monitoring socket. +Supported types are 'ZMQ_PAIR' (which is the equivalent of +_zmq_socket_monitor_versioned_), 'ZMQ_PUB' and 'ZMQ_PUSH'. Note that consumers +of the events will have to be compatible with the socket type, for instance a +monitoring socket of type 'ZMQ_PUB' will require consumers of type 'ZMQ_SUB'. +In the case that the monitoring socket type is of 'ZMQ_PUB', the multipart +message topic is the event number, thus consumers should subscribe to the +events they want to receive. + The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket. NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state. @@ -215,6 +228,20 @@ sockets are required to use the inproc:// transport. The monitor 'endpoint' supplied does not exist. +ERRORS - _zmq_socket_monitor_typed()_ +------------------------------- +*ETERM*:: +The 0MQ 'context' associated with the specified 'socket' was terminated. + +*EPROTONOSUPPORT*:: +The transport protocol of the monitor 'endpoint' is not supported. Monitor +sockets are required to use the inproc:// transport. + +*EINVAL*:: +The monitor 'endpoint' supplied does not exist or the specified socket 'type' +is not supported. + + ERRORS - _zmq_socket_monitor_pipes_stats()_ ------------------------------------------- *ENOTSOCK*:: diff --git a/include/zmq.h b/include/zmq.h index 1f768cc3..b614855a 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -735,6 +735,8 @@ ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_, const char *addr_, uint64_t events_, int event_version_); +ZMQ_EXPORT int zmq_socket_monitor_versioned_typed ( + void *s_, const char *addr_, uint64_t events_, int event_version_, int type_); ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s); #endif // ZMQ_BUILD_DRAFT_API diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2f6fd332..f1934fd2 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1639,7 +1639,8 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) int zmq::socket_base_t::monitor (const char *endpoint_, uint64_t events_, - int event_version_) + int event_version_, + int type_) { scoped_lock_t lock (_monitor_sync); @@ -1670,14 +1671,31 @@ int zmq::socket_base_t::monitor (const char *endpoint_, errno = EPROTONOSUPPORT; return -1; } + // already monitoring. Stop previous monitor before starting new one. if (_monitor_socket != NULL) { stop_monitor (true); } + + // Check if the specified socket type is supported. It must be a + // one-way socket types that support the SNDMORE flag. + switch (type_) { + case ZMQ_PAIR: + break; + case ZMQ_PUB: + break; + case ZMQ_PUSH: + break; + default: + errno = EINVAL; + return -1; + } + // Register events to monitor _monitor_events = events_; options.monitor_event_version = event_version_; - _monitor_socket = zmq_socket (get_ctx (), ZMQ_PAIR); + // Create a monitor socket of the specified type. + _monitor_socket = zmq_socket (get_ctx (), type_); if (_monitor_socket == NULL) return -1; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index ce6c57ed..881b8323 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -119,7 +119,10 @@ class socket_base_t : public own_t, void lock (); void unlock (); - int monitor (const char *endpoint_, uint64_t events_, int event_version_); + int monitor (const char *endpoint_, + uint64_t events_, + int event_version_, + int type_); void event_connected (const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_); diff --git a/src/zmq.cpp b/src/zmq.cpp index 9461788e..5bdef399 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -275,7 +275,7 @@ int zmq_socket_monitor_versioned (void *s_, zmq::socket_base_t *s = as_socket_base_t (s_); if (!s) return -1; - return s->monitor (addr_, events_, event_version_); + return s->monitor (addr_, events_, event_version_, ZMQ_PAIR); } int zmq_socket_monitor (void *s_, const char *addr_, int events_) @@ -283,6 +283,16 @@ int zmq_socket_monitor (void *s_, const char *addr_, int events_) return zmq_socket_monitor_versioned (s_, addr_, events_, 1); } +int zmq_socket_monitor_versioned_typed ( + void *s_, const char *addr_, uint64_t events_, int event_version_, int type_) +{ + zmq::socket_base_t *s = as_socket_base_t (s_); + if (!s) + return -1; + + return s->monitor (addr_, events_, event_version_, type_); +} + int zmq_join (void *s_, const char *group_) { zmq::socket_base_t *s = as_socket_base_t (s_); diff --git a/src/zmq_draft.h b/src/zmq_draft.h index ce00bb29..13f94562 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -133,6 +133,8 @@ int zmq_socket_monitor_versioned (void *s_, const char *addr_, uint64_t events_, int event_version_); +int zmq_socket_monitor_versioned_typed ( + void *s_, const char *addr_, uint64_t events_, int event_version_, int type_); int zmq_socket_monitor_pipes_stats (void *s_); #endif // ZMQ_BUILD_DRAFT_API diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 0aa9af0b..c4447742 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -126,8 +126,21 @@ void test_monitor_basic () #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ || (defined ZMQ_CURRENT_EVENT_VERSION \ && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) -void test_monitor_versioned_basic (bind_function_t bind_function_, - const char *expected_prefix_) +void test_monitor_versioned_typed_invalid_socket_type () +{ + void *client = test_context_socket (ZMQ_DEALER); + + // Socket monitoring only works with ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH. + TEST_ASSERT_FAILURE_ERRNO ( + EINVAL, zmq_socket_monitor_versioned_typed ( + client, "inproc://invalid-socket-type", 0, 2, ZMQ_CLIENT)); + + test_context_socket_close_zero_linger (client); +} + +void test_monitor_versioned_typed_basic (bind_function_t bind_function_, + const char *expected_prefix_, + int type_) { char server_endpoint[MAX_SOCKET_STRING]; @@ -136,14 +149,36 @@ void test_monitor_versioned_basic (bind_function_t bind_function_, void *server = test_context_socket (ZMQ_DEALER); // Monitor all events on client and server sockets - TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned ( - client, "inproc://monitor-client", ZMQ_EVENT_ALL_V2, 2)); - TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned ( - server, "inproc://monitor-server", ZMQ_EVENT_ALL_V2, 2)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned_typed ( + client, "inproc://monitor-client", ZMQ_EVENT_ALL_V2, 2, type_)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned_typed ( + server, "inproc://monitor-server", ZMQ_EVENT_ALL_V2, 2, type_)); + + // Choose the appropriate consumer socket type. + int mon_type; + switch (type_) { + case ZMQ_PAIR: + mon_type = ZMQ_PAIR; + break; + case ZMQ_PUSH: + mon_type = ZMQ_PULL; + break; + case ZMQ_PUB: + mon_type = ZMQ_SUB; + break; + } // Create two sockets for collecting monitor events - void *client_mon = test_context_socket (ZMQ_PAIR); - void *server_mon = test_context_socket (ZMQ_PAIR); + void *client_mon = test_context_socket (mon_type); + void *server_mon = test_context_socket (mon_type); + + // Additionally subscribe to all events if a PUB socket is used. + if (type_ == ZMQ_PUB) { + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (client_mon, ZMQ_SUBSCRIBE, "", 0)); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (server_mon, ZMQ_SUBSCRIBE, "", 0)); + } // Connect these to the inproc endpoints so they'll get events TEST_ASSERT_SUCCESS_ERRNO ( @@ -220,30 +255,44 @@ void test_monitor_versioned_basic (bind_function_t bind_function_, // TODO why does this use zero_linger? test_context_socket_close_zero_linger (client_mon); test_context_socket_close_zero_linger (server_mon); + + // Wait for the monitor socket's endpoint to be available + // for reuse. + msleep (SETTLE_TIME); } void test_monitor_versioned_basic_tcp_ipv4 () { static const char prefix[] = "tcp://127.0.0.1:"; - test_monitor_versioned_basic (bind_loopback_ipv4, prefix); + // Calling 'monitor_versioned_typed' with ZMQ_PAIR is the equivalent of + // calling 'monitor_versioned'. + test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PAIR); + test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PUB); + test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PUSH); } void test_monitor_versioned_basic_tcp_ipv6 () { static const char prefix[] = "tcp://[::1]:"; - test_monitor_versioned_basic (bind_loopback_ipv6, prefix); + test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PAIR); + test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PUB); + test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PUSH); } void test_monitor_versioned_basic_ipc () { static const char prefix[] = "ipc://"; - test_monitor_versioned_basic (bind_loopback_ipc, prefix); + test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PAIR); + test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PUB); + test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PUSH); } void test_monitor_versioned_basic_tipc () { static const char prefix[] = "tipc://"; - test_monitor_versioned_basic (bind_loopback_tipc, prefix); + test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PAIR); + test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PUB); + test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PUSH); } #ifdef ZMQ_EVENT_PIPES_STATS @@ -385,6 +434,7 @@ int main () #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ || (defined ZMQ_CURRENT_EVENT_VERSION \ && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) + RUN_TEST (test_monitor_versioned_typed_invalid_socket_type); RUN_TEST (test_monitor_versioned_basic_tcp_ipv4); RUN_TEST (test_monitor_versioned_basic_tcp_ipv6); RUN_TEST (test_monitor_versioned_basic_ipc);