Merge pull request #340 from methodmissing/events

Expose a ZMQ_MONITOR socket option to register a callback for notification of state changes in socket state ( stream engine, tcp and ipc transport only )
This commit is contained in:
Pieter Hintjens 2012-05-11 15:18:50 -07:00
commit 89d5054e59
26 changed files with 461 additions and 13 deletions

2
.gitignore vendored
View File

@ -20,6 +20,8 @@ autom4te.cache
.* .*
*~ *~
.*~ .*~
tests/test_term_endpoint
tests/test_monitor
tests/test_last_endpoint tests/test_last_endpoint
tests/test_pair_inproc tests/test_pair_inproc
tests/test_pair_ipc tests/test_pair_ipc

View File

@ -76,6 +76,7 @@ Thijs Terlouw <thijsterlouw@gmail.com>
Toralf Wittner <toralf.wittner@gmail.com> Toralf Wittner <toralf.wittner@gmail.com>
Tore Halvorsen <tore.halvorsen@gmail.com> Tore Halvorsen <tore.halvorsen@gmail.com>
Vitaly Mayatskikh <v.mayatskih@gmail.com> Vitaly Mayatskikh <v.mayatskih@gmail.com>
Lourens Naudé <lourens@methodmissing.com>
Credits Credits
======= =======

2
NEWS
View File

@ -57,6 +57,8 @@ Building
New functionality New functionality
----------------- -----------------
* ZMQ_MONITOR socket option registers a callback / event sink for changes in socket state.
* POSIX-compliant zmq_send and zmq_recv introduced (uses raw buffer * POSIX-compliant zmq_send and zmq_recv introduced (uses raw buffer
instead of message object). instead of message object).

View File

@ -455,6 +455,16 @@ Option value unit:: -1,>0
Default value:: -1 (leave to OS default) Default value:: -1 (leave to OS default)
Applicable socket types:: all, when using TCP transports. Applicable socket types:: all, when using TCP transports.
ZMQ_MONITOR: Registers a callback for socket state changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Registers a callback function / event sink for changes in underlying socket state.
The default value of `NULL` means no monitor callback function.
[horizontal]
Option value type:: zmq_monitor_fn
Option value unit:: N/A
Default value:: no callback function
Applicable socket types:: all
RETURN VALUE RETURN VALUE
------------ ------------

View File

@ -431,12 +431,51 @@ Default value:: no filters (allow from all)
Applicable socket types:: all listening sockets, when using TCP transports. Applicable socket types:: all listening sockets, when using TCP transports.
ZMQ_MONITOR: Registers a callback for socket state changes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Registers a callback function / event sink for changes in underlying socket state.
Expected signature is `void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data)`
To remove the callback function call `zmq_setsockopt(socket, ZMQ_MONITOR, NULL, 0)`
The default value of `NULL` means no monitor callback function.
Supported events are :
* 'ZMQ_EVENT_CONNECTED' - connection established
* 'ZMQ_EVENT_CONNECT_DELAYED' - connection could not be established synchronously, it's being polled
* 'ZMQ_EVENT_CONNECT_RETRIED' - asynchronous connect / reconnection attempt
* 'ZMQ_EVENT_LISTENING' - socket bound to an address, ready to accept connections
* 'ZMQ_EVENT_BIND_FAILED' - socket couldn't bind to an address
* 'ZMQ_EVENT_ACCEPTED' - connection accepted to bound interface
* 'ZMQ_EVENT_ACCEPT_FAILED' - could not accept client connection
* 'ZMQ_EVENT_CLOSED' - connection closed
* 'ZMQ_EVENT_CLOSE_FAILED' - connection couldn't be closed
* 'ZMQ_EVENT_DISCONNECTED' - broken session
See `zmq_event_data_t` and `ZMQ_EVENT_*` constants in zmq.h for event specific data (third argument to callback).
Please note that both events and their context data aren't stable contracts. The 'ZMQ_MONITOR' socket option is
intended for monitoring infrastructure / operations concerns only - NOT BUSINESS LOGIC. An event is a representation
of something that happened - you cannot change the past, but only react to them. The implementation also only concerned
with a single session. No state of peers, other sessions etc. are tracked - this will only pollute internals and is the
responsibility of application authors to either implement or correlate in another datastore. Monitor events are exceptional
conditions and are thus not directly in the messaging critical path. However, still be careful with what you're doing in the
callback function as severe latency there will block the socket's application thread.
Only tcp and ipc specific transport events are supported in this initial implementation.
[horizontal]
Option value type:: zmq_monitor_fn
Option value unit:: N/A
Default value:: no callback function
Applicable socket types:: all
RETURN VALUE RETURN VALUE
------------ ------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it
shall return `-1` and set 'errno' to one of the values defined below. shall return `-1` and set 'errno' to one of the values defined below.
ERRORS ERRORS
------ ------
*EINVAL*:: *EINVAL*::

