Merge pull request #345 from methodmissing/monitor-regressions

Moves the monitoring infrastructure to a global zmq_ctx_set_monitor () API to avoid strict aliasing issues with function pointers and socket options.
This commit is contained in:
Pieter Hintjens 2012-05-22 16:18:54 -07:00
commit 22b4388e29
14 changed files with 341 additions and 195 deletions

3
NEWS
View File

@ -57,7 +57,8 @@ Building
New functionality
-----------------
* ZMQ_MONITOR socket option registers a callback / event sink for changes in socket state.
* A zmq_ctx_set_monitor() API to register a callback / event sink for changes
in socket state.
* POSIX-compliant zmq_send and zmq_recv introduced (uses raw buffer
instead of message object).

View File

@ -1,6 +1,6 @@
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 \
zmq_ctx_new.3 zmq_ctx_destroy.3 zmq_ctx_get.3 zmq_ctx_set.3 \
zmq_init.3 zmq_term.3 \
zmq_init.3 zmq_term.3 zmq_ctx_set_monitor.3\
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
zmq_msg_send.3 zmq_msg_recv.3 \

View File

@ -44,6 +44,9 @@ Work with context properties::
Destroy a 0MQ context::
linkzmq:zmq_ctx_destroy[3]
Monitor a 0MQ context::
linkzmq:zmq_ctx_set_monitor[3]
These deprecated functions let you create and destroy 'contexts':
Initialise 0MQ context::

View File

@ -455,17 +455,6 @@ Option value unit:: -1,>0
Default value:: -1 (leave to OS default)
Applicable socket types:: all, when using TCP transports.
ZMQ_MONITOR: Registers a callback for socket state changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Registers a callback function / event sink for changes in underlying socket state.
The default value of `NULL` means no monitor callback function.
[horizontal]
Option value type:: zmq_monitor_fn
Option value unit:: N/A
Default value:: no callback function
Applicable socket types:: all
RETURN VALUE
------------
The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it

221
doc/zmq_monitor.txt Normal file
View File

