From 0458f7d16cd8fb52e207803e67168950a445e91b Mon Sep 17 00:00:00 2001 From: Gudmundur Adalsteinsson Date: Wed, 15 May 2019 06:35:12 +0000 Subject: [PATCH] Problem: Type-safety of poller_t and active_poller_t can be improved (#318) Problem: Type-safety of poller_t and active_poller_t can be improved --- tests/active_poller.cpp | 94 ++++++++++++++++++------------------- tests/poller.cpp | 100 ++++++++++++++++++++++------------------ zmq.hpp | 97 ++++++++++++++++++++++++++++++-------- zmq_addon.hpp | 29 ++++++------ 4 files changed, 194 insertions(+), 126 deletions(-) diff --git a/tests/active_poller.cpp b/tests/active_poller.cpp index f7db4a2..081eeab 100644 --- a/tests/active_poller.cpp +++ b/tests/active_poller.cpp @@ -47,7 +47,7 @@ TEST_CASE("move construct non empty", "[active_poller]") zmq::socket_t socket{context, zmq::socket_type::router}; zmq::active_poller_t a; - a.add(socket, ZMQ_POLLIN, [](short) {}); + a.add(socket, zmq::event_flags::pollin, [](zmq::event_flags) {}); CHECK_FALSE(a.empty()); CHECK(1u == a.size()); zmq::active_poller_t b = std::move(a); @@ -63,7 +63,7 @@ TEST_CASE("move assign non empty", "[active_poller]") zmq::socket_t socket{context, zmq::socket_type::router}; zmq::active_poller_t a; - a.add(socket, ZMQ_POLLIN, [](short) {}); + a.add(socket, zmq::event_flags::pollin, [](zmq::event_flags) {}); CHECK_FALSE(a.empty()); CHECK(1u == a.size()); zmq::active_poller_t b; @@ -79,8 +79,8 @@ TEST_CASE("add handler", "[active_poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; zmq::active_poller_t active_poller; - zmq::active_poller_t::handler_t handler; - CHECK_NOTHROW(active_poller.add(socket, ZMQ_POLLIN, handler)); + zmq::active_poller_t::handler_type handler; + CHECK_NOTHROW(active_poller.add(socket, zmq::event_flags::pollin, handler)); } #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) @@ -90,9 +90,9 @@ TEST_CASE("add handler invalid events type", "[active_poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; zmq::active_poller_t active_poller; - zmq::active_poller_t::handler_t handler; + zmq::active_poller_t::handler_type handler; short invalid_events_type = 2 << 10; - CHECK_THROWS_AS(active_poller.add(socket, invalid_events_type, handler), const zmq::error_t&); + CHECK_THROWS_AS(active_poller.add(socket, static_cast(invalid_events_type), handler), const zmq::error_t&); CHECK(active_poller.empty()); CHECK(0u == active_poller.size()); } @@ -103,10 +103,10 @@ TEST_CASE("add handler twice throws", "[active_poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; zmq::active_poller_t active_poller; - zmq::active_poller_t::handler_t handler; - active_poller.add(socket, ZMQ_POLLIN, handler); + zmq::active_poller_t::handler_type handler; + active_poller.add(socket, zmq::event_flags::pollin, handler); /// \todo the actual error code should be checked - CHECK_THROWS_AS(active_poller.add(socket, ZMQ_POLLIN, handler), const zmq::error_t&); + CHECK_THROWS_AS(active_poller.add(socket, zmq::event_flags::pollin, handler), const zmq::error_t&); } TEST_CASE("wait with no handlers throws", "[active_poller]") @@ -130,7 +130,7 @@ TEST_CASE("remove registered empty", "[active_poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; zmq::active_poller_t active_poller; - active_poller.add(socket, ZMQ_POLLIN, zmq::active_poller_t::handler_t{}); + active_poller.add(socket, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{}); CHECK_NOTHROW(active_poller.remove(socket)); } @@ -139,7 +139,7 @@ TEST_CASE("remove registered non empty", "[active_poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; zmq::active_poller_t active_poller; - active_poller.add(socket, ZMQ_POLLIN, [](short) {}); + active_poller.add(socket, zmq::event_flags::pollin, [](zmq::event_flags) {}); CHECK_NOTHROW(active_poller.remove(socket)); } @@ -147,9 +147,9 @@ namespace { struct server_client_setup : common_server_client_setup { - zmq::active_poller_t::handler_t handler = [&](short e) { events = e; }; + zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e) { events = e; }; - short events = 0; + zmq::event_flags events = zmq::event_flags::none; }; } @@ -161,11 +161,11 @@ TEST_CASE("poll basic", "[active_poller]") zmq::active_poller_t active_poller; bool message_received = false; - zmq::active_poller_t::handler_t handler = [&message_received](short events) { - CHECK(0 != (events & ZMQ_POLLIN)); + zmq::active_poller_t::handler_type handler = [&message_received](zmq::event_flags events) { + CHECK(zmq::event_flags::none != (events & zmq::event_flags::pollin)); message_received = true; }; - CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN, handler)); + CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler)); CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1})); CHECK(message_received); } @@ -180,33 +180,33 @@ TEST_CASE("client server", "[active_poller]") // Setup active_poller zmq::active_poller_t active_poller; - short events; - zmq::active_poller_t::handler_t handler = [&](short e) { - if (0 != (e & ZMQ_POLLIN)) { + zmq::event_flags events; + zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e) { + if (zmq::event_flags::none != (e & zmq::event_flags::pollin)) { zmq::message_t zmq_msg; CHECK_NOTHROW(s.server.recv(zmq_msg)); // get message std::string recv_msg(zmq_msg.data(), zmq_msg.size()); CHECK(send_msg == recv_msg); - } else if (0 != (e & ~ZMQ_POLLOUT)) { - INFO("Unexpected event type " << events); + } else if (zmq::event_flags::none != (e & ~zmq::event_flags::pollout)) { + INFO("Unexpected event type " << static_cast(events)); REQUIRE(false); } events = e; }; - CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN, handler)); + CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler)); // client sends message CHECK_NOTHROW(s.client.send(zmq::message_t{send_msg}, zmq::send_flags::none)); CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1})); - CHECK(events == ZMQ_POLLIN); + CHECK(events == zmq::event_flags::pollin); // Re-add server socket with pollout flag CHECK_NOTHROW(active_poller.remove(s.server)); - CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN | ZMQ_POLLOUT, handler)); + CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin | zmq::event_flags::pollout, handler)); CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1})); - CHECK(events == ZMQ_POLLOUT); + CHECK(events == zmq::event_flags::pollout); } TEST_CASE("add invalid socket throws", "[active_poller]") @@ -215,7 +215,7 @@ TEST_CASE("add invalid socket throws", "[active_poller]") zmq::active_poller_t active_poller; zmq::socket_t a{context, zmq::socket_type::router}; zmq::socket_t b{std::move(a)}; - CHECK_THROWS_AS(active_poller.add(a, ZMQ_POLLIN, zmq::active_poller_t::handler_t{}), + CHECK_THROWS_AS(active_poller.add(a, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{}), const zmq::error_t&); } @@ -225,7 +225,7 @@ TEST_CASE("remove invalid socket throws", "[active_poller]") zmq::socket_t socket{context, zmq::socket_type::router}; zmq::active_poller_t active_poller; CHECK_NOTHROW( - active_poller.add(socket, ZMQ_POLLIN, zmq::active_poller_t::handler_t{})); + active_poller.add(socket, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{})); CHECK(1u == active_poller.size()); std::vector sockets; sockets.emplace_back(std::move(socket)); @@ -238,8 +238,8 @@ TEST_CASE("wait on added empty handler", "[active_poller]") server_client_setup s; CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); zmq::active_poller_t active_poller; - zmq::active_poller_t::handler_t handler; - CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN, handler)); + zmq::active_poller_t::handler_type handler; + CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler)); CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{-1})); } @@ -248,7 +248,7 @@ TEST_CASE("modify empty throws", "[active_poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::push}; zmq::active_poller_t active_poller; - CHECK_THROWS_AS(active_poller.modify(socket, ZMQ_POLLIN), const zmq::error_t&); + CHECK_THROWS_AS(active_poller.modify(socket, zmq::event_flags::pollin), const zmq::error_t&); } TEST_CASE("modify invalid socket throws", "[active_poller]") @@ -257,7 +257,7 @@ TEST_CASE("modify invalid socket throws", "[active_poller]") zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t b{std::move(a)}; zmq::active_poller_t active_poller; - CHECK_THROWS_AS(active_poller.modify(a, ZMQ_POLLIN), const zmq::error_t&); + CHECK_THROWS_AS(active_poller.modify(a, zmq::event_flags::pollin), const zmq::error_t&); } TEST_CASE("modify not added throws", "[active_poller]") @@ -267,8 +267,8 @@ TEST_CASE("modify not added throws", "[active_poller]") zmq::socket_t b{context, zmq::socket_type::push}; zmq::active_poller_t active_poller; CHECK_NOTHROW( - active_poller.add(a, ZMQ_POLLIN, zmq::active_poller_t::handler_t{})); - CHECK_THROWS_AS(active_poller.modify(b, ZMQ_POLLIN), const zmq::error_t&); + active_poller.add(a, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{})); + CHECK_THROWS_AS(active_poller.modify(b, zmq::event_flags::pollin), const zmq::error_t&); } TEST_CASE("modify simple", "[active_poller]") @@ -277,8 +277,8 @@ TEST_CASE("modify simple", "[active_poller]") zmq::socket_t a{context, zmq::socket_type::push}; zmq::active_poller_t active_poller; CHECK_NOTHROW( - active_poller.add(a, ZMQ_POLLIN, zmq::active_poller_t::handler_t{})); - CHECK_NOTHROW(active_poller.modify(a, ZMQ_POLLIN | ZMQ_POLLOUT)); + active_poller.add(a, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{})); + CHECK_NOTHROW(active_poller.modify(a, zmq::event_flags::pollin | zmq::event_flags::pollout)); } TEST_CASE("poll client server", "[active_poller]") @@ -288,19 +288,19 @@ TEST_CASE("poll client server", "[active_poller]") // Setup active_poller zmq::active_poller_t active_poller; - CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN, s.handler)); + CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, s.handler)); // client sends message CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); // wait for message and verify events CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{500})); - CHECK(s.events == ZMQ_POLLIN); + CHECK(s.events == zmq::event_flags::pollin); // Modify server socket with pollout flag - CHECK_NOTHROW(active_poller.modify(s.server, ZMQ_POLLIN | ZMQ_POLLOUT)); + CHECK_NOTHROW(active_poller.modify(s.server, zmq::event_flags::pollin | zmq::event_flags::pollout)); CHECK(1 == active_poller.wait(std::chrono::milliseconds{500})); - CHECK(s.events == (ZMQ_POLLIN | ZMQ_POLLOUT)); + CHECK(s.events == (zmq::event_flags::pollin | zmq::event_flags::pollout)); } TEST_CASE("wait one return", "[active_poller]") @@ -313,7 +313,7 @@ TEST_CASE("wait one return", "[active_poller]") // Setup active_poller zmq::active_poller_t active_poller; CHECK_NOTHROW( - active_poller.add(s.server, ZMQ_POLLIN, [&count](short) { ++count; })); + active_poller.add(s.server, zmq::event_flags::pollin, [&count](zmq::event_flags) { ++count; })); // client sends message CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); @@ -328,8 +328,8 @@ TEST_CASE("wait on move constructed active_poller", "[active_poller]") server_client_setup s; CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); zmq::active_poller_t a; - zmq::active_poller_t::handler_t handler; - CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, handler)); + zmq::active_poller_t::handler_type handler; + CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, handler)); zmq::active_poller_t b{std::move(a)}; CHECK(1u == b.size()); /// \todo the actual error code should be checked @@ -342,8 +342,8 @@ TEST_CASE("wait on move assigned active_poller", "[active_poller]") server_client_setup s; CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); zmq::active_poller_t a; - zmq::active_poller_t::handler_t handler; - CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, handler)); + zmq::active_poller_t::handler_type handler; + CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, handler)); zmq::active_poller_t b; b = {std::move(a)}; CHECK(1u == b.size()); @@ -359,7 +359,7 @@ TEST_CASE("received on move constructed active_poller", "[active_poller]") int count = 0; // Setup active_poller a zmq::active_poller_t a; - CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, [&count](short) { ++count; })); + CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, [&count](zmq::event_flags) { ++count; })); // client sends message CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); // wait for message and verify it is received @@ -389,8 +389,8 @@ TEST_CASE("remove from handler", "[active_poller]") int count = 0; for (size_t i = 0; i < ITER_NO; ++i) { CHECK_NOTHROW( - active_poller.add(setup_list[i].server, ZMQ_POLLIN, [&, i](short events) { - CHECK(events == ZMQ_POLLIN); + active_poller.add(setup_list[i].server, zmq::event_flags::pollin, [&, i](zmq::event_flags events) { + CHECK(events == zmq::event_flags::pollin); active_poller.remove(setup_list[ITER_NO - i - 1].server); CHECK((ITER_NO - i - 1) == active_poller.size()); })); diff --git a/tests/poller.cpp b/tests/poller.cpp index dfe422b..efb14f5 100644 --- a/tests/poller.cpp +++ b/tests/poller.cpp @@ -1,25 +1,33 @@ #include "testutil.hpp" -#if defined(ZMQ_CPP11) && defined(ZMQ_BUILD_DRAFT_API) +#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) #include #include -#if (__cplusplus >= 201703L) -static_assert(std::is_nothrow_swappable>::value, - "poller_t should be nothrow swappable"); +#ifdef ZMQ_CPP17 +static_assert(std::is_nothrow_swappable_v>); #endif - -TEST_CASE("poller create destroy", "[poller]") -{ - zmq::poller_t<> poller; -} +static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<>), ""); +static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event), ""); +static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event), ""); +static_assert(alignof(zmq_poller_event_t) == alignof(zmq::poller_event<>), ""); +static_assert(alignof(zmq_poller_event_t) == alignof(zmq::poller_event), ""); static_assert(!std::is_copy_constructible>::value, "poller_t should not be copy-constructible"); static_assert(!std::is_copy_assignable>::value, "poller_t should not be copy-assignable"); +TEST_CASE("poller create destroy", "[poller]") +{ + zmq::poller_t<> a; +#ifdef ZMQ_CPP17 // CTAD + zmq::poller_t b; + zmq::poller_event e; +#endif +} + TEST_CASE("poller move construct empty", "[poller]") { zmq::poller_t<> a; @@ -47,7 +55,7 @@ TEST_CASE("poller move construct non empty", "[poller]") zmq::socket_t socket{context, zmq::socket_type::router}; zmq::poller_t<> a; - a.add(socket, ZMQ_POLLIN, nullptr); + a.add(socket, zmq::event_flags::pollin); zmq::poller_t<> b = std::move(a); } @@ -57,7 +65,7 @@ TEST_CASE("poller move assign non empty", "[poller]") zmq::socket_t socket{context, zmq::socket_type::router}; zmq::poller_t<> a; - a.add(socket, ZMQ_POLLIN, nullptr); + a.add(socket, zmq::event_flags::pollin); zmq::poller_t<> b; b = std::move(a); } @@ -66,17 +74,17 @@ TEST_CASE("poller add nullptr", "[poller]") { zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; - zmq::poller_t<> poller; - CHECK_NOTHROW(poller.add(socket, ZMQ_POLLIN, nullptr)); + zmq::poller_t poller; + CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin, nullptr)); } TEST_CASE("poller add non nullptr", "[poller]") { zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; - zmq::poller_t<> poller; + zmq::poller_t poller; int i; - CHECK_NOTHROW(poller.add(socket, ZMQ_POLLIN, &i)); + CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin, &i)); } #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) @@ -87,7 +95,7 @@ TEST_CASE("poller add handler invalid events type", "[poller]") zmq::socket_t socket{context, zmq::socket_type::router}; zmq::poller_t<> poller; short invalid_events_type = 2 << 10; - CHECK_THROWS_AS(poller.add(socket, invalid_events_type, nullptr), const zmq::error_t&); + CHECK_THROWS_AS(poller.add(socket, static_cast(invalid_events_type)), const zmq::error_t&); } #endif @@ -96,15 +104,15 @@ TEST_CASE("poller add handler twice throws", "[poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; zmq::poller_t<> poller; - poller.add(socket, ZMQ_POLLIN, nullptr); + poller.add(socket, zmq::event_flags::pollin); /// \todo the actual error code should be checked - CHECK_THROWS_AS(poller.add(socket, ZMQ_POLLIN, nullptr), const zmq::error_t&); + CHECK_THROWS_AS(poller.add(socket, zmq::event_flags::pollin), const zmq::error_t&); } TEST_CASE("poller wait with no handlers throws", "[poller]") { zmq::poller_t<> poller; - std::vector events; + std::vector> events; /// \todo the actual error code should be checked CHECK_THROWS_AS(poller.wait_all(events, std::chrono::milliseconds{10}), const zmq::error_t&); @@ -124,7 +132,7 @@ TEST_CASE("poller remove registered empty", "[poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; zmq::poller_t<> poller; - poller.add(socket, ZMQ_POLLIN, nullptr); + poller.add(socket, zmq::event_flags::pollin); CHECK_NOTHROW(poller.remove(socket)); } @@ -132,9 +140,9 @@ TEST_CASE("poller remove registered non empty", "[poller]") { zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; - zmq::poller_t<> poller; + zmq::poller_t poller; int empty{}; - poller.add(socket, ZMQ_POLLIN, &empty); + poller.add(socket, zmq::event_flags::pollin, &empty); CHECK_NOTHROW(poller.remove(socket)); } @@ -145,9 +153,9 @@ TEST_CASE("poller poll basic", "[poller]") CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); zmq::poller_t poller; - std::vector events{1}; + std::vector> events{1}; int i = 0; - CHECK_NOTHROW(poller.add(s.server, ZMQ_POLLIN, &i)); + CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin, &i)); CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{-1})); CHECK(s.server == events[0].socket); CHECK(&i == events[0].user_data); @@ -159,7 +167,7 @@ TEST_CASE("poller add invalid socket throws", "[poller]") zmq::poller_t<> poller; zmq::socket_t a{context, zmq::socket_type::router}; zmq::socket_t b{std::move(a)}; - CHECK_THROWS_AS(poller.add(a, ZMQ_POLLIN, nullptr), const zmq::error_t&); + CHECK_THROWS_AS(poller.add(a, zmq::event_flags::pollin), const zmq::error_t&); } TEST_CASE("poller remove invalid socket throws", "[poller]") @@ -167,7 +175,7 @@ TEST_CASE("poller remove invalid socket throws", "[poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::router}; zmq::poller_t<> poller; - CHECK_NOTHROW(poller.add(socket, ZMQ_POLLIN, nullptr)); + CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin)); std::vector sockets; sockets.emplace_back(std::move(socket)); CHECK_THROWS_AS(poller.remove(socket), const zmq::error_t&); @@ -179,7 +187,7 @@ TEST_CASE("poller modify empty throws", "[poller]") zmq::context_t context; zmq::socket_t socket{context, zmq::socket_type::push}; zmq::poller_t<> poller; - CHECK_THROWS_AS(poller.modify(socket, ZMQ_POLLIN), const zmq::error_t&); + CHECK_THROWS_AS(poller.modify(socket, zmq::event_flags::pollin), const zmq::error_t&); } TEST_CASE("poller modify invalid socket throws", "[poller]") @@ -188,7 +196,7 @@ TEST_CASE("poller modify invalid socket throws", "[poller]") zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t b{std::move(a)}; zmq::poller_t<> poller; - CHECK_THROWS_AS(poller.modify(a, ZMQ_POLLIN), const zmq::error_t&); + CHECK_THROWS_AS(poller.modify(a, zmq::event_flags::pollin), const zmq::error_t&); } TEST_CASE("poller modify not added throws", "[poller]") @@ -197,8 +205,8 @@ TEST_CASE("poller modify not added throws", "[poller]") zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t b{context, zmq::socket_type::push}; zmq::poller_t<> poller; - CHECK_NOTHROW(poller.add(a, ZMQ_POLLIN, nullptr)); - CHECK_THROWS_AS(poller.modify(b, ZMQ_POLLIN), const zmq::error_t&); + CHECK_NOTHROW(poller.add(a, zmq::event_flags::pollin)); + CHECK_THROWS_AS(poller.modify(b, zmq::event_flags::pollin), const zmq::error_t&); } TEST_CASE("poller modify simple", "[poller]") @@ -206,8 +214,8 @@ TEST_CASE("poller modify simple", "[poller]") zmq::context_t context; zmq::socket_t a{context, zmq::socket_type::push}; zmq::poller_t<> poller; - CHECK_NOTHROW(poller.add(a, ZMQ_POLLIN, nullptr)); - CHECK_NOTHROW(poller.modify(a, ZMQ_POLLIN | ZMQ_POLLOUT)); + CHECK_NOTHROW(poller.add(a, zmq::event_flags::pollin)); + CHECK_NOTHROW(poller.modify(a, zmq::event_flags::pollin | zmq::event_flags::pollout)); } TEST_CASE("poller poll client server", "[poller]") @@ -216,21 +224,21 @@ TEST_CASE("poller poll client server", "[poller]") common_server_client_setup s; // Setup poller - zmq::poller_t<> poller; - CHECK_NOTHROW(poller.add(s.server, ZMQ_POLLIN, s.server)); + zmq::poller_t poller; + CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin, &s.server)); // client sends message CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); // wait for message and verify events - std::vector events(1); + std::vector> events(1); CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500})); - CHECK(ZMQ_POLLIN == events[0].events); + CHECK(zmq::event_flags::pollin == events[0].events); // Modify server socket with pollout flag - CHECK_NOTHROW(poller.modify(s.server, ZMQ_POLLIN | ZMQ_POLLOUT)); + CHECK_NOTHROW(poller.modify(s.server, zmq::event_flags::pollin | zmq::event_flags::pollout)); CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500})); - CHECK((ZMQ_POLLIN | ZMQ_POLLOUT) == events[0].events); + CHECK((zmq::event_flags::pollin | zmq::event_flags::pollout) == events[0].events); } TEST_CASE("poller wait one return", "[poller]") @@ -240,13 +248,13 @@ TEST_CASE("poller wait one return", "[poller]") // Setup poller zmq::poller_t<> poller; - CHECK_NOTHROW(poller.add(s.server, ZMQ_POLLIN, nullptr)); + CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin)); // client sends message CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); // wait for message and verify events - std::vector events(1); + std::vector> events(1); CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500})); } @@ -255,9 +263,9 @@ TEST_CASE("poller wait on move constructed", "[poller]") common_server_client_setup s; CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); zmq::poller_t<> a; - CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, nullptr)); + CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin)); zmq::poller_t<> b{std::move(a)}; - std::vector events(1); + std::vector> events(1); /// \todo the actual error code should be checked CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}), const zmq::error_t&); CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1})); @@ -268,11 +276,11 @@ TEST_CASE("poller wait on move assigned", "[poller]") common_server_client_setup s; CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); zmq::poller_t<> a; - CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, nullptr)); + CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin)); zmq::poller_t<> b; b = {std::move(a)}; /// \todo the TEST_CASE error code should be checked - std::vector events(1); + std::vector> events(1); CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}), const zmq::error_t&); CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1})); } @@ -289,7 +297,7 @@ TEST_CASE("poller remove from handler", "[poller]") // Setup poller zmq::poller_t<> poller; for (size_t i = 0; i < ITER_NO; ++i) { - CHECK_NOTHROW(poller.add(setup_list[i].server, ZMQ_POLLIN, nullptr)); + CHECK_NOTHROW(poller.add(setup_list[i].server, zmq::event_flags::pollin)); } // Clients send messages for (auto &s : setup_list) { @@ -303,7 +311,7 @@ TEST_CASE("poller remove from handler", "[poller]") } // Fire all handlers in one wait - std::vector events(ITER_NO); + std::vector> events(ITER_NO); CHECK(ITER_NO == poller.wait_all(events, std::chrono::milliseconds{-1})); } diff --git a/zmq.hpp b/zmq.hpp index 1ab9b70..58c688f 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -1823,18 +1823,65 @@ class monitor_t }; #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) -template class poller_t + +// polling events +enum class event_flags : short +{ + none = 0, + pollin = ZMQ_POLLIN, + pollout = ZMQ_POLLOUT, + pollerr = ZMQ_POLLERR, + pollpri = ZMQ_POLLPRI +}; + +constexpr event_flags operator|(event_flags a, event_flags b) noexcept +{ + return static_cast(static_cast(a) | static_cast(b)); +} +constexpr event_flags operator&(event_flags a, event_flags b) noexcept +{ + return static_cast(static_cast(a) & static_cast(b)); +} +constexpr event_flags operator~(event_flags a) noexcept +{ + return static_cast(~static_cast(a)); +} + +struct no_user_data; + +// layout compatible with zmq_poller_event_t +template +struct poller_event +{ + socket_ref socket; +#ifdef _WIN32 + SOCKET fd; +#else + int fd; +#endif + T *user_data; + event_flags events; +}; + +template class poller_t { public: + using event_type = poller_event; + poller_t() = default; - void add(zmq::socket_ref socket, short events, T *user_data) + template< + typename Dummy = void, + typename = + typename std::enable_if::value, Dummy>::type> + void add(zmq::socket_ref socket, event_flags events, T *user_data) { - if (0 - != zmq_poller_add(poller_ptr.get(), socket.handle(), - user_data, events)) { - throw error_t(); - } + add_impl(socket, events, user_data); + } + + void add(zmq::socket_ref socket, event_flags events) + { + add_impl(socket, events, nullptr); } void remove(zmq::socket_ref socket) @@ -1844,21 +1891,23 @@ template class poller_t } } - void modify(zmq::socket_ref socket, short events) + void modify(zmq::socket_ref socket, event_flags events) { if (0 != zmq_poller_modify(poller_ptr.get(), socket.handle(), - events)) { + static_cast(events))) { throw error_t(); } } - size_t wait_all(std::vector &poller_events, + size_t wait_all(std::vector &poller_events, const std::chrono::milliseconds timeout) { - int rc = zmq_poller_wait_all(poller_ptr.get(), poller_events.data(), - static_cast(poller_events.size()), - static_cast(timeout.count())); + int rc = zmq_poller_wait_all( + poller_ptr.get(), + reinterpret_cast(poller_events.data()), + static_cast(poller_events.size()), + static_cast(timeout.count())); if (rc > 0) return static_cast(rc); @@ -1873,18 +1922,30 @@ template class poller_t } private: - std::unique_ptr poller_ptr{ + struct destroy_poller_t + { + void operator()(void *ptr) noexcept + { + int rc = zmq_poller_destroy(&ptr); + ZMQ_ASSERT(rc == 0); + } + }; + + std::unique_ptr poller_ptr{ []() { auto poller_new = zmq_poller_new(); if (poller_new) return poller_new; throw error_t(); - }(), &destroy_poller}; + }()}; - static void destroy_poller(void *ptr) + void add_impl(zmq::socket_ref socket, event_flags events, T *user_data) { - int rc = zmq_poller_destroy(&ptr); - ZMQ_ASSERT(rc == 0); + if (0 + != zmq_poller_add(poller_ptr.get(), socket.handle(), + user_data, static_cast(events))) { + throw error_t(); + } } }; #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) diff --git a/zmq_addon.hpp b/zmq_addon.hpp index 9d62c7a..322cd9d 100644 --- a/zmq_addon.hpp +++ b/zmq_addon.hpp @@ -382,15 +382,15 @@ class active_poller_t active_poller_t(active_poller_t &&src) = default; active_poller_t &operator=(active_poller_t &&src) = default; - using handler_t = std::function; + using handler_type = std::function; - void add(zmq::socket_ref socket, short events, handler_t handler) + void add(zmq::socket_ref socket, event_flags events, handler_type handler) { auto it = decltype(handlers)::iterator{}; auto inserted = bool{}; std::tie(it, inserted) = handlers.emplace(socket, - std::make_shared(std::move(handler))); + std::make_shared(std::move(handler))); try { base_poller.add(socket, events, inserted && *(it->second) ? it->second.get() : nullptr); @@ -412,7 +412,7 @@ class active_poller_t need_rebuild = true; } - void modify(zmq::socket_ref socket, short events) + void modify(zmq::socket_ref socket, event_flags events) { base_poller.modify(socket, events); } @@ -429,26 +429,25 @@ class active_poller_t need_rebuild = false; } const auto count = base_poller.wait_all(poller_events, timeout); - std::for_each(poller_events.begin(), poller_events.begin() + count, - [](zmq_poller_event_t &event) { - if (event.user_data != NULL) - (*reinterpret_cast(event.user_data))( - event.events); + std::for_each(poller_events.begin(), poller_events.begin() + static_cast(count), + [](decltype(base_poller)::event_type &event) { + if (event.user_data != nullptr) + (*event.user_data)(event.events); }); return count; } - bool empty() const { return handlers.empty(); } + ZMQ_NODISCARD bool empty() const noexcept { return handlers.empty(); } - size_t size() const { return handlers.size(); } + size_t size() const noexcept { return handlers.size(); } private: bool need_rebuild{false}; - poller_t base_poller{}; - std::unordered_map> handlers{}; - std::vector poller_events{}; - std::vector> poller_handlers{}; + poller_t base_poller{}; + std::unordered_map> handlers{}; + std::vector poller_events{}; + std::vector> poller_handlers{}; }; // class active_poller_t #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)