View File

@ -227,7 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_IDLE 36
#define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_KEEPALIVE_INTVL 37
#define ZMQ_TCP_ACCEPT_FILTER 38 #define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_MONITOR 39
/* Message options */ /* Message options */
#define ZMQ_MORE 1 #define ZMQ_MORE 1
@ -236,6 +236,72 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
#define ZMQ_DONTWAIT 1 #define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2 #define ZMQ_SNDMORE 2
/******************************************************************************/
/* 0MQ socket events and monitoring */
/******************************************************************************/
/* Socket transport events (tcp and ipc only) */
#define ZMQ_EVENT_CONNECTED 1
#define ZMQ_EVENT_CONNECT_DELAYED 2
#define ZMQ_EVENT_CONNECT_RETRIED 3
#define ZMQ_EVENT_LISTENING 4
#define ZMQ_EVENT_BIND_FAILED 5
#define ZMQ_EVENT_ACCEPTED 6
#define ZMQ_EVENT_ACCEPT_FAILED 7
#define ZMQ_EVENT_CLOSED 8
#define ZMQ_EVENT_CLOSE_FAILED 9
#define ZMQ_EVENT_DISCONNECTED 10
/* Socket event data (union member per event) */
typedef union {
struct {
char *addr;
int fd;
} connected;
struct {
char *addr;
int err;
} connect_delayed;
struct {
char *addr;
int interval;
} connect_retried;
struct {
char *addr;
int fd;
} listening;
struct {
char *addr;
int err;
} bind_failed;
struct {
char *addr;
int fd;
} accepted;
struct {
char *addr;
int err;
} accept_failed;
struct {
char *addr;
int fd;
} closed;
struct {
char *addr;
int err;
} close_failed;
struct {
char *addr;
int fd;
} disconnected;
} zmq_event_data_t;
/* Callback template for socket state changes */
typedef void (zmq_monitor_fn) (void *s, int event, zmq_event_data_t *data);
ZMQ_EXPORT void *zmq_socket (void *, int type); ZMQ_EXPORT void *zmq_socket (void *, int type);
ZMQ_EXPORT int zmq_close (void *s); ZMQ_EXPORT int zmq_close (void *s);
ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval, ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,

View File

@ -52,7 +52,7 @@ zmq::address_t::~address_t ()
#endif #endif
} }
int zmq::address_t::to_string (std::string &addr_) int zmq::address_t::to_string (std::string &addr_) const
{ {
if (protocol == "tcp") { if (protocol == "tcp") {
if (resolved.tcp_addr) { if (resolved.tcp_addr) {

View File

@ -45,7 +45,7 @@ namespace zmq
#endif #endif
} resolved; } resolved;
int to_string (std::string &addr_); int to_string (std::string &addr_) const;
}; };
} }

View File