@ -0,0 +1,221 @@
zmq_ctx_set_monitor(3)
======================
NAME
----
zmq_ctx_set_monitor - register a monitoring callback
SYNOPSIS
--------
*int zmq_ctx_set_monitor (void '*context', zmq_monitor_fn '*monitor');*
DESCRIPTION
-----------
The _zmq_ctx_set_monitor()_ function shall register a callback function specified by
the 'monitor' argument. This is an event sink for changes in per socket
connection and mailbox (work in progress) states.
.The _zmq_ctx_set_monitor()_ callback function is expected to have this prototype:
----
typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data);
----
The callback is global (per context), with the socket that triggered the event
passed to the handler as well. Each event also populates a 'zmq_event_data_t'
union with additional metadata which can be used for correlation.
CAUTION: _zmq_ctx_set_monitor()_ is intended for monitoring infrastructure / operations
concerns only - NOT BUSINESS LOGIC. An event is a representation of something
that happened - you cannot change the past, but only react to them. The
implementation is also only concerned with a single session. No state of peers,
other sessions etc. are tracked - this will only pollute internals and is the
responsibility of application authors to either implement or correlate in
another datastore. Monitor events are exceptional conditions and are thus not
directly in the messaging critical path. However, still be careful with what
you're doing in the callback function as excess time spent in the handler will
block the socket's application thread.
Only tcp and ipc specific transport events are supported in this initial
implementation.
Supported events are:
ZMQ_EVENT_CONNECTED: connection established
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECTED' event triggers when a connection has been established
to a remote peer. This can happen either synchronous or asynchronous.
.Callback metadata:
----
data.connected.addr // peer address
data.connected.fd // socket descriptor
----
ZMQ_EVENT_CONNECT_DELAYED: synchronous connect failed, it's being polled
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECT_DELAYED' event triggers when an immediate connection
attempt is delayed and it's completion's being polled for.
.Callback metadata:
----
data.connect_delayed.addr // peer address
data.connect_delayed.err // errno
----
ZMQ_EVENT_CONNECT_RETRIED: asynchronous connect / reconnection attempt
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CONNECT_RETRIED' event triggers when a connection attempt
is being handled by reconnect timer. The reconnect interval's recomputed
for each attempt.
.Callback metadata:
----
data.connect_retried.addr // peer address
data.connect_retried.interval // computed reconnect interval
----
ZMQ_EVENT_LISTENING: socket bound to an address, ready to accept connections
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_LISTENING' event triggers when a socket's successfully bound
to a an interface.
.Callback metadata:
----
data.listening.addr // listen address
data.listening.fd // socket descriptor
----
ZMQ_EVENT_BIND_FAILED: socket could not bind to an address
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_BIND_FAILED' event triggers when a socket could not bind to
a given interface.
.Callback metadata:
----
data.bind_failed.addr // listen address
data.bind_failed.err // errno
----
ZMQ_EVENT_ACCEPTED: connection accepted to bound interface
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_ACCEPTED' event triggers when a connection from a remote peer
has been established with a socket's listen address.
.Callback metadata:
----
data.accepted.addr // listen address
data.accepted.fd // socket descriptor
----
ZMQ_EVENT_ACCEPT_FAILED: could not accept client connection
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_ACCEPT_FAILED' event triggers when a connection attempt to
a socket's bound address fails.
.Callback metadata:
----
data.accept_failed.addr // listen address
data.accept_failed.err // errno
----
ZMQ_EVENT_CLOSED: connection closed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CLOSED' event triggers when a connection's underlying descriptor
has been closed.
.Callback metadata:
----
data.closed.addr // address
data.closed.fd // socket descriptor
----
ZMQ_EVENT_CLOSE_FAILED: connection couldn't be closed
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_CLOSE_FAILED' event triggers when a descriptor could not be
released back to the OS.
.Callback metadata:
----
data.close_failed.addr // address
data.close_failed.err // errno
----
ZMQ_EVENT_DISCONNECTED: broken session
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_EVENT_DISCONNECTED' event triggers when the stream engine (tcp and ipc
specific) detects a corrupted / broken session.
.Callback metadata:
----
data.disconnected.addr // address
data.disconnected.fd // socket descriptor
----
RETURN VALUE
------------
The _zmq_ctx_set_monitor()_ function returns a value of 0 or greater if successful.
Otherwise it returns `-1` and sets 'errno' to one of the values defined
below.
ERRORS
------
*EINVAL*::
The requested callback function _monitor_ is invalid.
EXAMPLE
-------
.Observing a 'PUB' socket's connection state
----
void socket_monitor (void *s, int event_, zmq_event_data_t *data_)
{
switch (event_) {
case ZMQ_EVENT_LISTENING:
printf ("Socket bound to %s, socket descriptor is %d\n",
data.listening.addr, data.listening.fd);
break;
case ZMQ_EVENT_ACCEPTED:
printf ("Accepted connection to %s, socket descriptor is %d\n",
data.accepted.addr, data.accepted.fd);
break;
}
}
void *context = zmq_ctx_new ();
int rc = zmq_ctx_set_monitor (context, socket_monitor);
assert (rc == 0);
void *pub = zmq_socket (context, ZMQ_PUB);
assert (pub);
void *sub = zmq_socket (context, ZMQ_SUB);
assert (pub);
rc = zmq_bind (pub, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (sub, "tcp://127.0.0.1:5560");
assert (rc == 0);
// Allow a window for socket events as connect can be async
zmq_sleep (1);
rc = zmq_close (pub);
assert (rc == 0);
rc = zmq_close (sub);
assert (rc == 0);
zmq_term (context);
----
SEE ALSO
--------
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Lourens Naudé <lourens@methodmissing.com>

View File

@ -431,46 +431,6 @@ Default value:: no filters (allow from all)
Applicable socket types:: all listening sockets, when using TCP transports.
ZMQ_MONITOR: Registers a callback for socket state changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Registers a callback function / event sink for changes in underlying socket state.
Expected signature is `void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data)`
To remove the callback function call `zmq_setsockopt(socket, ZMQ_MONITOR, NULL, 0)`
The default value of `NULL` means no monitor callback function.
Supported events are :
* 'ZMQ_EVENT_CONNECTED' - connection established
* 'ZMQ_EVENT_CONNECT_DELAYED' - connection could not be established synchronously, it's being polled
* 'ZMQ_EVENT_CONNECT_RETRIED' - asynchronous connect / reconnection attempt
* 'ZMQ_EVENT_LISTENING' - socket bound to an address, ready to accept connections
* 'ZMQ_EVENT_BIND_FAILED' - socket couldn't bind to an address
* 'ZMQ_EVENT_ACCEPTED' - connection accepted to bound interface
* 'ZMQ_EVENT_ACCEPT_FAILED' - could not accept client connection
* 'ZMQ_EVENT_CLOSED' - connection closed
* 'ZMQ_EVENT_CLOSE_FAILED' - connection couldn't be closed
* 'ZMQ_EVENT_DISCONNECTED' - broken session
See `zmq_event_data_t` and `ZMQ_EVENT_*` constants in zmq.h for event specific data (third argument to callback).
Please note that both events and their context data aren't stable contracts. The 'ZMQ_MONITOR' socket option is
intended for monitoring infrastructure / operations concerns only - NOT BUSINESS LOGIC. An event is a representation
of something that happened - you cannot change the past, but only react to them. The implementation also only concerned
with a single session. No state of peers, other sessions etc. are tracked - this will only pollute internals and is the
responsibility of application authors to either implement or correlate in another datastore. Monitor events are exceptional
conditions and are thus not directly in the messaging critical path. However, still be careful with what you're doing in the
callback function as severe latency there will block the socket's application thread.
Only tcp and ipc specific transport events are supported in this initial implementation.
[horizontal]
Option value type:: zmq_monitor_fn
Option value unit:: N/A
Default value:: no callback function
Applicable socket types:: all
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it

View File

@ -227,7 +227,6 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_IDLE 36
#define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_MONITOR 39
/* Message options */
#define ZMQ_MORE 1
@ -243,17 +242,17 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
/* Socket transport events (tcp and ipc only) */
#define ZMQ_EVENT_CONNECTED 1
#define ZMQ_EVENT_CONNECT_DELAYED 2
#define ZMQ_EVENT_CONNECT_RETRIED 3
#define ZMQ_EVENT_CONNECT_RETRIED 4
#define ZMQ_EVENT_LISTENING 4
#define ZMQ_EVENT_BIND_FAILED 5
#define ZMQ_EVENT_LISTENING 8
#define ZMQ_EVENT_BIND_FAILED 16
#define ZMQ_EVENT_ACCEPTED 6
#define ZMQ_EVENT_ACCEPT_FAILED 7
#define ZMQ_EVENT_ACCEPTED 32
#define ZMQ_EVENT_ACCEPT_FAILED 64
#define ZMQ_EVENT_CLOSED 8
#define ZMQ_EVENT_CLOSE_FAILED 9
#define ZMQ_EVENT_DISCONNECTED 10
#define ZMQ_EVENT_CLOSED 128
#define ZMQ_EVENT_CLOSE_FAILED 256
#define ZMQ_EVENT_DISCONNECTED 512
/* Socket event data (union member per event) */
typedef union {
@ -302,6 +301,8 @@ typedef union {
/* Callback template for socket state changes */
typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data);
ZMQ_EXPORT int zmq_ctx_set_monitor (void *context, zmq_monitor_fn *monitor);
ZMQ_EXPORT void *zmq_socket (void *, int type);
ZMQ_EXPORT int zmq_close (void *s);
ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,

View File

@ -45,7 +45,8 @@ zmq::ctx_t::ctx_t () :
slot_count (0),
slots (NULL),
max_sockets (ZMQ_MAX_SOCKETS_DFLT),
io_thread_count (ZMQ_IO_THREADS_DFLT)
io_thread_count (ZMQ_IO_THREADS_DFLT),
monitor_fn (NULL)
{
}
@ -125,6 +126,12 @@ int zmq::ctx_t::terminate ()
return 0;
}
int zmq::ctx_t::monitor (zmq_monitor_fn *monitor_)
{
monitor_fn = monitor_;
return 0;
}
int zmq::ctx_t::set (int option_, int optval_)
{
int rc = 0;
@ -346,6 +353,59 @@ zmq::endpoint_t zmq::ctx_t::find_endpoint (const char *addr_)
return endpoint;
}
void zmq::ctx_t::monitor_event (zmq::socket_base_t *socket_, int event_, va_list args_)
{
if (monitor_fn != NULL) {
zmq_event_data_t data;
memset(&data, 0, sizeof (zmq_event_data_t));
switch (event_) {
case ZMQ_EVENT_CONNECTED:
data.connected.addr = va_arg (args_, char*);
data.connected.fd = va_arg (args_, int);
break;
case ZMQ_EVENT_CONNECT_DELAYED:
data.connect_delayed.addr = va_arg (args_, char*);
data.connect_delayed.err = va_arg (args_, int);
break;
case ZMQ_EVENT_CONNECT_RETRIED:
data.connect_retried.addr = va_arg (args_, char*);
data.connect_retried.interval = va_arg (args_, int);
break;
case ZMQ_EVENT_LISTENING:
data.listening.addr = va_arg (args_, char*);
data.listening.fd = va_arg (args_, int);
break;
case ZMQ_EVENT_BIND_FAILED:
data.bind_failed.addr = va_arg (args_, char*);
data.bind_failed.err = va_arg (args_, int);
break;
case ZMQ_EVENT_ACCEPTED:
data.accepted.addr = va_arg (args_, char*);
data.accepted.fd = va_arg (args_, int);
break;
case ZMQ_EVENT_ACCEPT_FAILED:
data.accept_failed.addr = va_arg (args_, char*);
data.accept_failed.err = va_arg (args_, int);
break;
case ZMQ_EVENT_CLOSED:
data.closed.addr = va_arg (args_, char*);
data.closed.fd = va_arg (args_, int);
break;
case ZMQ_EVENT_CLOSE_FAILED:
data.close_failed.addr = va_arg (args_, char*);
data.close_failed.err = va_arg (args_, int);
break;
case ZMQ_EVENT_DISCONNECTED:
data.disconnected.addr = va_arg (args_, char*);
data.disconnected.fd = va_arg (args_, int);
break;
default:
zmq_assert (false);
}
monitor_fn ((void *)socket_, event_, &data);
}
}
// The last used socket ID, or 0 if no socket was used so far. Note that this
// is a global variable. Thus, even sockets created in different contexts have
// unique IDs.

View File

@ -95,12 +95,17 @@ namespace zmq
void unregister_endpoints (zmq::socket_base_t *socket_);
endpoint_t find_endpoint (const char *addr_);
// Monitoring specific
int monitor (zmq_monitor_fn *monitor_);
void monitor_event (zmq::socket_base_t *socket_, int event_, va_list args_);
enum {
term_tid = 0,
reaper_tid = 1
};
~ctx_t ();
private:
@ -163,6 +168,9 @@ namespace zmq
// Synchronisation of access to context options.
mutex_t opt_sync;
// Monitoring callback
zmq_monitor_fn *monitor_fn;
ctx_t (const ctx_t&);
const ctx_t &operator = (const ctx_t&);
};

