diff --git a/include/zmq.h b/include/zmq.h index 170bf70a..b3146b20 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_ACCEPT_FILTER 38 -#define ZMQ_MONITOR 39 /* Message options */ #define ZMQ_MORE 1 @@ -243,17 +242,17 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); /* Socket transport events (tcp and ipc only) */ #define ZMQ_EVENT_CONNECTED 1 #define ZMQ_EVENT_CONNECT_DELAYED 2 -#define ZMQ_EVENT_CONNECT_RETRIED 3 +#define ZMQ_EVENT_CONNECT_RETRIED 4 -#define ZMQ_EVENT_LISTENING 4 -#define ZMQ_EVENT_BIND_FAILED 5 +#define ZMQ_EVENT_LISTENING 8 +#define ZMQ_EVENT_BIND_FAILED 16 -#define ZMQ_EVENT_ACCEPTED 6 -#define ZMQ_EVENT_ACCEPT_FAILED 7 +#define ZMQ_EVENT_ACCEPTED 32 +#define ZMQ_EVENT_ACCEPT_FAILED 64 -#define ZMQ_EVENT_CLOSED 8 -#define ZMQ_EVENT_CLOSE_FAILED 9 -#define ZMQ_EVENT_DISCONNECTED 10 +#define ZMQ_EVENT_CLOSED 128 +#define ZMQ_EVENT_CLOSE_FAILED 256 +#define ZMQ_EVENT_DISCONNECTED 512 /* Socket event data (union member per event) */ typedef union { @@ -300,10 +299,9 @@ typedef union { } zmq_event_data_t; /* Callback template for socket state changes */ -typedef union { - void *object; - void (*function)(void *s, int event, zmq_event_data_t *data); -} zmq_monitor; +typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data); + +ZMQ_EXPORT int zmq_monitor (void *context, zmq_monitor_fn *monitor); ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT int zmq_close (void *s); diff --git a/src/ctx.cpp b/src/ctx.cpp index 520d2042..6c06ca24 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -45,7 +45,8 @@ zmq::ctx_t::ctx_t () : slot_count (0), slots (NULL), max_sockets (ZMQ_MAX_SOCKETS_DFLT), - io_thread_count (ZMQ_IO_THREADS_DFLT) + io_thread_count (ZMQ_IO_THREADS_DFLT), + monitor_fn (NULL) { } @@ -125,6 +126,12 @@ int zmq::ctx_t::terminate () return 0; } +int zmq::ctx_t::monitor (zmq_monitor_fn *monitor_) +{ + monitor_fn = monitor_; + return 0; +} + int zmq::ctx_t::set (int option_, int optval_) { int rc = 0; @@ -346,6 +353,62 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_) return endpoint; } +void zmq::ctx_t::monitor_event (zmq::socket_base_t *socket_, int event_, ...) +{ + if (monitor_fn != NULL) { + va_list args; + zmq_event_data_t data; + memset(&data, 0, sizeof (zmq_event_data_t)); + va_start (args, event_); + switch (event_) { + case ZMQ_EVENT_CONNECTED: + data.connected.addr = va_arg (args, char*); + data.connected.fd = va_arg (args, int); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + data.connect_delayed.addr = va_arg (args, char*); + data.connect_delayed.err = va_arg (args, int); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + data.connect_retried.addr = va_arg (args, char*); + data.connect_retried.interval = va_arg (args, int); + break; + case ZMQ_EVENT_LISTENING: + data.listening.addr = va_arg (args, char*); + data.listening.fd = va_arg (args, int); + break; + case ZMQ_EVENT_BIND_FAILED: + data.bind_failed.addr = va_arg (args, char*); + data.bind_failed.err = va_arg (args, int); + break; + case ZMQ_EVENT_ACCEPTED: + data.accepted.addr = va_arg (args, char*); + data.accepted.fd = va_arg (args, int); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + data.accept_failed.addr = va_arg (args, char*); + data.accept_failed.err = va_arg (args, int); + break; + case ZMQ_EVENT_CLOSED: + data.closed.addr = va_arg (args, char*); + data.closed.fd = va_arg (args, int); + break; + case ZMQ_EVENT_CLOSE_FAILED: + data.close_failed.addr = va_arg (args, char*); + data.close_failed.err = va_arg (args, int); + break; + case ZMQ_EVENT_DISCONNECTED: + data.disconnected.addr = va_arg (args, char*); + data.disconnected.fd = va_arg (args, int); + break; + default: + zmq_assert (false); + } + monitor_fn ((void *)socket_, event_, &data); + va_end (args); + } +} + // The last used socket ID, or 0 if no socket was used so far. Note that this // is a global variable. Thus, even sockets created in different contexts have // unique IDs. diff --git a/src/ctx.hpp b/src/ctx.hpp index dcc43f01..133a8b15 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -95,12 +95,17 @@ namespace zmq void unregister_endpoints (zmq::socket_base_t *socket_); endpoint_t find_endpoint (const char *addr_); + // Monitoring specific + int monitor (zmq_monitor_fn *monitor_); + void monitor_event (zmq::socket_base_t *socket_, int event_, ...); + enum { term_tid = 0, reaper_tid = 1 }; ~ctx_t (); + private: @@ -163,6 +168,9 @@ namespace zmq // Synchronisation of access to context options. mutex_t opt_sync; + // Monitoring callback + zmq_monitor_fn *monitor_fn; + ctx_t (const ctx_t&); const ctx_t &operator = (const ctx_t&); }; diff --git a/src/options.cpp b/src/options.cpp index 82785e91..fdd5299d 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -53,7 +53,6 @@ zmq::options_t::options_t () : tcp_keepalive_cnt (-1), tcp_keepalive_idle (-1), tcp_keepalive_intvl (-1), - monitor (NULL), socket_id (0) { } @@ -314,20 +313,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; } } - - case ZMQ_MONITOR: - { - if (optvallen_ == 0 && optval_ == NULL) { - monitor = NULL; - return 0; - } - if (optvallen_ != sizeof (void *)) { - errno = EINVAL; - return -1; - } - monitor = *((zmq_monitor**) &optval_); - return 0; - } } errno = EINVAL; return -1; @@ -544,15 +529,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) memcpy (optval_, last_endpoint.c_str(), last_endpoint.size()+1); *optvallen_ = last_endpoint.size()+1; return 0; - - case ZMQ_MONITOR: - if (*optvallen_ < sizeof (void *)) { - errno = EINVAL; - return -1; - } - *((zmq_monitor**) &optval_) = monitor; - *optvallen_ = sizeof (zmq_monitor*); - return 0; } errno = EINVAL; diff --git a/src/options.hpp b/src/options.hpp index 8ae134f8..2f6e6283 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -125,9 +125,6 @@ namespace zmq typedef std::vector tcp_accept_filters_t; tcp_accept_filters_t tcp_accept_filters; - // Connection and exceptional state callback - zmq_monitor *monitor; - // ID of the socket. int socket_id; }; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 15340d15..033e4d7d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -981,56 +981,8 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_) void zmq::socket_base_t::monitor_event (int event_, ...) { - if (options.monitor != NULL) { - va_list args; - zmq_event_data_t data; - memset(&data, 0, sizeof (zmq_event_data_t)); - va_start (args, event_); - switch (event_) { - case ZMQ_EVENT_CONNECTED: - data.connected.addr = va_arg (args, char*); - data.connected.fd = va_arg (args, int); - break; - case ZMQ_EVENT_CONNECT_DELAYED: - data.connect_delayed.addr = va_arg (args, char*); - data.connect_delayed.err = va_arg (args, int); - break; - case ZMQ_EVENT_CONNECT_RETRIED: - data.connect_retried.addr = va_arg (args, char*); - data.connect_retried.interval = va_arg (args, int); - break; - case ZMQ_EVENT_LISTENING: - data.listening.addr = va_arg (args, char*); - data.listening.fd = va_arg (args, int); - break; - case ZMQ_EVENT_BIND_FAILED: - data.bind_failed.addr = va_arg (args, char*); - data.bind_failed.err = va_arg (args, int); - break; - case ZMQ_EVENT_ACCEPTED: - data.accepted.addr = va_arg (args, char*); - data.accepted.fd = va_arg (args, int); - break; - case ZMQ_EVENT_ACCEPT_FAILED: - data.accept_failed.addr = va_arg (args, char*); - data.accept_failed.err = va_arg (args, int); - break; - case ZMQ_EVENT_CLOSED: - data.closed.addr = va_arg (args, char*); - data.closed.fd = va_arg (args, int); - break; - case ZMQ_EVENT_CLOSE_FAILED: - data.close_failed.addr = va_arg (args, char*); - data.close_failed.err = va_arg (args, int); - break; - case ZMQ_EVENT_DISCONNECTED: - data.disconnected.addr = va_arg (args, char*); - data.disconnected.fd = va_arg (args, int); - break; - default: - zmq_assert (false); - } - options.monitor->function ((void *)this, event_, &data); - va_end (args); - } + va_list args; + va_start (args, event_); + get_ctx ()->monitor_event (this, event_, args); + va_end (args); } diff --git a/src/zmq.cpp b/src/zmq.cpp index 745dbf55..2f307c4a 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -205,6 +205,14 @@ int zmq_ctx_get (void *ctx_, int option_) return ((zmq::ctx_t*) ctx_)->get (option_); } +int zmq_monitor (void *ctx_, zmq_monitor_fn *monitor_) +{ + if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + errno = EFAULT; + return -1; + } + return ((zmq::ctx_t*) ctx_)->monitor (monitor_); +} // Stable/legacy context API diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 049b1a9b..4b50d904 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -25,61 +25,50 @@ #include "../include/zmq.h" #include "../include/zmq_utils.h" -void listening_sock_monitor (void *s, int event_, zmq_event_data_t *data_) +static int events; + +void socket_monitor (void *s, int event_, zmq_event_data_t *data_) { const char *addr = "tcp://127.0.0.1:5560"; // Only some of the exceptional events could fire switch (event_) { + // listener specific case ZMQ_EVENT_LISTENING: assert (data_->listening.fd > 0); assert (memcmp (data_->listening.addr, addr, 22)); + events |= ZMQ_EVENT_LISTENING; break; case ZMQ_EVENT_ACCEPTED: assert (data_->accepted.fd > 0); assert (memcmp (data_->accepted.addr, addr, 22)); + events |= ZMQ_EVENT_ACCEPTED; break; - case ZMQ_EVENT_CLOSE_FAILED: - assert (data_->close_failed.err != 0); - assert (memcmp (data_->close_failed.addr, addr, 22)); - break; - case ZMQ_EVENT_CLOSED: - assert (data_->closed.fd != 0); - assert (memcmp (data_->closed.addr, addr, 22)); - break; - case ZMQ_EVENT_DISCONNECTED: - assert (data_->disconnected.fd != 0); - assert (memcmp (data_->disconnected.addr, addr, 22)); - break; - default: - // out of band / unexpected event - assert (0); - } -} - -void connecting_sock_monitor (void *s, int event_, zmq_event_data_t *data_) -{ - const char *addr = "tcp://127.0.0.1:5560"; - // Only some of the exceptional events could fire - switch (event_) { + // connecter specific case ZMQ_EVENT_CONNECTED: assert (data_->connected.fd > 0); assert (memcmp (data_->connected.addr, addr, 22)); + events |= ZMQ_EVENT_CONNECTED; break; case ZMQ_EVENT_CONNECT_DELAYED: assert (data_->connect_delayed.err != 0); assert (memcmp (data_->connect_delayed.addr, addr, 22)); + events |= ZMQ_EVENT_CONNECT_DELAYED; break; + // generic - either end of the socket case ZMQ_EVENT_CLOSE_FAILED: assert (data_->close_failed.err != 0); assert (memcmp (data_->close_failed.addr, addr, 22)); + events |= ZMQ_EVENT_CLOSE_FAILED; break; case ZMQ_EVENT_CLOSED: assert (data_->closed.fd != 0); assert (memcmp (data_->closed.addr, addr, 22)); + events |= ZMQ_EVENT_CLOSED; break; case ZMQ_EVENT_DISCONNECTED: assert (data_->disconnected.fd != 0); assert (memcmp (data_->disconnected.addr, addr, 22)); + events |= ZMQ_EVENT_DISCONNECTED; break; default: // out of band / unexpected event @@ -94,44 +83,18 @@ int main (int argc, char *argv []) // Create the infrastructure void *ctx = zmq_init (1); assert (ctx); - + // set socket monitor + rc = zmq_monitor (ctx, socket_monitor); + assert (rc == 0); void *rep = zmq_socket (ctx, ZMQ_REP); assert (rep); - // Expects failure - invalid size - zmq_monitor monitor; - monitor.function = listening_sock_monitor; - - rc = zmq_setsockopt (rep, ZMQ_MONITOR, &monitor, 20); - assert (rc == -1); - assert (errno == EINVAL); - - rc = zmq_setsockopt (rep, ZMQ_MONITOR, &monitor, sizeof (void *)); - assert (rc == 0); - - size_t sz = sizeof (void *); - rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz); - assert (rc == 0); - assert (monitor.function == listening_sock_monitor); - - // Remove socket monitor callback - rc = zmq_setsockopt (rep, ZMQ_MONITOR, NULL, 0); - assert (rc == 0); - - rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz); - assert (rc == 0); - assert (monitor.function == listening_sock_monitor); - rc = zmq_bind (rep, "tcp://127.0.0.1:5560"); assert (rc == 0); void *req = zmq_socket (ctx, ZMQ_REQ); assert (req); - monitor.function = connecting_sock_monitor; - rc = zmq_setsockopt (req, ZMQ_MONITOR, &monitor, sizeof (void *)); - assert (rc == 0); - rc = zmq_connect (req, "tcp://127.0.0.1:5560"); assert (rc == 0); @@ -151,5 +114,12 @@ int main (int argc, char *argv []) zmq_sleep (1); zmq_term (ctx); + + // We expect to at least observe these events + assert (events & ZMQ_EVENT_LISTENING); + assert (events & ZMQ_EVENT_ACCEPTED); + assert (events & ZMQ_EVENT_CONNECTED); + assert (events & ZMQ_EVENT_CLOSED); + return 0 ; } \ No newline at end of file