@ -33,6 +33,7 @@
#include "ip.hpp" #include "ip.hpp"
#include "address.hpp" #include "address.hpp"
#include "ipc_address.hpp" #include "ipc_address.hpp"
#include "session_base.hpp"
#include <unistd.h> #include <unistd.h>
#include <sys/types.h> #include <sys/types.h>
@ -53,6 +54,7 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
{ {
zmq_assert (addr); zmq_assert (addr);
zmq_assert (addr->protocol == "ipc"); zmq_assert (addr->protocol == "ipc");
addr->to_string (endpoint);
} }
zmq::ipc_connecter_t::~ipc_connecter_t () zmq::ipc_connecter_t::~ipc_connecter_t ()
@ -105,6 +107,8 @@ void zmq::ipc_connecter_t::out_event ()
// Shut the connecter down. // Shut the connecter down.
terminate (); terminate ();
session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd);
} }
void zmq::ipc_connecter_t::timer_event (int id_) void zmq::ipc_connecter_t::timer_event (int id_)
@ -132,6 +136,7 @@ void zmq::ipc_connecter_t::start_connecting ()
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno());
return; return;
} }
@ -143,7 +148,9 @@ void zmq::ipc_connecter_t::start_connecting ()
void zmq::ipc_connecter_t::add_reconnect_timer() void zmq::ipc_connecter_t::add_reconnect_timer()
{ {
add_timer (get_new_reconnect_ivl(), reconnect_timer_id); int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl);
} }
int zmq::ipc_connecter_t::get_new_reconnect_ivl () int zmq::ipc_connecter_t::get_new_reconnect_ivl ()
@ -202,8 +209,11 @@ int zmq::ipc_connecter_t::close ()
{ {
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
int rc = ::close (s); int rc = ::close (s);
if (rc != 0) if (rc != 0) {
session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
return -1; return -1;
}
session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s);
s = retired_fd; s = retired_fd;
return 0; return 0;
} }

View File

@ -106,6 +106,9 @@ namespace zmq
// Current reconnect ivl, updated for backoff strategy // Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl; int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
ipc_connecter_t (const ipc_connecter_t&); ipc_connecter_t (const ipc_connecter_t&);
const ipc_connecter_t &operator = (const ipc_connecter_t&); const ipc_connecter_t &operator = (const ipc_connecter_t&);
}; };

View File

@ -33,6 +33,7 @@
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
#include "socket_base.hpp"
#include <unistd.h> #include <unistd.h>
#include <sys/socket.h> #include <sys/socket.h>
@ -75,8 +76,10 @@ void zmq::ipc_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it. // If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc. // TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) if (fd == retired_fd) {
socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno());
return; return;
}
// Create the engine object for this connection. // Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options); stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options);
@ -94,6 +97,7 @@ void zmq::ipc_listener_t::in_event ()
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
send_attach (session, engine, false); send_attach (session, engine, false);
socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd);
} }
int zmq::ipc_listener_t::get_address (std::string &addr_) int zmq::ipc_listener_t::get_address (std::string &addr_)
@ -133,6 +137,8 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (s == -1) if (s == -1)
return -1; return -1;
address.to_string (endpoint);
// Bind the socket to the file path. // Bind the socket to the file path.
rc = bind (s, address.addr (), address.addrlen ()); rc = bind (s, address.addr (), address.addrlen ());
if (rc != 0) if (rc != 0)
@ -146,6 +152,7 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
if (rc != 0) if (rc != 0)
return -1; return -1;
socket->monitor_event (ZMQ_EVENT_LISTENING, addr_, s);
return 0; return 0;
} }
@ -153,18 +160,23 @@ int zmq::ipc_listener_t::close ()
{ {
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
int rc = ::close (s); int rc = ::close (s);
if (rc != 0) if (rc != 0) {
socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
return -1; return -1;
s = retired_fd; }
// If there's an underlying UNIX domain socket, get rid of the file it // If there's an underlying UNIX domain socket, get rid of the file it
// is associated with. // is associated with.
if (has_file && !filename.empty ()) { if (has_file && !filename.empty ()) {
rc = ::unlink(filename.c_str ()); rc = ::unlink(filename.c_str ());
if (rc != 0) if (rc != 0) {
socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
return -1; return -1;
}
} }
socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s);
s = retired_fd;
return 0; return 0;
} }

