From 48a1e637b6225fec266cb94ccdcf9ada59fa83fe Mon Sep 17 00:00:00 2001 From: sigiesec Date: Tue, 29 Aug 2017 11:51:01 +0200 Subject: [PATCH] Problem: zmq_socket_get_peer_state is not implemented Solution: add initial implementation --- src/router.cpp | 21 +++++++++++++++++++++ src/router.hpp | 1 + src/socket_base.cpp | 8 ++++++++ src/socket_base.hpp | 6 +++++- src/zmq.cpp | 9 ++++++--- 5 files changed, 41 insertions(+), 4 deletions(-) diff --git a/src/router.cpp b/src/router.cpp index 0e28c2ce..e9e71636 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -443,6 +443,27 @@ zmq::blob_t zmq::router_t::get_credential () const return fq.get_credential (); } +int zmq::router_t::get_peer_state (const void *identity, + size_t identity_size) const +{ + int res = 0; + + blob_t identity_blob ((unsigned char *) identity, identity_size); + outpipes_t::const_iterator it = outpipes.find (identity_blob); + if (it == outpipes.end ()) { + errno = EHOSTUNREACH; + return -1; + } + + const outpipe_t &outpipe = it->second; + if (outpipe.pipe->check_hwm ()) + res |= ZMQ_POLLOUT; + + /** \todo does it make any sense to check the inpipe as well? */ + + return res; +} + bool zmq::router_t::identify_peer (pipe_t *pipe_) { msg_t msg; diff --git a/src/router.hpp b/src/router.hpp index f693000e..f991aa67 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -64,6 +64,7 @@ namespace zmq void xread_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_); + int get_peer_state (const void *identity, size_t identity_size) const; protected: diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 44f904fd..60a22e52 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -221,6 +221,14 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_, bool } } +int zmq::socket_base_t::get_peer_state (const void *identity, + size_t identity_size) const +{ + // Only ROUTER sockets support this + errno = ENOTSUP; + return -1; +} + zmq::socket_base_t::~socket_base_t () { if (mailbox) diff --git a/src/socket_base.hpp b/src/socket_base.hpp index a8b100e1..b54af708 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -138,6 +138,11 @@ namespace zmq void event_handshake_failed_auth(const std::string &addr_, int err_); void event_handshake_succeeded(const std::string &addr_, int err_); + // Query the state of a specific peer. The default implementation + // always returns an ENOTSUP error. + virtual int get_peer_state (const void *identity, + size_t identity_size) const; + protected: socket_base_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_, bool thread_safe_ = false); @@ -180,7 +185,6 @@ namespace zmq // Delay actual destruction of the socket. void process_destroy (); - // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types std::string connect_rid; diff --git a/src/zmq.cpp b/src/zmq.cpp index 23a877c0..e39bfa65 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1358,12 +1358,15 @@ int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events_, int n_event // Peer-specific state -int zmq_socket_get_peer_state (void *socket, +int zmq_socket_get_peer_state (void *s_, const void *identity, size_t identity_size) { - errno = ENOTSUP; - return -1; + zmq::socket_base_t *s = as_socket_base_t (s_); + if (!s) + return -1; + + return s->get_peer_state (identity, identity_size); } // Timers