From 06cce1547996c0aa42038d0eccfbf63efe18f0bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Sun, 20 May 2012 18:16:23 +0100 Subject: [PATCH 1/7] Change zmq_monitor_fn type to cast between pointer-to-object and pointer-to-function in a more standards compliant way --- doc/zmq_setsockopt.txt | 2 +- include/zmq.h | 5 ++++- src/socket_base.cpp | 2 +- tests/test_monitor.cpp | 16 ++++++++-------- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 2bbb7590..e291de35 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -434,7 +434,7 @@ Applicable socket types:: all listening sockets, when using TCP transports. ZMQ_MONITOR: Registers a callback for socket state changes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Registers a callback function / event sink for changes in underlying socket state. -Expected signature is `void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data)` +Expected signature of function member of zmq_monitor_fn union is `void (*function)(void *s, int event, zmq_event_data_t *data);` To remove the callback function call `zmq_setsockopt(socket, ZMQ_MONITOR, NULL, 0)` The default value of `NULL` means no monitor callback function. Supported events are : diff --git a/include/zmq.h b/include/zmq.h index 8d556057..59cac8df 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -300,7 +300,10 @@ typedef union { } zmq_event_data_t; /* Callback template for socket state changes */ -typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data); +typedef union { + void *object; + void (*function)(void *s, int event, zmq_event_data_t *data); +} zmq_monitor_fn; ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT int zmq_close (void *s); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b095f2b5..15340d15 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1030,7 +1030,7 @@ void zmq::socket_base_t::monitor_event (int event_, ...) default: zmq_assert (false); } - options.monitor ((void *)this, event_, &data); + options.monitor->function ((void *)this, event_, &data); va_end (args); } } diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 28197e15..5f31ebdd 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -99,20 +99,20 @@ int main (int argc, char *argv []) assert (rep); // Expects failure - invalid size - zmq_monitor_fn *monitor; - monitor = listening_sock_monitor; + zmq_monitor_fn monitor; + monitor.function = listening_sock_monitor; - rc = zmq_setsockopt (rep, ZMQ_MONITOR, *(void **)&monitor, 20); + rc = zmq_setsockopt (rep, ZMQ_MONITOR, &monitor, 20); assert (rc == -1); assert (errno == EINVAL); - rc = zmq_setsockopt (rep, ZMQ_MONITOR, *(void **)&monitor, sizeof (void *)); + rc = zmq_setsockopt (rep, ZMQ_MONITOR, &monitor, sizeof (void *)); assert (rc == 0); size_t sz = sizeof (void *); rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz); assert (rc == 0); - assert (monitor == listening_sock_monitor); + assert (monitor.function == listening_sock_monitor); // Remove socket monitor callback rc = zmq_setsockopt (rep, ZMQ_MONITOR, NULL, 0); @@ -120,7 +120,7 @@ int main (int argc, char *argv []) rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz); assert (rc == 0); - assert (monitor == listening_sock_monitor); + assert (monitor.function == listening_sock_monitor); rc = zmq_bind (rep, "tcp://127.0.0.1:5560"); assert (rc == 0); @@ -128,8 +128,8 @@ int main (int argc, char *argv []) void *req = zmq_socket (ctx, ZMQ_REQ); assert (req); - monitor = connecting_sock_monitor; - rc = zmq_setsockopt (req, ZMQ_MONITOR, *(void **)&monitor, sizeof (void *)); + monitor.function = connecting_sock_monitor; + rc = zmq_setsockopt (req, ZMQ_MONITOR, &monitor, sizeof (void *)); assert (rc == 0); rc = zmq_connect (req, "tcp://127.0.0.1:5560"); From e13b3723b8246b0526298698f9070d077b3322e3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Sun, 20 May 2012 18:27:59 +0100 Subject: [PATCH 2/7] Rename type zmq_monitor_fn -> zmq_monitor for a more natural callback definition API (zmq_monitor type, monitor.function callback) --- doc/zmq_getsockopt.txt | 2 +- doc/zmq_setsockopt.txt | 4 ++-- include/zmq.h | 2 +- src/options.cpp | 6 +++--- src/options.hpp | 4 ++-- tests/test_monitor.cpp | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index e7ccd4a9..a2258411 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -461,7 +461,7 @@ Registers a callback function / event sink for changes in underlying socket stat The default value of `NULL` means no monitor callback function. [horizontal] -Option value type:: zmq_monitor_fn +Option value type:: zmq_monitor Option value unit:: N/A Default value:: no callback function Applicable socket types:: all diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index e291de35..d4a48ea7 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -434,7 +434,7 @@ Applicable socket types:: all listening sockets, when using TCP transports. ZMQ_MONITOR: Registers a callback for socket state changes ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Registers a callback function / event sink for changes in underlying socket state. -Expected signature of function member of zmq_monitor_fn union is `void (*function)(void *s, int event, zmq_event_data_t *data);` +Expected signature of function member of zmq_monitor union is `void (*function)(void *s, int event, zmq_event_data_t *data);` To remove the callback function call `zmq_setsockopt(socket, ZMQ_MONITOR, NULL, 0)` The default value of `NULL` means no monitor callback function. Supported events are : @@ -466,7 +466,7 @@ callback function as severe latency there will block the socket's application th Only tcp and ipc specific transport events are supported in this initial implementation. [horizontal] -Option value type:: zmq_monitor_fn +Option value type:: zmq_monitor Option value unit:: N/A Default value:: no callback function Applicable socket types:: all diff --git a/include/zmq.h b/include/zmq.h index 59cac8df..170bf70a 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -303,7 +303,7 @@ typedef union { typedef union { void *object; void (*function)(void *s, int event, zmq_event_data_t *data); -} zmq_monitor_fn; +} zmq_monitor; ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT int zmq_close (void *s); diff --git a/src/options.cpp b/src/options.cpp index d24834c1..82785e91 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -325,7 +325,7 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, errno = EINVAL; return -1; } - monitor = *((zmq_monitor_fn**) &optval_); + monitor = *((zmq_monitor**) &optval_); return 0; } } @@ -550,8 +550,8 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) errno = EINVAL; return -1; } - *((zmq_monitor_fn**) &optval_) = monitor; - *optvallen_ = sizeof (zmq_monitor_fn*); + *((zmq_monitor**) &optval_) = monitor; + *optvallen_ = sizeof (zmq_monitor*); return 0; } diff --git a/src/options.hpp b/src/options.hpp index c1248c76..8ae134f8 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -125,8 +125,8 @@ namespace zmq typedef std::vector tcp_accept_filters_t; tcp_accept_filters_t tcp_accept_filters; - // Connection and exceptional state callback function - zmq_monitor_fn *monitor; + // Connection and exceptional state callback + zmq_monitor *monitor; // ID of the socket. int socket_id; diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 5f31ebdd..049b1a9b 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -99,7 +99,7 @@ int main (int argc, char *argv []) assert (rep); // Expects failure - invalid size - zmq_monitor_fn monitor; + zmq_monitor monitor; monitor.function = listening_sock_monitor; rc = zmq_setsockopt (rep, ZMQ_MONITOR, &monitor, 20); From 4767159f39001b7f5a2cc6b9d63a548a57c286d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Mon, 21 May 2012 20:47:11 +0100 Subject: [PATCH 3/7] Initial stab at a context level monitor callback and registration API --- include/zmq.h | 24 ++++++------- src/ctx.cpp | 65 +++++++++++++++++++++++++++++++++++- src/ctx.hpp | 8 +++++ src/options.cpp | 24 ------------- src/options.hpp | 3 -- src/socket_base.cpp | 56 +++---------------------------- src/zmq.cpp | 8 +++++ tests/test_monitor.cpp | 76 +++++++++++++----------------------------- 8 files changed, 118 insertions(+), 146 deletions(-) diff --git a/include/zmq.h b/include/zmq.h index 170bf70a..b3146b20 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_ACCEPT_FILTER 38 -#define ZMQ_MONITOR 39 /* Message options */ #define ZMQ_MORE 1 @@ -243,17 +242,17 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); /* Socket transport events (tcp and ipc only) */ #define ZMQ_EVENT_CONNECTED 1 #define ZMQ_EVENT_CONNECT_DELAYED 2 -#define ZMQ_EVENT_CONNECT_RETRIED 3 +#define ZMQ_EVENT_CONNECT_RETRIED 4 -#define ZMQ_EVENT_LISTENING 4 -#define ZMQ_EVENT_BIND_FAILED 5 +#define ZMQ_EVENT_LISTENING 8 +#define ZMQ_EVENT_BIND_FAILED 16 -#define ZMQ_EVENT_ACCEPTED 6 -#define ZMQ_EVENT_ACCEPT_FAILED 7 +#define ZMQ_EVENT_ACCEPTED 32 +#define ZMQ_EVENT_ACCEPT_FAILED 64 -#define ZMQ_EVENT_CLOSED 8 -#define ZMQ_EVENT_CLOSE_FAILED 9 -#define ZMQ_EVENT_DISCONNECTED 10 +#define ZMQ_EVENT_CLOSED 128 +#define ZMQ_EVENT_CLOSE_FAILED 256 +#define ZMQ_EVENT_DISCONNECTED 512 /* Socket event data (union member per event) */ typedef union { @@ -300,10 +299,9 @@ typedef union { } zmq_event_data_t; /* Callback template for socket state changes */ -typedef union { - void *object; - void (*function)(void *s, int event, zmq_event_data_t *data); -} zmq_monitor; +typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data); + +ZMQ_EXPORT int zmq_monitor (void *context, zmq_monitor_fn *monitor); ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT int zmq_close (void *s); diff --git a/src/ctx.cpp b/src/ctx.cpp index 520d2042..6c06ca24 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -45,7 +45,8 @@ zmq::ctx_t::ctx_t () : slot_count (0), slots (NULL), max_sockets (ZMQ_MAX_SOCKETS_DFLT), - io_thread_count (ZMQ_IO_THREADS_DFLT) + io_thread_count (ZMQ_IO_THREADS_DFLT), + monitor_fn (NULL) { } @@ -125,6 +126,12 @@ int zmq::ctx_t::terminate () return 0; } +int zmq::ctx_t::monitor (zmq_monitor_fn *monitor_) +{ + monitor_fn = monitor_; + return 0; +} + int zmq::ctx_t::set (int option_, int optval_) { int rc = 0; @@ -346,6 +353,62 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) return endpoint; } +void zmq::ctx_t::monitor_event (zmq::socket_base_t *socket_, int event_, ...) +{ + if (monitor_fn != NULL) { + va_list args; + zmq_event_data_t data; + memset(&data, 0, sizeof (zmq_event_data_t)); + va_start (args, event_); + switch (event_) { + case ZMQ_EVENT_CONNECTED: + data.connected.addr = va_arg (args, char*); + data.connected.fd = va_arg (args, int); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + data.connect_delayed.addr = va_arg (args, char*); + data.connect_delayed.err = va_arg (args, int); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + data.connect_retried.addr = va_arg (args, char*); + data.connect_retried.interval = va_arg (args, int); + break; + case ZMQ_EVENT_LISTENING: + data.listening.addr = va_arg (args, char*); + data.listening.fd = va_arg (args, int); + break; + case ZMQ_EVENT_BIND_FAILED: + data.bind_failed.addr = va_arg (args, char*); + data.bind_failed.err = va_arg (args, int); + break; + case ZMQ_EVENT_ACCEPTED: + data.accepted.addr = va_arg (args, char*); + data.accepted.fd = va_arg (args, int); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + data.accept_failed.addr = va_arg (args, char*); + data.accept_failed.err = va_arg (args, int); + break; + case ZMQ_EVENT_CLOSED: + data.closed.addr = va_arg (args, char*); + data.closed.fd = va_arg (args, int); + break; + case ZMQ_EVENT_CLOSE_FAILED: + data.close_failed.addr = va_arg (args, char*); + data.close_failed.err = va_arg (args, int); + break; + case ZMQ_EVENT_DISCONNECTED: + data.disconnected.addr = va_arg (args, char*); + data.disconnected.fd = va_arg (args, int); + break; + default: + zmq_assert (false); + } + monitor_fn ((void *)socket_, event_, &data); + va_end (args); + } +} + // The last used socket ID, or 0 if no socket was used so far. Note that this // is a global variable. Thus, even sockets created in different contexts have // unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index dcc43f01..133a8b15 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -95,12 +95,17 @@ namespace zmq void unregister_endpoints (zmq::socket_base_t *socket_); endpoint_t find_endpoint (const char *addr_); + // Monitoring specific + int monitor (zmq_monitor_fn *monitor_); + void monitor_event (zmq::socket_base_t *socket_, int event_, ...); + enum { term_tid = 0, reaper_tid = 1 }; ~ctx_t (); + private: @@ -163,6 +168,9 @@ namespace zmq // Synchronisation of access to context options. mutex_t opt_sync; + // Monitoring callback + zmq_monitor_fn *monitor_fn; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; diff --git a/src/options.cpp b/src/options.cpp index 82785e91..fdd5299d 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -53,7 +53,6 @@ zmq::options_t::options_t () : tcp_keepalive_cnt (-1), tcp_keepalive_idle (-1), tcp_keepalive_intvl (-1), - monitor (NULL), socket_id (0) { } @@ -314,20 +313,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; } } - - case ZMQ_MONITOR: - { - if (optvallen_ == 0 && optval_ == NULL) { - monitor = NULL; - return 0; - } - if (optvallen_ != sizeof (void *)) { - errno = EINVAL; - return -1; - } - monitor = *((zmq_monitor**) &optval_); - return 0; - } } errno = EINVAL; return -1; @@ -544,15 +529,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) memcpy (optval_, last_endpoint.c_str(), last_endpoint.size()+1); *optvallen_ = last_endpoint.size()+1; return 0; - - case ZMQ_MONITOR: - if (*optvallen_ < sizeof (void *)) { - errno = EINVAL; - return -1; - } - *((zmq_monitor**) &optval_) = monitor; - *optvallen_ = sizeof (zmq_monitor*); - return 0; } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 8ae134f8..2f6e6283 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -125,9 +125,6 @@ namespace zmq typedef std::vector tcp_accept_filters_t; tcp_accept_filters_t tcp_accept_filters; - // Connection and exceptional state callback - zmq_monitor *monitor; - // ID of the socket. int socket_id; }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 15340d15..033e4d7d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -981,56 +981,8 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) void zmq::socket_base_t::monitor_event (int event_, ...) { - if (options.monitor != NULL) { - va_list args; - zmq_event_data_t data; - memset(&data, 0, sizeof (zmq_event_data_t)); - va_start (args, event_); - switch (event_) { - case ZMQ_EVENT_CONNECTED: - data.connected.addr = va_arg (args, char*); - data.connected.fd = va_arg (args, int); - break; - case ZMQ_EVENT_CONNECT_DELAYED: - data.connect_delayed.addr = va_arg (args, char*); - data.connect_delayed.err = va_arg (args, int); - break; - case ZMQ_EVENT_CONNECT_RETRIED: - data.connect_retried.addr = va_arg (args, char*); - data.connect_retried.interval = va_arg (args, int); - break; - case ZMQ_EVENT_LISTENING: - data.listening.addr = va_arg (args, char*); - data.listening.fd = va_arg (args, int); - break; - case ZMQ_EVENT_BIND_FAILED: - data.bind_failed.addr = va_arg (args, char*); - data.bind_failed.err = va_arg (args, int); - break; - case ZMQ_EVENT_ACCEPTED: - data.accepted.addr = va_arg (args, char*); - data.accepted.fd = va_arg (args, int); - break; - case ZMQ_EVENT_ACCEPT_FAILED: - data.accept_failed.addr = va_arg (args, char*); - data.accept_failed.err = va_arg (args, int); - break; - case ZMQ_EVENT_CLOSED: - data.closed.addr = va_arg (args, char*); - data.closed.fd = va_arg (args, int); - break; - case ZMQ_EVENT_CLOSE_FAILED: - data.close_failed.addr = va_arg (args, char*); - data.close_failed.err = va_arg (args, int); - break; - case ZMQ_EVENT_DISCONNECTED: - data.disconnected.addr = va_arg (args, char*); - data.disconnected.fd = va_arg (args, int); - break; - default: - zmq_assert (false); - } - options.monitor->function ((void *)this, event_, &data); - va_end (args); - } + va_list args; + va_start (args, event_); + get_ctx ()->monitor_event (this, event_, args); + va_end (args); } diff --git a/src/zmq.cpp b/src/zmq.cpp index 745dbf55..2f307c4a 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -205,6 +205,14 @@ int zmq_ctx_get (void *ctx_, int option_) return ((zmq::ctx_t*) ctx_)->get (option_); } +int zmq_monitor (void *ctx_, zmq_monitor_fn *monitor_) +{ + if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + errno = EFAULT; + return -1; + } + return ((zmq::ctx_t*) ctx_)->monitor (monitor_); +} // Stable/legacy context API diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 049b1a9b..4b50d904 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -25,61 +25,50 @@ #include "../include/zmq.h" #include "../include/zmq_utils.h" -void listening_sock_monitor (void *s, int event_, zmq_event_data_t *data_) +static int events; + +void socket_monitor (void *s, int event_, zmq_event_data_t *data_) { const char *addr = "tcp://127.0.0.1:5560"; // Only some of the exceptional events could fire switch (event_) { + // listener specific case ZMQ_EVENT_LISTENING: assert (data_->listening.fd > 0); assert (memcmp (data_->listening.addr, addr, 22)); + events |= ZMQ_EVENT_LISTENING; break; case ZMQ_EVENT_ACCEPTED: assert (data_->accepted.fd > 0); assert (memcmp (data_->accepted.addr, addr, 22)); + events |= ZMQ_EVENT_ACCEPTED; break; - case ZMQ_EVENT_CLOSE_FAILED: - assert (data_->close_failed.err != 0); - assert (memcmp (data_->close_failed.addr, addr, 22)); - break; - case ZMQ_EVENT_CLOSED: - assert (data_->closed.fd != 0); - assert (memcmp (data_->closed.addr, addr, 22)); - break; - case ZMQ_EVENT_DISCONNECTED: - assert (data_->disconnected.fd != 0); - assert (memcmp (data_->disconnected.addr, addr, 22)); - break; - default: - // out of band / unexpected event - assert (0); - } -} - -void connecting_sock_monitor (void *s, int event_, zmq_event_data_t *data_) -{ - const char *addr = "tcp://127.0.0.1:5560"; - // Only some of the exceptional events could fire - switch (event_) { + // connecter specific case ZMQ_EVENT_CONNECTED: assert (data_->connected.fd > 0); assert (memcmp (data_->connected.addr, addr, 22)); + events |= ZMQ_EVENT_CONNECTED; break; case ZMQ_EVENT_CONNECT_DELAYED: assert (data_->connect_delayed.err != 0); assert (memcmp (data_->connect_delayed.addr, addr, 22)); + events |= ZMQ_EVENT_CONNECT_DELAYED; break; + // generic - either end of the socket case ZMQ_EVENT_CLOSE_FAILED: assert (data_->close_failed.err != 0); assert (memcmp (data_->close_failed.addr, addr, 22)); + events |= ZMQ_EVENT_CLOSE_FAILED; break; case ZMQ_EVENT_CLOSED: assert (data_->closed.fd != 0); assert (memcmp (data_->closed.addr, addr, 22)); + events |= ZMQ_EVENT_CLOSED; break; case ZMQ_EVENT_DISCONNECTED: assert (data_->disconnected.fd != 0); assert (memcmp (data_->disconnected.addr, addr, 22)); + events |= ZMQ_EVENT_DISCONNECTED; break; default: // out of band / unexpected event @@ -94,44 +83,18 @@ int main (int argc, char *argv []) // Create the infrastructure void *ctx = zmq_init (1); assert (ctx); - + // set socket monitor + rc = zmq_monitor (ctx, socket_monitor); + assert (rc == 0); void *rep = zmq_socket (ctx, ZMQ_REP); assert (rep); - // Expects failure - invalid size - zmq_monitor monitor; - monitor.function = listening_sock_monitor; - - rc = zmq_setsockopt (rep, ZMQ_MONITOR, &monitor, 20); - assert (rc == -1); - assert (errno == EINVAL); - - rc = zmq_setsockopt (rep, ZMQ_MONITOR, &monitor, sizeof (void *)); - assert (rc == 0); - - size_t sz = sizeof (void *); - rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz); - assert (rc == 0); - assert (monitor.function == listening_sock_monitor); - - // Remove socket monitor callback - rc = zmq_setsockopt (rep, ZMQ_MONITOR, NULL, 0); - assert (rc == 0); - - rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz); - assert (rc == 0); - assert (monitor.function == listening_sock_monitor); - rc = zmq_bind (rep, "tcp://127.0.0.1:5560"); assert (rc == 0); void *req = zmq_socket (ctx, ZMQ_REQ); assert (req); - monitor.function = connecting_sock_monitor; - rc = zmq_setsockopt (req, ZMQ_MONITOR, &monitor, sizeof (void *)); - assert (rc == 0); - rc = zmq_connect (req, "tcp://127.0.0.1:5560"); assert (rc == 0); @@ -151,5 +114,12 @@ int main (int argc, char *argv []) zmq_sleep (1); zmq_term (ctx); + + // We expect to at least observe these events + assert (events & ZMQ_EVENT_LISTENING); + assert (events & ZMQ_EVENT_ACCEPTED); + assert (events & ZMQ_EVENT_CONNECTED); + assert (events & ZMQ_EVENT_CLOSED); + return 0 ; } \ No newline at end of file From f27c02d01e323c2d5d2d3d889f5e8d5a7cbff73d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Mon, 21 May 2012 21:22:16 +0100 Subject: [PATCH 4/7] Change context monitor_event prototype to accept a va_list instead --- src/ctx.cpp | 45 +++++++++++++++++++++------------------------ src/ctx.hpp | 2 +- 2 files changed, 22 insertions(+), 25 deletions(-) diff --git a/src/ctx.cpp b/src/ctx.cpp index 6c06ca24..1dda8538 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -353,59 +353,56 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) return endpoint; } -void zmq::ctx_t::monitor_event (zmq::socket_base_t *socket_, int event_, ...) +void zmq::ctx_t::monitor_event (zmq::socket_base_t *socket_, int event_, va_list args_) { if (monitor_fn != NULL) { - va_list args; zmq_event_data_t data; memset(&data, 0, sizeof (zmq_event_data_t)); - va_start (args, event_); switch (event_) { case ZMQ_EVENT_CONNECTED: - data.connected.addr = va_arg (args, char*); - data.connected.fd = va_arg (args, int); + data.connected.addr = va_arg (args_, char*); + data.connected.fd = va_arg (args_, int); break; case ZMQ_EVENT_CONNECT_DELAYED: - data.connect_delayed.addr = va_arg (args, char*); - data.connect_delayed.err = va_arg (args, int); + data.connect_delayed.addr = va_arg (args_, char*); + data.connect_delayed.err = va_arg (args_, int); break; case ZMQ_EVENT_CONNECT_RETRIED: - data.connect_retried.addr = va_arg (args, char*); - data.connect_retried.interval = va_arg (args, int); + data.connect_retried.addr = va_arg (args_, char*); + data.connect_retried.interval = va_arg (args_, int); break; case ZMQ_EVENT_LISTENING: - data.listening.addr = va_arg (args, char*); - data.listening.fd = va_arg (args, int); + data.listening.addr = va_arg (args_, char*); + data.listening.fd = va_arg (args_, int); break; case ZMQ_EVENT_BIND_FAILED: - data.bind_failed.addr = va_arg (args, char*); - data.bind_failed.err = va_arg (args, int); + data.bind_failed.addr = va_arg (args_, char*); + data.bind_failed.err = va_arg (args_, int); break; case ZMQ_EVENT_ACCEPTED: - data.accepted.addr = va_arg (args, char*); - data.accepted.fd = va_arg (args, int); + data.accepted.addr = va_arg (args_, char*); + data.accepted.fd = va_arg (args_, int); break; case ZMQ_EVENT_ACCEPT_FAILED: - data.accept_failed.addr = va_arg (args, char*); - data.accept_failed.err = va_arg (args, int); + data.accept_failed.addr = va_arg (args_, char*); + data.accept_failed.err = va_arg (args_, int); break; case ZMQ_EVENT_CLOSED: - data.closed.addr = va_arg (args, char*); - data.closed.fd = va_arg (args, int); + data.closed.addr = va_arg (args_, char*); + data.closed.fd = va_arg (args_, int); break; case ZMQ_EVENT_CLOSE_FAILED: - data.close_failed.addr = va_arg (args, char*); - data.close_failed.err = va_arg (args, int); + data.close_failed.addr = va_arg (args_, char*); + data.close_failed.err = va_arg (args_, int); break; case ZMQ_EVENT_DISCONNECTED: - data.disconnected.addr = va_arg (args, char*); - data.disconnected.fd = va_arg (args, int); + data.disconnected.addr = va_arg (args_, char*); + data.disconnected.fd = va_arg (args_, int); break; default: zmq_assert (false); } monitor_fn ((void *)socket_, event_, &data); - va_end (args); } } diff --git a/src/ctx.hpp b/src/ctx.hpp index 133a8b15..ab5bf897 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -97,7 +97,7 @@ namespace zmq // Monitoring specific int monitor (zmq_monitor_fn *monitor_); - void monitor_event (zmq::socket_base_t *socket_, int event_, ...); + void monitor_event (zmq::socket_base_t *socket_, int event_, va_list args_); enum { term_tid = 0, From 04f0e7f26e18a2802d8683fc12ef8a67197fda6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Tue, 22 May 2012 20:08:02 +0100 Subject: [PATCH 5/7] Documentation for zmq_monitor --- doc/Makefile.am | 2 +- doc/zmq.txt | 3 + doc/zmq_getsockopt.txt | 11 -- doc/zmq_monitor.txt | 221 +++++++++++++++++++++++++++++++++++++++++ doc/zmq_setsockopt.txt | 40 -------- 5 files changed, 225 insertions(+), 52 deletions(-) create mode 100644 doc/zmq_monitor.txt diff --git a/doc/Makefile.am b/doc/Makefile.am index eba0f998..58c84aa7 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -1,6 +1,6 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 \ zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \ - zmq_init.3 zmq_term.3 \ + zmq_init.3 zmq_term.3 zmq_monitor.3\ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_send.3 zmq_msg_recv.3 \ diff --git a/doc/zmq.txt b/doc/zmq.txt index 4dfe0d1f..a2635874 100644 --- a/doc/zmq.txt +++ b/doc/zmq.txt @@ -44,6 +44,9 @@ Work with context properties:: Destroy a 0MQ context:: linkzmq:zmq_ctx_destroy[3] +Monitor a 0MQ context:: + linkzmq:zmq_monitor[3] + These deprecated functions let you create and destroy 'contexts': Initialise 0MQ context:: diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index a2258411..0eae8d91 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -455,17 +455,6 @@ Option value unit:: -1,>0 Default value:: -1 (leave to OS default) Applicable socket types:: all, when using TCP transports. -ZMQ_MONITOR: Registers a callback for socket state changes -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Registers a callback function / event sink for changes in underlying socket state. -The default value of `NULL` means no monitor callback function. - -[horizontal] -Option value type:: zmq_monitor -Option value unit:: N/A -Default value:: no callback function -Applicable socket types:: all - RETURN VALUE ------------ The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it diff --git a/doc/zmq_monitor.txt b/doc/zmq_monitor.txt new file mode 100644 index 00000000..1570f5ba --- /dev/null +++ b/doc/zmq_monitor.txt @@ -0,0 +1,221 @@ +zmq_monitor(3) +============== + + +NAME +---- + +zmq_monitor - register a monitoring callback + + +SYNOPSIS +-------- +*int zmq_monitor (void '*context', zmq_monitor_fn '*monitor');* + + +DESCRIPTION +----------- +The _zmq_monitor()_ function shall register a callback function specified by +the 'monitor' argument. This is an event sink for changes in per socket +connection and mailbox (work in progress) states. + +.The _zmq_monitor()_ callback function is expected to have this prototype: +---- +typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data); +---- + +The callback is global (per context), with the socket that triggered the event +passed to the handler as well. Each event also populates a 'zmq_event_data_t' +union with additional metadata which can be used for correlation. + +CAUTION: _zmq_monitor()_ is intended for monitoring infrastructure / operations +concerns only - NOT BUSINESS LOGIC. An event is a representation of something +that happened - you cannot change the past, but only react to them. The +implementation is also only concerned with a single session. No state of peers, +other sessions etc. are tracked - this will only pollute internals and is the +responsibility of application authors to either implement or correlate in +another datastore. Monitor events are exceptional conditions and are thus not +directly in the messaging critical path. However, still be careful with what +you're doing in the callback function as excess time spent in the handler will +block the socket's application thread. + +Only tcp and ipc specific transport events are supported in this initial +implementation. + +Supported events are: + + +ZMQ_EVENT_CONNECTED: connection established +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established +to a remote peer. This can happen either synchronous or asynchronous. + +.Callback metadata: +---- +data.connected.addr // peer address +data.connected.fd // socket descriptor +---- + +ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection +attempt is delayed and it's completion's being polled for. + +.Callback metadata: +---- +data.connect_delayed.addr // peer address +data.connect_delayed.err // errno +---- + +ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt +is being handled by reconnect timer. The reconnect interval's recomputed +for each attempt. + +.Callback metadata: +---- +data.connect_retried.addr // peer address +data.connect_retried.interval // computed reconnect interval +---- + +ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound +to a an interface. + +.Callback metadata: +---- +data.listening.addr // listen address +data.listening.fd // socket descriptor +---- + +ZMQ_EVENT_BIND_FAILED: socket could not bind to an address +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to +a given interface. + +.Callback metadata: +---- +data.bind_failed.addr // listen address +data.bind_failed.err // errno +---- + +ZMQ_EVENT_ACCEPTED: connection accepted to bound interface +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer +has been established with a socket's listen address. + +.Callback metadata: +---- +data.accepted.addr // listen address +data.accepted.fd // socket descriptor +---- + +ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to +a socket's bound address fails. + +.Callback metadata: +---- +data.accept_failed.addr // listen address +data.accept_failed.err // errno +---- + +ZMQ_EVENT_CLOSED: connection closed +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor +has been closed. + +.Callback metadata: +---- +data.closed.addr // address +data.closed.fd // socket descriptor +---- + +ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be +released back to the OS. + +.Callback metadata: +---- +data.close_failed.addr // address +data.close_failed.err // errno +---- + +ZMQ_EVENT_DISCONNECTED: broken session +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc +specific) detects a corrupted / broken session. + +.Callback metadata: +---- +data.disconnected.addr // address +data.disconnected.fd // socket descriptor +---- + +RETURN VALUE +------------ +The _zmq_monitor()_ function returns a value of 0 or greater if successful. +Otherwise it returns `-1` and sets 'errno' to one of the values defined +below. + + +ERRORS +------ +*EINVAL*:: +The requested callback function _monitor_ is invalid. + + +EXAMPLE +------- +.Observing a 'PUB' socket's connection state +---- +void socket_monitor (void *s, int event_, zmq_event_data_t *data_) +{ + switch (event_) { + case ZMQ_EVENT_LISTENING: + printf ("Socket bound to %s, socket descriptor is %d\n", + data.listening.addr, data.listening.fd); + break; + case ZMQ_EVENT_ACCEPTED: + printf ("Accepted connection to %s, socket descriptor is %d\n", + data.accepted.addr, data.accepted.fd); + break; + } +} + +void *context = zmq_ctx_new (); +int rc = zmq_monitor (context, socket_monitor); +assert (rc == 0); +void *pub = zmq_socket (context, ZMQ_PUB); +assert (pub); +void *sub = zmq_socket (context, ZMQ_SUB); +assert (pub); +rc = zmq_bind (pub, "tcp://127.0.0.1:5560"); +assert (rc == 0); +rc = zmq_connect (sub, "tcp://127.0.0.1:5560"); +assert (rc == 0); + +// Allow a window for socket events as connect can be async +zmq_sleep (1); + +rc = zmq_close (pub); +assert (rc == 0); +rc = zmq_close (sub); +assert (rc == 0); + +zmq_term (context); +---- + + +SEE ALSO +-------- +linkzmq:zmq[7] + + +AUTHORS +------- +This 0MQ manual page was written by Lourens Naudé \ No newline at end of file diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index d4a48ea7..fee30cc8 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -431,46 +431,6 @@ Default value:: no filters (allow from all) Applicable socket types:: all listening sockets, when using TCP transports. -ZMQ_MONITOR: Registers a callback for socket state changes -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -Registers a callback function / event sink for changes in underlying socket state. -Expected signature of function member of zmq_monitor union is `void (*function)(void *s, int event, zmq_event_data_t *data);` -To remove the callback function call `zmq_setsockopt(socket, ZMQ_MONITOR, NULL, 0)` -The default value of `NULL` means no monitor callback function. -Supported events are : - -* 'ZMQ_EVENT_CONNECTED' - connection established -* 'ZMQ_EVENT_CONNECT_DELAYED' - connection could not be established synchronously, it's being polled -* 'ZMQ_EVENT_CONNECT_RETRIED' - asynchronous connect / reconnection attempt - -* 'ZMQ_EVENT_LISTENING' - socket bound to an address, ready to accept connections -* 'ZMQ_EVENT_BIND_FAILED' - socket couldn't bind to an address - -* 'ZMQ_EVENT_ACCEPTED' - connection accepted to bound interface -* 'ZMQ_EVENT_ACCEPT_FAILED' - could not accept client connection - -* 'ZMQ_EVENT_CLOSED' - connection closed -* 'ZMQ_EVENT_CLOSE_FAILED' - connection couldn't be closed -* 'ZMQ_EVENT_DISCONNECTED' - broken session - -See `zmq_event_data_t` and `ZMQ_EVENT_*` constants in zmq.h for event specific data (third argument to callback). - -Please note that both events and their context data aren't stable contracts. The 'ZMQ_MONITOR' socket option is -intended for monitoring infrastructure / operations concerns only - NOT BUSINESS LOGIC. An event is a representation -of something that happened - you cannot change the past, but only react to them. The implementation also only concerned -with a single session. No state of peers, other sessions etc. are tracked - this will only pollute internals and is the -responsibility of application authors to either implement or correlate in another datastore. Monitor events are exceptional -conditions and are thus not directly in the messaging critical path. However, still be careful with what you're doing in the -callback function as severe latency there will block the socket's application thread. - -Only tcp and ipc specific transport events are supported in this initial implementation. - -[horizontal] -Option value type:: zmq_monitor -Option value unit:: N/A -Default value:: no callback function -Applicable socket types:: all - RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it From 991b7fcc04de7d694dc54430c43f6bb01494086a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Tue, 22 May 2012 20:15:18 +0100 Subject: [PATCH 6/7] Rename zmq_monitor to zmq_ctx_set_monitor for compat with existing context specific APIs --- doc/Makefile.am | 2 +- doc/zmq.txt | 2 +- doc/zmq_monitor.txt | 18 +++++++++--------- include/zmq.h | 2 +- src/zmq.cpp | 2 +- tests/test_monitor.cpp | 2 +- 6 files changed, 14 insertions(+), 14 deletions(-) diff --git a/doc/Makefile.am b/doc/Makefile.am index 58c84aa7..eb75e62d 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -1,6 +1,6 @@ MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 \ zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \ - zmq_init.3 zmq_term.3 zmq_monitor.3\ + zmq_init.3 zmq_term.3 zmq_ctx_set_monitor.3\ zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \ zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \ zmq_msg_send.3 zmq_msg_recv.3 \ diff --git a/doc/zmq.txt b/doc/zmq.txt index a2635874..5370e7de 100644 --- a/doc/zmq.txt +++ b/doc/zmq.txt @@ -45,7 +45,7 @@ Destroy a 0MQ context:: linkzmq:zmq_ctx_destroy[3] Monitor a 0MQ context:: - linkzmq:zmq_monitor[3] + linkzmq:zmq_ctx_set_monitor[3] These deprecated functions let you create and destroy 'contexts': diff --git a/doc/zmq_monitor.txt b/doc/zmq_monitor.txt index 1570f5ba..c72fe921 100644 --- a/doc/zmq_monitor.txt +++ b/doc/zmq_monitor.txt @@ -1,25 +1,25 @@ -zmq_monitor(3) -============== +zmq_ctx_set_monitor(3) +====================== NAME ---- -zmq_monitor - register a monitoring callback +zmq_ctx_set_monitor - register a monitoring callback SYNOPSIS -------- -*int zmq_monitor (void '*context', zmq_monitor_fn '*monitor');* +*int zmq_ctx_set_monitor (void '*context', zmq_monitor_fn '*monitor');* DESCRIPTION ----------- -The _zmq_monitor()_ function shall register a callback function specified by +The _zmq_ctx_set_monitor()_ function shall register a callback function specified by the 'monitor' argument. This is an event sink for changes in per socket connection and mailbox (work in progress) states. -.The _zmq_monitor()_ callback function is expected to have this prototype: +.The _zmq_ctx_set_monitor()_ callback function is expected to have this prototype: ---- typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data); ---- @@ -28,7 +28,7 @@ The callback is global (per context), with the socket that triggered the event passed to the handler as well. Each event also populates a 'zmq_event_data_t' union with additional metadata which can be used for correlation. -CAUTION: _zmq_monitor()_ is intended for monitoring infrastructure / operations +CAUTION: _zmq_ctx_set_monitor()_ is intended for monitoring infrastructure / operations concerns only - NOT BUSINESS LOGIC. An event is a representation of something that happened - you cannot change the past, but only react to them. The implementation is also only concerned with a single session. No state of peers, @@ -158,7 +158,7 @@ data.disconnected.fd // socket descriptor RETURN VALUE ------------ -The _zmq_monitor()_ function returns a value of 0 or greater if successful. +The _zmq_ctx_set_monitor()_ function returns a value of 0 or greater if successful. Otherwise it returns `-1` and sets 'errno' to one of the values defined below. @@ -188,7 +188,7 @@ void socket_monitor (void *s, int event_, zmq_event_data_t *data_) } void *context = zmq_ctx_new (); -int rc = zmq_monitor (context, socket_monitor); +int rc = zmq_ctx_set_monitor (context, socket_monitor); assert (rc == 0); void *pub = zmq_socket (context, ZMQ_PUB); assert (pub); diff --git a/include/zmq.h b/include/zmq.h index b3146b20..83f9209a 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -301,7 +301,7 @@ typedef union { /* Callback template for socket state changes */ typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data); -ZMQ_EXPORT int zmq_monitor (void *context, zmq_monitor_fn *monitor); +ZMQ_EXPORT int zmq_ctx_set_monitor (void *context, zmq_monitor_fn *monitor); ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT int zmq_close (void *s); diff --git a/src/zmq.cpp b/src/zmq.cpp index 2f307c4a..a9de5ecb 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -205,7 +205,7 @@ int zmq_ctx_get (void *ctx_, int option_) return ((zmq::ctx_t*) ctx_)->get (option_); } -int zmq_monitor (void *ctx_, zmq_monitor_fn *monitor_) +int zmq_ctx_set_monitor (void *ctx_, zmq_monitor_fn *monitor_) { if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { errno = EFAULT; diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 4b50d904..394d6a5d 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -84,7 +84,7 @@ int main (int argc, char *argv []) void *ctx = zmq_init (1); assert (ctx); // set socket monitor - rc = zmq_monitor (ctx, socket_monitor); + rc = zmq_ctx_set_monitor (ctx, socket_monitor); assert (rc == 0); void *rep = zmq_socket (ctx, ZMQ_REP); assert (rep); From 1e92ee0a0e9f4996259c662d4a6e8f71f6f45843 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lourens=20Naud=C3=A9?= Date: Tue, 22 May 2012 23:45:15 +0100 Subject: [PATCH 7/7] Oust last remaning ZMQ_MONITOR reference from NEWS as well --- NEWS | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/NEWS b/NEWS index fd4af84e..8af0ab7a 100644 --- a/NEWS +++ b/NEWS @@ -57,7 +57,8 @@ Building New functionality ----------------- -* ZMQ_MONITOR socket option registers a callback / event sink for changes in socket state. +* A zmq_ctx_set_monitor() API to register a callback / event sink for changes + in socket state. * POSIX-compliant zmq_send and zmq_recv introduced (uses raw buffer instead of message object).