View File

@ -84,6 +84,9 @@ namespace zmq
// Socket the listerner belongs to. // Socket the listerner belongs to.
zmq::socket_base_t *socket; zmq::socket_base_t *socket;
// String representation of endpoint to bind to
std::string endpoint;
ipc_listener_t (const ipc_listener_t&); ipc_listener_t (const ipc_listener_t&);
const ipc_listener_t &operator = (const ipc_listener_t&); const ipc_listener_t &operator = (const ipc_listener_t&);
}; };

View File

@ -53,6 +53,7 @@ zmq::options_t::options_t () :
tcp_keepalive_cnt (-1), tcp_keepalive_cnt (-1),
tcp_keepalive_idle (-1), tcp_keepalive_idle (-1),
tcp_keepalive_intvl (-1), tcp_keepalive_intvl (-1),
monitor (NULL),
socket_id (0) socket_id (0)
{ {
} }
@ -313,6 +314,20 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
return 0; return 0;
} }
} }
case ZMQ_MONITOR:
{
if (optvallen_ == 0 && optval_ == NULL) {
monitor = NULL;
return 0;
}
if (optvallen_ != sizeof (void *)) {
errno = EINVAL;
return -1;
}
monitor = ((zmq_monitor_fn*) optval_);
return 0;
}
} }
errno = EINVAL; errno = EINVAL;
return -1; return -1;
@ -530,6 +545,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = last_endpoint.size()+1; *optvallen_ = last_endpoint.size()+1;
return 0; return 0;
case ZMQ_MONITOR:
if (*optvallen_ < sizeof (void *)) {
errno = EINVAL;
return -1;
}
((zmq_monitor_fn*) optval_) = monitor;
*optvallen_ = sizeof (void *);
return 0;
} }
errno = EINVAL; errno = EINVAL;

View File

@ -29,6 +29,7 @@
#include "stddef.h" #include "stddef.h"
#include "stdint.hpp" #include "stdint.hpp"
#include "tcp_address.hpp" #include "tcp_address.hpp"
#include "../include/zmq.h"
namespace zmq namespace zmq
{ {
@ -124,6 +125,9 @@ namespace zmq
typedef std::vector <tcp_address_mask_t> tcp_accept_filters_t; typedef std::vector <tcp_address_mask_t> tcp_accept_filters_t;
tcp_accept_filters_t tcp_accept_filters; tcp_accept_filters_t tcp_accept_filters;
// Connection and exceptional state callback function
zmq_monitor_fn *monitor;
// ID of the socket. // ID of the socket.
int socket_id; int socket_id;
}; };

View File

