Merge pull request #22 from ricnewton/monitor_updates

Move monitoring functionality out of socket_t and into monitor_t
This commit is contained in:
Andrey Sibiryov 2013-07-16 06:10:47 -07:00
commit f43e83e34e

206
zmq.hpp
View File

@ -30,6 +30,7 @@
#include <algorithm>
#include <cassert>
#include <cstring>
#include <string>
#include <exception>
#include <string>
@ -56,6 +57,10 @@
#define ZMQ_DELETED_FUNCTION
#endif
#if ZMQ_VERSION >= ZMQ_MAKE_VERSION(3, 3, 0)
#define ZMQ_NEW_MONITOR_EVENT_LAYOUT
#endif
// In order to prevent unused variable warnings when building in non-debug
// mode use this macro to make assertions.
#ifndef NDEBUG
@ -302,24 +307,9 @@ namespace zmq
void operator = (const context_t&);
};
class monitor_t
{
public:
virtual void on_event_connected(const char *addr_) = 0;
virtual void on_event_connect_delayed(const char *addr_) = 0;
virtual void on_event_connect_retried(const char *addr_) = 0;
virtual void on_event_listening(const char *addr_) = 0;
virtual void on_event_bind_failed(const char *addr_) = 0;
virtual void on_event_accepted(const char *addr_) = 0;
virtual void on_event_accept_failed(const char *addr_) = 0;
virtual void on_event_closed(const char *addr_) = 0;
virtual void on_event_close_failed(const char *addr_) = 0;
virtual void on_event_disconnected(const char *addr_) = 0;
virtual void on_event_unknown(int event) = 0;
};
class socket_t
{
friend class monitor_t;
public:
inline socket_t (context_t &context_, int type_)
@ -378,14 +368,6 @@ namespace zmq
throw error_t ();
}
inline void init_monitor(const char *addr_, int events)
{
int rc = zmq_socket_monitor(ptr, addr_, events);
if (rc != 0)
throw error_t ();
monaddr = std::string(addr_);
}
inline void bind (const char *addr_)
{
int rc = zmq_bind (ptr, addr_);
@ -418,67 +400,7 @@ namespace zmq
{
return(ptr != NULL);
}
inline void monitor (monitor_t* mon)
{
zmq_event_t event;
int rc;
assert(mon);
void *s = zmq_socket (ctxptr, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, monaddr.c_str());
assert (rc == 0);
while (true) {
zmq_msg_t msg;
zmq_msg_init (&msg);
rc = zmq_recvmsg (s, &msg, 0);
if (rc == -1 && zmq_errno() == ETERM) break;
assert (rc != -1);
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
switch (event.event) {
case ZMQ_EVENT_CONNECTED:
mon->on_event_connected(event.data.connected.addr);
break;
case ZMQ_EVENT_CONNECT_DELAYED:
mon->on_event_connect_delayed(event.data.connect_delayed.addr);
break;
case ZMQ_EVENT_CONNECT_RETRIED:
mon->on_event_connect_retried(event.data.connect_retried.addr);
break;
case ZMQ_EVENT_LISTENING:
mon->on_event_listening(event.data.listening.addr);
break;
case ZMQ_EVENT_BIND_FAILED:
mon->on_event_bind_failed(event.data.bind_failed.addr);
break;
case ZMQ_EVENT_ACCEPTED:
mon->on_event_accepted(event.data.accepted.addr);
break;
case ZMQ_EVENT_ACCEPT_FAILED:
mon->on_event_accept_failed(event.data.accept_failed.addr);
break;
case ZMQ_EVENT_CLOSED:
mon->on_event_closed(event.data.closed.addr);
break;
case ZMQ_EVENT_CLOSE_FAILED:
mon->on_event_close_failed(event.data.close_failed.addr);
break;
case ZMQ_EVENT_DISCONNECTED:
mon->on_event_disconnected(event.data.disconnected.addr);
break;
default:
mon->on_event_unknown(event.event);
break;
}
zmq_msg_close (&msg);
}
zmq_close (s);
}
inline size_t send (const void *buf_, size_t len_, int flags_ = 0)
{
int nbytes = zmq_send (ptr, buf_, len_, flags_);
@ -520,7 +442,6 @@ namespace zmq
}
private:
std::string monaddr;
void *ptr;
void *ctxptr;
@ -528,6 +449,119 @@ namespace zmq
void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;
};
class monitor_t
{
public:
monitor_t() : socketPtr(NULL) {}
virtual ~monitor_t() {}
void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
{
int rc = zmq_socket_monitor(socket.ptr, addr_, events);
if (rc != 0)
throw error_t ();
socketPtr = socket.ptr;
void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR);
assert (s);
rc = zmq_connect (s, addr_);
assert (rc == 0);
on_monitor_started();
while (true) {
zmq_msg_t eventMsg;
zmq_msg_init (&eventMsg);
rc = zmq_recvmsg (s, &eventMsg, 0);
if (rc == -1 && zmq_errno() == ETERM)
break;
assert (rc != -1);
zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data (&eventMsg));
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
zmq_msg_t addrMsg;
zmq_msg_init (&addrMsg);
rc = zmq_recvmsg (s, &addrMsg, 0);
if (rc == -1 && zmq_errno() == ETERM)
break;
assert (rc != -1);
const char* str = static_cast<const char*>(zmq_msg_data (&addrMsg));
std::string address(str, str + zmq_msg_size(&addrMsg));
zmq_msg_close (&addrMsg);
#else
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
std::string address = event->data.connected.addr;
#endif
#ifdef ZMQ_EVENT_MONITOR_STOPPED
if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
break;
#endif
switch (event->event) {
case ZMQ_EVENT_CONNECTED:
on_event_connected(*event, address.c_str());
break;
case ZMQ_EVENT_CONNECT_DELAYED:
on_event_connect_delayed(*event, address.c_str());
break;
case ZMQ_EVENT_CONNECT_RETRIED:
on_event_connect_retried(*event, address.c_str());
break;
case ZMQ_EVENT_LISTENING:
on_event_listening(*event, address.c_str());
break;
case ZMQ_EVENT_BIND_FAILED:
on_event_bind_failed(*event, address.c_str());
break;
case ZMQ_EVENT_ACCEPTED:
on_event_accepted(*event, address.c_str());
break;
case ZMQ_EVENT_ACCEPT_FAILED:
on_event_accept_failed(*event, address.c_str());
break;
case ZMQ_EVENT_CLOSED:
on_event_closed(*event, address.c_str());
break;
case ZMQ_EVENT_CLOSE_FAILED:
on_event_close_failed(*event, address.c_str());
break;
case ZMQ_EVENT_DISCONNECTED:
on_event_disconnected(*event, address.c_str());
break;
default:
on_event_unknown(*event, address.c_str());
break;
}
zmq_msg_close (&eventMsg);
}
zmq_close (s);
socketPtr = NULL;
}
#ifdef ZMQ_EVENT_MONITOR_STOPPED
void abort()
{
if (socketPtr)
zmq_socket_monitor(socketPtr, NULL, 0);
}
#endif
virtual void on_monitor_started() {}
virtual void on_event_connected(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_connect_delayed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_connect_retried(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_listening(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_bind_failed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_accepted(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_accept_failed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_closed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_close_failed(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_disconnected(const zmq_event_t &event_, const char* addr_) {}
virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) {}
private:
void* socketPtr;
};
}
#endif