Merge pull request #3383 from ZMQers/queue_monitor

Problem: cannot monitor state of queues at runtime
This commit is contained in:
Simon Giesecke 2019-02-11 10:49:39 +01:00 committed by GitHub
commit 119a258504
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 541 additions and 79 deletions

View File

@ -11,6 +11,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \
zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \
zmq_getsockopt.3 zmq_setsockopt.3 \
zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \
zmq_socket_monitor_versioned.3 \
zmq_errno.3 zmq_strerror.3 zmq_version.3 \
zmq_sendmsg.3 zmq_recvmsg.3 \
zmq_proxy.3 zmq_proxy_steerable.3 \

View File

@ -1 +0,0 @@
zmq_poller.txt

View File

@ -1 +0,0 @@
zmq_poller.txt

View File

@ -1 +0,0 @@
zmq_poller.txt

View File

@ -1 +0,0 @@
zmq_poller.txt

View File

@ -1 +0,0 @@
zmq_poller.txt

View File

@ -1 +0,0 @@
zmq_poller.txt

View File

@ -1 +0,0 @@
zmq_poller.txt

View File

@ -1 +0,0 @@
zmq_poller.txt

View File

@ -1 +0,0 @@
zmq_poller.txt

View File

@ -1,5 +1,5 @@
zmq_socket_monitor_versioned(3)
=====================
===============================
NAME
@ -12,6 +12,8 @@ SYNOPSIS
--------
*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');*
*int zmq_socket_monitor_pipes_stats (void '*socket');*
DESCRIPTION
-----------
@ -41,17 +43,23 @@ Each event is sent in multiple frames. The first frame contains an event
number (64 bits). The number and content of further frames depend on this
event number.
For all currently defined event types, the second frame contains an event
value (64 bits) that provides additional data according to the event number.
The third and fourth frames contain strings that specifies the affected
connection or endpoint. The third frame contains a string denoting the local
endpoint, while the fourth frame contains a string denoting the remote endpoint.
Unless it is specified differently, the second frame contains the number of
value frames that will follow it as a 64 bits integer. The third frame to N-th
frames contain an event value (64 bits) that provides additional data according
to the event number. Each event type might have a different number of values.
The second-to-last and last frames contain strings that specifies the affected
connection or endpoint. The former frame contains a string denoting the local
endpoint, while the latter frame contains a string denoting the remote endpoint.
Either of these may be empty, depending on the event type and whether the
connection uses a bound or connected local endpoint.
Note that the format of the second and further frames, and also the number of
frames, may be different for events added in the future.
The _zmq_socket_monitor_pipes_stats()_ method triggers an event of type
ZMQ_EVENT_PIPES_STATS for each connected peer of the monitored socket.
NOTE: _zmq_socket_monitor_pipes_stats()_ is in DRAFT state.
----
Monitoring events are only generated by some transports: At the moment these
are SOCKS, TCP, IPC, and TIPC. Note that it is not an error to call
@ -62,7 +70,7 @@ to multiple endpoints using different transports.
----
Supported events
Supported events (v1)
----------------
ZMQ_EVENT_CONNECTED
@ -167,17 +175,35 @@ The ZMTP security mechanism handshake failed due to an authentication failure.
The event value is the status code returned by the ZAP handler (i.e. 300,
400 or 500).
----
Supported events (v2)
----------------
ZMQ_EVENT_PIPE_STATS
~~~~~~~~~~~~~~~~~~~~
This event provides two values, the number of messages in each of the two
queues associated with the returned endpoint (respectively egress and ingress).
This event only triggers after calling the function
_zmq_socket_monitor_pipes_stats()_.
NOTE: this measurement is asynchronous, so by the time the message is received
the internal state might have already changed.
NOTE: when the monitored socket and the monitor are not used in a poll, the
event might not be delivered until an API has been called on the monitored
socket, like zmq_getsockopt for example (the option is irrelevant).
NOTE: in DRAFT state, not yet available in stable releases.
RETURN VALUE
------------
The _zmq_socket_monitor()_ function returns a value of 0 or greater if
successful. Otherwise it returns `-1` and sets 'errno' to one of the values
defined below.
The _zmq_socket_monitor()_ and _zmq_socket_monitor_pipes_stats()_ functions
return a value of 0 or greater if successful. Otherwise they return `-1` and
set 'errno' to one of the values defined below.
ERRORS
------
ERRORS - _zmq_socket_monitor()_
-------------------------------
*ETERM*::
The 0MQ 'context' associated with the specified 'socket' was terminated.
@ -188,42 +214,92 @@ sockets are required to use the inproc:// transport.
*EINVAL*::
The monitor 'endpoint' supplied does not exist.
ERRORS - _zmq_socket_monitor_pipes_stats()_
-------------------------------------------
*ENOTSOCK*::
The 'socket' parameter was not a valid 0MQ socket.
*EINVAL*::
The socket did not have monitoring enabled.
*EAGAIN*::
The monitored socket did not have any connections to monitor yet.
EXAMPLE
-------
.Monitoring client and server sockets
----
// Read one event off the monitor socket; return value and address
// Read one event off the monitor socket; return values and addresses
// by reference, if not null, and event number by value. Returns -1
// in case of error.
static int
get_monitor_event (void *monitor, int *value, char **address)
static uint64_t
get_monitor_event (void *monitor, uint64_t *value, char **local_address, char **remote_address)
{
// First frame in message contains event number and value
// First frame in message contains event number
zmq_msg_t msg;
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (zmq_msg_more (&msg));
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
uint16_t event = *(uint16_t *) (data);
if (value)
*value = *(uint32_t *) (data + 2);
uint64_t event;
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
zmq_msg_close (&msg);
// Second frame in message contains event address
// Second frame in message contains the number of values
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (zmq_msg_more (&msg));
uint64_t value_count;
memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
zmq_msg_close (&msg);
for (uint64_t i = 0; i < value_count; ++i) {
// Subsequent frames in message contain event values
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (zmq_msg_more (&msg));
if (value_ && value_ + i)
memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_));
zmq_msg_close (&msg);
}
// Second-to-last frame in message contains local address
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (zmq_msg_more (&msg));
if (local_address_) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*local_address_ = (char *) malloc (size + 1);
memcpy (*local_address_, data, size);
(*local_address_)[size] = 0;
}
zmq_msg_close (&msg);
// Last frame in message contains remote address
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor, 0) == -1)
return -1; // Interrupted, presumably
assert (!zmq_msg_more (&msg));
if (address) {
if (remote_address_) {
uint8_t *data = (uint8_t *) zmq_msg_data (&msg);
size_t size = zmq_msg_size (&msg);
*address = (char *) malloc (size + 1);
memcpy (*address, data, size);
(*address)[size] = 0;
*remote_address_ = (char *) malloc (size + 1);
memcpy (*remote_address_, data, size);
(*remote_address_)[size] = 0;
}
zmq_msg_close (&msg);
return event;
}
@ -239,14 +315,14 @@ int main (void)
assert (server);
// Socket monitoring only works over inproc://
int rc = zmq_socket_monitor (client, "tcp://127.0.0.1:9999", 0);
int rc = zmq_socket_monitor_versioned (client, "tcp://127.0.0.1:9999", 0, 2);
assert (rc == -1);
assert (zmq_errno () == EPROTONOSUPPORT);
// Monitor all events on client and server sockets
rc = zmq_socket_monitor (client, "inproc://monitor-client", ZMQ_EVENT_ALL);
rc = zmq_socket_monitor_versioned (client, "inproc://monitor-client", ZMQ_EVENT_ALL, 2);
assert (rc == 0);
rc = zmq_socket_monitor (server, "inproc://monitor-server", ZMQ_EVENT_ALL);
rc = zmq_socket_monitor_versioned (server, "inproc://monitor-server", ZMQ_EVENT_ALL, 2);
assert (rc == 0);
// Create two sockets for collecting monitor events

