From c59104a01df69f5a926de566438fcbce3e68000e Mon Sep 17 00:00:00 2001 From: Francesco Montorsi Date: Tue, 29 Nov 2022 13:00:11 +0100 Subject: [PATCH] Add ZMQ_TOPICS_COUNT socket option (#4459) --- .gitignore | 1 + Makefile.am | 11 +- doc/zmq_getsockopt.txt | 14 +++ include/zmq.h | 1 + src/generic_mtrie.hpp | 7 ++ src/generic_mtrie_impl.hpp | 9 +- src/radix_tree.cpp | 12 +- src/radix_tree.hpp | 4 +- src/socket_base.cpp | 12 ++ src/socket_base.hpp | 5 + src/trie.hpp | 48 ++++++++ src/xpub.cpp | 15 +++ src/xpub.hpp | 1 + src/xsub.cpp | 21 ++++ src/xsub.hpp | 3 +- src/zmq_draft.h | 1 + tests/CMakeLists.txt | 179 ++++++++++++++++------------- tests/test_pubsub_topics_count.cpp | 151 ++++++++++++++++++++++++ unittests/unittest_mtrie.cpp | 25 +++- 19 files changed, 428 insertions(+), 92 deletions(-) create mode 100644 tests/test_pubsub_topics_count.cpp diff --git a/.gitignore b/.gitignore index 6170b15a..22223e57 100644 --- a/.gitignore +++ b/.gitignore @@ -125,3 +125,4 @@ zeromq-*.tar.gz zeromq-*.zip core +mybuild diff --git a/Makefile.am b/Makefile.am index 5008f7e4..7b796570 100755 --- a/Makefile.am +++ b/Makefile.am @@ -472,8 +472,8 @@ test_apps = \ tests/test_unbind_wildcard \ tests/test_ctx_options \ tests/test_ctx_destroy \ - tests/test_security_no_zap_handler \ - tests/test_security_null \ + tests/test_security_no_zap_handler \ + tests/test_security_null \ tests/test_security_plain \ tests/test_security_zap \ tests/test_iov \ @@ -1067,7 +1067,8 @@ test_apps += tests/test_poller \ tests/test_channel \ tests/test_hiccup_msg \ tests/test_zmq_ppoll_fd \ - tests/test_xsub_verbose + tests/test_xsub_verbose \ + tests/test_pubsub_topics_count tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la @@ -1145,6 +1146,10 @@ tests_test_xsub_verbose_SOURCES = tests/test_xsub_verbose.cpp tests_test_xsub_verbose_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_xsub_verbose_CPPFLAGS = ${TESTUTIL_CPPFLAGS} +tests_test_pubsub_topics_count_SOURCES = tests/test_pubsub_topics_count.cpp +tests_test_pubsub_topics_count_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_pubsub_topics_count_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + if HAVE_FORK test_apps += tests/test_zmq_ppoll_signals diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 67eb6181..d912980d 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -983,6 +983,20 @@ Default value:: 8192 Applicable socket types:: All, when using TCP, IPC, PGM or NORM transport. +ZMQ_TOPICS_COUNT: Number of topic subscriptions received +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Gets the number of topic (prefix) subscriptions either +* received on a (X)PUB socket from all the connected (X)SUB sockets or +* acknowledged on an (X)SUB socket from all the connected (X)PUB sockets + +NOTE: in DRAFT state, not yet available in stable releases. + +[horizontal] +Option value type:: int +Option value unit:: N/A +Default value:: 0 +Applicable socket types:: ZMQ_PUB, ZMQ_XPUB, ZMQ_SUB, ZMQ_XSUB + RETURN VALUE ------------ diff --git a/include/zmq.h b/include/zmq.h index 4f1fb975..5f226018 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -680,6 +680,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); #define ZMQ_BUSY_POLL 113 #define ZMQ_HICCUP_MSG 114 #define ZMQ_XSUB_VERBOSE_UNSUBSCRIBE 115 +#define ZMQ_TOPICS_COUNT 116 /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 diff --git a/src/generic_mtrie.hpp b/src/generic_mtrie.hpp index 15dfad63..6c309894 100644 --- a/src/generic_mtrie.hpp +++ b/src/generic_mtrie.hpp @@ -35,6 +35,7 @@ along with this program. If not, see . #include "macros.hpp" #include "stdint.hpp" +#include "atomic_counter.hpp" namespace zmq { @@ -83,12 +84,18 @@ template class generic_mtrie_t void (*func_) (value_t *value_, Arg arg_), Arg arg_); + // Retrieve the number of prefixes stored in this trie (added - removed) + // Note this is a multithread safe function. + uint32_t num_prefixes () const { return _num_prefixes.get (); } + private: bool is_redundant () const; typedef std::set pipes_t; pipes_t *_pipes; + atomic_counter_t _num_prefixes; + unsigned char _min; unsigned short _count; unsigned short _live_nodes; diff --git a/src/generic_mtrie_impl.hpp b/src/generic_mtrie_impl.hpp index 19f3694e..96eec584 100644 --- a/src/generic_mtrie_impl.hpp +++ b/src/generic_mtrie_impl.hpp @@ -45,7 +45,7 @@ namespace zmq { template generic_mtrie_t::generic_mtrie_t () : - _pipes (0), _min (0), _count (0), _live_nodes (0) + _pipes (0), _num_prefixes (0), _min (0), _count (0), _live_nodes (0) { } @@ -144,6 +144,8 @@ bool generic_mtrie_t::add (prefix_t prefix_, size_t size_, value_t *pipe_) if (!it->_pipes) { it->_pipes = new (std::nothrow) pipes_t; alloc_assert (it->_pipes); + + _num_prefixes.add (1); } it->_pipes->insert (pipe_); @@ -535,6 +537,11 @@ generic_mtrie_t::rm (prefix_t prefix_, size_t size_, value_t *pipe_) } } + if (ret == last_value_removed) { + zmq_assert (_num_prefixes.get () > 0); + _num_prefixes.sub (1); + } + return ret; } diff --git a/src/radix_tree.cpp b/src/radix_tree.cpp index 2a38ebf0..12e1534c 100644 --- a/src/radix_tree.cpp +++ b/src/radix_tree.cpp @@ -326,7 +326,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_) _root._data = current_node._data; else parent_node.set_node_at (edge_index, current_node); - ++_size; + _size.add (1); return true; } @@ -362,7 +362,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_) current_node.set_edge_at (0, key_node.prefix ()[0], key_node); current_node.set_edge_at (1, split_node.prefix ()[0], split_node); - ++_size; + _size.add (1); parent_node.set_node_at (edge_index, current_node); return true; } @@ -394,7 +394,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_) current_node.set_edge_at (0, split_node.prefix ()[0], split_node); current_node.set_refcount (1); - ++_size; + _size.add (1); parent_node.set_node_at (edge_index, current_node); return true; } @@ -402,7 +402,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_) zmq_assert (key_bytes_matched == key_size_); zmq_assert (prefix_bytes_matched == current_node.prefix_length ()); - ++_size; + _size.add (1); current_node.set_refcount (current_node.refcount () + 1); return current_node.refcount () == 1; } @@ -424,7 +424,7 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_) return false; current_node.set_refcount (current_node.refcount () - 1); - --_size; + _size.sub (1); if (current_node.refcount () > 0) return false; @@ -574,5 +574,5 @@ void zmq::radix_tree_t::apply ( size_t zmq::radix_tree_t::size () const { - return _size; + return _size.get (); } diff --git a/src/radix_tree.hpp b/src/radix_tree.hpp index 02e74969..4c4aa0c0 100644 --- a/src/radix_tree.hpp +++ b/src/radix_tree.hpp @@ -33,6 +33,7 @@ #include #include "stdint.hpp" +#include "atomic_counter.hpp" // Wrapper type for a node's data layout. // @@ -133,6 +134,7 @@ class radix_tree_t void apply (void (*func_) (unsigned char *data, size_t size, void *arg), void *arg_); + // Retrieve size of the radix tree. Note this is a multithread safe function. size_t size () const; private: @@ -140,7 +142,7 @@ class radix_tree_t match (const unsigned char *key_, size_t key_size_, bool is_lookup_) const; node_t _root; - size_t _size; + atomic_counter_t _size; }; } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index ee78d8b7..26fb277e 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -460,6 +460,12 @@ int zmq::socket_base_t::getsockopt (int option_, return -1; } + // First, check whether specific socket type overloads the option. + int rc = xgetsockopt (option_, optval_, optvallen_); + if (rc == 0 || errno != EINVAL) { + return rc; + } + if (option_ == ZMQ_RCVMORE) { return do_getsockopt (optval_, optvallen_, _rcvmore ? 1 : 0); } @@ -1619,6 +1625,12 @@ int zmq::socket_base_t::xsetsockopt (int, const void *, size_t) return -1; } +int zmq::socket_base_t::xgetsockopt (int, void *, size_t *) +{ + errno = EINVAL; + return -1; +} + bool zmq::socket_base_t::xhas_out () { return false; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 92deb9f7..01920e72 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -186,6 +186,11 @@ class socket_base_t : public own_t, virtual int xsetsockopt (int option_, const void *optval_, size_t optvallen_); + // The default implementation assumes there are no specific socket + // options for the particular socket type. If not so, ZMQ_FINAL this + // method. + virtual int xgetsockopt (int option_, void *optval_, size_t *optvallen_); + // The default implementation assumes that send is not supported. virtual bool xhas_out (); virtual int xsend (zmq::msg_t *msg_); diff --git a/src/trie.hpp b/src/trie.hpp index 32a2c2cb..8457c3ce 100644 --- a/src/trie.hpp +++ b/src/trie.hpp @@ -34,6 +34,7 @@ #include "macros.hpp" #include "stdint.hpp" +#include "atomic_counter.hpp" namespace zmq { @@ -80,6 +81,53 @@ class trie_t ZMQ_NON_COPYABLE_NOR_MOVABLE (trie_t) }; + + +// lightweight wrapper around trie_t adding tracking of total number of prefixes +class trie_with_size_t +{ + public: + trie_with_size_t () {} + ~trie_with_size_t () {} + + bool add (unsigned char *prefix_, size_t size_) + { + if (_trie.add (prefix_, size_)) { + _num_prefixes.add (1); + return true; + } else + return false; + } + + bool rm (unsigned char *prefix_, size_t size_) + { + if (_trie.rm (prefix_, size_)) { + _num_prefixes.sub (1); + return true; + } else + return false; + } + + bool check (const unsigned char *data_, size_t size_) const + { + return _trie.check (data_, size_); + } + + void apply (void (*func_) (unsigned char *data_, size_t size_, void *arg_), + void *arg_) + { + _trie.apply (func_, arg_); + } + + // Retrieve the number of prefixes stored in this trie (added - removed) + // Note this is a multithread safe function. + uint32_t num_prefixes () const { return _num_prefixes.get (); } + + private: + atomic_counter_t _num_prefixes; + trie_t _trie; +}; + } #endif diff --git a/src/xpub.cpp b/src/xpub.cpp index a71543a3..acaed4c9 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -255,6 +255,21 @@ int zmq::xpub_t::xsetsockopt (int option_, return 0; } +int zmq::xpub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_) +{ + if (option_ == ZMQ_TOPICS_COUNT) { + // make sure to use a multi-thread safe function to avoid race conditions with I/O threads + // where subscriptions are processed: + return do_getsockopt (optval_, optvallen_, + (int) _subscriptions.num_prefixes ()); + } + + // room for future options here + + errno = EINVAL; + return -1; +} + static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_) { LIBZMQ_UNUSED (data_); diff --git a/src/xpub.hpp b/src/xpub.hpp index 82504b0e..32d7325a 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -62,6 +62,7 @@ class xpub_t : public socket_base_t void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL; int xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL; + int xgetsockopt (int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL; void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL; private: diff --git a/src/xsub.cpp b/src/xsub.cpp index 795e4bf3..d2192c90 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -121,6 +121,27 @@ int zmq::xsub_t::xsetsockopt (int option_, return -1; } +int zmq::xsub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_) +{ + if (option_ == ZMQ_TOPICS_COUNT) { + // make sure to use a multi-thread safe function to avoid race conditions with I/O threads + // where subscriptions are processed: +#ifdef ZMQ_USE_RADIX_TREE + uint64_t num_subscriptions = _subscriptions.size (); +#else + uint64_t num_subscriptions = _subscriptions.num_prefixes (); +#endif + + return do_getsockopt (optval_, optvallen_, + (int) num_subscriptions); + } + + // room for future options here + + errno = EINVAL; + return -1; +} + int zmq::xsub_t::xsend (msg_t *msg_) { size_t size = msg_->size (); diff --git a/src/xsub.hpp b/src/xsub.hpp index 2fc37ea5..14f3c7bb 100644 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -60,6 +60,7 @@ class xsub_t : public socket_base_t int xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_OVERRIDE; + int xgetsockopt (int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL; int xsend (zmq::msg_t *msg_) ZMQ_OVERRIDE; bool xhas_out () ZMQ_OVERRIDE; int xrecv (zmq::msg_t *msg_) ZMQ_FINAL; @@ -88,7 +89,7 @@ class xsub_t : public socket_base_t #ifdef ZMQ_USE_RADIX_TREE radix_tree_t _subscriptions; #else - trie_t _subscriptions; + trie_with_size_t _subscriptions; #endif // If true, send all unsubscription messages upstream, not just diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 5378ce51..dfb005ad 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -72,6 +72,7 @@ #define ZMQ_BUSY_POLL 113 #define ZMQ_HICCUP_MSG 114 #define ZMQ_XSUB_VERBOSE_UNSUBSCRIBE 115 +#define ZMQ_TOPICS_COUNT 116 /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 63e3c37b..469e60d4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -5,76 +5,76 @@ cmake_minimum_required(VERSION "2.8.1") project(tests) set(tests - test_ancillaries - test_system - test_pair_inproc - test_pair_tcp - test_reqrep_inproc - test_reqrep_tcp - test_hwm - test_hwm_pubsub - test_reqrep_device - test_sub_forward - test_invalid_rep - test_msg_flags - test_msg_ffn - test_connect_resolve - test_immediate - test_last_endpoint - test_term_endpoint - test_router_mandatory - test_probe_router - test_stream - test_stream_empty - test_stream_disconnect - test_disconnect_inproc - test_unbind_wildcard - test_ctx_options - test_ctx_destroy - test_security_no_zap_handler - test_security_null - test_security_plain - test_security_zap - test_iov - test_spec_req - test_spec_rep - test_spec_dealer - test_spec_router - test_spec_pushpull - test_req_correlate - test_req_relaxed - test_conflate - test_inproc_connect - test_issue_566 - test_shutdown_stress - test_timeo - test_many_sockets - test_diffserv - test_connect_rid - test_xpub_nodrop - test_pub_invert_matching - test_setsockopt - test_sockopt_hwm - test_heartbeats - test_atomics - test_bind_src_address - test_capabilities - test_metadata - test_router_handover - test_srcfd - test_stream_timeout - test_xpub_manual - test_xpub_welcome_msg - test_xpub_verbose - test_base85 - test_bind_after_connect_tcp - test_sodium - test_monitor - test_socket_null - test_reconnect_ivl - test_reconnect_options - test_tcp_accept_filter - test_mock_pub_sub) + test_ancillaries + test_system + test_pair_inproc + test_pair_tcp + test_reqrep_inproc + test_reqrep_tcp + test_hwm + test_hwm_pubsub + test_reqrep_device + test_sub_forward + test_invalid_rep + test_msg_flags + test_msg_ffn + test_connect_resolve + test_immediate + test_last_endpoint + test_term_endpoint + test_router_mandatory + test_probe_router + test_stream + test_stream_empty + test_stream_disconnect + test_disconnect_inproc + test_unbind_wildcard + test_ctx_options + test_ctx_destroy + test_security_no_zap_handler + test_security_null + test_security_plain + test_security_zap + test_iov + test_spec_req + test_spec_rep + test_spec_dealer + test_spec_router + test_spec_pushpull + test_req_correlate + test_req_relaxed + test_conflate + test_inproc_connect + test_issue_566 + test_shutdown_stress + test_timeo + test_many_sockets + test_diffserv + test_connect_rid + test_xpub_nodrop + test_pub_invert_matching + test_setsockopt + test_sockopt_hwm + test_heartbeats + test_atomics + test_bind_src_address + test_capabilities + test_metadata + test_router_handover + test_srcfd + test_stream_timeout + test_xpub_manual + test_xpub_welcome_msg + test_xpub_verbose + test_base85 + test_bind_after_connect_tcp + test_sodium + test_monitor + test_socket_null + test_reconnect_ivl + test_reconnect_options + test_tcp_accept_filter + test_mock_pub_sub) if(NOT WIN32) list(APPEND tests test_security_gssapi test_socks test_connect_null_fuzzer test_bind_null_fuzzer test_connect_fuzzer test_bind_fuzzer) @@ -85,12 +85,14 @@ if(ZMQ_HAVE_CURVE) if(NOT CMAKE_SYSTEM_NAME MATCHES "Linux") list(APPEND tests test_security_curve) endif() + if(NOT WIN32) list(APPEND tests test_connect_curve_fuzzer test_bind_curve_fuzzer test_z85_decode_fuzzer) endif() endif() option(ENABLE_CAPSH "Run tests that require sudo and capsh (for cap_net_admin)" OFF) + if(ENABLE_CAPSH) find_program(CAPSH_PROGRAM NAMES capsh) @@ -119,11 +121,14 @@ if(NOT WIN32) test_router_mandatory_hwm test_use_fd test_zmq_poll_fd) + if(HAVE_FORK) list(APPEND tests test_fork) endif() + if(CMAKE_SYSTEM_NAME MATCHES "Linux") list(APPEND tests test_abstract_ipc) + if(ZMQ_HAVE_TIPC) list( APPEND @@ -167,10 +172,13 @@ if(ENABLE_DRAFTS) test_hiccup_msg test_zmq_ppoll_fd test_xsub_verbose -) + test_pubsub_topics_count + ) + if(HAVE_FORK) list(APPEND tests test_zmq_ppoll_signals) endif() + if(ZMQ_HAVE_BUSY_POLL) list(APPEND tests test_busy_poll) endif() @@ -178,6 +186,7 @@ endif() if(ZMQ_HAVE_WS) list(APPEND tests test_ws_transport) + if(ZMQ_HAVE_WSS) list(APPEND tests test_wss_transport) endif() @@ -187,6 +196,7 @@ endif() if(WIN32) add_definitions(-DZMQ_CUSTOM_PLATFORM_HPP) add_definitions(-D_WINSOCK_DEPRECATED_NO_WARNINGS) + # Same name on 64bit systems link_libraries(ws2_32.lib) endif() @@ -200,22 +210,25 @@ target_compile_definitions(unity PUBLIC "UNITY_USE_COMMAND_LINE_ARGS" "UNITY_EXC target_include_directories(unity PUBLIC "${CMAKE_CURRENT_LIST_DIR}/../external/unity") set(TESTUTIL_SOURCES - testutil.cpp - testutil.hpp - testutil_monitoring.cpp - testutil_monitoring.hpp - testutil_security.cpp - testutil_security.hpp - testutil_unity.cpp - testutil_unity.hpp) + testutil.cpp + testutil.hpp + testutil_monitoring.cpp + testutil_monitoring.hpp + testutil_security.cpp + testutil_security.hpp + testutil_unity.cpp + testutil_unity.hpp) + if(BUILD_STATIC) add_library(testutil-static STATIC ${TESTUTIL_SOURCES}) target_link_libraries(testutil-static libzmq-static ${OPTIONAL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} unity) endif() + if(BUILD_SHARED) add_library(testutil STATIC ${TESTUTIL_SOURCES}) target_link_libraries(testutil libzmq ${OPTIONAL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} unity) endif() + if(BUILD_STATIC AND NOT BUILD_SHARED) # use testutil-static for both tests and unit tests set(TESTUTIL_LIB testutil-static) @@ -234,6 +247,7 @@ endif() # add include dirs for all targets include_directories("${ZeroMQ_SOURCE_DIR}/../include" "${ZeroMQ_BINARY_DIR}") + if(WIN32) add_definitions(-D_CRT_NONSTDC_NO_DEPRECATE) endif() @@ -250,11 +264,14 @@ foreach(test ${tests}) else() add_executable(${test} ${test}.cpp) endif() + target_link_libraries(${test} ${TESTUTIL_LIB}) + if(WIN32) # This is the output for Debug dynamic builds on Visual Studio 6.0 You should provide the correct directory, don't # know how to do it automatically find_path(LIBZMQ_PATH "libzmq.lib" PATHS "../bin/Win32/Debug/v120/dynamic") + if(NOT ${LIBZMQ_PATH} STREQUAL "LIBZMQ_PATH-NOTFOUND") set_target_properties(${test} PROPERTIES LINK_FLAGS "/LIBPATH:${LIBZMQ_PATH}") endif() @@ -286,6 +303,7 @@ foreach(test ${tests}) add_test(NAME ${test} COMMAND ${test}) endif() endif() + set_tests_properties(${test} PROPERTIES TIMEOUT 10) set_tests_properties(${test} PROPERTIES SKIP_RETURN_CODE 77) endforeach() @@ -301,6 +319,7 @@ if(NOT CMAKE_SYSTEM_NAME MATCHES "Linux") if(ZMQ_HAVE_CURVE) set_tests_properties(test_security_curve PROPERTIES TIMEOUT 60) endif() + # add additional required flags ZMQ_USE_TWEETNACL will already be defined when not using sodium if(ZMQ_HAVE_CURVE AND NOT ZMQ_USE_TWEETNACL) target_compile_definitions(test_security_curve PRIVATE "-DZMQ_USE_TWEETNACL") @@ -313,9 +332,11 @@ set_tests_properties(test_reconnect_ivl PROPERTIES TIMEOUT 15) # Check whether all tests in the current folder are present file(READ "${CMAKE_CURRENT_LIST_FILE}" CURRENT_LIST_FILE_CONTENT) file(GLOB ALL_TEST_SOURCES "test_*.cpp") + foreach(TEST_SOURCE ${ALL_TEST_SOURCES}) get_filename_component(TESTNAME "${TEST_SOURCE}" NAME_WE) string(REGEX MATCH "${TESTNAME}" MATCH_TESTNAME "${CURRENT_LIST_FILE_CONTENT}") + if(NOT MATCH_TESTNAME) message(AUTHOR_WARNING "Test '${TESTNAME}' is not known to CTest.") endif() diff --git a/tests/test_pubsub_topics_count.cpp b/tests/test_pubsub_topics_count.cpp new file mode 100644 index 00000000..f4fdbcc0 --- /dev/null +++ b/tests/test_pubsub_topics_count.cpp @@ -0,0 +1,151 @@ +/* + Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" +#include + +SETUP_TEARDOWN_TESTCONTEXT + + +void settle_subscriptions (void *skt) +{ + // To kick the application thread, do a dummy getsockopt - users here + // should use the monitor and the other sockets in a poll. + unsigned long int dummy; + size_t dummy_size = sizeof (dummy); + msleep (SETTLE_TIME); + zmq_getsockopt (skt, ZMQ_EVENTS, &dummy, &dummy_size); +} + +int get_subscription_count (void *skt) +{ + int num_subs = 0; + size_t num_subs_len = sizeof (num_subs); + + settle_subscriptions (skt); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (skt, ZMQ_TOPICS_COUNT, &num_subs, &num_subs_len)); + + return num_subs; +} + +void test_independent_topic_prefixes () +{ + // Create a publisher + void *publisher = test_context_socket (ZMQ_PUB); + char my_endpoint[MAX_SOCKET_STRING]; + + // Bind publisher + test_bind (publisher, "inproc://soname", my_endpoint, MAX_SOCKET_STRING); + + // Create a subscriber + void *subscriber = test_context_socket (ZMQ_SUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subscriber, my_endpoint)); + + // Subscribe to 3 topics + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + subscriber, ZMQ_SUBSCRIBE, "topicprefix1", strlen ("topicprefix1"))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + subscriber, ZMQ_SUBSCRIBE, "topicprefix2", strlen ("topicprefix2"))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + subscriber, ZMQ_SUBSCRIBE, "topicprefix3", strlen ("topicprefix3"))); + TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 3); + TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 3); + + // Remove first subscription and check subscriptions went 3 -> 2 + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + subscriber, ZMQ_UNSUBSCRIBE, "topicprefix3", strlen ("topicprefix3"))); + TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 2); + TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 2); + + // Remove other 2 subscriptions and check we're back to 0 subscriptions + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + subscriber, ZMQ_UNSUBSCRIBE, "topicprefix1", strlen ("topicprefix1"))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + subscriber, ZMQ_UNSUBSCRIBE, "topicprefix2", strlen ("topicprefix2"))); + TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 0); + TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 0); + + // Clean up. + test_context_socket_close (publisher); + test_context_socket_close (subscriber); +} + +void test_nested_topic_prefixes () +{ + // Create a publisher + void *publisher = test_context_socket (ZMQ_PUB); + char my_endpoint[MAX_SOCKET_STRING]; + + // Bind publisher + test_bind (publisher, "inproc://soname", my_endpoint, MAX_SOCKET_STRING); + + // Create a subscriber + void *subscriber = test_context_socket (ZMQ_SUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subscriber, my_endpoint)); + + // Subscribe to 3 (nested) topics + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "a", strlen ("a"))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "ab", strlen ("ab"))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "abc", strlen ("abc"))); + + // Even if the subscriptions are nested one into the other, the number of subscriptions + // received on the subscriber/publisher socket will be 3: + TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 3); + TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 3); + + // Subscribe to other 3 (nested) topics + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "xyz", strlen ("a"))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "xy", strlen ("ab"))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "x", strlen ("abc"))); + + TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 6); + TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 6); + + // Clean up. + test_context_socket_close (publisher); + test_context_socket_close (subscriber); +} + +int main () +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_independent_topic_prefixes); + RUN_TEST (test_nested_topic_prefixes); + return UNITY_END (); +} diff --git a/unittests/unittest_mtrie.cpp b/unittests/unittest_mtrie.cpp index e85f7f36..a03ace2c 100644 --- a/unittests/unittest_mtrie.cpp +++ b/unittests/unittest_mtrie.cpp @@ -81,6 +81,8 @@ void test_add_single_entry_match_exact () bool res = mtrie.add (test_name, getlen (test_name), &pipe); TEST_ASSERT_TRUE (res); + TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ()); + int count = 0; mtrie.match (test_name, getlen (test_name), mtrie_count, &count); TEST_ASSERT_EQUAL_INT (1, count); @@ -96,9 +98,11 @@ void test_add_single_entry_twice_match_exact () bool res = mtrie.add (test_name, getlen (test_name), &pipe); TEST_ASSERT_TRUE (res); + TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ()); res = mtrie.add (test_name, getlen (test_name), &pipe); TEST_ASSERT_FALSE (res); + TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ()); int count = 0; mtrie.match (test_name, getlen (test_name), mtrie_count, &count); @@ -115,9 +119,11 @@ void test_add_two_entries_with_same_name_match_exact () bool res = mtrie.add (test_name, getlen (test_name), &pipe_1); TEST_ASSERT_TRUE (res); + TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ()); res = mtrie.add (test_name, getlen (test_name), &pipe_2); TEST_ASSERT_FALSE (res); + TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ()); int count = 0; mtrie.match (test_name, getlen (test_name), mtrie_count, &count); @@ -136,9 +142,11 @@ void test_add_two_entries_match_prefix_and_exact () bool res = mtrie.add (test_name_prefix, getlen (test_name_prefix), &pipe_1); TEST_ASSERT_TRUE (res); + TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ()); res = mtrie.add (test_name_full, getlen (test_name_full), &pipe_2); TEST_ASSERT_TRUE (res); + TEST_ASSERT_EQUAL_INT (2, mtrie.num_prefixes ()); int count = 0; mtrie.match (test_name_full, getlen (test_name_full), mtrie_count, &count); @@ -153,9 +161,11 @@ void test_add_rm_single_entry_match_exact () reinterpret_cast::prefix_t> ("foo"); mtrie.add (test_name, getlen (test_name), &pipe); + TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ()); zmq::generic_mtrie_t::rm_result res = mtrie.rm (test_name, getlen (test_name), &pipe); TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::last_value_removed, res); + TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ()); int count = 0; mtrie.match (test_name, getlen (test_name), mtrie_count, &count); @@ -169,6 +179,7 @@ void test_rm_nonexistent_0_size_empty () zmq::generic_mtrie_t::rm_result res = mtrie.rm (0, 0, &pipe); TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::not_found, res); + TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ()); } void test_rm_nonexistent_empty () @@ -181,6 +192,7 @@ void test_rm_nonexistent_empty () zmq::generic_mtrie_t::rm_result res = mtrie.rm (test_name, getlen (test_name), &pipe); TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::not_found, res); + TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ()); int count = 0; mtrie.match (test_name, getlen (test_name), mtrie_count, &count); @@ -197,10 +209,12 @@ void test_add_and_rm_other (const char *add_name_, const char *rm_name_) reinterpret_cast::prefix_t> (rm_name_); mtrie.add (add_name_data, getlen (add_name_data), &addpipe); + TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ()); zmq::generic_mtrie_t::rm_result res = mtrie.rm (rm_name_data, getlen (rm_name_data), &rmpipe); TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::not_found, res); + TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ()); { int count = 0; @@ -249,7 +263,9 @@ void add_indexed_expect_unique (zmq::generic_mtrie_t &mtrie_, reinterpret_cast::prefix_t> (names_[i_]); bool res = mtrie_.add (name_data, getlen (name_data), &pipes_[i_]); - TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::last_value_removed, res); + TEST_ASSERT_EQUAL ( + zmq::generic_mtrie_t::last_value_removed, + res); // FIXME asserting equality between enum and bool? I think first arg for macro should be "true" } void test_rm_nonexistent_between () @@ -260,6 +276,7 @@ void test_rm_nonexistent_between () zmq::generic_mtrie_t mtrie; add_indexed_expect_unique (mtrie, pipes, names, 0); add_indexed_expect_unique (mtrie, pipes, names, 2); + TEST_ASSERT_EQUAL_INT (2, mtrie.num_prefixes ()); const zmq::generic_mtrie_t::prefix_t name_data = reinterpret_cast::prefix_t> (names[1]); @@ -267,6 +284,7 @@ void test_rm_nonexistent_between () zmq::generic_mtrie_t::rm_result res = mtrie.rm (name_data, getlen (name_data), &pipes[1]); TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::not_found, res); + TEST_ASSERT_EQUAL_INT (2, mtrie.num_prefixes ()); } template @@ -277,6 +295,7 @@ void add_entries (zmq::generic_mtrie_t &mtrie_, for (size_t i = 0; i < N; ++i) { add_indexed_expect_unique (mtrie_, pipes_, names_, i); } + TEST_ASSERT_EQUAL_INT (N, mtrie_.num_prefixes ()); } void test_add_multiple () @@ -306,6 +325,7 @@ void test_add_multiple_reverse () add_indexed_expect_unique (mtrie, pipes, names, static_cast (i)); } + TEST_ASSERT_EQUAL_INT (3, mtrie.num_prefixes ()); for (size_t i = 0; i < 3; ++i) { const zmq::generic_mtrie_t::prefix_t name_data = @@ -330,6 +350,7 @@ template void add_and_rm_entries (const char *(&names_)[N]) mtrie.rm (name_data, getlen (name_data), &pipes[i]); TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::last_value_removed, res); } + TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ()); } void test_rm_multiple_in_order () @@ -394,8 +415,10 @@ void add_duplicate_entry (zmq::generic_mtrie_t &mtrie_, int (&pipes_)[2]) bool res = mtrie_.add (name_data, getlen (name_data), &pipes_[0]); TEST_ASSERT_TRUE (res); + TEST_ASSERT_EQUAL_INT (1, mtrie_.num_prefixes ()); res = mtrie_.add (name_data, getlen (name_data), &pipes_[1]); TEST_ASSERT_FALSE (res); + TEST_ASSERT_EQUAL_INT (1, mtrie_.num_prefixes ()); } void test_rm_with_callback_duplicate ()