Problem: Type-safety of poller_t and active_poller_t can be improved (#318)

Problem: Type-safety of poller_t and active_poller_t can be improved
This commit is contained in:
Gudmundur Adalsteinsson 2019-05-15 06:35:12 +00:00 committed by Simon Giesecke
parent 6f0fb2a3ea
commit 0458f7d16c
4 changed files with 194 additions and 126 deletions

View File

@ -47,7 +47,7 @@ TEST_CASE("move construct non empty", "[active_poller]")
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t a; zmq::active_poller_t a;
a.add(socket, ZMQ_POLLIN, [](short) {}); a.add(socket, zmq::event_flags::pollin, [](zmq::event_flags) {});
CHECK_FALSE(a.empty()); CHECK_FALSE(a.empty());
CHECK(1u == a.size()); CHECK(1u == a.size());
zmq::active_poller_t b = std::move(a); zmq::active_poller_t b = std::move(a);
@ -63,7 +63,7 @@ TEST_CASE("move assign non empty", "[active_poller]")
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t a; zmq::active_poller_t a;
a.add(socket, ZMQ_POLLIN, [](short) {}); a.add(socket, zmq::event_flags::pollin, [](zmq::event_flags) {});
CHECK_FALSE(a.empty()); CHECK_FALSE(a.empty());
CHECK(1u == a.size()); CHECK(1u == a.size());
zmq::active_poller_t b; zmq::active_poller_t b;
@ -79,8 +79,8 @@ TEST_CASE("add handler", "[active_poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
zmq::active_poller_t::handler_t handler; zmq::active_poller_t::handler_type handler;
CHECK_NOTHROW(active_poller.add(socket, ZMQ_POLLIN, handler)); CHECK_NOTHROW(active_poller.add(socket, zmq::event_flags::pollin, handler));
} }
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0)
@ -90,9 +90,9 @@ TEST_CASE("add handler invalid events type", "[active_poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
zmq::active_poller_t::handler_t handler; zmq::active_poller_t::handler_type handler;
short invalid_events_type = 2 << 10; short invalid_events_type = 2 << 10;
CHECK_THROWS_AS(active_poller.add(socket, invalid_events_type, handler), const zmq::error_t&); CHECK_THROWS_AS(active_poller.add(socket, static_cast<zmq::event_flags>(invalid_events_type), handler), const zmq::error_t&);
CHECK(active_poller.empty()); CHECK(active_poller.empty());
CHECK(0u == active_poller.size()); CHECK(0u == active_poller.size());
} }
@ -103,10 +103,10 @@ TEST_CASE("add handler twice throws", "[active_poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
zmq::active_poller_t::handler_t handler; zmq::active_poller_t::handler_type handler;
active_poller.add(socket, ZMQ_POLLIN, handler); active_poller.add(socket, zmq::event_flags::pollin, handler);
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
CHECK_THROWS_AS(active_poller.add(socket, ZMQ_POLLIN, handler), const zmq::error_t&); CHECK_THROWS_AS(active_poller.add(socket, zmq::event_flags::pollin, handler), const zmq::error_t&);
} }
TEST_CASE("wait with no handlers throws", "[active_poller]") TEST_CASE("wait with no handlers throws", "[active_poller]")
@ -130,7 +130,7 @@ TEST_CASE("remove registered empty", "[active_poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
active_poller.add(socket, ZMQ_POLLIN, zmq::active_poller_t::handler_t{}); active_poller.add(socket, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{});
CHECK_NOTHROW(active_poller.remove(socket)); CHECK_NOTHROW(active_poller.remove(socket));
} }
@ -139,7 +139,7 @@ TEST_CASE("remove registered non empty", "[active_poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
active_poller.add(socket, ZMQ_POLLIN, [](short) {}); active_poller.add(socket, zmq::event_flags::pollin, [](zmq::event_flags) {});
CHECK_NOTHROW(active_poller.remove(socket)); CHECK_NOTHROW(active_poller.remove(socket));
} }
@ -147,9 +147,9 @@ namespace
{ {
struct server_client_setup : common_server_client_setup struct server_client_setup : common_server_client_setup
{ {
zmq::active_poller_t::handler_t handler = [&](short e) { events = e; }; zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e) { events = e; };
short events = 0; zmq::event_flags events = zmq::event_flags::none;
}; };
} }
@ -161,11 +161,11 @@ TEST_CASE("poll basic", "[active_poller]")
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
bool message_received = false; bool message_received = false;
zmq::active_poller_t::handler_t handler = [&message_received](short events) { zmq::active_poller_t::handler_type handler = [&message_received](zmq::event_flags events) {
CHECK(0 != (events & ZMQ_POLLIN)); CHECK(zmq::event_flags::none != (events & zmq::event_flags::pollin));
message_received = true; message_received = true;
}; };
CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN, handler)); CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1})); CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
CHECK(message_received); CHECK(message_received);
} }
@ -180,33 +180,33 @@ TEST_CASE("client server", "[active_poller]")
// Setup active_poller // Setup active_poller
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
short events; zmq::event_flags events;
zmq::active_poller_t::handler_t handler = [&](short e) { zmq::active_poller_t::handler_type handler = [&](zmq::event_flags e) {
if (0 != (e & ZMQ_POLLIN)) { if (zmq::event_flags::none != (e & zmq::event_flags::pollin)) {
zmq::message_t zmq_msg; zmq::message_t zmq_msg;
CHECK_NOTHROW(s.server.recv(zmq_msg)); // get message CHECK_NOTHROW(s.server.recv(zmq_msg)); // get message
std::string recv_msg(zmq_msg.data<char>(), zmq_msg.size()); std::string recv_msg(zmq_msg.data<char>(), zmq_msg.size());
CHECK(send_msg == recv_msg); CHECK(send_msg == recv_msg);
} else if (0 != (e & ~ZMQ_POLLOUT)) { } else if (zmq::event_flags::none != (e & ~zmq::event_flags::pollout)) {
INFO("Unexpected event type " << events); INFO("Unexpected event type " << static_cast<short>(events));
REQUIRE(false); REQUIRE(false);
} }
events = e; events = e;
}; };
CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN, handler)); CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
// client sends message // client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{send_msg}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{send_msg}, zmq::send_flags::none));
CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1})); CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
CHECK(events == ZMQ_POLLIN); CHECK(events == zmq::event_flags::pollin);
// Re-add server socket with pollout flag // Re-add server socket with pollout flag
CHECK_NOTHROW(active_poller.remove(s.server)); CHECK_NOTHROW(active_poller.remove(s.server));
CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN | ZMQ_POLLOUT, handler)); CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin | zmq::event_flags::pollout, handler));
CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1})); CHECK(1 == active_poller.wait(std::chrono::milliseconds{-1}));
CHECK(events == ZMQ_POLLOUT); CHECK(events == zmq::event_flags::pollout);
} }
TEST_CASE("add invalid socket throws", "[active_poller]") TEST_CASE("add invalid socket throws", "[active_poller]")
@ -215,7 +215,7 @@ TEST_CASE("add invalid socket throws", "[active_poller]")
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
zmq::socket_t a{context, zmq::socket_type::router}; zmq::socket_t a{context, zmq::socket_type::router};
zmq::socket_t b{std::move(a)}; zmq::socket_t b{std::move(a)};
CHECK_THROWS_AS(active_poller.add(a, ZMQ_POLLIN, zmq::active_poller_t::handler_t{}), CHECK_THROWS_AS(active_poller.add(a, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{}),
const zmq::error_t&); const zmq::error_t&);
} }
@ -225,7 +225,7 @@ TEST_CASE("remove invalid socket throws", "[active_poller]")
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
CHECK_NOTHROW( CHECK_NOTHROW(
active_poller.add(socket, ZMQ_POLLIN, zmq::active_poller_t::handler_t{})); active_poller.add(socket, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{}));
CHECK(1u == active_poller.size()); CHECK(1u == active_poller.size());
std::vector<zmq::socket_t> sockets; std::vector<zmq::socket_t> sockets;
sockets.emplace_back(std::move(socket)); sockets.emplace_back(std::move(socket));
@ -238,8 +238,8 @@ TEST_CASE("wait on added empty handler", "[active_poller]")
server_client_setup s; server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
zmq::active_poller_t::handler_t handler; zmq::active_poller_t::handler_type handler;
CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN, handler)); CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, handler));
CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{-1})); CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{-1}));
} }
@ -248,7 +248,7 @@ TEST_CASE("modify empty throws", "[active_poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::push}; zmq::socket_t socket{context, zmq::socket_type::push};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
CHECK_THROWS_AS(active_poller.modify(socket, ZMQ_POLLIN), const zmq::error_t&); CHECK_THROWS_AS(active_poller.modify(socket, zmq::event_flags::pollin), const zmq::error_t&);
} }
TEST_CASE("modify invalid socket throws", "[active_poller]") TEST_CASE("modify invalid socket throws", "[active_poller]")
@ -257,7 +257,7 @@ TEST_CASE("modify invalid socket throws", "[active_poller]")
zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t a{context, zmq::socket_type::push};
zmq::socket_t b{std::move(a)}; zmq::socket_t b{std::move(a)};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
CHECK_THROWS_AS(active_poller.modify(a, ZMQ_POLLIN), const zmq::error_t&); CHECK_THROWS_AS(active_poller.modify(a, zmq::event_flags::pollin), const zmq::error_t&);
} }
TEST_CASE("modify not added throws", "[active_poller]") TEST_CASE("modify not added throws", "[active_poller]")
@ -267,8 +267,8 @@ TEST_CASE("modify not added throws", "[active_poller]")
zmq::socket_t b{context, zmq::socket_type::push}; zmq::socket_t b{context, zmq::socket_type::push};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
CHECK_NOTHROW( CHECK_NOTHROW(
active_poller.add(a, ZMQ_POLLIN, zmq::active_poller_t::handler_t{})); active_poller.add(a, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{}));
CHECK_THROWS_AS(active_poller.modify(b, ZMQ_POLLIN), const zmq::error_t&); CHECK_THROWS_AS(active_poller.modify(b, zmq::event_flags::pollin), const zmq::error_t&);
} }
TEST_CASE("modify simple", "[active_poller]") TEST_CASE("modify simple", "[active_poller]")
@ -277,8 +277,8 @@ TEST_CASE("modify simple", "[active_poller]")
zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t a{context, zmq::socket_type::push};
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
CHECK_NOTHROW( CHECK_NOTHROW(
active_poller.add(a, ZMQ_POLLIN, zmq::active_poller_t::handler_t{})); active_poller.add(a, zmq::event_flags::pollin, zmq::active_poller_t::handler_type{}));
CHECK_NOTHROW(active_poller.modify(a, ZMQ_POLLIN | ZMQ_POLLOUT)); CHECK_NOTHROW(active_poller.modify(a, zmq::event_flags::pollin | zmq::event_flags::pollout));
} }
TEST_CASE("poll client server", "[active_poller]") TEST_CASE("poll client server", "[active_poller]")
@ -288,19 +288,19 @@ TEST_CASE("poll client server", "[active_poller]")
// Setup active_poller // Setup active_poller
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
CHECK_NOTHROW(active_poller.add(s.server, ZMQ_POLLIN, s.handler)); CHECK_NOTHROW(active_poller.add(s.server, zmq::event_flags::pollin, s.handler));
// client sends message // client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
// wait for message and verify events // wait for message and verify events
CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{500})); CHECK_NOTHROW(active_poller.wait(std::chrono::milliseconds{500}));
CHECK(s.events == ZMQ_POLLIN); CHECK(s.events == zmq::event_flags::pollin);
// Modify server socket with pollout flag // Modify server socket with pollout flag
CHECK_NOTHROW(active_poller.modify(s.server, ZMQ_POLLIN | ZMQ_POLLOUT)); CHECK_NOTHROW(active_poller.modify(s.server, zmq::event_flags::pollin | zmq::event_flags::pollout));
CHECK(1 == active_poller.wait(std::chrono::milliseconds{500})); CHECK(1 == active_poller.wait(std::chrono::milliseconds{500}));
CHECK(s.events == (ZMQ_POLLIN | ZMQ_POLLOUT)); CHECK(s.events == (zmq::event_flags::pollin | zmq::event_flags::pollout));
} }
TEST_CASE("wait one return", "[active_poller]") TEST_CASE("wait one return", "[active_poller]")
@ -313,7 +313,7 @@ TEST_CASE("wait one return", "[active_poller]")
// Setup active_poller // Setup active_poller
zmq::active_poller_t active_poller; zmq::active_poller_t active_poller;
CHECK_NOTHROW( CHECK_NOTHROW(
active_poller.add(s.server, ZMQ_POLLIN, [&count](short) { ++count; })); active_poller.add(s.server, zmq::event_flags::pollin, [&count](zmq::event_flags) { ++count; }));
// client sends message // client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
@ -328,8 +328,8 @@ TEST_CASE("wait on move constructed active_poller", "[active_poller]")
server_client_setup s; server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
zmq::active_poller_t a; zmq::active_poller_t a;
zmq::active_poller_t::handler_t handler; zmq::active_poller_t::handler_type handler;
CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, handler)); CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, handler));
zmq::active_poller_t b{std::move(a)}; zmq::active_poller_t b{std::move(a)};
CHECK(1u == b.size()); CHECK(1u == b.size());
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
@ -342,8 +342,8 @@ TEST_CASE("wait on move assigned active_poller", "[active_poller]")
server_client_setup s; server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
zmq::active_poller_t a; zmq::active_poller_t a;
zmq::active_poller_t::handler_t handler; zmq::active_poller_t::handler_type handler;
CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, handler)); CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, handler));
zmq::active_poller_t b; zmq::active_poller_t b;
b = {std::move(a)}; b = {std::move(a)};
CHECK(1u == b.size()); CHECK(1u == b.size());
@ -359,7 +359,7 @@ TEST_CASE("received on move constructed active_poller", "[active_poller]")
int count = 0; int count = 0;
// Setup active_poller a // Setup active_poller a
zmq::active_poller_t a; zmq::active_poller_t a;
CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, [&count](short) { ++count; })); CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin, [&count](zmq::event_flags) { ++count; }));
// client sends message // client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
// wait for message and verify it is received // wait for message and verify it is received
@ -389,8 +389,8 @@ TEST_CASE("remove from handler", "[active_poller]")
int count = 0; int count = 0;
for (size_t i = 0; i < ITER_NO; ++i) { for (size_t i = 0; i < ITER_NO; ++i) {
CHECK_NOTHROW( CHECK_NOTHROW(
active_poller.add(setup_list[i].server, ZMQ_POLLIN, [&, i](short events) { active_poller.add(setup_list[i].server, zmq::event_flags::pollin, [&, i](zmq::event_flags events) {
CHECK(events == ZMQ_POLLIN); CHECK(events == zmq::event_flags::pollin);
active_poller.remove(setup_list[ITER_NO - i - 1].server); active_poller.remove(setup_list[ITER_NO - i - 1].server);
CHECK((ITER_NO - i - 1) == active_poller.size()); CHECK((ITER_NO - i - 1) == active_poller.size());
})); }));

View File

@ -1,25 +1,33 @@
#include "testutil.hpp" #include "testutil.hpp"
#if defined(ZMQ_CPP11) && defined(ZMQ_BUILD_DRAFT_API) #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
#include <array> #include <array>
#include <memory> #include <memory>
#if (__cplusplus >= 201703L) #ifdef ZMQ_CPP17
static_assert(std::is_nothrow_swappable<zmq::poller_t<>>::value, static_assert(std::is_nothrow_swappable_v<zmq::poller_t<>>);
"poller_t should be nothrow swappable");
#endif #endif
static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<>), "");
TEST_CASE("poller create destroy", "[poller]") static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<zmq::no_user_data>), "");
{ static_assert(sizeof(zmq_poller_event_t) == sizeof(zmq::poller_event<int>), "");
zmq::poller_t<> poller; static_assert(alignof(zmq_poller_event_t) == alignof(zmq::poller_event<>), "");
} static_assert(alignof(zmq_poller_event_t) == alignof(zmq::poller_event<int>), "");
static_assert(!std::is_copy_constructible<zmq::poller_t<>>::value, static_assert(!std::is_copy_constructible<zmq::poller_t<>>::value,
"poller_t should not be copy-constructible"); "poller_t should not be copy-constructible");
static_assert(!std::is_copy_assignable<zmq::poller_t<>>::value, static_assert(!std::is_copy_assignable<zmq::poller_t<>>::value,
"poller_t should not be copy-assignable"); "poller_t should not be copy-assignable");
TEST_CASE("poller create destroy", "[poller]")
{
zmq::poller_t<> a;
#ifdef ZMQ_CPP17 // CTAD
zmq::poller_t b;
zmq::poller_event e;
#endif
}
TEST_CASE("poller move construct empty", "[poller]") TEST_CASE("poller move construct empty", "[poller]")
{ {
zmq::poller_t<> a; zmq::poller_t<> a;
@ -47,7 +55,7 @@ TEST_CASE("poller move construct non empty", "[poller]")
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> a; zmq::poller_t<> a;
a.add(socket, ZMQ_POLLIN, nullptr); a.add(socket, zmq::event_flags::pollin);
zmq::poller_t<> b = std::move(a); zmq::poller_t<> b = std::move(a);
} }
@ -57,7 +65,7 @@ TEST_CASE("poller move assign non empty", "[poller]")
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> a; zmq::poller_t<> a;
a.add(socket, ZMQ_POLLIN, nullptr); a.add(socket, zmq::event_flags::pollin);
zmq::poller_t<> b; zmq::poller_t<> b;
b = std::move(a); b = std::move(a);
} }
@ -66,17 +74,17 @@ TEST_CASE("poller add nullptr", "[poller]")
{ {
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller; zmq::poller_t<void> poller;
CHECK_NOTHROW(poller.add(socket, ZMQ_POLLIN, nullptr)); CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin, nullptr));
} }
TEST_CASE("poller add non nullptr", "[poller]") TEST_CASE("poller add non nullptr", "[poller]")
{ {
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller; zmq::poller_t<int> poller;
int i; int i;
CHECK_NOTHROW(poller.add(socket, ZMQ_POLLIN, &i)); CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin, &i));
} }
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) #if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0)
@ -87,7 +95,7 @@ TEST_CASE("poller add handler invalid events type", "[poller]")
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller; zmq::poller_t<> poller;
short invalid_events_type = 2 << 10; short invalid_events_type = 2 << 10;
CHECK_THROWS_AS(poller.add(socket, invalid_events_type, nullptr), const zmq::error_t&); CHECK_THROWS_AS(poller.add(socket, static_cast<zmq::event_flags>(invalid_events_type)), const zmq::error_t&);
} }
#endif #endif
@ -96,15 +104,15 @@ TEST_CASE("poller add handler twice throws", "[poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller; zmq::poller_t<> poller;
poller.add(socket, ZMQ_POLLIN, nullptr); poller.add(socket, zmq::event_flags::pollin);
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
CHECK_THROWS_AS(poller.add(socket, ZMQ_POLLIN, nullptr), const zmq::error_t&); CHECK_THROWS_AS(poller.add(socket, zmq::event_flags::pollin), const zmq::error_t&);
} }
TEST_CASE("poller wait with no handlers throws", "[poller]") TEST_CASE("poller wait with no handlers throws", "[poller]")
{ {
zmq::poller_t<> poller; zmq::poller_t<> poller;
std::vector<zmq_poller_event_t> events; std::vector<zmq::poller_event<>> events;
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
CHECK_THROWS_AS(poller.wait_all(events, std::chrono::milliseconds{10}), CHECK_THROWS_AS(poller.wait_all(events, std::chrono::milliseconds{10}),
const zmq::error_t&); const zmq::error_t&);
@ -124,7 +132,7 @@ TEST_CASE("poller remove registered empty", "[poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller; zmq::poller_t<> poller;
poller.add(socket, ZMQ_POLLIN, nullptr); poller.add(socket, zmq::event_flags::pollin);
CHECK_NOTHROW(poller.remove(socket)); CHECK_NOTHROW(poller.remove(socket));
} }
@ -132,9 +140,9 @@ TEST_CASE("poller remove registered non empty", "[poller]")
{ {
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller; zmq::poller_t<int> poller;
int empty{}; int empty{};
poller.add(socket, ZMQ_POLLIN, &empty); poller.add(socket, zmq::event_flags::pollin, &empty);
CHECK_NOTHROW(poller.remove(socket)); CHECK_NOTHROW(poller.remove(socket));
} }
@ -145,9 +153,9 @@ TEST_CASE("poller poll basic", "[poller]")
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
zmq::poller_t<int> poller; zmq::poller_t<int> poller;
std::vector<zmq_poller_event_t> events{1}; std::vector<zmq::poller_event<int>> events{1};
int i = 0; int i = 0;
CHECK_NOTHROW(poller.add(s.server, ZMQ_POLLIN, &i)); CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin, &i));
CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{-1})); CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{-1}));
CHECK(s.server == events[0].socket); CHECK(s.server == events[0].socket);
CHECK(&i == events[0].user_data); CHECK(&i == events[0].user_data);
@ -159,7 +167,7 @@ TEST_CASE("poller add invalid socket throws", "[poller]")
zmq::poller_t<> poller; zmq::poller_t<> poller;
zmq::socket_t a{context, zmq::socket_type::router}; zmq::socket_t a{context, zmq::socket_type::router};
zmq::socket_t b{std::move(a)}; zmq::socket_t b{std::move(a)};
CHECK_THROWS_AS(poller.add(a, ZMQ_POLLIN, nullptr), const zmq::error_t&); CHECK_THROWS_AS(poller.add(a, zmq::event_flags::pollin), const zmq::error_t&);
} }
TEST_CASE("poller remove invalid socket throws", "[poller]") TEST_CASE("poller remove invalid socket throws", "[poller]")
@ -167,7 +175,7 @@ TEST_CASE("poller remove invalid socket throws", "[poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::router}; zmq::socket_t socket{context, zmq::socket_type::router};
zmq::poller_t<> poller; zmq::poller_t<> poller;
CHECK_NOTHROW(poller.add(socket, ZMQ_POLLIN, nullptr)); CHECK_NOTHROW(poller.add(socket, zmq::event_flags::pollin));
std::vector<zmq::socket_t> sockets; std::vector<zmq::socket_t> sockets;
sockets.emplace_back(std::move(socket)); sockets.emplace_back(std::move(socket));
CHECK_THROWS_AS(poller.remove(socket), const zmq::error_t&); CHECK_THROWS_AS(poller.remove(socket), const zmq::error_t&);
@ -179,7 +187,7 @@ TEST_CASE("poller modify empty throws", "[poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t socket{context, zmq::socket_type::push}; zmq::socket_t socket{context, zmq::socket_type::push};
zmq::poller_t<> poller; zmq::poller_t<> poller;
CHECK_THROWS_AS(poller.modify(socket, ZMQ_POLLIN), const zmq::error_t&); CHECK_THROWS_AS(poller.modify(socket, zmq::event_flags::pollin), const zmq::error_t&);
} }
TEST_CASE("poller modify invalid socket throws", "[poller]") TEST_CASE("poller modify invalid socket throws", "[poller]")
@ -188,7 +196,7 @@ TEST_CASE("poller modify invalid socket throws", "[poller]")
zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t a{context, zmq::socket_type::push};
zmq::socket_t b{std::move(a)}; zmq::socket_t b{std::move(a)};
zmq::poller_t<> poller; zmq::poller_t<> poller;
CHECK_THROWS_AS(poller.modify(a, ZMQ_POLLIN), const zmq::error_t&); CHECK_THROWS_AS(poller.modify(a, zmq::event_flags::pollin), const zmq::error_t&);
} }
TEST_CASE("poller modify not added throws", "[poller]") TEST_CASE("poller modify not added throws", "[poller]")
@ -197,8 +205,8 @@ TEST_CASE("poller modify not added throws", "[poller]")
zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t a{context, zmq::socket_type::push};
zmq::socket_t b{context, zmq::socket_type::push}; zmq::socket_t b{context, zmq::socket_type::push};
zmq::poller_t<> poller; zmq::poller_t<> poller;
CHECK_NOTHROW(poller.add(a, ZMQ_POLLIN, nullptr)); CHECK_NOTHROW(poller.add(a, zmq::event_flags::pollin));
CHECK_THROWS_AS(poller.modify(b, ZMQ_POLLIN), const zmq::error_t&); CHECK_THROWS_AS(poller.modify(b, zmq::event_flags::pollin), const zmq::error_t&);
} }
TEST_CASE("poller modify simple", "[poller]") TEST_CASE("poller modify simple", "[poller]")
@ -206,8 +214,8 @@ TEST_CASE("poller modify simple", "[poller]")
zmq::context_t context; zmq::context_t context;
zmq::socket_t a{context, zmq::socket_type::push}; zmq::socket_t a{context, zmq::socket_type::push};
zmq::poller_t<> poller; zmq::poller_t<> poller;
CHECK_NOTHROW(poller.add(a, ZMQ_POLLIN, nullptr)); CHECK_NOTHROW(poller.add(a, zmq::event_flags::pollin));
CHECK_NOTHROW(poller.modify(a, ZMQ_POLLIN | ZMQ_POLLOUT)); CHECK_NOTHROW(poller.modify(a, zmq::event_flags::pollin | zmq::event_flags::pollout));
} }
TEST_CASE("poller poll client server", "[poller]") TEST_CASE("poller poll client server", "[poller]")
@ -216,21 +224,21 @@ TEST_CASE("poller poll client server", "[poller]")
common_server_client_setup s; common_server_client_setup s;
// Setup poller // Setup poller
zmq::poller_t<> poller; zmq::poller_t<zmq::socket_t> poller;
CHECK_NOTHROW(poller.add(s.server, ZMQ_POLLIN, s.server)); CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin, &s.server));
// client sends message // client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
// wait for message and verify events // wait for message and verify events
std::vector<zmq_poller_event_t> events(1); std::vector<zmq::poller_event<zmq::socket_t>> events(1);
CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500})); CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500}));
CHECK(ZMQ_POLLIN == events[0].events); CHECK(zmq::event_flags::pollin == events[0].events);
// Modify server socket with pollout flag // Modify server socket with pollout flag
CHECK_NOTHROW(poller.modify(s.server, ZMQ_POLLIN | ZMQ_POLLOUT)); CHECK_NOTHROW(poller.modify(s.server, zmq::event_flags::pollin | zmq::event_flags::pollout));
CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500})); CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500}));
CHECK((ZMQ_POLLIN | ZMQ_POLLOUT) == events[0].events); CHECK((zmq::event_flags::pollin | zmq::event_flags::pollout) == events[0].events);
} }
TEST_CASE("poller wait one return", "[poller]") TEST_CASE("poller wait one return", "[poller]")
@ -240,13 +248,13 @@ TEST_CASE("poller wait one return", "[poller]")
// Setup poller // Setup poller
zmq::poller_t<> poller; zmq::poller_t<> poller;
CHECK_NOTHROW(poller.add(s.server, ZMQ_POLLIN, nullptr)); CHECK_NOTHROW(poller.add(s.server, zmq::event_flags::pollin));
// client sends message // client sends message
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
// wait for message and verify events // wait for message and verify events
std::vector<zmq_poller_event_t> events(1); std::vector<zmq::poller_event<>> events(1);
CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500})); CHECK(1 == poller.wait_all(events, std::chrono::milliseconds{500}));
} }
@ -255,9 +263,9 @@ TEST_CASE("poller wait on move constructed", "[poller]")
common_server_client_setup s; common_server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
zmq::poller_t<> a; zmq::poller_t<> a;
CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, nullptr)); CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin));
zmq::poller_t<> b{std::move(a)}; zmq::poller_t<> b{std::move(a)};
std::vector<zmq_poller_event_t> events(1); std::vector<zmq::poller_event<>> events(1);
/// \todo the actual error code should be checked /// \todo the actual error code should be checked
CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}), const zmq::error_t&); CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}), const zmq::error_t&);
CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1})); CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1}));
@ -268,11 +276,11 @@ TEST_CASE("poller wait on move assigned", "[poller]")
common_server_client_setup s; common_server_client_setup s;
CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none)); CHECK_NOTHROW(s.client.send(zmq::message_t{"Hi"}, zmq::send_flags::none));
zmq::poller_t<> a; zmq::poller_t<> a;
CHECK_NOTHROW(a.add(s.server, ZMQ_POLLIN, nullptr)); CHECK_NOTHROW(a.add(s.server, zmq::event_flags::pollin));
zmq::poller_t<> b; zmq::poller_t<> b;
b = {std::move(a)}; b = {std::move(a)};
/// \todo the TEST_CASE error code should be checked /// \todo the TEST_CASE error code should be checked
std::vector<zmq_poller_event_t> events(1); std::vector<zmq::poller_event<>> events(1);
CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}), const zmq::error_t&); CHECK_THROWS_AS(a.wait_all(events, std::chrono::milliseconds{10}), const zmq::error_t&);
CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1})); CHECK(1 == b.wait_all(events, std::chrono::milliseconds{-1}));
} }
@ -289,7 +297,7 @@ TEST_CASE("poller remove from handler", "[poller]")
// Setup poller // Setup poller
zmq::poller_t<> poller; zmq::poller_t<> poller;
for (size_t i = 0; i < ITER_NO; ++i) { for (size_t i = 0; i < ITER_NO; ++i) {
CHECK_NOTHROW(poller.add(setup_list[i].server, ZMQ_POLLIN, nullptr)); CHECK_NOTHROW(poller.add(setup_list[i].server, zmq::event_flags::pollin));
} }
// Clients send messages // Clients send messages
for (auto &s : setup_list) { for (auto &s : setup_list) {
@ -303,7 +311,7 @@ TEST_CASE("poller remove from handler", "[poller]")
} }
// Fire all handlers in one wait // Fire all handlers in one wait
std::vector<zmq_poller_event_t> events(ITER_NO); std::vector<zmq::poller_event<>> events(ITER_NO);
CHECK(ITER_NO == poller.wait_all(events, std::chrono::milliseconds{-1})); CHECK(ITER_NO == poller.wait_all(events, std::chrono::milliseconds{-1}));
} }

