diff --git a/src/dish.cpp b/src/dish.cpp index 45023d3f..2bd60f52 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -97,8 +97,7 @@ int zmq::dish_t::xjoin (const char* group_) return -1; } - subscriptions_t::iterator it = - std::find (subscriptions.begin (), subscriptions.end (), group); + subscriptions_t::iterator it = subscriptions.find (group); // User cannot join same group twice if (it != subscriptions.end ()) { @@ -106,7 +105,7 @@ int zmq::dish_t::xjoin (const char* group_) return -1; } - subscriptions.push_back (group); + subscriptions.insert (group); msg_t msg; int rc = msg.init_join (); @@ -185,15 +184,21 @@ int zmq::dish_t::xrecv (msg_t *msg_) return 0; } - // Get a message using fair queueing algorithm. - int rc = fq.recv (msg_); + while (true) { - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) - return -1; + // Get a message using fair queueing algorithm. + int rc = fq.recv (msg_); - return 0; + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) + return -1; + + // Filtering non matching messages + subscriptions_t::iterator it = subscriptions.find (std::string(msg_->group ())); + if (it != subscriptions.end ()) + return 0; + } } bool zmq::dish_t::xhas_in () @@ -203,18 +208,24 @@ bool zmq::dish_t::xhas_in () if (has_message) return true; - // Get a message using fair queueing algorithm. - int rc = fq.recv (&message); + while (true) { + // Get a message using fair queueing algorithm. + int rc = fq.recv (&message); - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) { - errno_assert (errno == EAGAIN); - return false; + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) { + errno_assert (errno == EAGAIN); + return false; + } + + // Filtering non matching messages + subscriptions_t::iterator it = subscriptions.find (std::string(message.group ())); + if (it != subscriptions.end ()) { + has_message = true; + return true; + } } - - has_message = true; - return true; } zmq::blob_t zmq::dish_t::get_credential () const diff --git a/src/dish.hpp b/src/dish.hpp index 7759a462..bb805ab9 100644 --- a/src/dish.hpp +++ b/src/dish.hpp @@ -81,7 +81,7 @@ namespace zmq dist_t dist; // The repository of subscriptions. - typedef std::vector subscriptions_t; + typedef std::set subscriptions_t; subscriptions_t subscriptions; // If true, 'message' contains a matching message to return on the diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp index 5cc93c33..560d330d 100644 --- a/tests/test_radio_dish.cpp +++ b/tests/test_radio_dish.cpp @@ -60,7 +60,7 @@ int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* bod if (recv_rc == -1) return -1; - if (strcmp (zmq_msg_group (msg_), group_) != 0) + if (strcmp (zmq_msg_group (msg_), group_) != 0) { zmq_msg_close (msg_); return -1;