diff --git a/include/zmq.h b/include/zmq.h index 699f8417..6de1107e 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -541,8 +541,14 @@ ZMQ_EXPORT void zmq_atomic_counter_destroy (void **counter_p); /* Starts the stopwatch. Returns the handle to the watch. */ ZMQ_EXPORT void *zmq_stopwatch_start (void); +#ifdef ZMQ_BUILD_DRAFT_API +/* Returns the number of microseconds elapsed since the stopwatch was */ +/* started, but does not stop or deallocate the stopwatch. */ +ZMQ_EXPORT unsigned long zmq_stopwatch_intermediate (void *watch_); +#endif + /* Stops the stopwatch. Returns the number of microseconds elapsed since */ -/* the stopwatch was started. */ +/* the stopwatch was started, and deallocates that watch. */ ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_); /* Sleeps for specified number of seconds. */ diff --git a/src/select.cpp b/src/select.cpp index c21f3d84..e6945846 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -47,6 +47,8 @@ #include "config.hpp" #include "i_poll_events.hpp" +#include + zmq::select_t::select_t (const zmq::ctx_t &ctx_) : ctx (ctx_), #if defined ZMQ_HAVE_WINDOWS @@ -70,10 +72,13 @@ zmq::select_t::~select_t () stop (); worker.stop (); } + zmq_assert (get_load () == 0); } zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) { + zmq_assert (fd_ != retired_fd); + fd_entry_t fd_entry; fd_entry.fd = fd_; fd_entry.events = events_; @@ -92,7 +97,6 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) #endif adjust_load (1); - assert_load_consistent (); return fd_; } @@ -110,27 +114,6 @@ zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries, return fd_entry_it; } -size_t zmq::select_t::count_non_retired (const fd_entries_t &fd_entries) -{ - return fd_entries.size () - - std::count_if (fd_entries.begin (), fd_entries.end (), - is_retired_fd); -} - -void zmq::select_t::assert_load_consistent () const -{ -#ifdef _DEBUG - // check if load is consistent with number of non-retired fd entries - int expected = get_load (); - - for (family_entries_t::const_iterator family_it = family_entries.begin (); - family_it != family_entries.end (); ++family_it) { - expected -= (int) count_non_retired (family_it->second.fd_entries); - } - zmq_assert (expected == 0); -#endif -} - void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_, const fds_set_t &local_fds_set_, int event_count_) @@ -179,8 +162,13 @@ int zmq::select_t::try_retire_fd_entry ( fd_entries_t::iterator fd_entry_it = find_fd_entry_by_handle (family_entry.fd_entries, handle_); + if (fd_entry_it == family_entry.fd_entries.end ()) return 0; + + fd_entry_t &fd_entry = *fd_entry_it; + zmq_assert (fd_entry.fd != retired_fd); + if (family_entry_it != current_family_entry_it) { // Family is not currently being iterated and can be safely // modified in-place. So later it can be skipped without @@ -189,7 +177,7 @@ int zmq::select_t::try_retire_fd_entry ( } else { // Otherwise mark removed entries as retired. It will be cleaned up // at the end of the iteration. See zmq::select_t::loop - fd_entry_it->fd = retired_fd; + fd_entry.fd = retired_fd; family_entry.has_retired = true; } family_entry.fds_set.remove_fd (handle_); @@ -225,11 +213,12 @@ void zmq::select_t::rm_fd (handle_t handle_) find_fd_entry_by_handle (family_entry.fd_entries, handle_); assert (fd_entry_it != fd_entries.end ()); - ++retired; - + zmq_assert (fd_entry_it->fd != retired_fd); fd_entry_it->fd = retired_fd; family_entry.fds_set.remove_fd (handle_); + ++retired; + if (handle_ == maxfd) { maxfd = retired_fd; for (fd_entry_it = family_entry.fd_entries.begin (); @@ -242,8 +231,6 @@ void zmq::select_t::rm_fd (handle_t handle_) #endif zmq_assert (retired == 1); adjust_load (-1); - - assert_load_consistent (); } void zmq::select_t::set_pollin (handle_t handle_) @@ -302,11 +289,30 @@ int zmq::select_t::max_fds () return FD_SETSIZE; } +// TODO should this be configurable? +const int max_shutdown_timeout = 250; + void zmq::select_t::loop () { - while (!stopping) { + void *stopwatch = NULL; + while (!stopwatch || get_load ()) { + int max_timeout = INT_MAX; + if (stopping) { + if (stopwatch) { + max_timeout = max_shutdown_timeout + - (int) zmq_stopwatch_intermediate (stopwatch); + + // bail out eventually, when max_shutdown_timeout has reached, + // to avoid spinning forever in case of some error + zmq_assert (max_timeout > 0); + } else { + stopwatch = zmq_stopwatch_start (); + max_timeout = max_shutdown_timeout; + } + } + // Execute any due timers. - int timeout = (int) execute_timers (); + int timeout = std::min ((int) execute_timers (), max_timeout); #if defined ZMQ_HAVE_OSX struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000}; @@ -407,6 +413,7 @@ void zmq::select_t::loop () select_family_entry (family_entry, maxfd, timeout > 0, tv); #endif } + zmq_stopwatch_stop (stopwatch); } void zmq::select_t::select_family_entry (family_entry_t &family_entry_, diff --git a/src/select.hpp b/src/select.hpp index 9e253b53..ed85ee37 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -166,10 +166,6 @@ class select_t : public poller_base_t static fd_entries_t::iterator find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_); - static size_t - count_non_retired (const zmq::select_t::fd_entries_t &fd_entries); - void assert_load_consistent () const; - // If true, start has been called. bool started; diff --git a/src/zmq_draft.h b/src/zmq_draft.h index b3c1cbff..ae4494fc 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -37,6 +37,10 @@ #ifndef ZMQ_BUILD_DRAFT_API +/* Returns the number of microseconds elapsed since the stopwatch was */ +/* started, but does not stop or deallocate the stopwatch. */ +unsigned long zmq_stopwatch_intermediate (void *watch_); + /* DRAFT Socket types. */ #define ZMQ_SERVER 12 #define ZMQ_CLIENT 13 diff --git a/src/zmq_utils.cpp b/src/zmq_utils.cpp index c5243937..d07fc924 100644 --- a/src/zmq_utils.cpp +++ b/src/zmq_utils.cpp @@ -66,14 +66,20 @@ void *zmq_stopwatch_start () return (void *) watch; } -unsigned long zmq_stopwatch_stop (void *watch_) +unsigned long zmq_stopwatch_intermediate (void *watch_) { uint64_t end = zmq::clock_t::now_us (); uint64_t start = *(uint64_t *) watch_; - free (watch_); return (unsigned long) (end - start); } +unsigned long zmq_stopwatch_stop (void *watch_) +{ + unsigned long res = zmq_stopwatch_intermediate (watch_); + free (watch_); + return res; +} + void *zmq_threadstart (zmq_thread_fn *func, void *arg) { zmq::thread_t *thread = new (std::nothrow) zmq::thread_t; diff --git a/tests/test_timers.cpp b/tests/test_timers.cpp index f29ea519..1c2d4b00 100644 --- a/tests/test_timers.cpp +++ b/tests/test_timers.cpp @@ -154,13 +154,22 @@ int main (void) bool timer_invoked = false; - int timer_id = zmq_timers_add (timers, 100, handler, &timer_invoked); + const int full_timeout = 100; + void *const stopwatch = zmq_stopwatch_start (); + + int timer_id = + zmq_timers_add (timers, full_timeout, handler, &timer_invoked); assert (timer_id); - // Timer should be invoked yet + // Timer should not have been invoked yet int rc = zmq_timers_execute (timers); assert (rc == 0); - assert (!timer_invoked); + +#ifdef ZMQ_BUILD_DRAFT_API + if (zmq_stopwatch_intermediate (stopwatch) < full_timeout) { + assert (!timer_invoked); + } +#endif // Wait half the time and check again long timeout = zmq_timers_timeout (timers); @@ -168,7 +177,11 @@ int main (void) msleep (timeout / 2); rc = zmq_timers_execute (timers); assert (rc == 0); - assert (!timer_invoked); +#ifdef ZMQ_BUILD_DRAFT_API + if (zmq_stopwatch_intermediate (stopwatch) < full_timeout) { + assert (!timer_invoked); + } +#endif // Wait until the end rc = sleep_and_execute (timers); @@ -182,7 +195,11 @@ int main (void) msleep (timeout / 2); rc = zmq_timers_execute (timers); assert (rc == 0); - assert (!timer_invoked); +#ifdef ZMQ_BUILD_DRAFT_API + if (zmq_stopwatch_intermediate (stopwatch) < 2 * full_timeout) { + assert (!timer_invoked); + } +#endif // Reset timer and wait half of the time left rc = zmq_timers_reset (timers, timer_id); @@ -190,7 +207,9 @@ int main (void) msleep (timeout / 2); rc = zmq_timers_execute (timers); assert (rc == 0); - assert (!timer_invoked); + if (zmq_stopwatch_stop (stopwatch) < 2 * full_timeout) { + assert (!timer_invoked); + } // Wait until the end rc = sleep_and_execute (timers);