diff --git a/src/ctx.cpp b/src/ctx.cpp index a2802372..69dd8c22 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -448,10 +448,7 @@ int zmq::thread_ctx_t::set (int option_, int optval_) _thread_affinity_cpus.insert (optval_); } else if (option_ == ZMQ_THREAD_AFFINITY_CPU_REMOVE && optval_ >= 0) { scoped_lock_t locker (_opt_sync); - std::set::iterator it = _thread_affinity_cpus.find (optval_); - if (it != _thread_affinity_cpus.end ()) { - _thread_affinity_cpus.erase (it); - } else { + if (0 == _thread_affinity_cpus.erase (optval_)) { errno = EINVAL; rc = -1; } @@ -531,15 +528,16 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) { scoped_lock_t locker (_endpoints_sync); - endpoints_t::iterator it = _endpoints.begin (); - while (it != _endpoints.end ()) { - if (it->second.socket == socket_) { - endpoints_t::iterator to_erase = it; + for (endpoints_t::iterator it = _endpoints.begin (); + it != _endpoints.end ();) { + if (it->second.socket == socket_) +#if __cplusplus >= 201103L + it = _endpoints.erase (it); +#else + _endpoints.erase (it++); +#endif + else ++it; - _endpoints.erase (to_erase); - continue; - } - ++it; } } diff --git a/src/dish.cpp b/src/dish.cpp index 72611dff..4d6ea976 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -137,16 +137,11 @@ int zmq::dish_t::xleave (const char *group_) return -1; } - subscriptions_t::iterator it = - std::find (_subscriptions.begin (), _subscriptions.end (), group); - - if (it == _subscriptions.end ()) { + if (0 == _subscriptions.erase (group)) { errno = EINVAL; return -1; } - _subscriptions.erase (it); - msg_t msg; int rc = msg.init_leave (); errno_assert (rc == 0); @@ -183,27 +178,31 @@ int zmq::dish_t::xrecv (msg_t *msg_) // If there's already a message prepared by a previous call to zmq_poll, // return it straight ahead. if (_has_message) { - int rc = msg_->move (_message); + const int rc = msg_->move (_message); errno_assert (rc == 0); _has_message = false; return 0; } - while (true) { + return xxrecv (msg_); +} + +int zmq::dish_t::xxrecv (msg_t *msg_) +{ + do { // Get a message using fair queueing algorithm. - int rc = _fq.recv (msg_); + const int rc = _fq.recv (msg_); // If there's no message available, return immediately. // The same when error occurs. if (rc != 0) return -1; - // Filtering non matching messages - subscriptions_t::iterator it = - _subscriptions.find (std::string (msg_->group ())); - if (it != _subscriptions.end ()) - return 0; - } + // Skip non matching messages + } while (0 == _subscriptions.count (std::string (msg_->group ()))); + + // Found a matching message + return 0; } bool zmq::dish_t::xhas_in () @@ -213,25 +212,15 @@ bool zmq::dish_t::xhas_in () if (_has_message) return true; - while (true) { - // Get a message using fair queueing algorithm. - int rc = _fq.recv (&_message); - - // If there's no message available, return immediately. - // The same when error occurs. - if (rc != 0) { - errno_assert (errno == EAGAIN); - return false; - } - - // Filtering non matching messages - subscriptions_t::iterator it = - _subscriptions.find (std::string (_message.group ())); - if (it != _subscriptions.end ()) { - _has_message = true; - return true; - } + const int rc = xxrecv (&_message); + if (rc != 0) { + errno_assert (errno == EAGAIN); + return false; } + + // Matching message found + _has_message = true; + return true; } const zmq::blob_t &zmq::dish_t::get_credential () const diff --git a/src/dish.hpp b/src/dish.hpp index caf1d079..a1bc90db 100644 --- a/src/dish.hpp +++ b/src/dish.hpp @@ -68,6 +68,8 @@ class dish_t : public socket_base_t int xleave (const char *group_); private: + int xxrecv (zmq::msg_t *msg_); + // Send subscriptions to a pipe void send_subscriptions (pipe_t *pipe_); diff --git a/src/ip_resolver.cpp b/src/ip_resolver.cpp index 7aaf0103..26203319 100644 --- a/src/ip_resolver.cpp +++ b/src/ip_resolver.cpp @@ -604,8 +604,7 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_) const int max_attempts = 10; int iterations = 0; - IP_ADAPTER_ADDRESSES *addresses = NULL; - IP_ADAPTER_ADDRESSES *current_addresses = NULL; + IP_ADAPTER_ADDRESSES *addresses; unsigned long out_buf_len = sizeof (IP_ADAPTER_ADDRESSES); do { @@ -627,27 +626,25 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_) } while ((rc == ERROR_BUFFER_OVERFLOW) && (iterations < max_attempts)); if (rc == 0) { - current_addresses = addresses; - while (current_addresses) { + for (const IP_ADAPTER_ADDRESSES *current_addresses = addresses; + current_addresses; current_addresses = current_addresses->Next) { char *if_name = NULL; char *if_friendly_name = NULL; - int str_rc1, str_rc2; - str_rc1 = get_interface_name (current_addresses->IfIndex, &if_name); - str_rc2 = wchar_to_utf8 (current_addresses->FriendlyName, - &if_friendly_name); + const int str_rc1 = + get_interface_name (current_addresses->IfIndex, &if_name); + const int str_rc2 = wchar_to_utf8 (current_addresses->FriendlyName, + &if_friendly_name); // Find a network adapter by its "name" or "friendly name" if (((str_rc1 == 0) && (!strcmp (nic_, if_name))) || ((str_rc2 == 0) && (!strcmp (nic_, if_friendly_name)))) { // Iterate over all unicast addresses bound to the current network interface - IP_ADAPTER_UNICAST_ADDRESS *unicast_address = - current_addresses->FirstUnicastAddress; - IP_ADAPTER_UNICAST_ADDRESS *current_unicast_address = - unicast_address; - - while (current_unicast_address) { - ADDRESS_FAMILY family = + for (const IP_ADAPTER_UNICAST_ADDRESS *current_unicast_address = + current_addresses->FirstUnicastAddress; + current_unicast_address; + current_unicast_address = current_unicast_address->Next) { + const ADDRESS_FAMILY family = current_unicast_address->Address.lpSockaddr->sa_family; if (family == (_options.ipv6 () ? AF_INET6 : AF_INET)) { @@ -658,8 +655,6 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_) found = true; break; } - - current_unicast_address = current_unicast_address->Next; } if (found) @@ -670,8 +665,6 @@ int zmq::ip_resolver_t::resolve_nic_name (ip_addr_t *ip_addr_, const char *nic_) free (if_name); if (str_rc2 == 0) free (if_friendly_name); - - current_addresses = current_addresses->Next; } free (addresses); diff --git a/src/mailbox_safe.cpp b/src/mailbox_safe.cpp index 189a7716..624e8543 100644 --- a/src/mailbox_safe.cpp +++ b/src/mailbox_safe.cpp @@ -32,6 +32,8 @@ #include "clock.hpp" #include "err.hpp" +#include + zmq::mailbox_safe_t::mailbox_safe_t (mutex_t *sync_) : _sync (sync_) { // Get the pipe into passive state. That way, if the users starts by @@ -58,13 +60,9 @@ void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_) void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_) { - std::vector::iterator it = _signalers.begin (); - // TODO: make a copy of array and signal outside the lock - for (; it != _signalers.end (); ++it) { - if (*it == signaler_) - break; - } + std::vector::iterator it = + std::find (_signalers.begin (), _signalers.end (), signaler_); if (it != _signalers.end ()) _signalers.erase (it); diff --git a/src/own.cpp b/src/own.cpp index 9874a41e..e7f3c6cd 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -102,15 +102,12 @@ void zmq::own_t::process_term_req (own_t *object_) if (_terminating) return; - // If I/O object is well and alive let's ask it to terminate. - owned_t::iterator it = std::find (_owned.begin (), _owned.end (), object_); - // If not found, we assume that termination request was already sent to // the object so we can safely ignore the request. - if (it == _owned.end ()) + if (0 == _owned.erase (object_)) return; - _owned.erase (it); + // If I/O object is well and alive let's ask it to terminate. register_term_acks (1); // Note that this object is the root of the (partial shutdown) thus, its diff --git a/src/poller_base.cpp b/src/poller_base.cpp index 913456fa..ebcc6997 100644 --- a/src/poller_base.cpp +++ b/src/poller_base.cpp @@ -82,29 +82,32 @@ uint64_t zmq::poller_base_t::execute_timers () return 0; // Get the current time. - uint64_t current = _clock.now_ms (); + const uint64_t current = _clock.now_ms (); // Execute the timers that are already due. - timers_t::iterator it = _timers.begin (); - while (it != _timers.end ()) { + const timers_t::iterator begin = _timers.begin (); + const timers_t::iterator end = _timers.end (); + uint64_t res = 0; + timers_t::iterator it = begin; + for (; it != end; ++it) { // If we have to wait to execute the item, same will be true about // all the following items (multimap is sorted). Thus we can stop - // checking the subsequent timers and return the time to wait for - // the next timer (at least 1ms). - if (it->first > current) - return it->first - current; + // checking the subsequent timers. + if (it->first > current) { + res = it->first - current; + break; + } // Trigger the timer. it->second.sink->timer_event (it->second.id); - - // Remove it from the list of active timers. - timers_t::iterator o = it; - ++it; - _timers.erase (o); } - // There are no more timers. - return 0; + // Remove them from the list of active timers. + _timers.erase (begin, it); + + // Return the time to wait for the next timer (at least 1ms), or 0, if + // there are no more timers. + return res; } zmq::worker_poller_base_t::worker_poller_base_t (const thread_ctx_t &ctx_) : diff --git a/src/radio.cpp b/src/radio.cpp index e0a78b62..e31e0dc2 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -122,12 +122,14 @@ int zmq::radio_t::xsetsockopt (int option_, void zmq::radio_t::xpipe_terminated (pipe_t *pipe_) { - // NOTE: erase invalidates an iterator, and that's why it's not incrementing in post-loop - // read-after-free caught by Valgrind, see https://github.com/zeromq/libzmq/pull/1771 for (subscriptions_t::iterator it = _subscriptions.begin (); it != _subscriptions.end ();) { if (it->second == pipe_) { +#if __cplusplus >= 201103L + it = _subscriptions.erase (it); +#else _subscriptions.erase (it++); +#endif } else { ++it; } diff --git a/src/router.cpp b/src/router.cpp index 627a989e..03d117cd 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -150,10 +150,7 @@ int zmq::router_t::xsetsockopt (int option_, void zmq::router_t::xpipe_terminated (pipe_t *pipe_) { - std::set::iterator it = _anonymous_pipes.find (pipe_); - if (it != _anonymous_pipes.end ()) - _anonymous_pipes.erase (it); - else { + if (0 == _anonymous_pipes.erase (pipe_)) { erase_out_pipe (pipe_); _fq.pipe_terminated (pipe_); pipe_->rollback (); diff --git a/src/signaler.cpp b/src/signaler.cpp index b6b765b6..a80debd4 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -187,15 +187,10 @@ void zmq::signaler_t::send () errno_assert (sz == sizeof (inc)); #elif defined ZMQ_HAVE_WINDOWS unsigned char dummy = 0; - while (true) { - int nbytes = - ::send (_w, reinterpret_cast (&dummy), sizeof (dummy), 0); - wsa_assert (nbytes != SOCKET_ERROR); - if (unlikely (nbytes == SOCKET_ERROR)) - continue; - zmq_assert (nbytes == sizeof (dummy)); - break; - } + const int nbytes = + ::send (_w, reinterpret_cast (&dummy), sizeof (dummy), 0); + wsa_assert (nbytes != SOCKET_ERROR); + zmq_assert (nbytes == sizeof (dummy)); #elif defined ZMQ_HAVE_VXWORKS unsigned char dummy = 0; while (true) { diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fcead300..de4c58fc 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1317,12 +1317,7 @@ void zmq::socket_base_t::start_reaping (poller_t *poller_) int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) { - int rc; - command_t cmd; - if (timeout_ != 0) { - // If we are asked to wait, simply ask mailbox to wait. - rc = _mailbox->recv (&cmd, timeout_); - } else { + if (timeout_ == 0) { // If we are asked not to wait, check whether we haven't processed // commands recently, so that we can throttle the new commands. @@ -1343,11 +1338,12 @@ int zmq::socket_base_t::process_commands (int timeout_, bool throttle_) return 0; _last_tsc = tsc; } - - // Check whether there are any commands pending for this thread. - rc = _mailbox->recv (&cmd, 0); } + // Check whether there are any commands pending for this thread. + command_t cmd; + int rc = _mailbox->recv (&cmd, timeout_); + // Process all available commands. while (rc == 0) { cmd.destination->process_command (cmd); diff --git a/src/timers.cpp b/src/timers.cpp index e981957f..eb792ec7 100644 --- a/src/timers.cpp +++ b/src/timers.cpp @@ -135,65 +135,50 @@ int zmq::timers_t::reset (int timer_id_) long zmq::timers_t::timeout () { - timersmap_t::iterator it = _timers.begin (); + const uint64_t now = _clock.now_ms (); + long res = -1; - uint64_t now = _clock.now_ms (); - - while (it != _timers.end ()) { - cancelled_timers_t::iterator cancelled_it = - _cancelled_timers.find (it->second.timer_id); - - // Live timer, lets return the timeout - if (cancelled_it == _cancelled_timers.end ()) { - if (it->first > now) - return static_cast (it->first - now); - - return 0; + const timersmap_t::iterator begin = _timers.begin (); + const timersmap_t::iterator end = _timers.end (); + timersmap_t::iterator it = begin; + for (; it != end; ++it) { + if (0 == _cancelled_timers.erase (it->second.timer_id)) { + // Live timer, lets return the timeout + res = it->first > now ? static_cast (it->first - now) : 0; + break; } - - // Let's remove it from the beginning of the list - timersmap_t::iterator old = it; - ++it; - _timers.erase (old); - _cancelled_timers.erase (cancelled_it); } - // Wait forever as no timers are alive - return -1; + // Remove timed-out timers + _timers.erase (begin, it); + + return res; } int zmq::timers_t::execute () { + const uint64_t now = _clock.now_ms (); + + const timersmap_t::iterator begin = _timers.begin (); + const timersmap_t::iterator end = _timers.end (); timersmap_t::iterator it = _timers.begin (); + for (; it != end; ++it) { + if (0 == _cancelled_timers.erase (it->second.timer_id)) { + // Timer is not cancelled - uint64_t now = _clock.now_ms (); + // Map is ordered, if we have to wait for current timer we can stop. + if (it->first > now) + break; - while (it != _timers.end ()) { - cancelled_timers_t::iterator cancelled_it = - _cancelled_timers.find (it->second.timer_id); + const timer_t &timer = it->second; - // Dead timer, lets remove it and continue - if (cancelled_it != _cancelled_timers.end ()) { - timersmap_t::iterator old = it; - ++it; - _timers.erase (old); - _cancelled_timers.erase (cancelled_it); - continue; + timer.handler (timer.timer_id, timer.arg); + + _timers.insert ( + timersmap_t::value_type (now + timer.interval, timer)); } - - // Map is ordered, if we have to wait for current timer we can stop. - if (it->first > now) - break; - - timer_t timer = it->second; - - timer.handler (timer.timer_id, timer.arg); - - timersmap_t::iterator old = it; - ++it; - _timers.erase (old); - _timers.insert (timersmap_t::value_type (now + timer.interval, timer)); } + _timers.erase (begin, it); return 0; }