diff --git a/tests/poller.cpp b/tests/poller.cpp index dfb0776..6297523 100644 --- a/tests/poller.cpp +++ b/tests/poller.cpp @@ -9,6 +9,7 @@ TEST(poller, create_destroy) { zmq::poller_t poller; + ASSERT_TRUE(poller.empty ()); } static_assert(!std::is_copy_constructible::value, "poller_t should not be copy-constructible"); @@ -16,19 +17,26 @@ static_assert(!std::is_copy_assignable::value, "poller_t should n TEST(poller, move_construct_empty) { - std::unique_ptr a{new zmq::poller_t}; - zmq::poller_t b = std::move(*a); - + std::unique_ptr a {new zmq::poller_t}; + ASSERT_TRUE(a->empty ()); + zmq::poller_t b = std::move (*a); + ASSERT_TRUE(b.empty ()); + ASSERT_EQ(0u, a->size ()); + ASSERT_EQ(0u, b.size ()); a.reset (); } TEST(poller, move_assign_empty) { std::unique_ptr a{new zmq::poller_t}; + ASSERT_TRUE(a->empty()); zmq::poller_t b; - + ASSERT_TRUE(b.empty()); b = std::move(*a); - + ASSERT_EQ(0u, a->size ()); + ASSERT_EQ(0u, b.size ()); + ASSERT_TRUE(a->empty()); + ASSERT_TRUE(b.empty()); a.reset (); } @@ -39,8 +47,13 @@ TEST(poller, move_construct_non_empty) std::unique_ptr a{new zmq::poller_t}; a->add(socket, ZMQ_POLLIN, [](short) {}); - zmq::poller_t b = std::move(*a); - + ASSERT_FALSE(a->empty ()); + ASSERT_EQ(1u, a->size ()); + zmq::poller_t b = std::move (*a); + ASSERT_TRUE(a->empty ()); + ASSERT_EQ(0u, a->size ()); + ASSERT_FALSE(b.empty ()); + ASSERT_EQ(1u, b.size ()); a.reset (); } @@ -51,10 +64,14 @@ TEST(poller, move_assign_non_empty) std::unique_ptr a{new zmq::poller_t}; a->add(socket, ZMQ_POLLIN, [](short) {}); + ASSERT_FALSE(a->empty()); + ASSERT_EQ(1u, a->size ()); zmq::poller_t b; - b = std::move(*a); - + ASSERT_TRUE(a->empty ()); + ASSERT_EQ(0u, a->size ()); + ASSERT_FALSE(b.empty ()); + ASSERT_EQ(1u, b.size ()); a.reset (); } @@ -75,6 +92,8 @@ TEST(poller, add_handler_invalid_events_type) zmq::poller_t::handler_t handler; short invalid_events_type = 2 << 10; ASSERT_NO_THROW(poller.add(socket, invalid_events_type, handler)); + ASSERT_FALSE(poller.empty ()); + ASSERT_EQ(1u, poller.size ()); } TEST(poller, add_handler_twice_throws) @@ -229,7 +248,7 @@ TEST(poller, client_server) ASSERT_EQ(events, ZMQ_POLLOUT); } -TEST(poller, poller_add_invalid_socket_throws) +TEST(poller, add_invalid_socket_throws) { zmq::context_t context; zmq::poller_t poller; @@ -239,15 +258,17 @@ TEST(poller, poller_add_invalid_socket_throws) zmq::error_t); } -TEST(poller, poller_remove_invalid_socket_throws) +TEST(poller, remove_invalid_socket_throws) { zmq::context_t context; zmq::socket_t socket {context, zmq::socket_type::router}; zmq::poller_t poller; ASSERT_NO_THROW (poller.add (socket, ZMQ_POLLIN, zmq::poller_t::handler_t {})); + ASSERT_EQ (1u, poller.size ()); std::vector sockets; sockets.emplace_back (std::move (socket)); ASSERT_THROW (poller.remove (socket), zmq::error_t); + ASSERT_EQ (1u, poller.size ()); } TEST(poller, wait_on_added_empty_handler) @@ -340,6 +361,59 @@ TEST(poller, wait_one_return) ASSERT_EQ(count, result); } +TEST(poller, wait_on_move_constructed_poller) +{ + server_client_setup s; + ASSERT_NO_THROW (s.client.send ("Hi")); + zmq::poller_t a; + zmq::poller_t::handler_t handler; + ASSERT_NO_THROW (a.add (s.server, ZMQ_POLLIN, handler)); + ASSERT_EQ(1u, a.size ()); + zmq::poller_t b {std::move (a)}; + ASSERT_EQ(1u, b.size ()); + ASSERT_NO_THROW (b.wait (std::chrono::milliseconds {-1})); +} + +TEST(poller, wait_on_move_assign_poller) +{ + server_client_setup s; + ASSERT_NO_THROW (s.client.send ("Hi")); + zmq::poller_t a; + zmq::poller_t::handler_t handler; + ASSERT_NO_THROW (a.add (s.server, ZMQ_POLLIN, handler)); + ASSERT_EQ(1u, a.size ()); + zmq::poller_t b; + ASSERT_EQ(0u, b.size ()); + b = {std::move (a)}; + ASSERT_EQ(1u, b.size ()); + ASSERT_NO_THROW (b.wait (std::chrono::milliseconds {-1})); +} + +TEST(poller, received_on_move_construced_poller) +{ + // Setup server and client + server_client_setup s; + int count = 0; + // Setup poller a + zmq::poller_t a; + ASSERT_NO_THROW(a.add(s.server, ZMQ_POLLIN, [&count](short) { + ++count; + })); + // client sends message + ASSERT_NO_THROW(s.client.send("Hi")); + // wait for message and verify it is received + a.wait(std::chrono::milliseconds{500}); + ASSERT_EQ(1u, count); + // Move construct poller b + zmq::poller_t b{std::move(a)}; + // client sends message again + ASSERT_NO_THROW(s.client.send("Hi")); + // wait for message and verify it is received + b.wait(std::chrono::milliseconds{500}); + ASSERT_EQ(2u, count); +} + + TEST(poller, remove_from_handler) { constexpr auto ITER_NO = 10; @@ -354,9 +428,11 @@ TEST(poller, remove_from_handler) for (auto i = 0; i < ITER_NO; ++i) { ASSERT_NO_THROW(poller.add(setup_list[i].server, ZMQ_POLLIN, [&,i](short events) { ASSERT_EQ(events, ZMQ_POLLIN); - poller.remove(setup_list[ITER_NO - i -1].server); + poller.remove(setup_list[ITER_NO-i-1].server); + ASSERT_EQ(ITER_NO-i-1, poller.size()); })); } + ASSERT_EQ(ITER_NO, poller.size()); // Clients send messages for (auto & s : setup_list) { ASSERT_NO_THROW(s.client.send("Hi")); diff --git a/zmq.hpp b/zmq.hpp index f2efacd..846385a 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -1018,36 +1018,19 @@ namespace zmq class poller_t { public: - poller_t () : poller_ptr (zmq_poller_new ()) + poller_t () { if (!poller_ptr) throw error_t (); } - ~poller_t () - { - if (poller_ptr) - { - int rc = zmq_poller_destroy (&poller_ptr); - assert(rc == 0); - } - } + ~poller_t () = default; poller_t(const poller_t&) = delete; poller_t &operator=(const poller_t&) = delete; - poller_t(poller_t&& src) - : poller_ptr(src.poller_ptr) - , poller_events(std::move (src.poller_events)) - { - src.poller_ptr = NULL; - } - poller_t &operator=(poller_t&& src) - { - poller_ptr = src.poller_ptr; - poller_events = std::move (src.poller_events); - src.poller_ptr = NULL; - return *this; - } + + poller_t(poller_t&& src) = default; + poller_t &operator=(poller_t&& src) = default; using handler_t = std::function; @@ -1056,7 +1039,7 @@ namespace zmq auto it = std::end (handlers); auto inserted = false; std::tie(it, inserted) = handlers.emplace (socket.ptr, std::make_shared (std::move (handler))); - if (0 == zmq_poller_add (poller_ptr, socket.ptr, inserted && *(it->second) ? it->second.get() : nullptr, events)) { + if (0 == zmq_poller_add (poller_ptr.get (), socket.ptr, inserted && *(it->second) ? it->second.get() : nullptr, events)) { need_rebuild = true; return; } @@ -1068,7 +1051,7 @@ namespace zmq void remove (zmq::socket_t &socket) { - if (0 == zmq_poller_remove (poller_ptr, socket.ptr)) { + if (0 == zmq_poller_remove (poller_ptr.get (), socket.ptr)) { handlers.erase (socket.ptr); need_rebuild = true; return; @@ -1078,7 +1061,7 @@ namespace zmq void modify (zmq::socket_t &socket, short events) { - if (0 != zmq_poller_modify (poller_ptr, socket.ptr, events)) + if (0 != zmq_poller_modify (poller_ptr.get (), socket.ptr, events)) throw error_t (); } @@ -1095,7 +1078,7 @@ namespace zmq } need_rebuild = false; } - int rc = zmq_poller_wait_all (poller_ptr, poller_events.data (), + int rc = zmq_poller_wait_all (poller_ptr.get (), poller_events.data (), static_cast (poller_events.size ()), static_cast(timeout.count ())); if (rc > 0) { @@ -1116,8 +1099,25 @@ namespace zmq throw error_t (); } + bool empty () const + { + return handlers.empty (); + } + + size_t size () const + { + return handlers.size (); + } + private: - void *poller_ptr {nullptr}; + std::unique_ptr> poller_ptr + { + zmq_poller_new (), + [](void *ptr) { + int rc = zmq_poller_destroy (&ptr); + ZMQ_ASSERT (rc == 0); + } + }; bool need_rebuild {false}; std::unordered_map> handlers {}; std::vector poller_events {};