diff --git a/src/radio.cpp b/src/radio.cpp index e22fbdfa..ed1ff4c1 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -37,7 +37,8 @@ #include "msg.hpp" zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) : - socket_base_t (parent_, tid_, sid_, true) + socket_base_t (parent_, tid_, sid_, true), + lossy (true) { options.type = ZMQ_RADIO; } @@ -99,6 +100,22 @@ void zmq::radio_t::xwrite_activated (pipe_t *pipe_) { dist.activated (pipe_); } +int zmq::radio_t::xsetsockopt (int option_, + const void *optval_, + size_t optvallen_) +{ + if (optvallen_ != sizeof (int) || *static_cast (optval_) < 0) { + errno = EINVAL; + return -1; + } + if (option_ == ZMQ_XPUB_NODROP) + lossy = (*static_cast (optval_) == 0); + else { + errno = EINVAL; + return -1; + } + return 0; +} void zmq::radio_t::xpipe_terminated (pipe_t *pipe_) { @@ -141,7 +158,13 @@ int zmq::radio_t::xsend (msg_t *msg_) ++it) dist.match (*it); - int rc = dist.send_to_matching (msg_); + int rc = -1; + if (lossy || dist.check_hwm ()) { + if (dist.send_to_matching (msg_) == 0) { + rc = 0; // Yay, sent successfully + } + } else + errno = EAGAIN; return rc; } diff --git a/src/radio.hpp b/src/radio.hpp index a5d80ab6..da8a4de8 100644 --- a/src/radio.hpp +++ b/src/radio.hpp @@ -61,6 +61,7 @@ class radio_t : public socket_base_t bool xhas_in (); void xread_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xpipe_terminated (zmq::pipe_t *pipe_); private: @@ -75,6 +76,9 @@ class radio_t : public socket_base_t // Distributor of messages holding the list of outbound pipes. dist_t dist; + // Drop messages if HWM reached, otherwise return with EAGAIN + bool lossy; + radio_t (const radio_t &); const radio_t &operator= (const radio_t &); };