Rework monitor to support both new (3.3 and above) and old ways montior events work.

This commit is contained in:
Richard Newton 2013-06-28 17:32:49 +01:00
parent dfea887ecb
commit 938e4ab07b

74
zmq.hpp
View File

@ -30,6 +30,7 @@
#include <algorithm> #include <algorithm>
#include <cassert> #include <cassert>
#include <cstring> #include <cstring>
#include <string>
#include <exception> #include <exception>
// Detect whether the compiler supports C++11 rvalue references. // Detect whether the compiler supports C++11 rvalue references.
@ -55,6 +56,10 @@
#define ZMQ_DELETED_FUNCTION #define ZMQ_DELETED_FUNCTION
#endif #endif
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0)
#define ZMQ_NEW_MONITOR_EVENT_LAYOUT
#endif
// In order to prevent unused variable warnings when building in non-debug // In order to prevent unused variable warnings when building in non-debug
// mode use this macro to make assertions. // mode use this macro to make assertions.
#ifndef NDEBUG #ifndef NDEBUG
@ -444,66 +449,79 @@ namespace zmq
rc = zmq_connect (s, addr_); rc = zmq_connect (s, addr_);
assert (rc == 0); assert (rc == 0);
while (true) { while (true) {
zmq_msg_t msg; zmq_msg_t eventMsg;
zmq_msg_init (&msg); zmq_msg_init (&eventMsg);
rc = zmq_recvmsg (s, &msg, 0); rc = zmq_recvmsg (s, &eventMsg, 0);
if (rc == -1 && zmq_errno() == ETERM) if (rc == -1 && zmq_errno() == ETERM)
break; break;
assert (rc != -1); assert (rc != -1);
zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data (&eventMsg));
zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data (&msg)); #ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
zmq_msg_t addrMsg;
zmq_msg_init (&addrMsg);
rc = zmq_recvmsg (s, &addrMsg, 0);
if (rc == -1 && zmq_errno() == ETERM)
break;
assert (rc != -1);
const char* str = static_cast<const char*>(zmq_msg_data (&addrMsg));
std::string address(str, str + zmq_msg_size(&addrMsg));
zmq_msg_close (&addrMsg);
#else
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
std::string address = event->data.connected.addr;
#endif
switch (event->event) { switch (event->event) {
case ZMQ_EVENT_CONNECTED: case ZMQ_EVENT_CONNECTED:
on_event_connected(*event); on_event_connected(*event, address.c_str());
break; break;
case ZMQ_EVENT_CONNECT_DELAYED: case ZMQ_EVENT_CONNECT_DELAYED:
on_event_connect_delayed(*event); on_event_connect_delayed(*event, address.c_str());
break; break;
case ZMQ_EVENT_CONNECT_RETRIED: case ZMQ_EVENT_CONNECT_RETRIED:
on_event_connect_retried(*event); on_event_connect_retried(*event, address.c_str());
break; break;
case ZMQ_EVENT_LISTENING: case ZMQ_EVENT_LISTENING:
on_event_listening(*event); on_event_listening(*event, address.c_str());
break; break;
case ZMQ_EVENT_BIND_FAILED: case ZMQ_EVENT_BIND_FAILED:
on_event_bind_failed(*event); on_event_bind_failed(*event, address.c_str());
break; break;
case ZMQ_EVENT_ACCEPTED: case ZMQ_EVENT_ACCEPTED:
on_event_accepted(*event); on_event_accepted(*event, address.c_str());
break; break;
case ZMQ_EVENT_ACCEPT_FAILED: case ZMQ_EVENT_ACCEPT_FAILED:
on_event_accept_failed(*event); on_event_accept_failed(*event, address.c_str());
break; break;
case ZMQ_EVENT_CLOSED: case ZMQ_EVENT_CLOSED:
on_event_closed(*event); on_event_closed(*event, address.c_str());
break; break;
case ZMQ_EVENT_CLOSE_FAILED: case ZMQ_EVENT_CLOSE_FAILED:
on_event_close_failed(*event); on_event_close_failed(*event, address.c_str());
break; break;
case ZMQ_EVENT_DISCONNECTED: case ZMQ_EVENT_DISCONNECTED:
on_event_disconnected(*event); on_event_disconnected(*event, address.c_str());
break; break;
default: default:
on_event_unknown(*event); on_event_unknown(*event, address.c_str());
break; break;
} }
zmq_msg_close (&msg); zmq_msg_close (&eventMsg);
} }
zmq_close (s); zmq_close (s);
} }
virtual void on_event_connected(const zmq_event_t &event_) {} virtual void on_event_connected(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_connect_delayed(const zmq_event_t &event_) {} virtual void on_event_connect_delayed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_connect_retried(const zmq_event_t &event_) {} virtual void on_event_connect_retried(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_listening(const zmq_event_t &event_) {} virtual void on_event_listening(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_bind_failed(const zmq_event_t &event_) {} virtual void on_event_bind_failed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_accepted(const zmq_event_t &event_) {} virtual void on_event_accepted(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_accept_failed(const zmq_event_t &event_) {} virtual void on_event_accept_failed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_closed(const zmq_event_t &event_) {} virtual void on_event_closed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_close_failed(const zmq_event_t &event_) {} virtual void on_event_close_failed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_disconnected(const zmq_event_t &event_) {} virtual void on_event_disconnected(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_unknown(const zmq_event_t &event_) {} virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) {}
}; };
} }