Update zmq.hpp

work in progress...
added monitor_t class
added monitor() method
This commit is contained in:
Montoya Edu 2013-01-29 19:11:33 +01:00
parent 127c255d65
commit b1dacd8d0f

93
zmq.hpp
View File

@ -276,6 +276,22 @@ namespace zmq
void operator = (const context_t&);
};
class monitor_t
{
public:
virtual void on_event_connected() = 0;
virtual void on_event_connect_delayed() = 0;
virtual void on_event_connect_retried() = 0;
virtual void on_event_listening() = 0;
virtual void on_event_bind_failed() = 0;
virtual void on_event_accepted() = 0;
virtual void on_event_accept_failed() = 0;
virtual void on_event_closed() = 0;
virtual void on_event_close_failed() = 0;
virtual void on_event_disconnected() = 0;
virtual void on_event_unknown(event.event) = 0;
};
class socket_t
{
public:
@ -338,6 +354,11 @@ namespace zmq
inline void bind (const char *addr_)
{
int rc = zmq_bind (ptr, addr_);
if (rc != 0)
throw error_t ();
myaddr = std::string(addr_);
monaddr = "inproc://monitor/" + myaddr;
rc = zmq_socket_monitor (ptr, monaddr.c_str(), ZMQ_EVENT_ALL);
if (rc != 0)
throw error_t ();
}
@ -345,6 +366,11 @@ namespace zmq
inline void connect (const char *addr_)
{
int rc = zmq_connect (ptr, addr_);
if (rc != 0)
throw error_t ();
myaddr = std::string(addr_);
monaddr = "inproc://monitor/" + myaddr;
rc = zmq_socket_monitor (ptr, monaddr.c_str(), ZMQ_EVENT_ALL);
if (rc != 0)
throw error_t ();
}
@ -353,6 +379,70 @@ namespace zmq
{
return(ptr != NULL);
}
inline void monitor (void* ctx, monitor_t* mon)
{
zmq_event_t event;
int rc;
assert(mon);
void *s = zmq_socket (ctx, ZMQ_PAIR);
assert (s);
const char* addr = myaddr.c_str();
rc = zmq_connect (s, monaddr.c_str());
assert (rc == 0);
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
mon->on_event_connected();
break;
case ZMQ_EVENT_CONNECT_DELAYED:
mon->on_event_connect_delayed();
break;
case ZMQ_EVENT_CONNECT_RETRIED:
mon->on_event_connect_retried();
break;
case ZMQ_EVENT_LISTENING:
mon->on_event_listening();
break;
case ZMQ_EVENT_BIND_FAILED:
mon->on_event_bind_failed();
break;
case ZMQ_EVENT_ACCEPTED:
mon->on_event_accepted();
break;
case ZMQ_EVENT_ACCEPT_FAILED:
mon->on_event_accept_failed();
break;
case ZMQ_EVENT_CLOSED:
mon->on_event_closed();
break;
case ZMQ_EVENT_CLOSE_FAILED:
mon->on_event_close_failed();
break;
case ZMQ_EVENT_DISCONNECTED:
mon->on_event_disconnected();
break;
default:
mon->on_event_unknown(event.event);
break;
}
zmq_msg_close (&msg);
}
zmq_close (s);
if (rc != 0)
throw error_t ();
}
inline size_t send (const void *buf_, size_t len_, int flags_ = 0)
{
@ -397,6 +487,9 @@ namespace zmq
private:
void *ptr;
monitor_t *mon;
std::string monaddr;
std::string myaddr;
socket_t (const socket_t&) ZMQ_DELETED_FUNCTION;
void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;