Merge pull request #221 from kurdybacha/poller-size

Problem: poller move operations not complete
This commit is contained in:
Simon Giesecke 2018-05-09 09:05:48 +02:00 committed by GitHub
commit 2aba0bb3ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 115 additions and 39 deletions

View File

@ -9,6 +9,7 @@
TEST(poller, create_destroy)
{
zmq::poller_t poller;
ASSERT_TRUE(poller.empty ());
}
static_assert(!std::is_copy_constructible<zmq::poller_t>::value, "poller_t should not be copy-constructible");
@ -16,19 +17,26 @@ static_assert(!std::is_copy_assignable<zmq::poller_t>::value, "poller_t should n
TEST(poller, move_construct_empty)
{
std::unique_ptr<zmq::poller_t> a{new zmq::poller_t};
zmq::poller_t b = std::move(*a);
std::unique_ptr<zmq::poller_t> a {new zmq::poller_t};
ASSERT_TRUE(a->empty ());
zmq::poller_t b = std::move (*a);
ASSERT_TRUE(b.empty ());
ASSERT_EQ(0u, a->size ());
ASSERT_EQ(0u, b.size ());
a.reset ();
}
TEST(poller, move_assign_empty)
{
std::unique_ptr<zmq::poller_t> a{new zmq::poller_t};
ASSERT_TRUE(a->empty());
zmq::poller_t b;
ASSERT_TRUE(b.empty());
b = std::move(*a);
ASSERT_EQ(0u, a->size ());
ASSERT_EQ(0u, b.size ());
ASSERT_TRUE(a->empty());
ASSERT_TRUE(b.empty());
a.reset ();
}
@ -39,8 +47,13 @@ TEST(poller, move_construct_non_empty)
std::unique_ptr<zmq::poller_t> a{new zmq::poller_t};
a->add(socket, ZMQ_POLLIN, [](short) {});
zmq::poller_t b = std::move(*a);
ASSERT_FALSE(a->empty ());
ASSERT_EQ(1u, a->size ());
zmq::poller_t b = std::move (*a);
ASSERT_TRUE(a->empty ());
ASSERT_EQ(0u, a->size ());
ASSERT_FALSE(b.empty ());
ASSERT_EQ(1u, b.size ());
a.reset ();
}
@ -51,10 +64,14 @@ TEST(poller, move_assign_non_empty)
std::unique_ptr<zmq::poller_t> a{new zmq::poller_t};
a->add(socket, ZMQ_POLLIN, [](short) {});
ASSERT_FALSE(a->empty());
ASSERT_EQ(1u, a->size ());
zmq::poller_t b;
b = std::move(*a);
ASSERT_TRUE(a->empty ());
ASSERT_EQ(0u, a->size ());
ASSERT_FALSE(b.empty ());
ASSERT_EQ(1u, b.size ());
a.reset ();
}
@ -75,6 +92,8 @@ TEST(poller, add_handler_invalid_events_type)
zmq::poller_t::handler_t handler;
short invalid_events_type = 2 << 10;
ASSERT_NO_THROW(poller.add(socket, invalid_events_type, handler));
ASSERT_FALSE(poller.empty ());
ASSERT_EQ(1u, poller.size ());
}
TEST(poller, add_handler_twice_throws)
@ -229,7 +248,7 @@ TEST(poller, client_server)
ASSERT_EQ(events, ZMQ_POLLOUT);
}
TEST(poller, poller_add_invalid_socket_throws)
TEST(poller, add_invalid_socket_throws)
{
zmq::context_t context;
zmq::poller_t poller;
@ -239,15 +258,17 @@ TEST(poller, poller_add_invalid_socket_throws)
zmq::error_t);
}
TEST(poller, poller_remove_invalid_socket_throws)
TEST(poller, remove_invalid_socket_throws)
{
zmq::context_t context;
zmq::socket_t socket {context, zmq::socket_type::router};
zmq::poller_t poller;
ASSERT_NO_THROW (poller.add (socket, ZMQ_POLLIN, zmq::poller_t::handler_t {}));
ASSERT_EQ (1u, poller.size ());
std::vector<zmq::socket_t> sockets;
sockets.emplace_back (std::move (socket));
ASSERT_THROW (poller.remove (socket), zmq::error_t);
ASSERT_EQ (1u, poller.size ());
}
TEST(poller, wait_on_added_empty_handler)
@ -340,6 +361,59 @@ TEST(poller, wait_one_return)
ASSERT_EQ(count, result);
}
TEST(poller, wait_on_move_constructed_poller)
{
server_client_setup s;
ASSERT_NO_THROW (s.client.send ("Hi"));
zmq::poller_t a;
zmq::poller_t::handler_t handler;
ASSERT_NO_THROW (a.add (s.server, ZMQ_POLLIN, handler));
ASSERT_EQ(1u, a.size ());
zmq::poller_t b {std::move (a)};
ASSERT_EQ(1u, b.size ());
ASSERT_NO_THROW (b.wait (std::chrono::milliseconds {-1}));
}
TEST(poller, wait_on_move_assign_poller)
{
server_client_setup s;
ASSERT_NO_THROW (s.client.send ("Hi"));
zmq::poller_t a;
zmq::poller_t::handler_t handler;
ASSERT_NO_THROW (a.add (s.server, ZMQ_POLLIN, handler));
ASSERT_EQ(1u, a.size ());
zmq::poller_t b;
ASSERT_EQ(0u, b.size ());
b = {std::move (a)};
ASSERT_EQ(1u, b.size ());
ASSERT_NO_THROW (b.wait (std::chrono::milliseconds {-1}));
}
TEST(poller, received_on_move_construced_poller)
{
// Setup server and client
server_client_setup s;
int count = 0;
// Setup poller a
zmq::poller_t a;
ASSERT_NO_THROW(a.add(s.server, ZMQ_POLLIN, [&count](short) {
++count;
}));
// client sends message
ASSERT_NO_THROW(s.client.send("Hi"));
// wait for message and verify it is received
a.wait(std::chrono::milliseconds{500});
ASSERT_EQ(1u, count);
// Move construct poller b
zmq::poller_t b{std::move(a)};
// client sends message again
ASSERT_NO_THROW(s.client.send("Hi"));
// wait for message and verify it is received
b.wait(std::chrono::milliseconds{500});
ASSERT_EQ(2u, count);
}
TEST(poller, remove_from_handler)
{
constexpr auto ITER_NO = 10;
@ -354,9 +428,11 @@ TEST(poller, remove_from_handler)
for (auto i = 0; i < ITER_NO; ++i) {
ASSERT_NO_THROW(poller.add(setup_list[i].server, ZMQ_POLLIN, [&,i](short events) {
ASSERT_EQ(events, ZMQ_POLLIN);
poller.remove(setup_list[ITER_NO - i -1].server);
poller.remove(setup_list[ITER_NO-i-1].server);
ASSERT_EQ(ITER_NO-i-1, poller.size());
}));
}
ASSERT_EQ(ITER_NO, poller.size());
// Clients send messages
for (auto & s : setup_list) {
ASSERT_NO_THROW(s.client.send("Hi"));

54
zmq.hpp
View File

@ -1018,36 +1018,19 @@ namespace zmq
class poller_t
{
public:
poller_t () : poller_ptr (zmq_poller_new ())
poller_t ()
{
if (!poller_ptr)
throw error_t ();
}
~poller_t ()
{
if (poller_ptr)
{
int rc = zmq_poller_destroy (&poller_ptr);
assert(rc == 0);
}
}
~poller_t () = default;
poller_t(const poller_t&) = delete;
poller_t &operator=(const poller_t&) = delete;
poller_t(poller_t&& src)
: poller_ptr(src.poller_ptr)
, poller_events(std::move (src.poller_events))
{
src.poller_ptr = NULL;
}
poller_t &operator=(poller_t&& src)
{
poller_ptr = src.poller_ptr;
poller_events = std::move (src.poller_events);
src.poller_ptr = NULL;
return *this;
}
poller_t(poller_t&& src) = default;
poller_t &operator=(poller_t&& src) = default;
using handler_t = std::function<void(short)>;
@ -1056,7 +1039,7 @@ namespace zmq
auto it = std::end (handlers);
auto inserted = false;
std::tie(it, inserted) = handlers.emplace (socket.ptr, std::make_shared<handler_t> (std::move (handler)));
if (0 == zmq_poller_add (poller_ptr, socket.ptr, inserted && *(it->second) ? it->second.get() : nullptr, events)) {
if (0 == zmq_poller_add (poller_ptr.get (), socket.ptr, inserted && *(it->second) ? it->second.get() : nullptr, events)) {
need_rebuild = true;
return;
}
@ -1068,7 +1051,7 @@ namespace zmq
void remove (zmq::socket_t &socket)
{
if (0 == zmq_poller_remove (poller_ptr, socket.ptr)) {
if (0 == zmq_poller_remove (poller_ptr.get (), socket.ptr)) {
handlers.erase (socket.ptr);
need_rebuild = true;
return;
@ -1078,7 +1061,7 @@ namespace zmq
void modify (zmq::socket_t &socket, short events)
{
if (0 != zmq_poller_modify (poller_ptr, socket.ptr, events))
if (0 != zmq_poller_modify (poller_ptr.get (), socket.ptr, events))
throw error_t ();
}
@ -1095,7 +1078,7 @@ namespace zmq
}
need_rebuild = false;
}
int rc = zmq_poller_wait_all (poller_ptr, poller_events.data (),
int rc = zmq_poller_wait_all (poller_ptr.get (), poller_events.data (),
static_cast<int> (poller_events.size ()),
static_cast<long>(timeout.count ()));
if (rc > 0) {
@ -1116,8 +1099,25 @@ namespace zmq
throw error_t ();
}
bool empty () const
{
return handlers.empty ();
}
size_t size () const
{
return handlers.size ();
}
private:
void *poller_ptr {nullptr};
std::unique_ptr<void, std::function<void(void*)>> poller_ptr
{
zmq_poller_new (),
[](void *ptr) {
int rc = zmq_poller_destroy (&ptr);
ZMQ_ASSERT (rc == 0);
}
};
bool need_rebuild {false};
std::unordered_map<void*, std::shared_ptr<handler_t>> handlers {};
std::vector<zmq_poller_event_t> poller_events {};