mirror of
https://github.com/zeromq/cppzmq.git
synced 2025-03-02 20:30:14 +01:00
Problem: monitor_t::monitor function is blocking
This does not allaw to use monitor_t without a thread. What is often OK but sometimes not. Solution: keep existing interface but add a non blocking alternative.
This commit is contained in:
parent
b0e6d4bacd
commit
d4da63fed6
223
zmq.hpp
223
zmq.hpp
@ -703,8 +703,32 @@ namespace zmq
|
||||
class monitor_t
|
||||
{
|
||||
public:
|
||||
monitor_t() : socketPtr(NULL) {}
|
||||
virtual ~monitor_t() {}
|
||||
monitor_t() : socketPtr(NULL), monitor_socket{NULL} {}
|
||||
|
||||
virtual ~monitor_t()
|
||||
{
|
||||
if (socketPtr)
|
||||
zmq_socket_monitor(socketPtr, NULL, 0);
|
||||
|
||||
if (monitor_socket)
|
||||
zmq_close (monitor_socket);
|
||||
|
||||
}
|
||||
|
||||
|
||||
#ifdef ZMQ_HAS_RVALUE_REFS
|
||||
monitor_t(monitor_t&& rhs) ZMQ_NOTHROW :
|
||||
socketPtr(rhs.socketPtr),
|
||||
monitor_socket(rhs.monitor_socket)
|
||||
{
|
||||
rhs.socketPtr = NULL;
|
||||
rhs.monitor_socket = NULL;
|
||||
}
|
||||
|
||||
socket_t& operator=(socket_t&& rhs) ZMQ_DELETED_FUNCTION ;
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
void monitor(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL)
|
||||
{
|
||||
@ -712,104 +736,146 @@ namespace zmq
|
||||
}
|
||||
|
||||
void monitor(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
|
||||
{
|
||||
init (socket, addr_, events) ;
|
||||
while(true)
|
||||
{
|
||||
check_event(-1) ;
|
||||
}
|
||||
}
|
||||
|
||||
void init(socket_t &socket, std::string const& addr, int events = ZMQ_EVENT_ALL)
|
||||
{
|
||||
init(socket, addr.c_str(), events);
|
||||
}
|
||||
|
||||
void init(socket_t &socket, const char *addr_, int events = ZMQ_EVENT_ALL)
|
||||
{
|
||||
int rc = zmq_socket_monitor(socket.ptr, addr_, events);
|
||||
if (rc != 0)
|
||||
throw error_t ();
|
||||
|
||||
socketPtr = socket.ptr;
|
||||
void *s = zmq_socket (socket.ctxptr, ZMQ_PAIR);
|
||||
assert (s);
|
||||
monitor_socket = zmq_socket (socket.ctxptr, ZMQ_PAIR);
|
||||
assert (monitor_socket);
|
||||
|
||||
rc = zmq_connect (s, addr_);
|
||||
rc = zmq_connect (monitor_socket, addr_);
|
||||
assert (rc == 0);
|
||||
|
||||
on_monitor_started();
|
||||
}
|
||||
|
||||
while (true) {
|
||||
zmq_msg_t eventMsg;
|
||||
zmq_msg_init (&eventMsg);
|
||||
rc = zmq_msg_recv (&eventMsg, s, 0);
|
||||
bool check_event(int timeout = 0)
|
||||
{
|
||||
assert (monitor_socket);
|
||||
|
||||
zmq_msg_t eventMsg;
|
||||
zmq_msg_init (&eventMsg);
|
||||
|
||||
zmq::pollitem_t items [] = {
|
||||
{ monitor_socket, 0, ZMQ_POLLIN, 0 },
|
||||
};
|
||||
|
||||
zmq::poll (&items [0], 1, timeout);
|
||||
|
||||
if (items [0].revents & ZMQ_POLLIN)
|
||||
{
|
||||
int rc = zmq_msg_recv (&eventMsg, monitor_socket, 0);
|
||||
if (rc == -1 && zmq_errno() == ETERM)
|
||||
break;
|
||||
return false;
|
||||
assert (rc != -1);
|
||||
|
||||
}
|
||||
else
|
||||
{
|
||||
zmq_msg_close (&eventMsg);
|
||||
return false;
|
||||
}
|
||||
|
||||
#if ZMQ_VERSION_MAJOR >= 4
|
||||
const char* data = static_cast<const char*>(zmq_msg_data(&eventMsg));
|
||||
zmq_event_t msgEvent;
|
||||
memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t);
|
||||
memcpy(&msgEvent.value, data, sizeof(int32_t));
|
||||
zmq_event_t* event = &msgEvent;
|
||||
const char* data = static_cast<const char*>(zmq_msg_data(&eventMsg));
|
||||
zmq_event_t msgEvent;
|
||||
memcpy(&msgEvent.event, data, sizeof(uint16_t)); data += sizeof(uint16_t);
|
||||
memcpy(&msgEvent.value, data, sizeof(int32_t));
|
||||
zmq_event_t* event = &msgEvent;
|
||||
#else
|
||||
zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data(&eventMsg));
|
||||
zmq_event_t* event = static_cast<zmq_event_t*>(zmq_msg_data(&eventMsg));
|
||||
#endif
|
||||
|
||||
#ifdef ZMQ_NEW_MONITOR_EVENT_LAYOUT
|
||||
zmq_msg_t addrMsg;
|
||||
zmq_msg_init (&addrMsg);
|
||||
rc = zmq_msg_recv (&addrMsg, s, 0);
|
||||
if (rc == -1 && zmq_errno() == ETERM)
|
||||
break;
|
||||
assert (rc != -1);
|
||||
const char* str = static_cast<const char*>(zmq_msg_data (&addrMsg));
|
||||
std::string address(str, str + zmq_msg_size(&addrMsg));
|
||||
zmq_msg_close (&addrMsg);
|
||||
zmq_msg_t addrMsg;
|
||||
zmq_msg_init (&addrMsg);
|
||||
int rc = zmq_msg_recv (&addrMsg, monitor_socket, 0);
|
||||
if (rc == -1 && zmq_errno() == ETERM)
|
||||
{
|
||||
zmq_msg_close (&eventMsg);
|
||||
return false;
|
||||
}
|
||||
|
||||
assert (rc != -1);
|
||||
const char* str = static_cast<const char*>(zmq_msg_data (&addrMsg));
|
||||
std::string address(str, str + zmq_msg_size(&addrMsg));
|
||||
zmq_msg_close (&addrMsg);
|
||||
#else
|
||||
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
|
||||
std::string address = event->data.connected.addr;
|
||||
// Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
|
||||
std::string address = event->data.connected.addr;
|
||||
#endif
|
||||
|
||||
#ifdef ZMQ_EVENT_MONITOR_STOPPED
|
||||
if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
|
||||
break;
|
||||
if (event->event == ZMQ_EVENT_MONITOR_STOPPED)
|
||||
{
|
||||
zmq_msg_close (&eventMsg);
|
||||
return true;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
switch (event->event) {
|
||||
case ZMQ_EVENT_CONNECTED:
|
||||
on_event_connected(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_DELAYED:
|
||||
on_event_connect_delayed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_RETRIED:
|
||||
on_event_connect_retried(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_LISTENING:
|
||||
on_event_listening(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_BIND_FAILED:
|
||||
on_event_bind_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPTED:
|
||||
on_event_accepted(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPT_FAILED:
|
||||
on_event_accept_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSED:
|
||||
on_event_closed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSE_FAILED:
|
||||
on_event_close_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_DISCONNECTED:
|
||||
on_event_disconnected(*event, address.c_str());
|
||||
break;
|
||||
switch (event->event) {
|
||||
case ZMQ_EVENT_CONNECTED:
|
||||
on_event_connected(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_DELAYED:
|
||||
on_event_connect_delayed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CONNECT_RETRIED:
|
||||
on_event_connect_retried(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_LISTENING:
|
||||
on_event_listening(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_BIND_FAILED:
|
||||
on_event_bind_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPTED:
|
||||
on_event_accepted(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_ACCEPT_FAILED:
|
||||
on_event_accept_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSED:
|
||||
on_event_closed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_CLOSE_FAILED:
|
||||
on_event_close_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_DISCONNECTED:
|
||||
on_event_disconnected(*event, address.c_str());
|
||||
break;
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED:
|
||||
on_event_handshake_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
|
||||
on_event_handshake_succeed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_FAILED:
|
||||
on_event_handshake_failed(*event, address.c_str());
|
||||
break;
|
||||
case ZMQ_EVENT_HANDSHAKE_SUCCEED:
|
||||
on_event_handshake_succeed(*event, address.c_str());
|
||||
break;
|
||||
#endif
|
||||
default:
|
||||
on_event_unknown(*event, address.c_str());
|
||||
break;
|
||||
}
|
||||
zmq_msg_close (&eventMsg);
|
||||
default:
|
||||
on_event_unknown(*event, address.c_str());
|
||||
break;
|
||||
}
|
||||
zmq_close (s);
|
||||
socketPtr = NULL;
|
||||
zmq_msg_close (&eventMsg);
|
||||
|
||||
return true ;
|
||||
}
|
||||
|
||||
#ifdef ZMQ_EVENT_MONITOR_STOPPED
|
||||
@ -817,6 +883,12 @@ namespace zmq
|
||||
{
|
||||
if (socketPtr)
|
||||
zmq_socket_monitor(socketPtr, NULL, 0);
|
||||
|
||||
if (monitor_socket)
|
||||
zmq_close (monitor_socket);
|
||||
|
||||
socketPtr = NULL;
|
||||
monitor_socket = NULL;
|
||||
}
|
||||
#endif
|
||||
virtual void on_monitor_started() {}
|
||||
@ -834,7 +906,12 @@ namespace zmq
|
||||
virtual void on_event_handshake_succeed(const zmq_event_t &event_, const char* addr_) { (void) event_; (void) addr_; }
|
||||
virtual void on_event_unknown(const zmq_event_t &event_, const char* addr_) { (void)event_; (void)addr_; }
|
||||
private:
|
||||
|
||||
monitor_t (const monitor_t&) ZMQ_DELETED_FUNCTION;
|
||||
void operator = (const monitor_t&) ZMQ_DELETED_FUNCTION;
|
||||
|
||||
void* socketPtr;
|
||||
void *monitor_socket ;
|
||||
};
|
||||
|
||||
#if defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)
|
||||
|
Loading…
x
Reference in New Issue
Block a user