View File

@ -726,16 +726,20 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket,
const void *routing_id,
size_t routing_id_size);
/* DRAFT Socket monitoring events */
#define ZMQ_EVENT_PIPES_STATS 0x10000
#define ZMQ_CURRENT_EVENT_VERSION 1
#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_);
ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s);
#endif // ZMQ_BUILD_DRAFT_API

View File

@ -32,6 +32,7 @@
#include <string>
#include "stdint.hpp"
#include "endpoint.hpp"
namespace zmq
{
@ -73,6 +74,8 @@ __declspec(align (64))
reap,
reaped,
inproc_connected,
pipe_peer_stats,
pipe_stats_publish,
done
} type;
@ -186,6 +189,23 @@ __declspec(align (64))
{
} reaped;
// Send application-side pipe count and ask to send monitor event
struct
{
uint64_t queue_count;
zmq::own_t *socket_base;
endpoint_uri_pair_t *endpoint_pair;
} pipe_peer_stats;
// Collate application thread and I/O thread pipe counts and endpoints
// and send as event
struct
{
uint64_t outbound_queue_count;
uint64_t inbound_queue_count;
endpoint_uri_pair_t *endpoint_pair;
} pipe_stats_publish;
// Sent by reaper thread to the term thread when all the sockets
// are successfully deallocated.
struct

