syntax & bug fixing

modified monitor_t interface
This commit is contained in:
H. Eduardo Montoya Sánchez 2013-01-30 17:30:05 +01:00
parent bcdf163fa8
commit f30f92b392

165
zmq.hpp
View File

@ -278,33 +278,34 @@ namespace zmq
class monitor_t class monitor_t
{ {
public: public:
virtual void on_event_connected() = 0; virtual void on_event_connected(const char *addr_) = 0;
virtual void on_event_connect_delayed() = 0; virtual void on_event_connect_delayed(const char *addr_) = 0;
virtual void on_event_connect_retried() = 0; virtual void on_event_connect_retried(const char *addr_) = 0;
virtual void on_event_listening() = 0; virtual void on_event_listening(const char *addr_) = 0;
virtual void on_event_bind_failed() = 0; virtual void on_event_bind_failed(const char *addr_) = 0;
virtual void on_event_accepted() = 0; virtual void on_event_accepted(const char *addr_) = 0;
virtual void on_event_accept_failed() = 0; virtual void on_event_accept_failed(const char *addr_) = 0;
virtual void on_event_closed() = 0; virtual void on_event_closed(const char *addr_) = 0;
virtual void on_event_close_failed() = 0; virtual void on_event_close_failed(const char *addr_) = 0;
virtual void on_event_disconnected() = 0; virtual void on_event_disconnected(const char *addr_) = 0;
virtual void on_event_unknown(int event) = 0; virtual void on_event_unknown(int event) = 0;
}; };
class socket_t class socket_t
{ {
public: public:
inline socket_t (context_t &context_, int type_) : is_monitored(false) inline socket_t (context_t &context_, int type_)
{ {
ctxptr = context_.ptr;
ptr = zmq_socket (context_.ptr, type_); ptr = zmq_socket (context_.ptr, type_);
if (ptr == NULL) if (ptr == NULL)
throw error_t (); throw error_t ();
} }
#ifdef ZMQ_HAS_RVALUE_REFS #ifdef ZMQ_HAS_RVALUE_REFS
inline socket_t(socket_t&& rhs) : ptr(rhs.ptr), is_monitored(false) inline socket_t(socket_t&& rhs) : ptr(rhs.ptr)
{ {
rhs.ptr = NULL; rhs.ptr = NULL;
} }
@ -351,13 +352,12 @@ namespace zmq
throw error_t (); throw error_t ();
} }
inlinde void init_monitor(const char *addr_) inline void init_monitor(const char *addr_, int events)
{ {
std::string myaddr = std::string(addr_); int rc = zmq_socket_monitor(ptr, addr_, events);
std::string monaddr = "inproc://monitor/" + myaddr;
rc = zmq_socket_monitor (ptr, monaddr.c_str(), ZMQ_EVENT_ALL);
if (rc != 0) if (rc != 0)
throw error_t (); throw error_t ();
monaddr = std::string(addr_);
} }
inline void bind (const char *addr_) inline void bind (const char *addr_)
@ -365,10 +365,6 @@ namespace zmq
int rc = zmq_bind (ptr, addr_); int rc = zmq_bind (ptr, addr_);
if (rc != 0) if (rc != 0)
throw error_t (); throw error_t ();
if (is_monitored)
{
init_monitor()
}
} }
inline void connect (const char *addr_) inline void connect (const char *addr_)
@ -376,10 +372,6 @@ namespace zmq
int rc = zmq_connect (ptr, addr_); int rc = zmq_connect (ptr, addr_);
if (rc != 0) if (rc != 0)
throw error_t (); throw error_t ();
if (is_monitored)
{
init_monitor()
}
} }
inline bool connected() inline bool connected()
@ -387,68 +379,64 @@ namespace zmq
return(ptr != NULL); return(ptr != NULL);
} }
inline void monitor (void* ctx, monitor_t* mon) inline void monitor (monitor_t* mon)
{ {
zmq_event_t event; zmq_event_t event;
int rc; int rc;
assert(mon); assert(mon);
void *s = zmq_socket (ctx, ZMQ_PAIR); void *s = zmq_socket (ctxptr, ZMQ_PAIR);
assert (s); assert (s);
const char* addr = myaddr.c_str(); rc = zmq_connect (s, monaddr.c_str());
assert (rc == 0);
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
rc = zmq_connect (s, monaddr.c_str()); switch (event.event) {
assert (rc == 0); case ZMQ_EVENT_CONNECTED:
while (true) { mon->on_event_connected(event.data.connected.addr);
zmq_msg_t msg; break;
zmq_msg_init (&msg); case ZMQ_EVENT_CONNECT_DELAYED:
rc = zmq_recvmsg (s, &msg, 0); mon->on_event_connect_delayed(event.data.connect_delayed.addr);
if (rc == -1 && zmq_errno() == ETERM) break; break;
assert (rc != -1); case ZMQ_EVENT_CONNECT_RETRIED:
memcpy (&event, zmq_msg_data (&msg), sizeof (event)); mon->on_event_connect_retried(event.data.connect_retried.addr);
break;
switch (event.event) { case ZMQ_EVENT_LISTENING:
case ZMQ_EVENT_CONNECTED: mon->on_event_listening(event.data.listening.addr);
mon->on_event_connected(event.connected.addr); break;
break; case ZMQ_EVENT_BIND_FAILED:
case ZMQ_EVENT_CONNECT_DELAYED: mon->on_event_bind_failed(event.data.bind_failed.addr);
mon->on_event_connect_delayed(); break;
break; case ZMQ_EVENT_ACCEPTED:
case ZMQ_EVENT_CONNECT_RETRIED: mon->on_event_accepted(event.data.accepted.addr);
mon->on_event_connect_retried(); break;
break; case ZMQ_EVENT_ACCEPT_FAILED:
case ZMQ_EVENT_LISTENING: mon->on_event_accept_failed(event.data.accept_failed.addr);
mon->on_event_listening(); break;
break; case ZMQ_EVENT_CLOSED:
case ZMQ_EVENT_BIND_FAILED: mon->on_event_closed(event.data.closed.addr);
mon->on_event_bind_failed(); break;
break; case ZMQ_EVENT_CLOSE_FAILED:
case ZMQ_EVENT_ACCEPTED: mon->on_event_close_failed(event.data.close_failed.addr);
mon->on_event_accepted(); break;
break; case ZMQ_EVENT_DISCONNECTED:
case ZMQ_EVENT_ACCEPT_FAILED: mon->on_event_disconnected(event.data.disconnected.addr);
mon->on_event_accept_failed(); break;
break; default:
case ZMQ_EVENT_CLOSED: mon->on_event_unknown(event.event);
mon->on_event_closed(); break;
break; }
case ZMQ_EVENT_CLOSE_FAILED: zmq_msg_close (&msg);
mon->on_event_close_failed(); }
break; zmq_close (s);
case ZMQ_EVENT_DISCONNECTED:
mon->on_event_disconnected();
break;
default:
mon->on_event_unknown(event.event);
break;
}
zmq_msg_close (&msg);
}
zmq_close (s);
if (rc != 0)
throw error_t ();
} }
inline size_t send (const void *buf_, size_t len_, int flags_ = 0) inline size_t send (const void *buf_, size_t len_, int flags_ = 0)
@ -491,17 +479,10 @@ namespace zmq
throw error_t (); throw error_t ();
} }
inline void enable_monitoring()
{
is_monitored = true;
}
private: private:
bool is_monitored; std::string monaddr;
void *ptr; void *ptr;
monitor_t *mon; void *ctxptr;
std::string monaddr;
std::string myaddr;
socket_t (const socket_t&) ZMQ_DELETED_FUNCTION; socket_t (const socket_t&) ZMQ_DELETED_FUNCTION;
void operator = (const socket_t&) ZMQ_DELETED_FUNCTION; void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;