From d4da63fed64316e07cea7462e72375de393122dc Mon Sep 17 00:00:00 2001 From: a4z Date: Fri, 14 Jul 2017 16:03:04 +0200 Subject: [PATCH] Problem: monitor_t::monitor function is blocking This does not allaw to use monitor_t without a thread. What is often OK but sometimes not. Solution: keep existing interface but add a non blocking alternative. --- zmq.hpp | 223 +++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 150 insertions(+), 73 deletions(-) diff --git a/zmq.hpp b/zmq.hpp index 3a43228..a838716 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -703,8 +703,32 @@ namespace zmq class monitor_t { public: - monitor_t() : socketPtr(NULL) {} - virtual ~monitor_t() {} + monitor_t() : socketPtr(NULL), monitor_socket{NULL} {} + + virtual ~monitor_t() + { + if (socketPtr) + zmq_socket_monitor(socketPtr, NULL, 0); + + if (monitor_socket) + zmq_close (monitor_socket); + + } + + +#ifdef ZMQ_HAS_RVALUE_REFS + monitor_t(monitor_t&& rhs) ZMQ_NOTHROW : + socketPtr(rhs.socketPtr), + monitor_socket(rhs.monitor_socket) + { + rhs.socketPtr = NULL; + rhs.monitor_socket = NULL; + } + + socket_t& operator=(socket_t&& rhs) ZMQ_DELETED_FUNCTION ; +#endif + + void monitor(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL) { @@ -712,104 +736,146 @@ namespace zmq } void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) + { + init (socket, addr_, events) ; + while(true) + { + check_event(-1) ; + } + } + + void init(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL) + { + init(socket, addr.c_str(), events); + } + + void init(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL) { int rc = zmq_socket_monitor(socket.ptr, addr_, events); if (rc != 0) throw error_t (); socketPtr = socket.ptr; - void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR); - assert (s); + monitor_socket = zmq_socket (socket.ctxptr, ZMQ_PAIR); + assert (monitor_socket); - rc = zmq_connect (s, addr_); + rc = zmq_connect (monitor_socket, addr_); assert (rc == 0); on_monitor_started(); + } - while (true) { - zmq_msg_t eventMsg; - zmq_msg_init (&eventMsg); - rc = zmq_msg_recv (&eventMsg, s, 0); + bool check_event(int timeout = 0) + { + assert (monitor_socket); + + zmq_msg_t eventMsg; + zmq_msg_init (&eventMsg); + + zmq::pollitem_t items [] = { + { monitor_socket, 0, ZMQ_POLLIN, 0 }, + }; + + zmq::poll (&items [0], 1, timeout); + + if (items [0].revents & ZMQ_POLLIN) + { + int rc = zmq_msg_recv (&eventMsg, monitor_socket, 0); if (rc == -1 && zmq_errno() == ETERM) - break; + return false; assert (rc != -1); + + } + else + { + zmq_msg_close (&eventMsg); + return false; + } + #if ZMQ_VERSION_MAJOR >= 4 - const char* data = static_cast(zmq_msg_data(&eventMsg)); - zmq_event_t msgEvent; - memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t); - memcpy(&msgEvent.value, data, sizeof(int32_t)); - zmq_event_t* event = &msgEvent; + const char* data = static_cast(zmq_msg_data(&eventMsg)); + zmq_event_t msgEvent; + memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t); + memcpy(&msgEvent.value, data, sizeof(int32_t)); + zmq_event_t* event = &msgEvent; #else - zmq_event_t* event = static_cast(zmq_msg_data(&eventMsg)); + zmq_event_t* event = static_cast(zmq_msg_data(&eventMsg)); #endif #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT - zmq_msg_t addrMsg; - zmq_msg_init (&addrMsg); - rc = zmq_msg_recv (&addrMsg, s, 0); - if (rc == -1 && zmq_errno() == ETERM) - break; - assert (rc != -1); - const char* str = static_cast(zmq_msg_data (&addrMsg)); - std::string address(str, str + zmq_msg_size(&addrMsg)); - zmq_msg_close (&addrMsg); + zmq_msg_t addrMsg; + zmq_msg_init (&addrMsg); + int rc = zmq_msg_recv (&addrMsg, monitor_socket, 0); + if (rc == -1 && zmq_errno() == ETERM) + { + zmq_msg_close (&eventMsg); + return false; + } + + assert (rc != -1); + const char* str = static_cast(zmq_msg_data (&addrMsg)); + std::string address(str, str + zmq_msg_size(&addrMsg)); + zmq_msg_close (&addrMsg); #else - // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. - std::string address = event->data.connected.addr; + // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. + std::string address = event->data.connected.addr; #endif #ifdef ZMQ_EVENT_MONITOR_STOPPED - if (event->event == ZMQ_EVENT_MONITOR_STOPPED) - break; + if (event->event == ZMQ_EVENT_MONITOR_STOPPED) + { + zmq_msg_close (&eventMsg); + return true; + } + #endif - switch (event->event) { - case ZMQ_EVENT_CONNECTED: - on_event_connected(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_DELAYED: - on_event_connect_delayed(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_RETRIED: - on_event_connect_retried(*event, address.c_str()); - break; - case ZMQ_EVENT_LISTENING: - on_event_listening(*event, address.c_str()); - break; - case ZMQ_EVENT_BIND_FAILED: - on_event_bind_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPTED: - on_event_accepted(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPT_FAILED: - on_event_accept_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSED: - on_event_closed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSE_FAILED: - on_event_close_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_DISCONNECTED: - on_event_disconnected(*event, address.c_str()); - break; + switch (event->event) { + case ZMQ_EVENT_CONNECTED: + on_event_connected(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + on_event_connect_delayed(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + on_event_connect_retried(*event, address.c_str()); + break; + case ZMQ_EVENT_LISTENING: + on_event_listening(*event, address.c_str()); + break; + case ZMQ_EVENT_BIND_FAILED: + on_event_bind_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPTED: + on_event_accepted(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + on_event_accept_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSED: + on_event_closed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSE_FAILED: + on_event_close_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_DISCONNECTED: + on_event_disconnected(*event, address.c_str()); + break; #ifdef ZMQ_BUILD_DRAFT_API - case ZMQ_EVENT_HANDSHAKE_FAILED: - on_event_handshake_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_HANDSHAKE_SUCCEED: - on_event_handshake_succeed(*event, address.c_str()); - break; + case ZMQ_EVENT_HANDSHAKE_FAILED: + on_event_handshake_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEED: + on_event_handshake_succeed(*event, address.c_str()); + break; #endif - default: - on_event_unknown(*event, address.c_str()); - break; - } - zmq_msg_close (&eventMsg); + default: + on_event_unknown(*event, address.c_str()); + break; } - zmq_close (s); - socketPtr = NULL; + zmq_msg_close (&eventMsg); + + return true ; } #ifdef ZMQ_EVENT_MONITOR_STOPPED @@ -817,6 +883,12 @@ namespace zmq { if (socketPtr) zmq_socket_monitor(socketPtr, NULL, 0); + + if (monitor_socket) + zmq_close (monitor_socket); + + socketPtr = NULL; + monitor_socket = NULL; } #endif virtual void on_monitor_started() {} @@ -834,7 +906,12 @@ namespace zmq virtual void on_event_handshake_succeed(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; } virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; } private: + + monitor_t (const monitor_t&) ZMQ_DELETED_FUNCTION; + void operator = (const monitor_t&) ZMQ_DELETED_FUNCTION; + void* socketPtr; + void *monitor_socket ; }; #if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)