View File

@ -53,7 +53,6 @@ zmq::options_t::options_t () :
tcp_keepalive_cnt (-1),
tcp_keepalive_idle (-1),
tcp_keepalive_intvl (-1),
monitor (NULL),
socket_id (0)
{
}
@ -314,20 +313,6 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0;
}
}
case ZMQ_MONITOR:
{
if (optvallen_ == 0 && optval_ == NULL) {
monitor = NULL;
return 0;
}
if (optvallen_ != sizeof (void *)) {
errno = EINVAL;
return -1;
}
monitor = *((zmq_monitor_fn**) &optval_);
return 0;
}
}
errno = EINVAL;
return -1;
@ -544,15 +529,6 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
memcpy (optval_, last_endpoint.c_str(), last_endpoint.size()+1);
*optvallen_ = last_endpoint.size()+1;
return 0;
case ZMQ_MONITOR:
if (*optvallen_ < sizeof (void *)) {
errno = EINVAL;
return -1;
}
*((zmq_monitor_fn**) &optval_) = monitor;
*optvallen_ = sizeof (zmq_monitor_fn*);
return 0;
}
errno = EINVAL;

View File

@ -125,9 +125,6 @@ namespace zmq
typedef std::vector <tcp_address_mask_t> tcp_accept_filters_t;
tcp_accept_filters_t tcp_accept_filters;
// Connection and exceptional state callback function
zmq_monitor_fn *monitor;
// ID of the socket.
int socket_id;
};

