diff --git a/src/dish.cpp b/src/dish.cpp index 38725efd..4d6ea976 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -184,6 +184,11 @@ int zmq::dish_t::xrecv (msg_t *msg_) return 0; } + return xxrecv (msg_); +} + +int zmq::dish_t::xxrecv (msg_t *msg_) +{ do { // Get a message using fair queueing algorithm. const int rc = _fq.recv (msg_); @@ -207,19 +212,11 @@ bool zmq::dish_t::xhas_in () if (_has_message) return true; - do { - // Get a message using fair queueing algorithm. - const int rc = _fq.recv (&_message); - - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) { - errno_assert (errno == EAGAIN); - return false; - } - - // Filter out non matching messages - } while (0 == _subscriptions.count (std::string (_message.group ()))); + const int rc = xxrecv (&_message); + if (rc != 0) { + errno_assert (errno == EAGAIN); + return false; + } // Matching message found _has_message = true; diff --git a/src/dish.hpp b/src/dish.hpp index caf1d079..a1bc90db 100644 --- a/src/dish.hpp +++ b/src/dish.hpp @@ -68,6 +68,8 @@ class dish_t : public socket_base_t int xleave (const char *group_); private: + int xxrecv (zmq::msg_t *msg_); + // Send subscriptions to a pipe void send_subscriptions (pipe_t *pipe_);