View File

@ -107,6 +107,19 @@ void zmq::object_t::process_command (command_t &cmd_)
process_hiccup (cmd_.args.hiccup.pipe);
break;
case command_t::pipe_peer_stats:
process_pipe_peer_stats (cmd_.args.pipe_peer_stats.queue_count,
cmd_.args.pipe_peer_stats.socket_base,
cmd_.args.pipe_peer_stats.endpoint_pair);
break;
case command_t::pipe_stats_publish:
process_pipe_stats_publish (
cmd_.args.pipe_stats_publish.outbound_queue_count,
cmd_.args.pipe_stats_publish.inbound_queue_count,
cmd_.args.pipe_stats_publish.endpoint_pair);
break;
case command_t::pipe_term:
process_pipe_term ();
break;
@ -285,6 +298,35 @@ void zmq::object_t::send_hiccup (pipe_t *destination_, void *pipe_)
send_command (cmd);
}
void zmq::object_t::send_pipe_peer_stats (pipe_t *destination_,
uint64_t queue_count_,
own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::pipe_peer_stats;
cmd.args.pipe_peer_stats.queue_count = queue_count_;
cmd.args.pipe_peer_stats.socket_base = socket_base_;
cmd.args.pipe_peer_stats.endpoint_pair = endpoint_pair_;
send_command (cmd);
}
void zmq::object_t::send_pipe_stats_publish (
own_t *destination_,
uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_)
{
command_t cmd;
cmd.destination = destination_;
cmd.type = command_t::pipe_stats_publish;
cmd.args.pipe_stats_publish.outbound_queue_count = outbound_queue_count_;
cmd.args.pipe_stats_publish.inbound_queue_count = inbound_queue_count_;
cmd.args.pipe_stats_publish.endpoint_pair = endpoint_pair_;
send_command (cmd);
}
void zmq::object_t::send_pipe_term (pipe_t *destination_)
{
command_t cmd;
@ -422,6 +464,20 @@ void zmq::object_t::process_hiccup (void *)
zmq_assert (false);
}
void zmq::object_t::process_pipe_peer_stats (uint64_t,
own_t *,
endpoint_uri_pair_t *)
{
zmq_assert (false);
}
void zmq::object_t::process_pipe_stats_publish (uint64_t,
uint64_t,
endpoint_uri_pair_t *)
{
zmq_assert (false);
}
void zmq::object_t::process_pipe_term ()
{
zmq_assert (false);

View File

@ -32,6 +32,7 @@
#include <string>
#include "stdint.hpp"
#include "endpoint.hpp"
namespace zmq
{
@ -96,6 +97,14 @@ class object_t
void send_activate_read (zmq::pipe_t *destination_);
void send_activate_write (zmq::pipe_t *destination_, uint64_t msgs_read_);
void send_hiccup (zmq::pipe_t *destination_, void *pipe_);
void send_pipe_peer_stats (zmq::pipe_t *destination_,
uint64_t queue_count_,
zmq::own_t *socket_base,
endpoint_uri_pair_t *endpoint_pair_);
void send_pipe_stats_publish (zmq::own_t *destination_,
uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_);
void send_pipe_term (zmq::pipe_t *destination_);
void send_pipe_term_ack (zmq::pipe_t *destination_);
void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_);
@ -117,6 +126,13 @@ class object_t
virtual void process_activate_read ();
virtual void process_activate_write (uint64_t msgs_read_);
virtual void process_hiccup (void *pipe_);
virtual void process_pipe_peer_stats (uint64_t queue_count_,
zmq::own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_);
virtual void
process_pipe_stats_publish (uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_);
virtual void process_pipe_term ();
virtual void process_pipe_term_ack ();
virtual void process_pipe_hwm (int inhwm_, int outhwm_);

View File

@ -563,3 +563,19 @@ const zmq::endpoint_uri_pair_t &zmq::pipe_t::get_endpoint_pair () const
{
return _endpoint_pair;
}
void zmq::pipe_t::send_stats_to_peer (own_t *socket_base_)
{
endpoint_uri_pair_t *ep =
new (std::nothrow) endpoint_uri_pair_t (_endpoint_pair);
send_pipe_peer_stats (_peer, _msgs_written - _peers_msgs_read, socket_base_,
ep);
}
void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_)
{
send_pipe_stats_publish (socket_base_, queue_count_,
_msgs_written - _peers_msgs_read, endpoint_pair_);
}

