mirror of
https://github.com/zeromq/cppzmq.git
synced 2024-12-12 10:33:52 +01:00
Rework monitor_t.
This commit is contained in:
parent
c36e3065d2
commit
b34444d273
163
zmq.hpp
163
zmq.hpp
@ -285,24 +285,9 @@ namespace zmq
|
||||
void operator = (const context_t&);
|
||||
};
|
||||
|
||||
class monitor_t
|
||||
{
|
||||
public:
|
||||
virtual void on_event_connected(const char *addr_) = 0;
|
||||
virtual void on_event_connect_delayed(const char *addr_) = 0;
|
||||
virtual void on_event_connect_retried(const char *addr_) = 0;
|
||||
virtual void on_event_listening(const char *addr_) = 0;
|
||||
virtual void on_event_bind_failed(const char *addr_) = 0;
|
||||
virtual void on_event_accepted(const char *addr_) = 0;
|
||||
virtual void on_event_accept_failed(const char *addr_) = 0;
|
||||
virtual void on_event_closed(const char *addr_) = 0;
|
||||
virtual void on_event_close_failed(const char *addr_) = 0;
|
||||
virtual void on_event_disconnected(const char *addr_) = 0;
|
||||
virtual void on_event_unknown(int event) = 0;
|
||||
};
|
||||
|
||||
class socket_t
|
||||
{
|
||||
friend class monitor_t;
|
||||
public:
|
||||
|
||||
inline socket_t (context_t &context_, int type_)
|
||||
@ -361,14 +346,6 @@ namespace zmq
|
||||
throw error_t ();
|
||||
}
|
||||
|
||||
inline void init_monitor(const char *addr_, int events)
|
||||
{
|
||||
int rc = zmq_socket_monitor(ptr, addr_, events);
|
||||
if (rc != 0)
|
||||
throw error_t ();
|
||||
monaddr = std::string(addr_);
|
||||
}
|
||||
|
||||
inline void bind (const char *addr_)
|
||||
{
|
||||
int rc = zmq_bind (ptr, addr_);
|
||||
@ -402,66 +379,6 @@ namespace zmq
|
||||
return(ptr != NULL);
|
||||
}
|
||||
|
||||
inline void monitor (monitor_t* mon)
|
||||
{
|
||||
zmq_event_t event;
|
||||
int rc;
|
||||
|
||||
assert(mon);
|
||||
|
||||
void *s = zmq_socket (ctxptr, ZMQ_PAIR);
|
||||
assert (s);
|
||||
|
||||
rc = zmq_connect (s, monaddr.c_str());
|
||||
assert (rc == 0);
|
||||
while (true) {
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
rc = zmq_recvmsg (s, &msg, 0);
|
||||
if (rc == -1 && zmq_errno() == ETERM) break;
|
||||
assert (rc != -1);
|
||||
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
|
||||
|
||||
switch (event.event) {
|
||||
case ZMQ_EVENT_CONNECTED:
|
||||
mon->on_event_connected(event.data.connected.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_DELAYED:
|
||||
mon->on_event_connect_delayed(event.data.connect_delayed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_RETRIED:
|
||||
mon->on_event_connect_retried(event.data.connect_retried.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_LISTENING:
|
||||
mon->on_event_listening(event.data.listening.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_BIND_FAILED:
|
||||
mon->on_event_bind_failed(event.data.bind_failed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPTED:
|
||||
mon->on_event_accepted(event.data.accepted.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPT_FAILED:
|
||||
mon->on_event_accept_failed(event.data.accept_failed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSED:
|
||||
mon->on_event_closed(event.data.closed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSE_FAILED:
|
||||
mon->on_event_close_failed(event.data.close_failed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_DISCONNECTED:
|
||||
mon->on_event_disconnected(event.data.disconnected.addr);
|
||||
break;
|
||||
default:
|
||||
mon->on_event_unknown(event.event);
|
||||
break;
|
||||
}
|
||||
zmq_msg_close (&msg);
|
||||
}
|
||||
zmq_close (s);
|
||||
}
|
||||
|
||||
inline size_t send (const void *buf_, size_t len_, int flags_ = 0)
|
||||
{
|
||||
int nbytes = zmq_send (ptr, buf_, len_, flags_);
|
||||
@ -503,7 +420,6 @@ namespace zmq
|
||||
}
|
||||
|
||||
private:
|
||||
std::string monaddr;
|
||||
void *ptr;
|
||||
void *ctxptr;
|
||||
|
||||
@ -511,6 +427,83 @@ namespace zmq
|
||||
void operator = (const socket_t&) ZMQ_DELETED_FUNCTION;
|
||||
};
|
||||
|
||||
class monitor_t
|
||||
{
|
||||
public:
|
||||
virtual ~monitor_t() {}
|
||||
|
||||
void monitor(socket_t& socket, const char *addr_, int events = ZMQ_EVENT_ALL)
|
||||
{
|
||||
int rc = zmq_socket_monitor(socket.ptr, addr_, events);
|
||||
if (rc != 0)
|
||||
throw error_t ();
|
||||
|
||||
zmq_event_t event;
|
||||
void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR);
|
||||
assert (s);
|
||||
|
||||
rc = zmq_connect (s, addr_);
|
||||
assert (rc == 0);
|
||||
while (true) {
|
||||
zmq_msg_t msg;
|
||||
zmq_msg_init (&msg);
|
||||
rc = zmq_recvmsg (s, &msg, 0);
|
||||
if (rc == -1 && zmq_errno() == ETERM) break;
|
||||
assert (rc != -1);
|
||||
memcpy (&event, zmq_msg_data (&msg), sizeof (event));
|
||||
|
||||
switch (event.event) {
|
||||
case ZMQ_EVENT_CONNECTED:
|
||||
on_event_connected(event.data.connected.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_DELAYED:
|
||||
on_event_connect_delayed(event.data.connect_delayed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_RETRIED:
|
||||
on_event_connect_retried(event.data.connect_retried.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_LISTENING:
|
||||
on_event_listening(event.data.listening.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_BIND_FAILED:
|
||||
on_event_bind_failed(event.data.bind_failed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPTED:
|
||||
on_event_accepted(event.data.accepted.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPT_FAILED:
|
||||
on_event_accept_failed(event.data.accept_failed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSED:
|
||||
on_event_closed(event.data.closed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSE_FAILED:
|
||||
on_event_close_failed(event.data.close_failed.addr);
|
||||
break;
|
||||
case ZMQ_EVENT_DISCONNECTED:
|
||||
on_event_disconnected(event.data.disconnected.addr);
|
||||
break;
|
||||
default:
|
||||
on_event_unknown(event.event);
|
||||
break;
|
||||
}
|
||||
zmq_msg_close (&msg);
|
||||
}
|
||||
zmq_close (s);
|
||||
}
|
||||
|
||||
virtual void on_event_connected(const char *addr_) {}
|
||||
virtual void on_event_connect_delayed(const char *addr_) {}
|
||||
virtual void on_event_connect_retried(const char *addr_) {}
|
||||
virtual void on_event_listening(const char *addr_) {}
|
||||
virtual void on_event_bind_failed(const char *addr_) {}
|
||||
virtual void on_event_accepted(const char *addr_) {}
|
||||
virtual void on_event_accept_failed(const char *addr_) {}
|
||||
virtual void on_event_closed(const char *addr_) {}
|
||||
virtual void on_event_close_failed(const char *addr_) {}
|
||||
virtual void on_event_disconnected(const char *addr_) {}
|
||||
virtual void on_event_unknown(int event) {}
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
||||
|
Loading…
Reference in New Issue
Block a user