mirror of
https://github.com/zeromq/cppzmq.git
synced 2024-12-13 10:52:57 +01:00
Merge pull request #612 from dietelTiMaMi/feature/ExposeMonitorSocketForActivePoller
Feature/expose monitor socket for active poller
This commit is contained in:
commit
85a14afe30
@ -8,7 +8,7 @@
|
||||
|
||||
class mock_monitor_t : public zmq::monitor_t
|
||||
{
|
||||
public:
|
||||
public:
|
||||
|
||||
void on_event_connected(const zmq_event_t &, const char *) ZMQ_OVERRIDE
|
||||
{
|
||||
@ -89,7 +89,7 @@ TEST_CASE("monitor init abort", "[monitor]")
|
||||
{
|
||||
class mock_monitor : public mock_monitor_t
|
||||
{
|
||||
public:
|
||||
public:
|
||||
mock_monitor(std::function<void(void)> handle_connected) :
|
||||
handle_connected{std::move(handle_connected)}
|
||||
{
|
||||
@ -128,7 +128,7 @@ TEST_CASE("monitor init abort", "[monitor]")
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(mutex);
|
||||
CHECK(cond_var.wait_for(lock, std::chrono::seconds(1),
|
||||
[&done] { return done; }));
|
||||
[&done] { return done; }));
|
||||
}
|
||||
CHECK(monitor.connected == 1);
|
||||
monitor.abort();
|
||||
@ -150,3 +150,95 @@ TEST_CASE("monitor from move assigned socket", "[monitor]")
|
||||
// failing
|
||||
}
|
||||
#endif
|
||||
|
||||
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \
|
||||
&& !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER)
|
||||
#include "zmq_addon.hpp"
|
||||
|
||||
TEST_CASE("poll monitor events using active poller", "[monitor]")
|
||||
{
|
||||
// define necessary class for test
|
||||
class test_monitor : public zmq::monitor_t
|
||||
{
|
||||
public:
|
||||
void init(zmq::socket_t &socket,
|
||||
const char *const addr_,
|
||||
int events = ZMQ_EVENT_ALL)
|
||||
{
|
||||
zmq::monitor_t::init(socket, addr_, events);
|
||||
}
|
||||
|
||||
void addToPoller(zmq::active_poller_t &inActivePoller)
|
||||
{
|
||||
inActivePoller.add(
|
||||
monitor_socket(), zmq::event_flags::pollin,
|
||||
[&](zmq::event_flags ef) { process_event(static_cast<short>(ef)); });
|
||||
}
|
||||
|
||||
void on_event_accepted(const zmq_event_t &event_, const char *addr_) override
|
||||
{
|
||||
clientAccepted++;
|
||||
}
|
||||
void on_event_disconnected(const zmq_event_t &event,
|
||||
const char *const addr) override
|
||||
{
|
||||
clientDisconnected++;
|
||||
}
|
||||
|
||||
int clientAccepted = 0;
|
||||
int clientDisconnected = 0;
|
||||
};
|
||||
|
||||
//Arrange
|
||||
int messageCounter = 0;
|
||||
const char monitorAddress[] = "inproc://monitor-server";
|
||||
|
||||
auto addToPoller = [&](zmq::socket_t &socket, zmq::active_poller_t &poller) {
|
||||
poller.add(socket, zmq::event_flags::pollin, [&](zmq::event_flags ef) {
|
||||
zmq::message_t msg;
|
||||
auto result = socket.recv(msg, zmq::recv_flags::dontwait);
|
||||
messageCounter++;
|
||||
});
|
||||
};
|
||||
|
||||
common_server_client_setup sockets(false);
|
||||
|
||||
test_monitor monitor;
|
||||
monitor.init(sockets.server, monitorAddress,
|
||||
ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED);
|
||||
|
||||
zmq::active_poller_t poller;
|
||||
monitor.addToPoller(poller);
|
||||
addToPoller(sockets.server, poller);
|
||||
|
||||
sockets.init();
|
||||
sockets.client.send(zmq::message_t(0), zmq::send_flags::dontwait);
|
||||
CHECK(monitor.clientAccepted == 0);
|
||||
CHECK(monitor.clientDisconnected == 0);
|
||||
|
||||
//Act
|
||||
for (int i = 0; i < 100; i++) {
|
||||
poller.wait(std::chrono::milliseconds(50));
|
||||
if (monitor.clientAccepted > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
CHECK(monitor.clientAccepted == 1);
|
||||
CHECK(monitor.clientDisconnected == 0);
|
||||
|
||||
sockets.client.close();
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
poller.wait(std::chrono::milliseconds(50));
|
||||
if (monitor.clientDisconnected > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
sockets.server.close();
|
||||
|
||||
// Assert
|
||||
CHECK(messageCounter == 1);
|
||||
CHECK(monitor.clientAccepted == 1);
|
||||
CHECK(monitor.clientDisconnected == 1);
|
||||
}
|
||||
#endif
|
||||
|
212
zmq.hpp
212
zmq.hpp
@ -2362,8 +2362,6 @@ class monitor_t
|
||||
{
|
||||
assert(_monitor_socket);
|
||||
|
||||
zmq::message_t eventMsg;
|
||||
|
||||
zmq::pollitem_t items[] = {
|
||||
{_monitor_socket.handle(), 0, ZMQ_POLLIN, 0},
|
||||
};
|
||||
@ -2374,106 +2372,7 @@ class monitor_t
|
||||
zmq::poll(&items[0], 1, timeout);
|
||||
#endif
|
||||
|
||||
if (items[0].revents & ZMQ_POLLIN) {
|
||||
int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0);
|
||||
if (rc == -1 && zmq_errno() == ETERM)
|
||||
return false;
|
||||
assert(rc != -1);
|
||||
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
#if ZMQ_VERSION_MAJOR >= 4
|
||||
const char *data = static_cast<const char *>(eventMsg.data());
|
||||
zmq_event_t msgEvent;
|
||||
memcpy(&msgEvent.event, data, sizeof(uint16_t));
|
||||
data += sizeof(uint16_t);
|
||||
memcpy(&msgEvent.value, data, sizeof(int32_t));
|
||||
zmq_event_t *event = &msgEvent;
|
||||
#else
|
||||
zmq_event_t *event = static_cast<zmq_event_t *>(eventMsg.data());
|
||||
#endif
|
||||
|
||||
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
|
||||
zmq::message_t addrMsg;
|
||||
int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0);
|
||||
if (rc == -1 && zmq_errno() == ETERM) {
|
||||
return false;
|
||||
}
|
||||
|
||||
assert(rc != -1);
|
||||
std::string address = addrMsg.to_string();
|
||||
#else
|
||||
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
|
||||
std::string address = event->data.connected.addr;
|
||||
#endif
|
||||
|
||||
#ifdef ZMQ_EVENT_MONITOR_STOPPED
|
||||
if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
switch (event->event) {
|
||||
case ZMQ_EVENT_CONNECTED:
|
||||
on_event_connected(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_DELAYED:
|
||||
on_event_connect_delayed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_RETRIED:
|
||||
on_event_connect_retried(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_LISTENING:
|
||||
on_event_listening(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_BIND_FAILED:
|
||||
on_event_bind_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPTED:
|
||||
on_event_accepted(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPT_FAILED:
|
||||
on_event_accept_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSED:
|
||||
on_event_closed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSE_FAILED:
|
||||
on_event_close_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_DISCONNECTED:
|
||||
on_event_disconnected(*event, address.c_str());
|
||||
break;
|
||||
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
|
||||
on_event_handshake_failed_no_detail(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
|
||||
on_event_handshake_failed_protocol(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
|
||||
on_event_handshake_failed_auth(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
|
||||
on_event_handshake_succeeded(*event, address.c_str());
|
||||
break;
|
||||
#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED:
|
||||
on_event_handshake_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
|
||||
on_event_handshake_succeed(*event, address.c_str());
|
||||
break;
|
||||
#endif
|
||||
default:
|
||||
on_event_unknown(*event, address.c_str());
|
||||
break;
|
||||
}
|
||||
|
||||
return true;
|
||||
return process_event(items[0].revents);
|
||||
}
|
||||
|
||||
#ifdef ZMQ_EVENT_MONITOR_STOPPED
|
||||
@ -2583,6 +2482,115 @@ class monitor_t
|
||||
(void) addr_;
|
||||
}
|
||||
|
||||
protected:
|
||||
bool process_event(short events)
|
||||
{
|
||||
zmq::message_t eventMsg;
|
||||
|
||||
if (events & ZMQ_POLLIN) {
|
||||
int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0);
|
||||
if (rc == -1 && zmq_errno() == ETERM)
|
||||
return false;
|
||||
assert(rc != -1);
|
||||
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
#if ZMQ_VERSION_MAJOR >= 4
|
||||
const char *data = static_cast<const char *>(eventMsg.data());
|
||||
zmq_event_t msgEvent;
|
||||
memcpy(&msgEvent.event, data, sizeof(uint16_t));
|
||||
data += sizeof(uint16_t);
|
||||
memcpy(&msgEvent.value, data, sizeof(int32_t));
|
||||
zmq_event_t *event = &msgEvent;
|
||||
#else
|
||||
zmq_event_t *event = static_cast<zmq_event_t *>(eventMsg.data());
|
||||
#endif
|
||||
|
||||
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
|
||||
zmq::message_t addrMsg;
|
||||
int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0);
|
||||
if (rc == -1 && zmq_errno() == ETERM) {
|
||||
return false;
|
||||
}
|
||||
|
||||
assert(rc != -1);
|
||||
std::string address = addrMsg.to_string();
|
||||
#else
|
||||
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
|
||||
std::string address = event->data.connected.addr;
|
||||
#endif
|
||||
|
||||
#ifdef ZMQ_EVENT_MONITOR_STOPPED
|
||||
if (event->event == ZMQ_EVENT_MONITOR_STOPPED) {
|
||||
return false;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
switch (event->event) {
|
||||
case ZMQ_EVENT_CONNECTED:
|
||||
on_event_connected(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_DELAYED:
|
||||
on_event_connect_delayed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_RETRIED:
|
||||
on_event_connect_retried(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_LISTENING:
|
||||
on_event_listening(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_BIND_FAILED:
|
||||
on_event_bind_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPTED:
|
||||
on_event_accepted(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPT_FAILED:
|
||||
on_event_accept_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSED:
|
||||
on_event_closed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSE_FAILED:
|
||||
on_event_close_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_DISCONNECTED:
|
||||
on_event_disconnected(*event, address.c_str());
|
||||
break;
|
||||
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3))
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL:
|
||||
on_event_handshake_failed_no_detail(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL:
|
||||
on_event_handshake_failed_protocol(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH:
|
||||
on_event_handshake_failed_auth(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_SUCCEEDED:
|
||||
on_event_handshake_succeeded(*event, address.c_str());
|
||||
break;
|
||||
#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1)
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED:
|
||||
on_event_handshake_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
|
||||
on_event_handshake_succeed(*event, address.c_str());
|
||||
break;
|
||||
#endif
|
||||
default:
|
||||
on_event_unknown(*event, address.c_str());
|
||||
break;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
socket_ref monitor_socket() {return _monitor_socket;}
|
||||
|
||||
private:
|
||||
monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION;
|
||||
void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION;
|
||||
|
Loading…
Reference in New Issue
Block a user