View File

@ -145,6 +145,8 @@ class pipe_t : public object_t,
void set_endpoint_pair (endpoint_uri_pair_t endpoint_pair_);
const endpoint_uri_pair_t &get_endpoint_pair () const;
void send_stats_to_peer (own_t *socket_base_);
private:
// Type of the underlying lock-free pipe.
typedef ypipe_base_t<msg_t> upipe_t;
@ -153,6 +155,9 @@ class pipe_t : public object_t,
void process_activate_read ();
void process_activate_write (uint64_t msgs_read_);
void process_hiccup (void *pipe_);
void process_pipe_peer_stats (uint64_t queue_count_,
own_t *socket_base_,
endpoint_uri_pair_t *endpoint_pair_);
void process_pipe_term ();
void process_pipe_term_ack ();
void process_pipe_hwm (int inhwm_, int outhwm_);

View File

@ -409,6 +409,11 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
zmq_assert (!_pipe);
_pipe = pipes[0];
// The endpoints strings are not set on bind, set them here so that
// events can use them.
pipes[0]->set_endpoint_pair (engine_->get_endpoint ());
pipes[1]->set_endpoint_pair (engine_->get_endpoint ());
// Ask socket to plug into the remote end of the pipe.
send_bind (_socket, pipes[1]);
}

View File