97
zmq.hpp
View File

@ -1823,18 +1823,65 @@ class monitor_t
}; };
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
template<typename T = void> class poller_t
// polling events
enum class event_flags : short
{
none = 0,
pollin = ZMQ_POLLIN,
pollout = ZMQ_POLLOUT,
pollerr = ZMQ_POLLERR,
pollpri = ZMQ_POLLPRI
};
constexpr event_flags operator|(event_flags a, event_flags b) noexcept
{
return static_cast<event_flags>(static_cast<short>(a) | static_cast<short>(b));
}
constexpr event_flags operator&(event_flags a, event_flags b) noexcept
{
return static_cast<event_flags>(static_cast<short>(a) & static_cast<short>(b));
}
constexpr event_flags operator~(event_flags a) noexcept
{
return static_cast<event_flags>(~static_cast<short>(a));
}
struct no_user_data;
// layout compatible with zmq_poller_event_t
template<class T = no_user_data>
struct poller_event
{
socket_ref socket;
#ifdef _WIN32
SOCKET fd;
#else
int fd;
#endif
T *user_data;
event_flags events;
};
template<typename T = no_user_data> class poller_t
{ {
public: public:
using event_type = poller_event<T>;
poller_t() = default; poller_t() = default;
void add(zmq::socket_ref socket, short events, T *user_data) template<
typename Dummy = void,
typename =
typename std::enable_if<!std::is_same<T, no_user_data>::value, Dummy>::type>
void add(zmq::socket_ref socket, event_flags events, T *user_data)
{ {
if (0 add_impl(socket, events, user_data);
!= zmq_poller_add(poller_ptr.get(), socket.handle(), }
user_data, events)) {
throw error_t(); void add(zmq::socket_ref socket, event_flags events)
} {
add_impl(socket, events, nullptr);
} }
void remove(zmq::socket_ref socket) void remove(zmq::socket_ref socket)
@ -1844,21 +1891,23 @@ template<typename T = void> class poller_t
} }
} }
void modify(zmq::socket_ref socket, short events) void modify(zmq::socket_ref socket, event_flags events)
{ {
if (0 if (0
!= zmq_poller_modify(poller_ptr.get(), socket.handle(), != zmq_poller_modify(poller_ptr.get(), socket.handle(),
events)) { static_cast<short>(events))) {
throw error_t(); throw error_t();
} }
} }
size_t wait_all(std::vector<zmq_poller_event_t> &poller_events, size_t wait_all(std::vector<event_type> &poller_events,
const std::chrono::milliseconds timeout) const std::chrono::milliseconds timeout)
{ {
int rc = zmq_poller_wait_all(poller_ptr.get(), poller_events.data(), int rc = zmq_poller_wait_all(
static_cast<int>(poller_events.size()), poller_ptr.get(),
static_cast<long>(timeout.count())); reinterpret_cast<zmq_poller_event_t *>(poller_events.data()),
static_cast<int>(poller_events.size()),
static_cast<long>(timeout.count()));
if (rc > 0) if (rc > 0)
return static_cast<size_t>(rc); return static_cast<size_t>(rc);
@ -1873,18 +1922,30 @@ template<typename T = void> class poller_t
} }
private: private:
std::unique_ptr<void, void(*)(void *)> poller_ptr{ struct destroy_poller_t
{
void operator()(void *ptr) noexcept
{
int rc = zmq_poller_destroy(&ptr);
ZMQ_ASSERT(rc == 0);
}
};
std::unique_ptr<void, destroy_poller_t> poller_ptr{
[]() { []() {
auto poller_new = zmq_poller_new(); auto poller_new = zmq_poller_new();
if (poller_new) if (poller_new)
return poller_new; return poller_new;
throw error_t(); throw error_t();
}(), &destroy_poller}; }()};
static void destroy_poller(void *ptr) void add_impl(zmq::socket_ref socket, event_flags events, T *user_data)
{ {
int rc = zmq_poller_destroy(&ptr); if (0
ZMQ_ASSERT(rc == 0); != zmq_poller_add(poller_ptr.get(), socket.handle(),
user_data, static_cast<short>(events))) {
throw error_t();
}
} }
}; };
#endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)