@ -255,6 +255,21 @@ void zmq::session_base_t::hiccuped (pipe_t *pipe_)
zmq_assert (false); zmq_assert (false);
} }
int zmq::session_base_t::get_address (std::string &addr_)
{
if (addr)
return addr->to_string (addr_);
return -1;
}
void zmq::session_base_t::monitor_event (int event_, ...)
{
va_list args;
va_start (args, event_);
socket->monitor_event (event_, args);
va_end (args);
}
void zmq::session_base_t::process_plug () void zmq::session_base_t::process_plug ()
{ {
if (connect) if (connect)

View File

@ -65,6 +65,9 @@ namespace zmq
void hiccuped (zmq::pipe_t *pipe_); void hiccuped (zmq::pipe_t *pipe_);
void terminated (zmq::pipe_t *pipe_); void terminated (zmq::pipe_t *pipe_);
int get_address (std::string &addr_);
void monitor_event (int event_, ...);
protected: protected:
session_base_t (zmq::io_thread_t *io_thread_, bool connect_, session_base_t (zmq::io_thread_t *io_thread_, bool connect_,

View File

@ -351,6 +351,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ()); int rc = listener->set_address (address.c_str ());
if (rc != 0) { if (rc != 0) {
delete listener; delete listener;
monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno());
return -1; return -1;
} }
@ -369,6 +370,7 @@ int zmq::socket_base_t::bind (const char *addr_)
int rc = listener->set_address (address.c_str ()); int rc = listener->set_address (address.c_str ());
if (rc != 0) { if (rc != 0) {
delete listener; delete listener;
monitor_event (ZMQ_EVENT_BIND_FAILED, addr_, zmq_errno());
return -1; return -1;
} }
@ -976,3 +978,59 @@ void zmq::socket_base_t::extract_flags (msg_t *msg_)
// Remove MORE flag. // Remove MORE flag.
rcvmore = msg_->flags () & msg_t::more ? true : false; rcvmore = msg_->flags () & msg_t::more ? true : false;
} }
void zmq::socket_base_t::monitor_event (int event_, ...)
{
if (options.monitor != NULL) {
va_list args;
zmq_event_data_t data;
memset(&data, 0, sizeof (zmq_event_data_t));
va_start (args, event_);
switch (event_) {
case ZMQ_EVENT_CONNECTED:
data.connected.addr = va_arg (args, char*);
data.connected.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_DELAYED:
data.connect_delayed.addr = va_arg (args, char*);
data.connect_delayed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CONNECT_RETRIED:
data.connect_retried.addr = va_arg (args, char*);
data.connect_retried.interval = va_arg (args, int);
break;
case ZMQ_EVENT_LISTENING:
data.listening.addr = va_arg (args, char*);
data.listening.fd = va_arg (args, int);
break;
case ZMQ_EVENT_BIND_FAILED:
data.bind_failed.addr = va_arg (args, char*);
data.bind_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPTED:
data.accepted.addr = va_arg (args, char*);
data.accepted.fd = va_arg (args, int);
break;
case ZMQ_EVENT_ACCEPT_FAILED:
data.accept_failed.addr = va_arg (args, char*);
data.accept_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSED:
data.closed.addr = va_arg (args, char*);
data.closed.fd = va_arg (args, int);
break;
case ZMQ_EVENT_CLOSE_FAILED:
data.close_failed.addr = va_arg (args, char*);
data.close_failed.err = va_arg (args, int);
break;
case ZMQ_EVENT_DISCONNECTED:
data.disconnected.addr = va_arg (args, char*);
data.disconnected.fd = va_arg (args, int);
break;
default:
zmq_assert (false);
}
options.monitor ((void *)this, event_, &data);
va_end (args);
}
}

View File

@ -100,6 +100,9 @@ namespace zmq
void terminated (pipe_t *pipe_); void terminated (pipe_t *pipe_);
void lock(); void lock();
void unlock(); void unlock();
void monitor_event (int event_, ...);
protected: protected:
socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);

View File