@ -1421,6 +1421,45 @@ void zmq::socket_base_t::process_term_endpoint (std::string *endpoint_)
delete endpoint_;
}
void zmq::socket_base_t::process_pipe_stats_publish (
uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_)
{
uint64_t values[2] = {outbound_queue_count_, inbound_queue_count_};
event (*endpoint_pair_, values, 2, ZMQ_EVENT_PIPES_STATS);
delete endpoint_pair_;
}
/*
* There are 2 pipes per connection, and the inbound one _must_ be queried from
* the I/O thread. So ask the outbound pipe, in the application thread, to send
* a message (pipe_peer_stats) to its peer. The message will carry the outbound
* pipe stats and endpoint, and the reference to the socket object.
* The inbound pipe on the I/O thread will then add its own stats and endpoint,
* and write back a message to the socket object (pipe_stats_publish) which
* will raise an event with the data.
*/
int zmq::socket_base_t::query_pipes_stats ()
{
{
scoped_lock_t lock (_monitor_sync);
if (!(_monitor_events & ZMQ_EVENT_PIPES_STATS)) {
errno = EINVAL;
return -1;
}
}
if (_pipes.size () == 0) {
errno = EAGAIN;
return -1;
}
for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
_pipes[i]->send_stats_to_peer (this);
}
return 0;
}
void zmq::socket_base_t::update_pipe_options (int option_)
{
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
@ -1658,101 +1697,117 @@ int zmq::socket_base_t::monitor (const char *endpoint_,
void zmq::socket_base_t::event_connected (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CONNECTED);
uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECTED);
}
void zmq::socket_base_t::event_connect_delayed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_pair_, err_, ZMQ_EVENT_CONNECT_DELAYED);
uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_DELAYED);
}
void zmq::socket_base_t::event_connect_retried (
const endpoint_uri_pair_t &endpoint_uri_pair_, int interval_)
{
event (endpoint_uri_pair_, interval_, ZMQ_EVENT_CONNECT_RETRIED);
uint64_t values[1] = {(uint64_t) interval_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CONNECT_RETRIED);
}
void zmq::socket_base_t::event_listening (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_LISTENING);
uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_LISTENING);
}
void zmq::socket_base_t::event_bind_failed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_pair_, err_, ZMQ_EVENT_BIND_FAILED);
uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_BIND_FAILED);
}
void zmq::socket_base_t::event_accepted (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_ACCEPTED);
uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPTED);
}
void zmq::socket_base_t::event_accept_failed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_pair_, err_, ZMQ_EVENT_ACCEPT_FAILED);
uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_ACCEPT_FAILED);
}
void zmq::socket_base_t::event_closed (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_CLOSED);
uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSED);
}
void zmq::socket_base_t::event_close_failed (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_pair_, err_, ZMQ_EVENT_CLOSE_FAILED);
uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_CLOSE_FAILED);
}
void zmq::socket_base_t::event_disconnected (
const endpoint_uri_pair_t &endpoint_uri_pair_, zmq::fd_t fd_)
{
event (endpoint_uri_pair_, fd_, ZMQ_EVENT_DISCONNECTED);
uint64_t values[1] = {(uint64_t) fd_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_DISCONNECTED);
}
void zmq::socket_base_t::event_handshake_failed_no_detail (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
}
void zmq::socket_base_t::event_handshake_failed_protocol (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL);
}
void zmq::socket_base_t::event_handshake_failed_auth (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_FAILED_AUTH);
}
void zmq::socket_base_t::event_handshake_succeeded (
const endpoint_uri_pair_t &endpoint_uri_pair_, int err_)
{
event (endpoint_uri_pair_, err_, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
uint64_t values[1] = {(uint64_t) err_};
event (endpoint_uri_pair_, values, 1, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
}
void zmq::socket_base_t::event (const endpoint_uri_pair_t &endpoint_uri_pair_,
uint64_t value_,
uint64_t values_[],
uint64_t values_count_,
uint64_t type_)
{
scoped_lock_t lock (_monitor_sync);
if (_monitor_events & type_) {
monitor_event (type_, value_, endpoint_uri_pair_);
monitor_event (type_, values_, values_count_, endpoint_uri_pair_);
}
}
// Send a monitor event
void zmq::socket_base_t::monitor_event (
uint64_t event_,
uint64_t value_,
uint64_t values_[],
uint64_t values_count_,
const endpoint_uri_pair_t &endpoint_uri_pair_) const
{
// this is a private method which is only called from
@ -1765,11 +1820,14 @@ void zmq::socket_base_t::monitor_event (
case 1: {
// The API should not allow to activate unsupported events
zmq_assert (event_ <= std::numeric_limits<uint16_t>::max ());
zmq_assert (value_ <= std::numeric_limits<uint32_t>::max ());
// v1 only allows one value
zmq_assert (values_count_ == 1);
zmq_assert (values_[0]
<= std::numeric_limits<uint32_t>::max ());
// Send event and value in first frame
const uint16_t event = static_cast<uint16_t> (event_);
const uint32_t value = static_cast<uint32_t> (value_);
const uint32_t value = static_cast<uint32_t> (values_[0]);
zmq_msg_init_size (&msg, sizeof (event) + sizeof (value));
uint8_t *data = static_cast<uint8_t *> (zmq_msg_data (&msg));
// Avoid dereferencing uint32_t on unaligned address
@ -1788,22 +1846,31 @@ void zmq::socket_base_t::monitor_event (
} break;
case 2: {
// Send event in first frame (64bit unsigned)
zmq_msg_init_size (&msg, sizeof event_);
memcpy (zmq_msg_data (&msg), &event_, sizeof event_);
zmq_msg_init_size (&msg, sizeof (event_));
memcpy (zmq_msg_data (&msg), &event_, sizeof (event_));
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
// Send value in second frame (64bit unsigned)
zmq_msg_init_size (&msg, sizeof value_);
memcpy (zmq_msg_data (&msg), &value_, sizeof value_);
// Send number of values that will follow in second frame
zmq_msg_init_size (&msg, sizeof (values_count_));
memcpy (zmq_msg_data (&msg), &values_count_,
sizeof (values_count_));
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
// Send local endpoint URI in third frame (string)
// Send values in third-Nth frames (64bit unsigned)
for (uint64_t i = 0; i < values_count_; ++i) {
zmq_msg_init_size (&msg, sizeof (values_[i]));
memcpy (zmq_msg_data (&msg), &values_[i],
sizeof (values_[i]));
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
}
// Send local endpoint URI in second-to-last frame (string)
zmq_msg_init_size (&msg, endpoint_uri_pair_.local.size ());
memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.local.c_str (),
endpoint_uri_pair_.local.size ());
zmq_msg_send (&msg, _monitor_socket, ZMQ_SNDMORE);
// Send remote endpoint URI in fourth frame (string)
// Send remote endpoint URI in last frame (string)
zmq_msg_init_size (&msg, endpoint_uri_pair_.remote.size ());
memcpy (zmq_msg_data (&msg), endpoint_uri_pair_.remote.c_str (),
endpoint_uri_pair_.remote.size ());
@ -1820,9 +1887,11 @@ void zmq::socket_base_t::stop_monitor (bool send_monitor_stopped_event_)
if (_monitor_socket) {
if ((_monitor_events & ZMQ_EVENT_MONITOR_STOPPED)
&& send_monitor_stopped_event_)
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, 0,
&& send_monitor_stopped_event_) {
uint64_t values[1] = {0};
monitor_event (ZMQ_EVENT_MONITOR_STOPPED, values, 1,
endpoint_uri_pair_t ());
}
zmq_close (_monitor_socket);
_monitor_socket = NULL;
_monitor_events = 0;

View File

@ -157,6 +157,11 @@ class socket_base_t : public own_t,
virtual int get_peer_state (const void *routing_id_,
size_t routing_id_size_) const;
// Request for pipes statistics - will generate a ZMQ_EVENT_PIPES_STATS
// after gathering the data asynchronously. Requires event monitoring to
// be enabled.
int query_pipes_stats ();
protected:
socket_base_t (zmq::ctx_t *parent_,
uint32_t tid_,
@ -200,12 +205,14 @@ class socket_base_t : public own_t,
private:
// test if event should be sent and then dispatch it
void event (const endpoint_uri_pair_t &endpoint_uri_pair_,
uint64_t value_,
uint64_t values_[],
uint64_t values_count_,
uint64_t type_);
// Socket event data dispatch
void monitor_event (uint64_t event_,
uint64_t value_,
uint64_t values_[],
uint64_t values_count_,
const endpoint_uri_pair_t &endpoint_uri_pair_) const;
// Monitor socket cleanup
@ -276,6 +283,9 @@ class socket_base_t : public own_t,
// Handlers for incoming commands.
void process_stop ();
void process_bind (zmq::pipe_t *pipe_);
void process_pipe_stats_publish (uint64_t outbound_queue_count_,
uint64_t inbound_queue_count_,
endpoint_uri_pair_t *endpoint_pair_);
void process_term (int linger_);
void process_term_endpoint (std::string *endpoint_);

View File

@ -1452,3 +1452,11 @@ int zmq_has (const char *capability_)
// Whatever the application asked for, we don't have
return false;
}
int zmq_socket_monitor_pipes_stats (void *s_)
{
zmq::socket_base_t *s = as_socket_base_t (s_);
if (!s)
return -1;
return s->query_pipes_stats ();
}

View File

@ -123,10 +123,20 @@ int zmq_socket_get_peer_state (void *socket_,
const void *routing_id_,
size_t routing_id_size_);
/* DRAFT Socket monitoring events */
#define ZMQ_EVENT_PIPES_STATS 0x10000
#define ZMQ_CURRENT_EVENT_VERSION 1
#define ZMQ_CURRENT_EVENT_VERSION_DRAFT 2
#define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL
#define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS
int zmq_socket_monitor_versioned (void *s_,
const char *addr_,
uint64_t events_,
int event_version_);
int zmq_socket_monitor_pipes_stats (void *s_);
#endif // ZMQ_BUILD_DRAFT_API

View File

@ -50,6 +50,13 @@ void test_monitor_invalid_protocol_fails ()
TEST_ASSERT_FAILURE_ERRNO (
EPROTONOSUPPORT, zmq_socket_monitor (client, "tcp://127.0.0.1:*", 0));
#ifdef ZMQ_EVENT_PIPES_STATS
// Stats command needs to be called on a valid socket with monitoring
// enabled
TEST_ASSERT_FAILURE_ERRNO (ENOTSOCK, zmq_socket_monitor_pipes_stats (NULL));
TEST_ASSERT_FAILURE_ERRNO (EINVAL, zmq_socket_monitor_pipes_stats (client));
#endif
test_context_socket_close_zero_linger (client);
}
@ -94,7 +101,12 @@ void test_monitor_basic ()
event = get_monitor_event (client_mon, NULL, NULL);
assert (event == ZMQ_EVENT_CONNECTED);
expect_monitor_event (client_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED);
expect_monitor_event (client_mon, ZMQ_EVENT_MONITOR_STOPPED);
event = get_monitor_event (client_mon, NULL, NULL);
if (event == ZMQ_EVENT_DISCONNECTED) {
expect_monitor_event (client_mon, ZMQ_EVENT_CONNECT_RETRIED);
expect_monitor_event (client_mon, ZMQ_EVENT_MONITOR_STOPPED);
} else
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
// This is the flow of server events
expect_monitor_event (server_mon, ZMQ_EVENT_LISTENING);
@ -116,7 +128,9 @@ void test_monitor_basic ()
test_context_socket_close_zero_linger (server_mon);
}
#ifdef ZMQ_BUILD_DRAFT_API
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
|| (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
void test_monitor_versioned_basic (bind_function_t bind_function_,
const char *expected_prefix_)
{
@ -180,7 +194,13 @@ void test_monitor_versioned_basic (bind_function_t bind_function_,
expect_monitor_event_v2 (client_mon, ZMQ_EVENT_HANDSHAKE_SUCCEEDED,
client_local_address, client_remote_address);
expect_monitor_event_v2 (client_mon, ZMQ_EVENT_MONITOR_STOPPED, "", "");
event = get_monitor_event_v2 (client_mon, NULL, NULL, NULL);
if (event == ZMQ_EVENT_DISCONNECTED) {
expect_monitor_event_v2 (client_mon, ZMQ_EVENT_CONNECT_RETRIED,
client_local_address, client_remote_address);
expect_monitor_event_v2 (client_mon, ZMQ_EVENT_MONITOR_STOPPED, "", "");
} else
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_MONITOR_STOPPED, event);
// This is the flow of server events
expect_monitor_event_v2 (server_mon, ZMQ_EVENT_LISTENING,
@ -230,6 +250,133 @@ void test_monitor_versioned_basic_tipc ()
static const char prefix[] = "tipc://";
test_monitor_versioned_basic (bind_loopback_tipc, prefix);
}
#ifdef ZMQ_EVENT_PIPES_STATS
void test_monitor_versioned_stats (bind_function_t bind_function_,
const char *expected_prefix_)
{
char server_endpoint[MAX_SOCKET_STRING];
const int pulls_count = 4;
void *pulls[pulls_count];
// We'll monitor these two sockets
void *push = test_context_socket (ZMQ_PUSH);
TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned (
push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2));
// Should fail if there are no pipes to monitor
TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_socket_monitor_pipes_stats (push));
void *push_mon = test_context_socket (ZMQ_PAIR);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (push_mon, "inproc://monitor-push"));
// Set lower HWM - queues will be filled so we should see it in the stats
int send_hwm = 500;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (push, ZMQ_SNDHWM, &send_hwm, sizeof (send_hwm)));
// Set very low TCP buffers so that messages cannot be stored in-flight
const int tcp_buffer_size = 4096;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
push, ZMQ_SNDBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
bind_function_ (push, server_endpoint, sizeof (server_endpoint));
int ipv6_;
size_t ipv6_size_ = sizeof (ipv6_);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (push, ZMQ_IPV6, &ipv6_, &ipv6_size_));
for (int i = 0; i < pulls_count; ++i) {
pulls[i] = test_context_socket (ZMQ_PULL);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pulls[i], ZMQ_IPV6, &ipv6_, sizeof (int)));
int timeout_ms = 10;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
pulls[i], ZMQ_RCVTIMEO, &timeout_ms, sizeof (timeout_ms)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (pulls[i], ZMQ_RCVHWM, &send_hwm, sizeof (send_hwm)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
pulls[i], ZMQ_RCVBUF, &tcp_buffer_size, sizeof (tcp_buffer_size)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[i], server_endpoint));
}
// Send until we block
int send_count = 0;
// Saturate the TCP buffers too
char data[tcp_buffer_size * 2];
memset (data, 0, sizeof (data));
// Saturate all pipes - send + receive - on all connections
while (send_count < send_hwm * 2 * pulls_count) {
TEST_ASSERT_EQUAL_INT (sizeof (data),
zmq_send (push, data, sizeof (data), 0));
++send_count;
}
// Drain one of the pulls - doesn't matter how many messages, at least one
send_count = send_count / 4;
do {
zmq_recv (pulls[0], data, sizeof (data), 0);
--send_count;
} while (send_count > 0);
// To kick the application thread, do a dummy getsockopt - users here
// should use the monitor and the other sockets in a poll.
unsigned long int dummy;
size_t dummy_size = sizeof (dummy);
msleep (SETTLE_TIME);
// Note that the pipe stats on the sender will not get updated until the
// receiver has processed at least lwm ((hwm + 1) / 2) messages AND until
// the application thread has ran through the mailbox, as the update is
// delivered via a message (send_activate_write)
zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
// Ask for stats and check that they match
zmq_socket_monitor_pipes_stats (push);
msleep (SETTLE_TIME);
zmq_getsockopt (push, ZMQ_EVENTS, &dummy, &dummy_size);
for (int i = 0; i < pulls_count; ++i) {
char *push_local_address = NULL;
char *push_remote_address = NULL;
uint64_t queue_stat[2];
int64_t event = get_monitor_event_v2 (
push_mon, queue_stat, &push_local_address, &push_remote_address);
TEST_ASSERT_EQUAL_STRING (server_endpoint, push_local_address);
TEST_ASSERT_EQUAL_STRING_LEN (expected_prefix_, push_remote_address,
strlen (expected_prefix_));
TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_PIPES_STATS, event);
TEST_ASSERT_EQUAL_INT (i == 0 ? 0 : send_hwm, queue_stat[0]);
TEST_ASSERT_EQUAL_INT (0, queue_stat[1]);
free (push_local_address);
free (push_remote_address);
}
// Close client and server
test_context_socket_close_zero_linger (push_mon);
test_context_socket_close_zero_linger (push);
for (int i = 0; i < pulls_count; ++i)
test_context_socket_close_zero_linger (pulls[i]);
}
void test_monitor_versioned_stats_tcp_ipv4 ()
{
static const char prefix[] = "tcp://127.0.0.1:";
test_monitor_versioned_stats (bind_loopback_ipv4, prefix);
}
void test_monitor_versioned_stats_tcp_ipv6 ()
{
static const char prefix[] = "tcp://[::1]:";
test_monitor_versioned_stats (bind_loopback_ipv6, prefix);
}
void test_monitor_versioned_stats_ipc ()
{
static const char prefix[] = "ipc://";
test_monitor_versioned_stats (bind_loopback_ipc, prefix);
}
#endif // ZMQ_EVENT_PIPES_STATS
#endif
int main ()
@ -240,11 +387,18 @@ int main ()
RUN_TEST (test_monitor_invalid_protocol_fails);
RUN_TEST (test_monitor_basic);
#ifdef ZMQ_BUILD_DRAFT_API
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
|| (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
RUN_TEST (test_monitor_versioned_basic_tcp_ipv4);
RUN_TEST (test_monitor_versioned_basic_tcp_ipv6);
RUN_TEST (test_monitor_versioned_basic_ipc);
RUN_TEST (test_monitor_versioned_basic_tipc);
#ifdef ZMQ_EVENT_PIPES_STATS
RUN_TEST (test_monitor_versioned_stats_tcp_ipv4);
RUN_TEST (test_monitor_versioned_stats_tcp_ipv6);
RUN_TEST (test_monitor_versioned_stats_ipc);
#endif
#endif
return UNITY_END ();

View File

@ -190,7 +190,9 @@ int expect_monitor_event_multiple (void *server_mon_,
return count_of_expected_events;
}
#ifdef ZMQ_BUILD_DRAFT_API
#if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \
|| (defined ZMQ_CURRENT_EVENT_VERSION \
&& ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2)
static int64_t get_monitor_event_internal_v2 (void *monitor_,
uint64_t *value_,
char **local_address_,
@ -208,9 +210,10 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_,
assert (sizeof (uint64_t) == zmq_msg_size (&msg));
uint64_t event;
memcpy (&event, zmq_msg_data (&msg), sizeof event);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
zmq_msg_close (&msg);
// Second frame in message contains event value
// Second frame in message contains the number of values
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
assert (errno == EAGAIN);
@ -219,10 +222,26 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_,
assert (zmq_msg_more (&msg));
assert (sizeof (uint64_t) == zmq_msg_size (&msg));
if (value_)
memcpy (value_, zmq_msg_data (&msg), sizeof *value_);
uint64_t value_count;
memcpy (&value_count, zmq_msg_data (&msg), sizeof (value_count));
zmq_msg_close (&msg);
// Third frame in message contains local address
for (uint64_t i = 0; i < value_count; ++i) {
// Subsequent frames in message contain event values
zmq_msg_init (&msg);
if (zmq_msg_recv (&msg, monitor_, recv_flag_) == -1) {
assert (errno == EAGAIN);
return -1; // timed out or no message available
}
assert (zmq_msg_more (&msg));
assert (sizeof (uint64_t) == zmq_msg_size (&msg));
if (value_ && value_ + i)
memcpy (value_ + i, zmq_msg_data (&msg), sizeof (*value_));
zmq_msg_close (&msg);
}
// Second-to-last frame in message contains local address
zmq_msg_init (&msg);
int res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1;
assert (res != -1);
@ -235,8 +254,9 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_,
memcpy (*local_address_, data, size);
(*local_address_)[size] = 0;
}
zmq_msg_close (&msg);
// Fourth and last frame in message contains remote address
// Last frame in message contains remote address
zmq_msg_init (&msg);
res = zmq_msg_recv (&msg, monitor_, recv_flag_) == -1;
assert (res != -1);
@ -249,6 +269,7 @@ static int64_t get_monitor_event_internal_v2 (void *monitor_,
memcpy (*remote_address_, data, size);
(*remote_address_)[size] = 0;
}
zmq_msg_close (&msg);
return event;
}