mirror of
https://github.com/zeromq/cppzmq.git
synced 2025-04-01 09:24:53 +02:00
Problem: poller's handler not aware of event type. (#200)
* Problem: Poller's handler not aware of event type. It was possible to register a handler for more than one event types but impossible to distinguish which event is being handled. Now events are passed to a handler. Besides that some other changes: * new test covering changes in the poller * modified existing tests to cover changes in the poller * defined handler_t in poller_t scope for more convinient use and simpler code * helper loopback binder to be re-used in tests * Problem: CMake build fails on Windows Issue #199 It seems that with GCC on Linux <array> is implicitly included by one of stl includes already in zmq.hpp but it breaks on Windows with Visual Studio. Adding explicit include for array. Can not verify right now but this change is a good practice so creating a pull request. * Poller: array include not between C++11 guards
This commit is contained in:
parent
65475cb603
commit
1975818171
@ -3,6 +3,8 @@
|
||||
|
||||
#if defined(ZMQ_CPP11) && defined(ZMQ_BUILD_DRAFT_API)
|
||||
|
||||
#include <array>
|
||||
|
||||
TEST(poller, create_destroy)
|
||||
{
|
||||
zmq::poller_t poller;
|
||||
@ -26,7 +28,7 @@ TEST(poller, add_handler)
|
||||
zmq::context_t context;
|
||||
zmq::socket_t socket{context, zmq::socket_type::router};
|
||||
zmq::poller_t poller;
|
||||
std::function<void()> handler = [](){};
|
||||
zmq::poller_t::handler_t handler;
|
||||
ASSERT_NO_THROW(poller.add(socket, ZMQ_POLLIN, handler));
|
||||
}
|
||||
|
||||
@ -35,7 +37,7 @@ TEST(poller, add_handler_invalid_events_type)
|
||||
zmq::context_t context;
|
||||
zmq::socket_t socket{context, zmq::socket_type::router};
|
||||
zmq::poller_t poller;
|
||||
std::function<void()> handler = [](){};
|
||||
zmq::poller_t::handler_t handler;
|
||||
short invalid_events_type = 2 << 10;
|
||||
ASSERT_NO_THROW(poller.add(socket, invalid_events_type, handler));
|
||||
}
|
||||
@ -45,7 +47,7 @@ TEST(poller, add_handler_twice_throws)
|
||||
zmq::context_t context;
|
||||
zmq::socket_t socket{context, zmq::socket_type::router};
|
||||
zmq::poller_t poller;
|
||||
std::function<void()> handler = [](){};
|
||||
zmq::poller_t::handler_t handler;
|
||||
poller.add(socket, ZMQ_POLLIN, handler);
|
||||
ASSERT_THROW(poller.add(socket, ZMQ_POLLIN, handler), zmq::error_t);
|
||||
}
|
||||
@ -69,26 +71,46 @@ TEST(poller, remove_registered)
|
||||
zmq::context_t context;
|
||||
zmq::socket_t socket{context, zmq::socket_type::router};
|
||||
zmq::poller_t poller;
|
||||
std::function<void()> handler = [](){};
|
||||
zmq::poller_t::handler_t handler;
|
||||
poller.add(socket, ZMQ_POLLIN, handler);
|
||||
ASSERT_NO_THROW(poller.remove(socket));
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
||||
class loopback_ip4_binder
|
||||
{
|
||||
public:
|
||||
loopback_ip4_binder(zmq::socket_t &socket) { bind(socket); }
|
||||
std::string endpoint() { return endpoint_; }
|
||||
private:
|
||||
// Helper function used in constructor
|
||||
// as Gtest allows only void returning functions
|
||||
// and constructor/destructor are not.
|
||||
void bind(zmq::socket_t &socket)
|
||||
{
|
||||
ASSERT_NO_THROW(socket.bind("tcp://127.0.0.1:*"));
|
||||
std::array<char, 100> endpoint{};
|
||||
size_t endpoint_size = endpoint.size();
|
||||
ASSERT_NO_THROW(socket.getsockopt(ZMQ_LAST_ENDPOINT, endpoint.data(),
|
||||
&endpoint_size));
|
||||
ASSERT_TRUE(endpoint_size < endpoint.size());
|
||||
endpoint_ = std::string{endpoint.data()};
|
||||
}
|
||||
std::string endpoint_;
|
||||
};
|
||||
|
||||
} //namespace
|
||||
|
||||
TEST(poller, poll_basic)
|
||||
{
|
||||
zmq::context_t context;
|
||||
|
||||
zmq::socket_t vent{context, zmq::socket_type::push};
|
||||
ASSERT_NO_THROW(vent.bind("tcp://127.0.0.1:*"));
|
||||
auto endpoint = loopback_ip4_binder(vent).endpoint();
|
||||
|
||||
zmq::socket_t sink{context, zmq::socket_type::pull};
|
||||
// TODO: this should be simpler...
|
||||
std::array<char, 100> endpoint{};
|
||||
size_t endpoint_size = endpoint.size();
|
||||
ASSERT_NO_THROW(vent.getsockopt(ZMQ_LAST_ENDPOINT, endpoint.data(),
|
||||
&endpoint_size));
|
||||
ASSERT_TRUE(endpoint_size < endpoint.size());
|
||||
ASSERT_NO_THROW(sink.connect(endpoint.data()));
|
||||
ASSERT_NO_THROW(sink.connect(endpoint));
|
||||
|
||||
std::string message = "H";
|
||||
|
||||
@ -97,7 +119,8 @@ TEST(poller, poll_basic)
|
||||
|
||||
zmq::poller_t poller;
|
||||
bool message_received = false;
|
||||
std::function<void()> handler = [&message_received]() {
|
||||
zmq::poller_t::handler_t handler = [&message_received](short events) {
|
||||
ASSERT_TRUE(0 != (events & ZMQ_POLLIN));
|
||||
message_received = true;
|
||||
};
|
||||
ASSERT_NO_THROW(poller.add(sink, ZMQ_POLLIN, handler));
|
||||
@ -105,4 +128,50 @@ TEST(poller, poll_basic)
|
||||
ASSERT_TRUE(message_received);
|
||||
}
|
||||
|
||||
TEST(poller, client_server)
|
||||
{
|
||||
zmq::context_t context;
|
||||
std::string send_msg = "Hi";
|
||||
|
||||
// Setup server
|
||||
zmq::socket_t server{context, zmq::socket_type::router};
|
||||
auto endpoint = loopback_ip4_binder(server).endpoint();
|
||||
|
||||
// Setup poller
|
||||
zmq::poller_t poller;
|
||||
bool got_pollin = false;
|
||||
bool got_pollout = false;
|
||||
zmq::poller_t::handler_t handler = [&](short events) {
|
||||
if (0 != (events & ZMQ_POLLIN)) {
|
||||
zmq::message_t zmq_msg;
|
||||
ASSERT_NO_THROW(server.recv(&zmq_msg)); // skip msg id
|
||||
ASSERT_NO_THROW(server.recv(&zmq_msg)); // get message
|
||||
std::string recv_msg(zmq_msg.data<char>(),
|
||||
zmq_msg.size());
|
||||
ASSERT_EQ(send_msg, recv_msg);
|
||||
got_pollin = true;
|
||||
} else if (0 != (events & ZMQ_POLLOUT)) {
|
||||
got_pollout = true;
|
||||
} else {
|
||||
ASSERT_TRUE(false) << "Unexpected event type " << events;
|
||||
}
|
||||
};
|
||||
ASSERT_NO_THROW(poller.add(server, ZMQ_POLLIN, handler));
|
||||
|
||||
// Setup client and send message
|
||||
zmq::socket_t client{context, zmq::socket_type::dealer};
|
||||
ASSERT_NO_THROW(client.connect(endpoint));
|
||||
ASSERT_NO_THROW(client.send(std::begin(send_msg), std::end(send_msg)));
|
||||
|
||||
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
|
||||
ASSERT_TRUE(got_pollin);
|
||||
ASSERT_FALSE(got_pollout);
|
||||
|
||||
// Re-add server socket with pollout flag
|
||||
ASSERT_NO_THROW(poller.remove(server));
|
||||
ASSERT_NO_THROW(poller.add(server, ZMQ_POLLIN | ZMQ_POLLOUT, handler));
|
||||
ASSERT_NO_THROW(poller.wait(std::chrono::milliseconds{-1}));
|
||||
ASSERT_TRUE(got_pollout);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
6
zmq.hpp
6
zmq.hpp
@ -1020,7 +1020,9 @@ namespace zmq
|
||||
return *this;
|
||||
}
|
||||
|
||||
void add (zmq::socket_t &socket, short events, std::function<void(void)> &handler)
|
||||
using handler_t = std::function<void(short)>;
|
||||
|
||||
void add (zmq::socket_t &socket, short events, handler_t &handler)
|
||||
{
|
||||
if (0 == zmq_poller_add (poller_ptr, socket.ptr, handler ? &handler : NULL, events)) {
|
||||
poller_events.emplace_back (zmq_poller_event_t ());
|
||||
@ -1044,7 +1046,7 @@ namespace zmq
|
||||
if (rc >= 0) {
|
||||
std::for_each (poller_events.begin (), poller_events.begin () + rc, [](zmq_poller_event_t& event) {
|
||||
if (event.user_data != NULL)
|
||||
(*reinterpret_cast<std::function<void(void)>*> (event.user_data)) ();
|
||||
(*reinterpret_cast<handler_t*> (event.user_data)) (event.events);
|
||||
});
|
||||
return true;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user