@ -118,6 +118,8 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
decoder.set_session (session_); decoder.set_session (session_);
session = session_; session = session_;
session->get_address (endpoint);
// Connect to I/O threads poller object. // Connect to I/O threads poller object.
io_object_t::plug (io_thread_); io_object_t::plug (io_thread_);
handle = add_fd (s); handle = add_fd (s);
@ -144,6 +146,7 @@ void zmq::stream_engine_t::unplug ()
decoder.set_session (NULL); decoder.set_session (NULL);
leftover_session = session; leftover_session = session;
session = NULL; session = NULL;
endpoint.clear();
} }
void zmq::stream_engine_t::terminate () void zmq::stream_engine_t::terminate ()
@ -291,6 +294,7 @@ void zmq::stream_engine_t::activate_in ()
void zmq::stream_engine_t::error () void zmq::stream_engine_t::error ()
{ {
zmq_assert (session); zmq_assert (session);
session->monitor_event (ZMQ_EVENT_DISCONNECTED, endpoint.c_str(), s);
session->detach (); session->detach ();
unplug (); unplug ();
delete this; delete this;

View File

@ -30,6 +30,7 @@
#include "encoder.hpp" #include "encoder.hpp"
#include "decoder.hpp" #include "decoder.hpp"
#include "options.hpp" #include "options.hpp"
#include "../include/zmq.h"
namespace zmq namespace zmq
{ {
@ -97,6 +98,9 @@ namespace zmq
options_t options; options_t options;
// String representation of endpoint
std::string endpoint;
bool plugged; bool plugged;
stream_engine_t (const stream_engine_t&); stream_engine_t (const stream_engine_t&);

View File

@ -31,6 +31,7 @@
#include "ip.hpp" #include "ip.hpp"
#include "address.hpp" #include "address.hpp"
#include "tcp_address.hpp" #include "tcp_address.hpp"
#include "session_base.hpp"
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp" #include "windows.hpp"
@ -62,6 +63,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
{ {
zmq_assert (addr); zmq_assert (addr);
zmq_assert (addr->protocol == "tcp"); zmq_assert (addr->protocol == "tcp");
addr->to_string (endpoint);
} }
zmq::tcp_connecter_t::~tcp_connecter_t () zmq::tcp_connecter_t::~tcp_connecter_t ()
@ -117,6 +119,8 @@ void zmq::tcp_connecter_t::out_event ()
// Shut the connecter down. // Shut the connecter down.
terminate (); terminate ();
session->monitor_event (ZMQ_EVENT_CONNECTED, endpoint.c_str(), fd);
} }
void zmq::tcp_connecter_t::timer_event (int id_) void zmq::tcp_connecter_t::timer_event (int id_)
@ -144,6 +148,7 @@ void zmq::tcp_connecter_t::start_connecting ()
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno());
return; return;
} }
@ -155,7 +160,9 @@ void zmq::tcp_connecter_t::start_connecting ()
void zmq::tcp_connecter_t::add_reconnect_timer() void zmq::tcp_connecter_t::add_reconnect_timer()
{ {
add_timer (get_new_reconnect_ivl(), reconnect_timer_id); int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl);
} }
int zmq::tcp_connecter_t::get_new_reconnect_ivl () int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
@ -278,10 +285,15 @@ void zmq::tcp_connecter_t::close ()
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (s); int rc = closesocket (s);
if (unlikely (rc != SOCKET_ERROR))
session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#else #else
int rc = ::close (s); int rc = ::close (s);
if (unlikely (rc == 0))
session->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
session->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s);
s = retired_fd; s = retired_fd;
} }

View File

@ -26,6 +26,7 @@
#include "own.hpp" #include "own.hpp"
#include "stdint.hpp" #include "stdint.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "../include/zmq.h"
namespace zmq namespace zmq
{ {
@ -103,6 +104,9 @@ namespace zmq
// Current reconnect ivl, updated for backoff strategy // Current reconnect ivl, updated for backoff strategy
int current_reconnect_ivl; int current_reconnect_ivl;
// String representation of endpoint to connect to
std::string endpoint;
tcp_connecter_t (const tcp_connecter_t&); tcp_connecter_t (const tcp_connecter_t&);
const tcp_connecter_t &operator = (const tcp_connecter_t&); const tcp_connecter_t &operator = (const tcp_connecter_t&);
}; };

View File