View File

@ -981,56 +981,8 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
void zmq::socket_base_t::monitor_event (int event_, ...)
{
if (options.monitor != NULL) {
va_list args;
zmq_event_data_t data;
memset(&data, 0, sizeof (zmq_event_data_t));
va_start (args, event_);
switch (event_) {
case ZMQ_EVENT_CONNECTED:
data.connected.addr = va_arg (args, char*);
data.connected.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_DELAYED:
data.connect_delayed.addr = va_arg (args, char*);
data.connect_delayed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_RETRIED:
data.connect_retried.addr = va_arg (args, char*);
data.connect_retried.interval = va_arg (args, int);
break;
case ZMQ_EVENT_LISTENING:
data.listening.addr = va_arg (args, char*);
data.listening.fd = va_arg (args, int);
break;
case ZMQ_EVENT_BIND_FAILED:
data.bind_failed.addr = va_arg (args, char*);
data.bind_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPTED:
data.accepted.addr = va_arg (args, char*);
data.accepted.fd = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPT_FAILED:
data.accept_failed.addr = va_arg (args, char*);
data.accept_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSED:
data.closed.addr = va_arg (args, char*);
data.closed.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSE_FAILED:
data.close_failed.addr = va_arg (args, char*);
data.close_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_DISCONNECTED:
data.disconnected.addr = va_arg (args, char*);
data.disconnected.fd = va_arg (args, int);
break;
default:
zmq_assert (false);
}
options.monitor ((void *)this, event_, &data);
va_end (args);
}
va_list args;
va_start (args, event_);
get_ctx ()->monitor_event (this, event_, args);
va_end (args);
}

