diff --git a/tests/monitor.cpp b/tests/monitor.cpp index 09d5381..8637cc9 100644 --- a/tests/monitor.cpp +++ b/tests/monitor.cpp @@ -8,7 +8,7 @@ class mock_monitor_t : public zmq::monitor_t { -public: + public: void on_event_connected(const zmq_event_t &, const char *) ZMQ_OVERRIDE { @@ -89,7 +89,7 @@ TEST_CASE("monitor init abort", "[monitor]") { class mock_monitor : public mock_monitor_t { - public: + public: mock_monitor(std::function handle_connected) : handle_connected{std::move(handle_connected)} { @@ -128,7 +128,7 @@ TEST_CASE("monitor init abort", "[monitor]") { std::unique_lock lock(mutex); CHECK(cond_var.wait_for(lock, std::chrono::seconds(1), - [&done] { return done; })); + [&done] { return done; })); } CHECK(monitor.connected == 1); monitor.abort(); @@ -150,3 +150,90 @@ TEST_CASE("monitor from move assigned socket", "[monitor]") // failing } #endif + +#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \ + && !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER) +#include "zmq_addon.hpp" +using namespace std::literals::chrono_literals; + +TEST_CASE("poll monitor events using active poller", "[monitor]") +{ + // define necessary class for test + class test_monitor : public zmq::monitor_t + { + public: + void init(zmq::socket_t &socket, + const char *const addr_, + int events = ZMQ_EVENT_ALL) + { + zmq::monitor_t::init(socket, addr_, events); + } + + void addToPoller(zmq::active_poller_t &inActivePoller) + { + inActivePoller.add( + _monitor_socket, zmq::event_flags::pollin, + [&](zmq::event_flags ef) { process_event(static_cast(ef)); }); + } + + void on_event_accepted(const zmq_event_t &event_, + const char *addr_) override + { + clientAccepted++; + } + void on_event_disconnected(const zmq_event_t &event, + const char *const addr) override + { + clientDisconnected++; + } + + int clientAccepted = 0; + int clientDisconnected = 0; + }; + + //Arrange + int messageCounter = 0; + const char monitorAddress[] = "inproc://monitor-server"; + + auto addToPoller = [&](zmq::socket_t &socket, zmq::active_poller_t &poller) { + poller.add(socket, zmq::event_flags::pollin, [&](zmq::event_flags ef) { + zmq::message_t msg; + auto result = socket.recv(msg, zmq::recv_flags::dontwait); + messageCounter++; + }); + }; + + common_server_client_setup sockets(false); + + test_monitor monitor; + monitor.init(sockets.server, monitorAddress); + + zmq::active_poller_t poller; + monitor.addToPoller(poller); + addToPoller(sockets.server, poller); + + sockets.init(); + sockets.client.send(zmq::message_t(0), zmq::send_flags::dontwait); + CHECK(monitor.clientAccepted == 0); + CHECK(monitor.clientDisconnected == 0); + + //Act + for (int i = 0; i < 10; i++) { + poller.wait(10ms); + } + CHECK(monitor.clientAccepted == 1); + CHECK(monitor.clientDisconnected == 0); + + sockets.client.close(); + + for (int i = 0; i < 10; i++) { + poller.wait(10ms); + } + sockets.server.close(); + + // Assert + CHECK(messageCounter == 1); + CHECK(monitor.clientAccepted == 1); + CHECK(monitor.clientDisconnected == 1); +} +#endif