View File

@ -382,15 +382,15 @@ class active_poller_t
active_poller_t(active_poller_t &&src) = default; active_poller_t(active_poller_t &&src) = default;
active_poller_t &operator=(active_poller_t &&src) = default; active_poller_t &operator=(active_poller_t &&src) = default;
using handler_t = std::function<void(short)>; using handler_type = std::function<void(event_flags)>;
void add(zmq::socket_ref socket, short events, handler_t handler) void add(zmq::socket_ref socket, event_flags events, handler_type handler)
{ {
auto it = decltype(handlers)::iterator{}; auto it = decltype(handlers)::iterator{};
auto inserted = bool{}; auto inserted = bool{};
std::tie(it, inserted) = std::tie(it, inserted) =
handlers.emplace(socket, handlers.emplace(socket,
std::make_shared<handler_t>(std::move(handler))); std::make_shared<handler_type>(std::move(handler)));
try { try {
base_poller.add(socket, events, base_poller.add(socket, events,
inserted && *(it->second) ? it->second.get() : nullptr); inserted && *(it->second) ? it->second.get() : nullptr);
@ -412,7 +412,7 @@ class active_poller_t
need_rebuild = true; need_rebuild = true;
} }
void modify(zmq::socket_ref socket, short events) void modify(zmq::socket_ref socket, event_flags events)
{ {
base_poller.modify(socket, events); base_poller.modify(socket, events);
} }
@ -429,26 +429,25 @@ class active_poller_t
need_rebuild = false; need_rebuild = false;
} }
const auto count = base_poller.wait_all(poller_events, timeout); const auto count = base_poller.wait_all(poller_events, timeout);
std::for_each(poller_events.begin(), poller_events.begin() + count, std::for_each(poller_events.begin(), poller_events.begin() + static_cast<ptrdiff_t>(count),
[](zmq_poller_event_t &event) { [](decltype(base_poller)::event_type &event) {
if (event.user_data != NULL) if (event.user_data != nullptr)
(*reinterpret_cast<handler_t *>(event.user_data))( (*event.user_data)(event.events);
event.events);
}); });
return count; return count;
} }
bool empty() const { return handlers.empty(); } ZMQ_NODISCARD bool empty() const noexcept { return handlers.empty(); }
size_t size() const { return handlers.size(); } size_t size() const noexcept { return handlers.size(); }
private: private:
bool need_rebuild{false}; bool need_rebuild{false};
poller_t<handler_t> base_poller{}; poller_t<handler_type> base_poller{};
std::unordered_map<socket_ref, std::shared_ptr<handler_t>> handlers{}; std::unordered_map<socket_ref, std::shared_ptr<handler_type>> handlers{};
std::vector<zmq_poller_event_t> poller_events{}; std::vector<decltype(base_poller)::event_type> poller_events{};
std::vector<std::shared_ptr<handler_t>> poller_handlers{}; std::vector<std::shared_ptr<handler_type>> poller_handlers{};
}; // class active_poller_t }; // class active_poller_t
#endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER) #endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)