diff --git a/zmq.hpp b/zmq.hpp index 049daaf..93314de 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2374,112 +2374,6 @@ class monitor_t return process_event(items[0].revents); } - - bool process_event(short events) - { - zmq::message_t eventMsg; - - if (events & ZMQ_POLLIN) { - int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0); - if (rc == -1 && zmq_errno() == ETERM) - return false; - assert(rc != -1); - - } else { - return false; - } - -#if ZMQ_VERSION_MAJOR >= 4 - const char *data = static_cast(eventMsg.data()); - zmq_event_t msgEvent; - memcpy(&msgEvent.event, data, sizeof(uint16_t)); - data += sizeof(uint16_t); - memcpy(&msgEvent.value, data, sizeof(int32_t)); - zmq_event_t *event = &msgEvent; -#else - zmq_event_t *event = static_cast(eventMsg.data()); -#endif - -#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT - zmq::message_t addrMsg; - int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0); - if (rc == -1 && zmq_errno() == ETERM) { - return false; - } - - assert(rc != -1); - std::string address = addrMsg.to_string(); -#else - // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. - std::string address = event->data.connected.addr; -#endif - -#ifdef ZMQ_EVENT_MONITOR_STOPPED - if (event->event == ZMQ_EVENT_MONITOR_STOPPED) { - return false; - } - -#endif - - switch (event->event) { - case ZMQ_EVENT_CONNECTED: - on_event_connected(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_DELAYED: - on_event_connect_delayed(*event, address.c_str()); - break; - case ZMQ_EVENT_CONNECT_RETRIED: - on_event_connect_retried(*event, address.c_str()); - break; - case ZMQ_EVENT_LISTENING: - on_event_listening(*event, address.c_str()); - break; - case ZMQ_EVENT_BIND_FAILED: - on_event_bind_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPTED: - on_event_accepted(*event, address.c_str()); - break; - case ZMQ_EVENT_ACCEPT_FAILED: - on_event_accept_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSED: - on_event_closed(*event, address.c_str()); - break; - case ZMQ_EVENT_CLOSE_FAILED: - on_event_close_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_DISCONNECTED: - on_event_disconnected(*event, address.c_str()); - break; -#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)) - case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL: - on_event_handshake_failed_no_detail(*event, address.c_str()); - break; - case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL: - on_event_handshake_failed_protocol(*event, address.c_str()); - break; - case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH: - on_event_handshake_failed_auth(*event, address.c_str()); - break; - case ZMQ_EVENT_HANDSHAKE_SUCCEEDED: - on_event_handshake_succeeded(*event, address.c_str()); - break; -#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) - case ZMQ_EVENT_HANDSHAKE_FAILED: - on_event_handshake_failed(*event, address.c_str()); - break; - case ZMQ_EVENT_HANDSHAKE_SUCCEED: - on_event_handshake_succeed(*event, address.c_str()); - break; -#endif - default: - on_event_unknown(*event, address.c_str()); - break; - } - - return true; - } #ifdef ZMQ_EVENT_MONITOR_STOPPED void abort() @@ -2588,12 +2482,120 @@ class monitor_t (void) addr_; } + protected: + bool process_event(short events) + { + zmq::message_t eventMsg; + + if (events & ZMQ_POLLIN) { + int rc = zmq_msg_recv(eventMsg.handle(), _monitor_socket.handle(), 0); + if (rc == -1 && zmq_errno() == ETERM) + return false; + assert(rc != -1); + + } else { + return false; + } + +#if ZMQ_VERSION_MAJOR >= 4 + const char *data = static_cast(eventMsg.data()); + zmq_event_t msgEvent; + memcpy(&msgEvent.event, data, sizeof(uint16_t)); + data += sizeof(uint16_t); + memcpy(&msgEvent.value, data, sizeof(int32_t)); + zmq_event_t *event = &msgEvent; +#else + zmq_event_t *event = static_cast(eventMsg.data()); +#endif + +#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT + zmq::message_t addrMsg; + int rc = zmq_msg_recv(addrMsg.handle(), _monitor_socket.handle(), 0); + if (rc == -1 && zmq_errno() == ETERM) { + return false; + } + + assert(rc != -1); + std::string address = addrMsg.to_string(); +#else + // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types. + std::string address = event->data.connected.addr; +#endif + +#ifdef ZMQ_EVENT_MONITOR_STOPPED + if (event->event == ZMQ_EVENT_MONITOR_STOPPED) { + return false; + } + +#endif + + switch (event->event) { + case ZMQ_EVENT_CONNECTED: + on_event_connected(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + on_event_connect_delayed(*event, address.c_str()); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + on_event_connect_retried(*event, address.c_str()); + break; + case ZMQ_EVENT_LISTENING: + on_event_listening(*event, address.c_str()); + break; + case ZMQ_EVENT_BIND_FAILED: + on_event_bind_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPTED: + on_event_accepted(*event, address.c_str()); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + on_event_accept_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSED: + on_event_closed(*event, address.c_str()); + break; + case ZMQ_EVENT_CLOSE_FAILED: + on_event_close_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_DISCONNECTED: + on_event_disconnected(*event, address.c_str()); + break; +#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 0) || (defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 3)) + case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL: + on_event_handshake_failed_no_detail(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL: + on_event_handshake_failed_protocol(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH: + on_event_handshake_failed_auth(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEEDED: + on_event_handshake_succeeded(*event, address.c_str()); + break; +#elif defined(ZMQ_BUILD_DRAFT_API) && ZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 2, 1) + case ZMQ_EVENT_HANDSHAKE_FAILED: + on_event_handshake_failed(*event, address.c_str()); + break; + case ZMQ_EVENT_HANDSHAKE_SUCCEED: + on_event_handshake_succeed(*event, address.c_str()); + break; +#endif + default: + on_event_unknown(*event, address.c_str()); + break; + } + + return true; + } + + socket_t _monitor_socket; + private: monitor_t(const monitor_t &) ZMQ_DELETED_FUNCTION; void operator=(const monitor_t &) ZMQ_DELETED_FUNCTION; socket_ref _socket; - socket_t _monitor_socket; void close() ZMQ_NOTHROW {