@ -31,6 +31,7 @@
#include "config.hpp" #include "config.hpp"
#include "err.hpp" #include "err.hpp"
#include "ip.hpp" #include "ip.hpp"
#include "socket_base.hpp"
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
#include "windows.hpp" #include "windows.hpp"
@ -83,8 +84,10 @@ void zmq::tcp_listener_t::in_event ()
// If connection was reset by the peer in the meantime, just ignore it. // If connection was reset by the peer in the meantime, just ignore it.
// TODO: Handle specific errors like ENFILE/EMFILE etc. // TODO: Handle specific errors like ENFILE/EMFILE etc.
if (fd == retired_fd) if (fd == retired_fd) {
socket->monitor_event (ZMQ_EVENT_ACCEPT_FAILED, endpoint.c_str(), zmq_errno());
return; return;
}
tune_tcp_socket (fd); tune_tcp_socket (fd);
tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl); tune_tcp_keepalives (fd, options.tcp_keepalive, options.tcp_keepalive_cnt, options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
@ -105,6 +108,7 @@ void zmq::tcp_listener_t::in_event ()
session->inc_seqnum (); session->inc_seqnum ();
launch_child (session); launch_child (session);
send_attach (session, engine, false); send_attach (session, engine, false);
socket->monitor_event (ZMQ_EVENT_ACCEPTED, endpoint.c_str(), fd);
} }
void zmq::tcp_listener_t::close () void zmq::tcp_listener_t::close ()
@ -112,11 +116,16 @@ void zmq::tcp_listener_t::close ()
zmq_assert (s != retired_fd); zmq_assert (s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
int rc = closesocket (s); int rc = closesocket (s);
if (unlikely (rc != SOCKET_ERROR))
socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#else #else
int rc = ::close (s); int rc = ::close (s);
if (unlikely (rc == 0))
socket->monitor_event (ZMQ_EVENT_CLOSE_FAILED, endpoint.c_str(), zmq_errno());
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
socket->monitor_event (ZMQ_EVENT_CLOSED, endpoint.c_str(), s);
s = retired_fd; s = retired_fd;
} }
@ -188,6 +197,8 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
errno_assert (rc == 0); errno_assert (rc == 0);
#endif #endif
address.to_string (endpoint);
// Bind the socket to the network interface and port. // Bind the socket to the network interface and port.
rc = bind (s, address.addr (), address.addrlen ()); rc = bind (s, address.addr (), address.addrlen ());
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
@ -212,6 +223,7 @@ int zmq::tcp_listener_t::set_address (const char *addr_)
return -1; return -1;
#endif #endif
socket->monitor_event (ZMQ_EVENT_LISTENING, addr_, s);
return 0; return 0;
} }

View File

@ -27,6 +27,7 @@
#include "stdint.hpp" #include "stdint.hpp"
#include "io_object.hpp" #include "io_object.hpp"
#include "tcp_address.hpp" #include "tcp_address.hpp"
#include "../include/zmq.h"
namespace zmq namespace zmq
{ {
@ -78,6 +79,9 @@ namespace zmq
// Socket the listerner belongs to. // Socket the listerner belongs to.
zmq::socket_base_t *socket; zmq::socket_base_t *socket;
// String representation of endpoint to bind to
std::string endpoint;
tcp_listener_t (const tcp_listener_t&); tcp_listener_t (const tcp_listener_t&);
const tcp_listener_t &operator = (const tcp_listener_t&); const tcp_listener_t &operator = (const tcp_listener_t&);
}; };

View File

@ -14,7 +14,8 @@ noinst_PROGRAMS = test_pair_inproc \
test_msg_flags \ test_msg_flags \
test_connect_resolve \ test_connect_resolve \
test_last_endpoint \ test_last_endpoint \
test_term_endpoint test_term_endpoint \
test_monitor
if !ON_MINGW if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \ noinst_PROGRAMS += test_shutdown_stress \
@ -35,6 +36,7 @@ test_msg_flags_SOURCES = test_msg_flags.cpp
test_connect_resolve_SOURCES = test_connect_resolve.cpp test_connect_resolve_SOURCES = test_connect_resolve.cpp
test_last_endpoint_SOURCES = test_last_endpoint.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp
test_term_endpoint_SOURCES = test_term_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp
test_monitor_SOURCES = test_monitor.cpp
if !ON_MINGW if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_shutdown_stress_SOURCES = test_shutdown_stress.cpp

152
tests/test_monitor.cpp Normal file
View File

