From aadaf99011c9fb4ce7b04ec22a776fa7a9b93515 Mon Sep 17 00:00:00 2001 From: somdoron Date: Fri, 18 Dec 2015 12:12:18 +0200 Subject: [PATCH] add timers API to libzmq --- .gitignore | 1 + CMakeLists.txt | 1 + Makefile.am | 9 ++- include/zmq.h | 16 +++- src/timers.cpp | 169 ++++++++++++++++++++++++++++++++++++++++++ src/timers.hpp | 108 +++++++++++++++++++++++++++ src/zmq.cpp | 89 +++++++++++++++++++++- tests/CMakeLists.txt | 1 + tests/test_timers.cpp | 126 +++++++++++++++++++++++++++++++ 9 files changed, 513 insertions(+), 7 deletions(-) create mode 100644 src/timers.cpp create mode 100644 src/timers.hpp create mode 100644 tests/test_timers.cpp diff --git a/.gitignore b/.gitignore index 1efa8ac8..69a5c137 100644 --- a/.gitignore +++ b/.gitignore @@ -117,6 +117,7 @@ test_getsockopt_memset test_setsockopt test_stream_exceeds_buffer test_poller +test_timers tests/test*.log tests/test*.trs src/platform.hpp* diff --git a/CMakeLists.txt b/CMakeLists.txt index 1b06670e..48a49c23 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -449,6 +449,7 @@ set(cxx-sources zmq_utils.cpp decoder_allocators.cpp socket_poller.cpp + timers.cpp config.hpp) set(rc-sources version.rc) diff --git a/Makefile.am b/Makefile.am index 209af418..ca152a38 100644 --- a/Makefile.am +++ b/Makefile.am @@ -179,6 +179,8 @@ src_libzmq_la_SOURCES = \ src/tcp_listener.hpp \ src/thread.cpp \ src/thread.hpp \ + src/timers.cpp \ + src/timers.hpp \ src/tipc_address.cpp \ src/tipc_address.hpp \ src/tipc_connecter.cpp \ @@ -376,7 +378,8 @@ test_apps = \ tests/test_socketopt_hwm \ tests/test_heartbeats \ tests/test_stream_exceeds_buffer \ - tests/test_poller + tests/test_poller \ + tests/test_timers tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_LDADD = src/libzmq.la @@ -587,6 +590,9 @@ tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = src/libzmq.la +tests_test_timers_SOURCES = tests/test_timers.cpp +tests_test_timers_LDADD = src/libzmq.la + if !ON_MINGW if !ON_CYGWIN @@ -729,4 +735,3 @@ dist-hook: maintainer-clean-local: -rm -rf $(top_srcdir)/config - diff --git a/include/zmq.h b/include/zmq.h index f53283f8..88525bd8 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -451,6 +451,21 @@ ZMQ_EXPORT int zmq_poller_modify_fd (void *poller, int fd, short events); ZMQ_EXPORT int zmq_poller_remove_fd (void *poller, int fd); #endif +/******************************************************************************/ +/* Scheduling timers */ +/******************************************************************************/ + +typedef void (zmq_timer_fn)(int timer_id, void *arg); + +ZMQ_EXPORT void *zmq_timers_new (); +ZMQ_EXPORT int zmq_timers_close (void *timers); +ZMQ_EXPORT int zmq_timers_add (void *timers, size_t interval, zmq_timer_fn handler, void *arg); +ZMQ_EXPORT int zmq_timers_cancel (void *timers, int timer_id); +ZMQ_EXPORT int zmq_timers_set_interval (void *timers, int timer_id, size_t interval); +ZMQ_EXPORT int zmq_timers_reset (void *timers, int timer_id); +ZMQ_EXPORT long zmq_timers_timeout (void *timers); +ZMQ_EXPORT int zmq_timers_execute (void *timers); + /******************************************************************************/ /* Message proxying */ /******************************************************************************/ @@ -542,4 +557,3 @@ ZMQ_EXPORT void zmq_threadclose (void* thread); #endif #endif - diff --git a/src/timers.cpp b/src/timers.cpp new file mode 100644 index 00000000..f2f3ee99 --- /dev/null +++ b/src/timers.cpp @@ -0,0 +1,169 @@ +/* +Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + +This file is part of libzmq, the ZeroMQ core engine in C++. + +libzmq is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License (LGPL) as published +by the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +As a special exception, the Contributors give you permission to link +this library with independent modules to produce an executable, +regardless of the license terms of these independent modules, and to +copy and distribute the resulting executable under terms of your choice, +provided that you also meet, for each linked independent module, the +terms and conditions of the license of that module. An independent +module is a module which is not derived from or based on this library. +If you modify this library, you must extend this exception to your +version of the library. + +libzmq is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see . +*/ + +#include "timers.hpp" +#include "err.hpp" + +zmq::timers_t::timers_t () : +tag (0xCAFEDADA), +next_timer_id (0) +{ + +} + +zmq::timers_t::~timers_t () +{ + // Mark the timers as dead + tag = 0xdeadbeef; +} + +bool zmq::timers_t::check_tag () +{ + return tag == 0xCAFEDADA; +} + +int zmq::timers_t::add (size_t interval_, timers_timer_fn handler_, void* arg_) +{ + uint64_t when = clock.now_ms() + interval_; + timer_t timer = {++next_timer_id, interval_, handler_, arg_}; + timers.insert (timersmap_t::value_type (when, timer)); + + return timer.timer_id; +} + +int zmq::timers_t::cancel (int timer_id_) +{ + cancelled_timers_t::iterator it = cancelled_timers.find (timer_id_); + + if (it != cancelled_timers.end ()) { + errno = EINVAL; + return -1; + } + + cancelled_timers.insert (timer_id_); + + return 0; +} + +int zmq::timers_t::set_interval (int timer_id_, size_t interval_) +{ + for (timersmap_t::iterator it = timers.begin (); it != timers.end (); ++it) { + if (it->second.timer_id == timer_id_) { + timer_t timer = it->second; + timer.interval = interval_; + uint64_t when = clock.now_ms() + interval_; + timers.erase (it); + timers.insert (timersmap_t::value_type (when, timer)); + + return 0; + } + } + + errno = EINVAL; + return -1; +} + +int zmq::timers_t::reset (int timer_id_) { + for (timersmap_t::iterator it = timers.begin (); it != timers.end (); ++it) { + if (it->second.timer_id == timer_id_) { + timer_t timer = it->second; + uint64_t when = clock.now_ms() + timer.interval; + timers.erase (it); + timers.insert (timersmap_t::value_type (when, timer)); + + return 0; + } + } + + errno = EINVAL; + return -1; +} + +long zmq::timers_t::timeout () +{ + timersmap_t::iterator it = timers.begin (); + + uint64_t now = clock.now_ms(); + + while (it != timers.end ()) { + cancelled_timers_t::iterator cancelled_it = cancelled_timers.find (it->second.timer_id); + + // Live timer, lets return the timeout + if (cancelled_it == cancelled_timers.end ()) { + if (it->first > now) + return it->first - now; + else + return 0; + } + + // Let's remove it from the begining of the list + timersmap_t::iterator old = it; + ++it; + timers.erase (old); + cancelled_timers.erase (cancelled_it); + } + + // Wait forever as no timers are alive + return -1; +} + +int zmq::timers_t::execute () +{ + timersmap_t::iterator it = timers.begin (); + + uint64_t now = clock.now_ms(); + + while (it != timers.end ()) { + cancelled_timers_t::iterator cancelled_it = cancelled_timers.find (it->second.timer_id); + + // Dead timer, lets remove it and continue + if (cancelled_it != cancelled_timers.end ()) { + timersmap_t::iterator old = it; + ++it; + timers.erase (old); + cancelled_timers.erase (cancelled_it); + continue; + } + + // Map is ordered, if we have to wait for current timer we can stop. + if (it->first > now) + break; + + timer_t timer = it->second; + + timer.handler (timer.timer_id, timer.arg); + + timersmap_t::iterator old = it; + ++it; + timers.erase (old); + timers.insert (timersmap_t::value_type (now + timer.interval, timer)); + } + + return 0; +} diff --git a/src/timers.hpp b/src/timers.hpp new file mode 100644 index 00000000..86cab51b --- /dev/null +++ b/src/timers.hpp @@ -0,0 +1,108 @@ +/* +Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + +This file is part of libzmq, the ZeroMQ core engine in C++. + +libzmq is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License (LGPL) as published +by the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +As a special exception, the Contributors give you permission to link +this library with independent modules to produce an executable, +regardless of the license terms of these independent modules, and to +copy and distribute the resulting executable under terms of your choice, +provided that you also meet, for each linked independent module, the +terms and conditions of the license of that module. An independent +module is a module which is not derived from or based on this library. +If you modify this library, you must extend this exception to your +version of the library. + +libzmq is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see . +*/ + +#ifndef __ZMQ_TIMERS_HPP_INCLUDED__ +#define __ZMQ_TIMERS_HPP_INCLUDED__ + +#include +#include +#include + +#include "clock.hpp" + +namespace zmq +{ + typedef void (timers_timer_fn)( + int timer_id, void *arg); + + class timers_t + { + public: + timers_t (); + ~timers_t (); + + // Add timer to the set, timer repeats forever, or until cancel is called. + // Returns a timer_id that is used to cancel the timer. + // Returns -1 if there was an error. + int add (size_t interval, timers_timer_fn handler, void* arg); + + // Set the interval of the timer. + // This method is slow, cancelling exsting and adding a new timer yield better performance. + // Returns 0 on success and -1 on error. + int set_interval (int timer_id, size_t interval); + + // Reset the timer. + // This method is slow, cancelling exsting and adding a new timer yield better performance. + // Returns 0 on success and -1 on error. + int reset (int timer_id); + + // Cancel a timer. + // Returns 0 on success and -1 on error. + int cancel (int timer_id); + + // Returns the time in millisecond until the next timer. + // Returns -1 if no timer is due. + long timeout (); + + // Execute timers. + // Return 0 if all succeed and -1 if error. + int execute (); + + // Return false if object is not a timers class. + bool check_tag (); + + private: + + // Used to check whether the object is a timers class. + uint32_t tag; + + int next_timer_id; + + // Clock instance. + clock_t clock; + + typedef struct timer_t { + int timer_id; + size_t interval; + timers_timer_fn *handler; + void *arg; + } timer_t; + + typedef std::multimap timersmap_t; + timersmap_t timers; + + typedef std::set cancelled_timers_t; + cancelled_timers_t cancelled_timers; + + timers_t (const timers_t&); + const timers_t &operator = (const timers_t&); + }; + } + + #endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 2e7318c1..63af37e9 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -77,6 +77,7 @@ struct iovec { #include "metadata.hpp" #include "signaler.hpp" #include "socket_poller.hpp" +#include "timers.hpp" #if !defined ZMQ_HAVE_WINDOWS #include @@ -1044,7 +1045,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // The poller functionality -void* zmq_poller_new () +void* zmq_poller_new () { zmq::socket_poller_t *poller = new (std::nothrow) zmq::socket_poller_t; alloc_assert (poller); @@ -1130,7 +1131,7 @@ int zmq_poller_remove (void *poller_, void *s_) if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { errno = EFAULT; return -1; - } + } if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) { errno = ENOTSOCK; @@ -1154,7 +1155,7 @@ int zmq_poller_remove_fd (void *poller_, int fd_) return ((zmq::socket_poller_t*)poller_)->remove_fd (fd_); } - + int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_) { @@ -1169,12 +1170,92 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_) event->socket = e.socket; event->fd = e.fd; - event->user_data = e.user_data; + event->user_data = e.user_data; event->events = e.events; return rc; } +// Timers + +void *zmq_timers_new () +{ + zmq::timers_t *timers = new (std::nothrow) zmq::timers_t; + alloc_assert (timers); + return timers; +} + +int zmq_timers_close (void *timers_) +{ + if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + delete ((zmq::timers_t*)timers_); + return 0; +} + +int zmq_timers_add (void *timers_, size_t interval_, zmq_timer_fn handler_, void *arg_) +{ + if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::timers_t*)timers_)->add (interval_, handler_, arg_); +} + +int zmq_timers_cancel (void *timers_, int timer_id_) +{ + if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::timers_t*)timers_)->cancel (timer_id_); +} + +int zmq_timers_set_interval (void *timers_, int timer_id_, size_t interval_) +{ + if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::timers_t*)timers_)->set_interval (timer_id_, interval_); +} + +int zmq_timers_reset (void *timers_, int timer_id_) +{ + if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::timers_t*)timers_)->reset (timer_id_); +} + +long zmq_timers_timeout (void *timers_) +{ + if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::timers_t*)timers_)->timeout (); +} + +int zmq_timers_execute (void *timers_) +{ + if (!timers_ || !((zmq::timers_t*)timers_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + return ((zmq::timers_t*)timers_)->execute (); +} + // The proxy functionality int zmq_proxy (void *frontend_, void *backend_, void *capture_) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 9192d1a5..7487ca76 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -69,6 +69,7 @@ set(tests test_sub_forward_tipc test_xpub_manual test_xpub_welcome_msg + test_timers ) if(NOT WIN32) list(APPEND tests diff --git a/tests/test_timers.cpp b/tests/test_timers.cpp new file mode 100644 index 00000000..ee70367f --- /dev/null +++ b/tests/test_timers.cpp @@ -0,0 +1,126 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "../include/zmq_utils.h" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#endif + +void _sleep (long timeout_) +{ +#if defined ZMQ_HAVE_WINDOWS + Sleep (timeout_ > 0 ? timeout_ : INFINITE); +#elif defined ZMQ_HAVE_ANDROID + usleep (timeout_ * 1000); +#else + usleep (timeout_ * 1000); +#endif +} + +void handler (int timer_id, void* arg) +{ + *((bool *)arg) = true; +} + +int main (void) +{ + setup_test_environment (); + + void* timers = zmq_timers_new (); + assert (timers); + + bool timer_invoked = false; + + int timer_id = zmq_timers_add (timers, 100, handler, &timer_invoked); + assert (timer_id); + + // Timer should be invoked yet + int rc = zmq_timers_execute (timers); + assert (rc == 0); + assert (!timer_invoked); + + // Wait half the time and check again + _sleep (zmq_timers_timeout (timers) / 2); + rc = zmq_timers_execute (timers); + assert (rc == 0); + assert (!timer_invoked); + + // Wait until the end + _sleep (zmq_timers_timeout (timers)); + rc = zmq_timers_execute (timers); + assert (rc == 0); + assert (timer_invoked); + timer_invoked = false; + + // Wait half the time and check again + long timeout = zmq_timers_timeout (timers); + _sleep (timeout / 2); + rc = zmq_timers_execute (timers); + assert (rc == 0); + assert (!timer_invoked); + + // Reset timer and wait half of the time left + rc = zmq_timers_reset (timers, timer_id); + _sleep (timeout / 2); + rc = zmq_timers_execute (timers); + assert (rc == 0); + assert (!timer_invoked); + + // Wait until the end + _sleep (zmq_timers_timeout (timers)); + rc = zmq_timers_execute (timers); + assert (rc == 0); + assert (timer_invoked); + timer_invoked = false; + + // reschedule + zmq_timers_set_interval (timers, timer_id, 50); + _sleep (51); + rc = zmq_timers_execute (timers); + assert (rc == 0); + assert (timer_invoked); + timer_invoked = false; + + // cancel timer + timeout = zmq_timers_timeout (timers); + zmq_timers_cancel (timers, timer_id); + _sleep (timeout * 2); + rc = zmq_timers_execute (timers); + assert (rc == 0); + assert (!timer_invoked); + + rc = zmq_timers_close (timers); + assert (rc == 0); + + return 0; +}