diff --git a/zmq.hpp b/zmq.hpp index 5049f16..afd2418 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -278,33 +278,34 @@ namespace zmq class monitor_t { - public: - virtual void on_event_connected() = 0; - virtual void on_event_connect_delayed() = 0; - virtual void on_event_connect_retried() = 0; - virtual void on_event_listening() = 0; - virtual void on_event_bind_failed() = 0; - virtual void on_event_accepted() = 0; - virtual void on_event_accept_failed() = 0; - virtual void on_event_closed() = 0; - virtual void on_event_close_failed() = 0; - virtual void on_event_disconnected() = 0; - virtual void on_event_unknown(int event) = 0; + public: + virtual void on_event_connected(const char *addr_) = 0; + virtual void on_event_connect_delayed(const char *addr_) = 0; + virtual void on_event_connect_retried(const char *addr_) = 0; + virtual void on_event_listening(const char *addr_) = 0; + virtual void on_event_bind_failed(const char *addr_) = 0; + virtual void on_event_accepted(const char *addr_) = 0; + virtual void on_event_accept_failed(const char *addr_) = 0; + virtual void on_event_closed(const char *addr_) = 0; + virtual void on_event_close_failed(const char *addr_) = 0; + virtual void on_event_disconnected(const char *addr_) = 0; + virtual void on_event_unknown(int event) = 0; }; class socket_t { public: - inline socket_t (context_t &context_, int type_) : is_monitored(false) + inline socket_t (context_t &context_, int type_) { + ctxptr = context_.ptr; ptr = zmq_socket (context_.ptr, type_); if (ptr == NULL) throw error_t (); } #ifdef ZMQ_HAS_RVALUE_REFS - inline socket_t(socket_t&& rhs) : ptr(rhs.ptr), is_monitored(false) + inline socket_t(socket_t&& rhs) : ptr(rhs.ptr) { rhs.ptr = NULL; } @@ -351,13 +352,12 @@ namespace zmq throw error_t (); } - inlinde void init_monitor(const char *addr_) + inline void init_monitor(const char *addr_, int events) { - std::string myaddr = std::string(addr_); - std::string monaddr = "inproc://monitor/" + myaddr; - rc = zmq_socket_monitor (ptr, monaddr.c_str(), ZMQ_EVENT_ALL); + int rc = zmq_socket_monitor(ptr, addr_, events); if (rc != 0) throw error_t (); + monaddr = std::string(addr_); } inline void bind (const char *addr_) @@ -365,10 +365,6 @@ namespace zmq int rc = zmq_bind (ptr, addr_); if (rc != 0) throw error_t (); - if (is_monitored) - { - init_monitor() - } } inline void connect (const char *addr_) @@ -376,10 +372,6 @@ namespace zmq int rc = zmq_connect (ptr, addr_); if (rc != 0) throw error_t (); - if (is_monitored) - { - init_monitor() - } } inline bool connected() @@ -387,68 +379,64 @@ namespace zmq return(ptr != NULL); } - inline void monitor (void* ctx, monitor_t* mon) + inline void monitor (monitor_t* mon) { - zmq_event_t event; - int rc; + zmq_event_t event; + int rc; - assert(mon); + assert(mon); - void *s = zmq_socket (ctx, ZMQ_PAIR); - assert (s); + void *s = zmq_socket (ctxptr, ZMQ_PAIR); + assert (s); - const char* addr = myaddr.c_str(); + rc = zmq_connect (s, monaddr.c_str()); + assert (rc == 0); + while (true) { + zmq_msg_t msg; + zmq_msg_init (&msg); + rc = zmq_recvmsg (s, &msg, 0); + if (rc == -1 && zmq_errno() == ETERM) break; + assert (rc != -1); + memcpy (&event, zmq_msg_data (&msg), sizeof (event)); - rc = zmq_connect (s, monaddr.c_str()); - assert (rc == 0); - while (true) { - zmq_msg_t msg; - zmq_msg_init (&msg); - rc = zmq_recvmsg (s, &msg, 0); - if (rc == -1 && zmq_errno() == ETERM) break; - assert (rc != -1); - memcpy (&event, zmq_msg_data (&msg), sizeof (event)); - - switch (event.event) { - case ZMQ_EVENT_CONNECTED: - mon->on_event_connected(event.connected.addr); - break; - case ZMQ_EVENT_CONNECT_DELAYED: - mon->on_event_connect_delayed(); - break; - case ZMQ_EVENT_CONNECT_RETRIED: - mon->on_event_connect_retried(); - break; - case ZMQ_EVENT_LISTENING: - mon->on_event_listening(); - break; - case ZMQ_EVENT_BIND_FAILED: - mon->on_event_bind_failed(); - break; - case ZMQ_EVENT_ACCEPTED: - mon->on_event_accepted(); - break; - case ZMQ_EVENT_ACCEPT_FAILED: - mon->on_event_accept_failed(); - break; - case ZMQ_EVENT_CLOSED: - mon->on_event_closed(); - break; - case ZMQ_EVENT_CLOSE_FAILED: - mon->on_event_close_failed(); - break; - case ZMQ_EVENT_DISCONNECTED: - mon->on_event_disconnected(); - break; - default: - mon->on_event_unknown(event.event); - break; - } - zmq_msg_close (&msg); - } - zmq_close (s); - if (rc != 0) - throw error_t (); + switch (event.event) { + case ZMQ_EVENT_CONNECTED: + mon->on_event_connected(event.data.connected.addr); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + mon->on_event_connect_delayed(event.data.connect_delayed.addr); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + mon->on_event_connect_retried(event.data.connect_retried.addr); + break; + case ZMQ_EVENT_LISTENING: + mon->on_event_listening(event.data.listening.addr); + break; + case ZMQ_EVENT_BIND_FAILED: + mon->on_event_bind_failed(event.data.bind_failed.addr); + break; + case ZMQ_EVENT_ACCEPTED: + mon->on_event_accepted(event.data.accepted.addr); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + mon->on_event_accept_failed(event.data.accept_failed.addr); + break; + case ZMQ_EVENT_CLOSED: + mon->on_event_closed(event.data.closed.addr); + break; + case ZMQ_EVENT_CLOSE_FAILED: + mon->on_event_close_failed(event.data.close_failed.addr); + break; + case ZMQ_EVENT_DISCONNECTED: + mon->on_event_disconnected(event.data.disconnected.addr); + break; + default: + mon->on_event_unknown(event.event); + break; + } + zmq_msg_close (&msg); + } + zmq_close (s); } inline size_t send (const void *buf_, size_t len_, int flags_ = 0) @@ -491,17 +479,10 @@ namespace zmq throw error_t (); } - inline void enable_monitoring() - { - is_monitored = true; - } - private: - bool is_monitored; + std::string monaddr; void *ptr; - monitor_t *mon; - std::string monaddr; - std::string myaddr; + void *ctxptr; socket_t (const socket_t&) ZMQ_DELETED_FUNCTION; void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;