@ -0,0 +1,152 @@
/*
Copyright (c) 2007-2012 iMatix Corporation
Copyright (c) 2011 250bpm s.r.o.
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <string.h>
#include "../include/zmq.h"
#include "../include/zmq_utils.h"
void listening_sock_monitor (void *s, int event_, zmq_event_data_t *data_)
{
const char *addr = "tcp://127.0.0.1:5560";
// Only some of the exceptional events could fire
switch (event_) {
case ZMQ_EVENT_LISTENING:
assert (data_->listening.fd > 0);
assert (memcmp (data_->listening.addr, addr, 22));
break;
case ZMQ_EVENT_ACCEPTED:
assert (data_->accepted.fd > 0);
assert (memcmp (data_->accepted.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSE_FAILED:
assert (data_->close_failed.err != 0);
assert (memcmp (data_->close_failed.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSED:
assert (data_->closed.fd != 0);
assert (memcmp (data_->closed.addr, addr, 22));
break;
case ZMQ_EVENT_DISCONNECTED:
assert (data_->disconnected.fd != 0);
assert (memcmp (data_->disconnected.addr, addr, 22));
break;
default:
// out of band / unexpected event
assert (0);
}
}
void connecting_sock_monitor (void *s, int event_, zmq_event_data_t *data_)
{
const char *addr = "tcp://127.0.0.1:5560";
// Only some of the exceptional events could fire
switch (event_) {
case ZMQ_EVENT_CONNECTED:
assert (data_->connected.fd > 0);
assert (memcmp (data_->connected.addr, addr, 22));
break;
case ZMQ_EVENT_CONNECT_DELAYED:
assert (data_->connect_delayed.err != 0);
assert (memcmp (data_->connect_delayed.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSE_FAILED:
assert (data_->close_failed.err != 0);
assert (memcmp (data_->close_failed.addr, addr, 22));
break;
case ZMQ_EVENT_CLOSED:
assert (data_->closed.fd != 0);
assert (memcmp (data_->closed.addr, addr, 22));
break;
case ZMQ_EVENT_DISCONNECTED:
assert (data_->disconnected.fd != 0);
assert (memcmp (data_->disconnected.addr, addr, 22));
break;
default:
// out of band / unexpected event
assert (0);
}
}
int main (int argc, char *argv [])
{
int rc;
// Create the infrastructure
void *ctx = zmq_init (1);
assert (ctx);
void *rep = zmq_socket (ctx, ZMQ_REP);
assert (rep);
// Expects failure - invalid size
rc = zmq_setsockopt (rep, ZMQ_MONITOR, (void *)listening_sock_monitor, 20);
assert (rc == -1);
assert (errno == EINVAL);
rc = zmq_setsockopt (rep, ZMQ_MONITOR, (void *)listening_sock_monitor, sizeof (void *));
assert (rc == 0);
void *monitor;
size_t sz = sizeof (void *);
rc = zmq_getsockopt (rep, ZMQ_MONITOR, monitor, &sz);
assert (rc == 0);
assert (monitor == listening_sock_monitor);
// Remove socket monitor callback
rc = zmq_setsockopt (rep, ZMQ_MONITOR, NULL, 0);
assert (rc == 0);
rc = zmq_getsockopt (rep, ZMQ_MONITOR, monitor, &sz);
assert (rc == 0);
assert (monitor == listening_sock_monitor);
rc = zmq_bind (rep, "tcp://127.0.0.1:5560");
assert (rc == 0);
void *req = zmq_socket (ctx, ZMQ_REQ);
assert (req);
rc = zmq_setsockopt (req, ZMQ_MONITOR, (void *)connecting_sock_monitor, sizeof (void *));
assert (rc == 0);
rc = zmq_connect (req, "tcp://127.0.0.1:5560");
assert (rc == 0);
// Allow a window for socket events as connect can be async
zmq_sleep (1);
// Deallocate the infrastructure.
rc = zmq_close (req);
assert (rc == 0);
// Allow for closed or disconnected events to bubble up
zmq_sleep (1);
rc = zmq_close (rep);
assert (rc == 0);
zmq_sleep (1);
zmq_term (ctx);
return 0 ;
}