View File

@ -205,6 +205,14 @@ int zmq_ctx_get (void *ctx_, int option_)
return ((zmq::ctx_t*) ctx_)->get (option_);
}
int zmq_ctx_set_monitor (void *ctx_, zmq_monitor_fn *monitor_)
{
if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::ctx_t*) ctx_)->monitor (monitor_);
}
// Stable/legacy context API

View File

@ -25,61 +25,50 @@
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
void listening_sock_monitor (void *s, int event_, zmq_event_data_t *data_)
static int events;
void socket_monitor (void *s, int event_, zmq_event_data_t *data_)
{
const char *addr = "tcp://127.0.0.1:5560";
// Only some of the exceptional events could fire
switch (event_) {
// listener specific
case ZMQ_EVENT_LISTENING:
assert (data_->listening.fd > 0);
assert (memcmp (data_->listening.addr, addr, 22));
events |= ZMQ_EVENT_LISTENING;
break;
case ZMQ_EVENT_ACCEPTED:
assert (data_->accepted.fd > 0);
assert (memcmp (data_->accepted.addr, addr, 22));
events |= ZMQ_EVENT_ACCEPTED;
break;
case ZMQ_EVENT_CLOSE_FAILED:
assert (data_->close_failed.err != 0);
assert (memcmp (data_->close_failed.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSED:
assert (data_->closed.fd != 0);
assert (memcmp (data_->closed.addr, addr, 22));
break;
case ZMQ_EVENT_DISCONNECTED:
assert (data_->disconnected.fd != 0);
assert (memcmp (data_->disconnected.addr, addr, 22));
break;
default:
// out of band / unexpected event
assert (0);
}
}
void connecting_sock_monitor (void *s, int event_, zmq_event_data_t *data_)
{
const char *addr = "tcp://127.0.0.1:5560";
// Only some of the exceptional events could fire
switch (event_) {
// connecter specific
case ZMQ_EVENT_CONNECTED:
assert (data_->connected.fd > 0);
assert (memcmp (data_->connected.addr, addr, 22));
events |= ZMQ_EVENT_CONNECTED;
break;
case ZMQ_EVENT_CONNECT_DELAYED:
assert (data_->connect_delayed.err != 0);
assert (memcmp (data_->connect_delayed.addr, addr, 22));
events |= ZMQ_EVENT_CONNECT_DELAYED;
break;
// generic - either end of the socket
case ZMQ_EVENT_CLOSE_FAILED:
assert (data_->close_failed.err != 0);
assert (memcmp (data_->close_failed.addr, addr, 22));
events |= ZMQ_EVENT_CLOSE_FAILED;
break;
case ZMQ_EVENT_CLOSED:
assert (data_->closed.fd != 0);
assert (memcmp (data_->closed.addr, addr, 22));
events |= ZMQ_EVENT_CLOSED;
break;
case ZMQ_EVENT_DISCONNECTED:
assert (data_->disconnected.fd != 0);
assert (memcmp (data_->disconnected.addr, addr, 22));
events |= ZMQ_EVENT_DISCONNECTED;
break;
default:
// out of band / unexpected event
@ -94,44 +83,18 @@ int main (int argc, char *argv [])
// Create the infrastructure
void *ctx = zmq_init (1);
assert (ctx);
// set socket monitor
rc = zmq_ctx_set_monitor (ctx, socket_monitor);
assert (rc == 0);
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
// Expects failure - invalid size
zmq_monitor_fn *monitor;
monitor = listening_sock_monitor;
rc = zmq_setsockopt (rep, ZMQ_MONITOR, *(void **)&monitor, 20);
assert (rc == -1);
assert (errno == EINVAL);
rc = zmq_setsockopt (rep, ZMQ_MONITOR, *(void **)&monitor, sizeof (void *));
assert (rc == 0);
size_t sz = sizeof (void *);
rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz);
assert (rc == 0);
assert (monitor == listening_sock_monitor);
// Remove socket monitor callback
rc = zmq_setsockopt (rep, ZMQ_MONITOR, NULL, 0);
assert (rc == 0);
rc = zmq_getsockopt (rep, ZMQ_MONITOR, &monitor, &sz);
assert (rc == 0);
assert (monitor == listening_sock_monitor);
rc = zmq_bind (rep, "tcp://127.0.0.1:5560");
assert (rc == 0);
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
monitor = connecting_sock_monitor;
rc = zmq_setsockopt (req, ZMQ_MONITOR, *(void **)&monitor, sizeof (void *));
assert (rc == 0);
rc = zmq_connect (req, "tcp://127.0.0.1:5560");
assert (rc == 0);
@ -151,5 +114,12 @@ int main (int argc, char *argv [])
zmq_sleep (1);
zmq_term (ctx);
// We expect to at least observe these events
assert (events & ZMQ_EVENT_LISTENING);
assert (events & ZMQ_EVENT_ACCEPTED);
assert (events & ZMQ_EVENT_CONNECTED);
assert (events & ZMQ_EVENT_CLOSED);
return 0 ;
}