From b1dacd8d0f9e700c4ff4763a07aba7a91ee2c3eb Mon Sep 17 00:00:00 2001 From: Montoya Edu Date: Tue, 29 Jan 2013 19:11:33 +0100 Subject: [PATCH] Update zmq.hpp work in progress... added monitor_t class added monitor() method --- zmq.hpp | 93 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/zmq.hpp b/zmq.hpp index 485a3da..e25359b 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -276,6 +276,22 @@ namespace zmq void operator = (const context_t&); }; + class monitor_t + { + public: + virtual void on_event_connected() = 0; + virtual void on_event_connect_delayed() = 0; + virtual void on_event_connect_retried() = 0; + virtual void on_event_listening() = 0; + virtual void on_event_bind_failed() = 0; + virtual void on_event_accepted() = 0; + virtual void on_event_accept_failed() = 0; + virtual void on_event_closed() = 0; + virtual void on_event_close_failed() = 0; + virtual void on_event_disconnected() = 0; + virtual void on_event_unknown(event.event) = 0; + }; + class socket_t { public: @@ -338,6 +354,11 @@ namespace zmq inline void bind (const char *addr_) { int rc = zmq_bind (ptr, addr_); + if (rc != 0) + throw error_t (); + myaddr = std::string(addr_); + monaddr = "inproc://monitor/" + myaddr; + rc = zmq_socket_monitor (ptr, monaddr.c_str(), ZMQ_EVENT_ALL); if (rc != 0) throw error_t (); } @@ -345,6 +366,11 @@ namespace zmq inline void connect (const char *addr_) { int rc = zmq_connect (ptr, addr_); + if (rc != 0) + throw error_t (); + myaddr = std::string(addr_); + monaddr = "inproc://monitor/" + myaddr; + rc = zmq_socket_monitor (ptr, monaddr.c_str(), ZMQ_EVENT_ALL); if (rc != 0) throw error_t (); } @@ -353,6 +379,70 @@ namespace zmq { return(ptr != NULL); } + + inline void monitor (void* ctx, monitor_t* mon) + { + zmq_event_t event; + int rc; + + assert(mon); + + void *s = zmq_socket (ctx, ZMQ_PAIR); + assert (s); + + const char* addr = myaddr.c_str(); + + rc = zmq_connect (s, monaddr.c_str()); + assert (rc == 0); + while (true) { + zmq_msg_t msg; + zmq_msg_init (&msg); + rc = zmq_recvmsg (s, &msg, 0); + if (rc == -1 && zmq_errno() == ETERM) break; + assert (rc != -1); + memcpy (&event, zmq_msg_data (&msg), sizeof (event)); + + switch (event.event) { + case ZMQ_EVENT_CONNECTED: + mon->on_event_connected(); + break; + case ZMQ_EVENT_CONNECT_DELAYED: + mon->on_event_connect_delayed(); + break; + case ZMQ_EVENT_CONNECT_RETRIED: + mon->on_event_connect_retried(); + break; + case ZMQ_EVENT_LISTENING: + mon->on_event_listening(); + break; + case ZMQ_EVENT_BIND_FAILED: + mon->on_event_bind_failed(); + break; + case ZMQ_EVENT_ACCEPTED: + mon->on_event_accepted(); + break; + case ZMQ_EVENT_ACCEPT_FAILED: + mon->on_event_accept_failed(); + break; + case ZMQ_EVENT_CLOSED: + mon->on_event_closed(); + break; + case ZMQ_EVENT_CLOSE_FAILED: + mon->on_event_close_failed(); + break; + case ZMQ_EVENT_DISCONNECTED: + mon->on_event_disconnected(); + break; + default: + mon->on_event_unknown(event.event); + break; + } + zmq_msg_close (&msg); + } + zmq_close (s); + if (rc != 0) + throw error_t (); + } inline size_t send (const void *buf_, size_t len_, int flags_ = 0) { @@ -397,6 +487,9 @@ namespace zmq private: void *ptr; + monitor_t *mon; + std::string monaddr; + std::string myaddr; socket_t (const socket_t&) ZMQ_DELETED_FUNCTION; void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;