From 34a9b3aa6a9d6ade41cecdf290435c39c0bd64bb Mon Sep 17 00:00:00 2001 From: Charles Cabergs Date: Wed, 23 Apr 2025 21:30:24 +0200 Subject: [PATCH] Add zmq_timers support (#657) * Add zmq_timers support * Add zmq_timers unit tests --- CMakeLists.txt | 1 + tests/CMakeLists.txt | 1 + tests/timers.cpp | 80 ++++++++++++++++++++++++++++++++++++++++++++ zmq.hpp | 79 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 161 insertions(+) create mode 100644 tests/timers.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index b45e91d..3035755 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,7 @@ cmake_minimum_required(VERSION 3.11) list (APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/cmake") +set(CMAKE_EXPORT_COMPILE_COMMANDS ON) include (DetectCPPZMQVersion) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e20aaaa..3821c5c 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -30,6 +30,7 @@ add_executable( codec_multipart.cpp monitor.cpp utilities.cpp + timers.cpp ) target_include_directories(unit_tests PUBLIC ${CATCH_MODULE_PATH}) diff --git a/tests/timers.cpp b/tests/timers.cpp new file mode 100644 index 0000000..4229fd5 --- /dev/null +++ b/tests/timers.cpp @@ -0,0 +1,80 @@ +#include +#include + +#include +#include +#include + +#if defined(ZMQ_CPP11) && defined(ZMQ_HAVE_TIMERS) + +static_assert(std::is_default_constructible::value); +static_assert(!std::is_copy_constructible::value); +static_assert(!std::is_copy_assignable::value); + +TEST_CASE("timers constructor", "[timers]") +{ + zmq::timers timers; + CHECK(!timers.timeout().has_value()); +} + +TEST_CASE("timers add/execute", "[timers]") +{ + using namespace std::chrono_literals; + zmq::timers timers; + bool handler_ran = false; + timers.add(4ms, [](auto, void *arg) { *(bool *) arg = true; }, &handler_ran); + CHECK(timers.timeout().has_value()); + CHECK(!handler_ran); + std::this_thread::sleep_for(10ms); + timers.execute(); + CHECK(handler_ran); +} + +TEST_CASE("timers add/cancel", "[timers]") +{ + using namespace std::chrono_literals; + zmq::timers timers; + bool handler_ran = false; + auto id = + timers.add(4ms, [](auto, void *arg) { *(bool *) arg = true; }, &handler_ran); + CHECK(timers.timeout().has_value()); + CHECK(!handler_ran); + timers.cancel(id); + CHECK(!timers.timeout().has_value()); + CHECK(!handler_ran); +} + +TEST_CASE("timers set_interval", "[timers]") +{ + using namespace std::chrono_literals; + zmq::timers timers; + bool handler_ran = false; + // Interval of 4 hours should never run like this + auto id = + timers.add(4h, [](auto, void *arg) { *(bool *) arg = true; }, &handler_ran); + CHECK(timers.timeout().has_value()); + CHECK(!handler_ran); + // Change the interval to 4ms and wait for it to timeout + timers.set_interval(id, 4ms); + std::this_thread::sleep_for(10ms); + timers.execute(); + CHECK(handler_ran); +} + +TEST_CASE("timers reset", "[timers]") +{ + using namespace std::chrono_literals; + zmq::timers timers; + bool handler_ran = false; + auto id = + timers.add(4ms, [](auto, void *arg) { *(bool *) arg = true; }, &handler_ran); + CHECK(timers.timeout().has_value()); + std::this_thread::sleep_for(10ms); + // Available to be executed but we reset it + timers.reset(id); + CHECK(timers.timeout().has_value()); + CHECK(!handler_ran); + +} + +#endif // defined(ZMQ_CPP11) && defined(ZMQ_HAVE_TIMERS) diff --git a/zmq.hpp b/zmq.hpp index 5b22687..363f74b 100644 --- a/zmq.hpp +++ b/zmq.hpp @@ -2794,6 +2794,85 @@ inline std::ostream &operator<<(std::ostream &os, const message_t &msg) return os << msg.str(); } +#if defined(ZMQ_CPP11) && defined(ZMQ_HAVE_TIMERS) + +class timers +{ + public: + using id_t = int; + using fn_t = zmq_timer_fn; + +#if CPPZMQ_HAS_OPTIONAL + using timeout_result_t = std::optional; +#else + using timeout_result_t = detail::trivial_optional; +#endif + + timers() : _timers(zmq_timers_new()) + { + if (_timers == nullptr) + throw error_t(); + } + + timers(const timers &other) = delete; + timers &operator=(const timers &other) = delete; + + ~timers() + { + int rc = zmq_timers_destroy(&_timers); + ZMQ_ASSERT(rc == 0); + } + + id_t add(std::chrono::milliseconds interval, zmq_timer_fn handler, void *arg) + { + id_t timer_id = zmq_timers_add(_timers, interval.count(), handler, arg); + if (timer_id == -1) + throw zmq::error_t(); + return timer_id; + } + + void cancel(id_t timer_id) + { + int rc = zmq_timers_cancel(_timers, timer_id); + if (rc == -1) + throw zmq::error_t(); + } + + void set_interval(id_t timer_id, std::chrono::milliseconds interval) + { + int rc = zmq_timers_set_interval(_timers, timer_id, interval.count()); + if (rc == -1) + throw zmq::error_t(); + } + + void reset(id_t timer_id) + { + int rc = zmq_timers_reset(_timers, timer_id); + if (rc == -1) + throw zmq::error_t(); + } + + timeout_result_t timeout() const + { + int timeout = zmq_timers_timeout(_timers); + if (timeout == -1) + return timeout_result_t{}; + return std::chrono::milliseconds{timeout}; + } + + void execute() + { + int rc = zmq_timers_execute(_timers); + if (rc == -1) + throw zmq::error_t(); + } + + private: + void *_timers; +}; + +#endif // defined(ZMQ_CPP11) && defined(ZMQ_HAVE_TIMERS) + } // namespace zmq #endif // __ZMQ_HPP_INCLUDED__