#include "testutil.hpp" #ifdef ZMQ_CPP11 #include #include #include #include class mock_monitor_t : public zmq::monitor_t { public: void on_event_connected(const zmq_event_t &, const char *) ZMQ_OVERRIDE { ++connected; ++total; } int total{0}; int connected{0}; }; #endif TEST_CASE("monitor create destroy", "[monitor]") { zmq::monitor_t monitor; } #if defined(ZMQ_CPP11) TEST_CASE("monitor move construct", "[monitor]") { zmq::context_t ctx; zmq::socket_t sock(ctx, ZMQ_DEALER); SECTION("move ctor empty") { zmq::monitor_t monitor1; zmq::monitor_t monitor2 = std::move(monitor1); } SECTION("move ctor init") { zmq::monitor_t monitor1; monitor1.init(sock, "inproc://monitor-client"); zmq::monitor_t monitor2 = std::move(monitor1); } } TEST_CASE("monitor move assign", "[monitor]") { zmq::context_t ctx; zmq::socket_t sock(ctx, ZMQ_DEALER); SECTION("move assign empty") { zmq::monitor_t monitor1; zmq::monitor_t monitor2; monitor1 = std::move(monitor2); } SECTION("move assign init") { zmq::monitor_t monitor1; monitor1.init(sock, "inproc://monitor-client"); zmq::monitor_t monitor2; monitor2 = std::move(monitor1); } SECTION("move assign init both") { zmq::monitor_t monitor1; monitor1.init(sock, "inproc://monitor-client"); zmq::monitor_t monitor2; zmq::socket_t sock2(ctx, ZMQ_DEALER); monitor2.init(sock2, "inproc://monitor-client2"); monitor2 = std::move(monitor1); } } TEST_CASE("monitor init event count", "[monitor]") { common_server_client_setup s{false}; mock_monitor_t monitor; const int expected_event_count = 1; monitor.init(s.client, "inproc://foo"); CHECK_FALSE(monitor.check_event(0)); s.init(); while (monitor.check_event(1000) && monitor.total < expected_event_count) { } CHECK(monitor.connected == 1); CHECK(monitor.total == expected_event_count); } TEST_CASE("monitor init abort", "[monitor]") { class mock_monitor : public mock_monitor_t { public: mock_monitor(std::function handle_connected) : handle_connected{std::move(handle_connected)} { } void on_event_connected(const zmq_event_t &e, const char *m) ZMQ_OVERRIDE { mock_monitor_t::on_event_connected(e, m); handle_connected(); } std::function handle_connected; }; common_server_client_setup s(false); std::mutex mutex; std::condition_variable cond_var; bool done{false}; mock_monitor monitor([&]() { std::lock_guard lock(mutex); done = true; cond_var.notify_one(); }); monitor.init(s.client, "inproc://foo"); auto thread = std::thread([&monitor] { while (monitor.check_event(-1)) { } }); s.init(); { std::unique_lock lock(mutex); CHECK(cond_var.wait_for(lock, std::chrono::seconds(1), [&done] { return done; })); } CHECK(monitor.connected == 1); monitor.abort(); thread.join(); } TEST_CASE("monitor from move assigned socket", "[monitor]") { zmq::context_t ctx; zmq::socket_t sock; sock = std::move([&ctx] { zmq::socket_t sock(ctx, ZMQ_DEALER); return sock; }()); zmq::monitor_t monitor1; monitor1.init(sock, "inproc://monitor-client"); // On failure, this test might hang indefinitely instead of immediately // failing } #endif #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) \ && !defined(ZMQ_CPP11_PARTIAL) && defined(ZMQ_HAVE_POLLER) #include "zmq_addon.hpp" TEST_CASE("poll monitor events using active poller", "[monitor]") { // define necessary class for test class test_monitor : public zmq::monitor_t { public: void init(zmq::socket_t &socket, const char *const addr_, int events = ZMQ_EVENT_ALL) { zmq::monitor_t::init(socket, addr_, events); } void addToPoller(zmq::active_poller_t &inActivePoller) { inActivePoller.add( monitor_socket(), zmq::event_flags::pollin, [&](zmq::event_flags ef) { process_event(static_cast(ef)); }); } void on_event_accepted(const zmq_event_t &event_, const char *addr_) override { clientAccepted++; } void on_event_disconnected(const zmq_event_t &event, const char *const addr) override { clientDisconnected++; } int clientAccepted = 0; int clientDisconnected = 0; }; //Arrange int messageCounter = 0; const char monitorAddress[] = "inproc://monitor-server"; auto addToPoller = [&](zmq::socket_t &socket, zmq::active_poller_t &poller) { poller.add(socket, zmq::event_flags::pollin, [&](zmq::event_flags ef) { zmq::message_t msg; auto result = socket.recv(msg, zmq::recv_flags::dontwait); messageCounter++; }); }; common_server_client_setup sockets(false); test_monitor monitor; monitor.init(sockets.server, monitorAddress, ZMQ_EVENT_ACCEPTED | ZMQ_EVENT_DISCONNECTED); zmq::active_poller_t poller; monitor.addToPoller(poller); addToPoller(sockets.server, poller); sockets.init(); sockets.client.send(zmq::message_t(0), zmq::send_flags::dontwait); CHECK(monitor.clientAccepted == 0); CHECK(monitor.clientDisconnected == 0); //Act for (int i = 0; i < 100; i++) { poller.wait(std::chrono::milliseconds(50)); if (monitor.clientAccepted > 0) { break; } } CHECK(monitor.clientAccepted == 1); CHECK(monitor.clientDisconnected == 0); sockets.client.close(); for (int i = 0; i < 100; i++) { poller.wait(std::chrono::milliseconds(50)); if (monitor.clientDisconnected > 0) { break; } } sockets.server.close(); // Assert CHECK(messageCounter == 1); CHECK(monitor.clientAccepted == 1); CHECK(monitor.clientDisconnected == 1); } #endif