diff --git a/CMakeLists.txt b/CMakeLists.txt index 8eabb802..e10537c3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -611,6 +611,8 @@ set (cxx-sources fd.hpp fq.hpp gather.hpp + generic_mtrie.hpp + generic_mtrie_impl.hpp gssapi_client.hpp gssapi_mechanism_base.hpp gssapi_server.hpp diff --git a/Makefile.am b/Makefile.am index 230a93d3..7f753f93 100644 --- a/Makefile.am +++ b/Makefile.am @@ -64,6 +64,8 @@ src_libzmq_la_SOURCES = \ src/fq.hpp \ src/gather.cpp \ src/gather.hpp \ + src/generic_mtrie.hpp \ + src/generic_mtrie_impl.hpp \ src/gssapi_mechanism_base.cpp \ src/gssapi_mechanism_base.hpp \ src/gssapi_client.cpp \ @@ -865,7 +867,8 @@ if ENABLE_STATIC # unit tests - these include individual source files and test the internal functions test_apps += \ unittests/unittest_poller \ - unittests/unittest_ypipe + unittests/unittest_ypipe \ + unittests/unittest_mtrie unittests_unittest_poller_SOURCES = unittests/unittest_poller.cpp unittests_unittest_poller_CPPFLAGS = -I$(top_srcdir)/src ${UNITY_CPPFLAGS} @@ -878,6 +881,12 @@ unittests_unittest_ypipe_CPPFLAGS = -I$(top_srcdir)/src ${UNITY_CPPFLAGS} unittests_unittest_ypipe_LDADD = $(top_builddir)/src/.libs/libzmq.a \ ${src_libzmq_la_LIBADD} \ ${UNITY_LIBS} + +unittests_unittest_mtrie_SOURCES = unittests/unittest_mtrie.cpp +unittests_unittest_mtrie_CPPFLAGS = -I$(top_srcdir)/src ${UNITY_CPPFLAGS} +unittests_unittest_mtrie_LDADD = $(top_builddir)/src/.libs/libzmq.a \ + ${src_libzmq_la_LIBADD} \ + ${UNITY_LIBS} endif check_PROGRAMS = ${test_apps} diff --git a/src/generic_mtrie.hpp b/src/generic_mtrie.hpp new file mode 100644 index 00000000..d40208e7 --- /dev/null +++ b/src/generic_mtrie.hpp @@ -0,0 +1,104 @@ +/* +Copyright (c) 2018 Contributors as noted in the AUTHORS file + +This file is part of libzmq, the ZeroMQ core engine in C++. + +libzmq is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License (LGPL) as published +by the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +As a special exception, the Contributors give you permission to link +this library with independent modules to produce an executable, +regardless of the license terms of these independent modules, and to +copy and distribute the resulting executable under terms of your choice, +provided that you also meet, for each linked independent module, the +terms and conditions of the license of that module. An independent +module is a module which is not derived from or based on this library. +If you modify this library, you must extend this exception to your +version of the library. + +libzmq is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see . +*/ + +#ifndef __ZMQ_GENERIC_MTRIE_HPP_INCLUDED__ +#define __ZMQ_GENERIC_MTRIE_HPP_INCLUDED__ + +#include +#include + +#include "stdint.hpp" + +namespace zmq +{ +// Multi-trie. Each node in the trie is a set of pointers to pipes. +template class generic_mtrie_t +{ + public: + typedef T value_t; + typedef const unsigned char *prefix_t; + + generic_mtrie_t (); + ~generic_mtrie_t (); + + // Add key to the trie. Returns true if it's a new subscription + // rather than a duplicate. + bool add (prefix_t prefix_, size_t size_, value_t *pipe_); + + // Remove all subscriptions for a specific peer from the trie. + // The call_on_uniq_ flag controls if the callback is invoked + // when there are no subscriptions left on some topics or on + // every removal. + void + rm (value_t *pipe_, + void (*func_) (const unsigned char *data_, size_t size_, void *arg_), + void *arg_, + bool call_on_uniq_); + + // Remove specific subscription from the trie. Return true is it was + // actually removed rather than de-duplicated. + bool rm (prefix_t prefix_, size_t size_, value_t *pipe_); + + // Signal all the matching pipes. + void match (prefix_t data_, + size_t size_, + void (*func_) (value_t *pipe_, void *arg_), + void *arg_); + + private: + bool add_helper (prefix_t prefix_, size_t size_, value_t *pipe_); + void rm_helper (value_t *pipe_, + unsigned char **buff_, + size_t buffsize_, + size_t maxbuffsize_, + void (*func_) (prefix_t data_, size_t size_, void *arg_), + void *arg_, + bool call_on_uniq_); + bool rm_helper (prefix_t prefix_, size_t size_, value_t *pipe_); + bool is_redundant () const; + + typedef std::set pipes_t; + pipes_t *pipes; + + unsigned char min; + unsigned short count; + unsigned short live_nodes; + union + { + class generic_mtrie_t *node; + class generic_mtrie_t **table; + } next; + + generic_mtrie_t (const generic_mtrie_t &); + const generic_mtrie_t & + operator= (const generic_mtrie_t &); +}; +} + +#endif diff --git a/src/generic_mtrie_impl.hpp b/src/generic_mtrie_impl.hpp new file mode 100644 index 00000000..f17df75c --- /dev/null +++ b/src/generic_mtrie_impl.hpp @@ -0,0 +1,450 @@ +/* +Copyright (c) 2018 Contributors as noted in the AUTHORS file + +This file is part of libzmq, the ZeroMQ core engine in C++. + +libzmq is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License (LGPL) as published +by the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +As a special exception, the Contributors give you permission to link +this library with independent modules to produce an executable, +regardless of the license terms of these independent modules, and to +copy and distribute the resulting executable under terms of your choice, +provided that you also meet, for each linked independent module, the +terms and conditions of the license of that module. An independent +module is a module which is not derived from or based on this library. +If you modify this library, you must extend this exception to your +version of the library. + +libzmq is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public +License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see . +*/ + +#ifndef __ZMQ_GENERIC_MTRIE_IMPL_HPP_INCLUDED__ +#define __ZMQ_GENERIC_MTRIE_IMPL_HPP_INCLUDED__ + + +#include + +#include +#include + +#include "err.hpp" +#include "pipe.hpp" +#include "macros.hpp" +#include "generic_mtrie.hpp" + +template +zmq::generic_mtrie_t::generic_mtrie_t () : + pipes (0), + min (0), + count (0), + live_nodes (0) +{ +} + +template zmq::generic_mtrie_t::~generic_mtrie_t () +{ + LIBZMQ_DELETE (pipes); + + if (count == 1) { + zmq_assert (next.node); + LIBZMQ_DELETE (next.node); + } else if (count > 1) { + for (unsigned short i = 0; i != count; ++i) { + LIBZMQ_DELETE (next.table[i]); + } + free (next.table); + } +} + +template +bool zmq::generic_mtrie_t::add (prefix_t prefix_, + size_t size_, + value_t *pipe_) +{ + return add_helper (prefix_, size_, pipe_); +} + +template +bool zmq::generic_mtrie_t::add_helper (prefix_t prefix_, + size_t size_, + value_t *pipe_) +{ + // We are at the node corresponding to the prefix. We are done. + if (!size_) { + bool result = !pipes; + if (!pipes) { + pipes = new (std::nothrow) pipes_t; + alloc_assert (pipes); + } + pipes->insert (pipe_); + return result; + } + + unsigned char c = *prefix_; + if (c < min || c >= min + count) { + // The character is out of range of currently handled + // characters. We have to extend the table. + if (!count) { + min = c; + count = 1; + next.node = NULL; + } else if (count == 1) { + unsigned char oldc = min; + generic_mtrie_t *oldp = next.node; + count = (min < c ? c - min : min - c) + 1; + next.table = + (generic_mtrie_t **) malloc (sizeof (generic_mtrie_t *) * count); + alloc_assert (next.table); + for (unsigned short i = 0; i != count; ++i) + next.table[i] = 0; + min = std::min (min, c); + next.table[oldc - min] = oldp; + } else if (min < c) { + // The new character is above the current character range. + unsigned short old_count = count; + count = c - min + 1; + next.table = (generic_mtrie_t **) realloc ( + next.table, sizeof (generic_mtrie_t *) * count); + alloc_assert (next.table); + for (unsigned short i = old_count; i != count; i++) + next.table[i] = NULL; + } else { + // The new character is below the current character range. + unsigned short old_count = count; + count = (min + old_count) - c; + next.table = (generic_mtrie_t **) realloc ( + next.table, sizeof (generic_mtrie_t *) * count); + alloc_assert (next.table); + memmove (next.table + min - c, next.table, + old_count * sizeof (generic_mtrie_t *)); + for (unsigned short i = 0; i != min - c; i++) + next.table[i] = NULL; + min = c; + } + } + + // If next node does not exist, create one. + if (count == 1) { + if (!next.node) { + next.node = new (std::nothrow) generic_mtrie_t; + alloc_assert (next.node); + ++live_nodes; + } + return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); + } else { + if (!next.table[c - min]) { + next.table[c - min] = new (std::nothrow) generic_mtrie_t; + alloc_assert (next.table[c - min]); + ++live_nodes; + } + return next.table[c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); + } +} + + +template +void zmq::generic_mtrie_t::rm (value_t *pipe_, + void (*func_) (prefix_t data_, + size_t size_, + void *arg_), + void *arg_, + bool call_on_uniq_) +{ + unsigned char *buff = NULL; + rm_helper (pipe_, &buff, 0, 0, func_, arg_, call_on_uniq_); + free (buff); +} + +template +void zmq::generic_mtrie_t::rm_helper (value_t *pipe_, + unsigned char **buff_, + size_t buffsize_, + size_t maxbuffsize_, + void (*func_) (prefix_t data_, + size_t size_, + void *arg_), + void *arg_, + bool call_on_uniq_) +{ + // Remove the subscription from this node. + if (pipes && pipes->erase (pipe_)) { + if (!call_on_uniq_ || pipes->empty ()) { + func_ (*buff_, buffsize_, arg_); + } + + if (pipes->empty ()) { + LIBZMQ_DELETE (pipes); + } + } + + // Adjust the buffer. + if (buffsize_ >= maxbuffsize_) { + maxbuffsize_ = buffsize_ + 256; + *buff_ = (unsigned char *) realloc (*buff_, maxbuffsize_); + alloc_assert (*buff_); + } + + // If there are no subnodes in the trie, return. + if (count == 0) + return; + + // If there's one subnode (optimisation). + if (count == 1) { + (*buff_)[buffsize_] = min; + buffsize_++; + next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, func_, + arg_, call_on_uniq_); + + // Prune the node if it was made redundant by the removal + if (next.node->is_redundant ()) { + LIBZMQ_DELETE (next.node); + count = 0; + --live_nodes; + zmq_assert (live_nodes == 0); + } + return; + } + + // If there are multiple subnodes. + // + // New min non-null character in the node table after the removal + unsigned char new_min = min + count - 1; + // New max non-null character in the node table after the removal + unsigned char new_max = min; + for (unsigned short c = 0; c != count; c++) { + (*buff_)[buffsize_] = min + c; + if (next.table[c]) { + next.table[c]->rm_helper (pipe_, buff_, buffsize_ + 1, maxbuffsize_, + func_, arg_, call_on_uniq_); + + // Prune redundant nodes from the mtrie + if (next.table[c]->is_redundant ()) { + LIBZMQ_DELETE (next.table[c]); + + zmq_assert (live_nodes > 0); + --live_nodes; + } else { + // The node is not redundant, so it's a candidate for being + // the new min/max node. + // + // We loop through the node array from left to right, so the + // first non-null, non-redundant node encountered is the new + // minimum index. Conversely, the last non-redundant, non-null + // node encountered is the new maximum index. + if (c + min < new_min) + new_min = c + min; + if (c + min > new_max) + new_max = c + min; + } + } + } + + zmq_assert (count > 1); + + // Free the node table if it's no longer used. + if (live_nodes == 0) { + free (next.table); + next.table = NULL; + count = 0; + } + // Compact the node table if possible + else if (live_nodes == 1) { + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + zmq_assert (new_min == new_max); + zmq_assert (new_min >= min && new_min < min + count); + generic_mtrie_t *node = next.table[new_min - min]; + zmq_assert (node); + free (next.table); + next.node = node; + count = 1; + min = new_min; + } else if (new_min > min || new_max < min + count - 1) { + zmq_assert (new_max - new_min + 1 > 1); + + generic_mtrie_t **old_table = next.table; + zmq_assert (new_min > min || new_max < min + count - 1); + zmq_assert (new_min >= min); + zmq_assert (new_max <= min + count - 1); + zmq_assert (new_max - new_min + 1 < count); + + count = new_max - new_min + 1; + next.table = + (generic_mtrie_t **) malloc (sizeof (generic_mtrie_t *) * count); + alloc_assert (next.table); + + memmove (next.table, old_table + (new_min - min), + sizeof (generic_mtrie_t *) * count); + free (old_table); + + min = new_min; + } +} + +template +bool zmq::generic_mtrie_t::rm (prefix_t prefix_, + size_t size_, + value_t *pipe_) +{ + return rm_helper (prefix_, size_, pipe_); +} + +template +bool zmq::generic_mtrie_t::rm_helper (prefix_t prefix_, + size_t size_, + value_t *pipe_) +{ + if (!size_) { + if (pipes) { + typename pipes_t::size_type erased = pipes->erase (pipe_); + zmq_assert (erased == 1); + if (pipes->empty ()) { + LIBZMQ_DELETE (pipes); + } + } + return !pipes; + } + + unsigned char c = *prefix_; + if (!count || c < min || c >= min + count) + return false; + + generic_mtrie_t *next_node = count == 1 ? next.node : next.table[c - min]; + + if (!next_node) + return false; + + bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); + + if (next_node->is_redundant ()) { + LIBZMQ_DELETE (next_node); + zmq_assert (count > 0); + + if (count == 1) { + next.node = 0; + count = 0; + --live_nodes; + zmq_assert (live_nodes == 0); + } else { + next.table[c - min] = 0; + zmq_assert (live_nodes > 1); + --live_nodes; + + // Compact the table if possible + if (live_nodes == 1) { + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + unsigned short i; + for (i = 0; i < count; ++i) + if (next.table[i]) + break; + + zmq_assert (i < count); + min += i; + count = 1; + generic_mtrie_t *oldp = next.table[i]; + free (next.table); + next.node = oldp; + } else if (c == min) { + // We can compact the table "from the left" + unsigned short i; + for (i = 1; i < count; ++i) + if (next.table[i]) + break; + + zmq_assert (i < count); + min += i; + count -= i; + generic_mtrie_t **old_table = next.table; + next.table = (generic_mtrie_t **) malloc ( + sizeof (generic_mtrie_t *) * count); + alloc_assert (next.table); + memmove (next.table, old_table + i, + sizeof (generic_mtrie_t *) * count); + free (old_table); + } else if (c == min + count - 1) { + // We can compact the table "from the right" + unsigned short i; + for (i = 1; i < count; ++i) + if (next.table[count - 1 - i]) + break; + + zmq_assert (i < count); + count -= i; + generic_mtrie_t **old_table = next.table; + next.table = (generic_mtrie_t **) malloc ( + sizeof (generic_mtrie_t *) * count); + alloc_assert (next.table); + memmove (next.table, old_table, + sizeof (generic_mtrie_t *) * count); + free (old_table); + } + } + } + + return ret; +} + +template +void zmq::generic_mtrie_t::match (prefix_t data_, + size_t size_, + void (*func_) (value_t *pipe_, void *arg_), + void *arg_) +{ + generic_mtrie_t *current = this; + while (true) { + // Signal the pipes attached to this node. + if (current->pipes) { + for (typename pipes_t::iterator it = current->pipes->begin (); + it != current->pipes->end (); ++it) + func_ (*it, arg_); + } + + // If we are at the end of the message, there's nothing more to match. + if (!size_) + break; + + // If there are no subnodes in the trie, return. + if (current->count == 0) + break; + + // If there's one subnode (optimisation). + if (current->count == 1) { + if (data_[0] != current->min) + break; + current = current->next.node; + data_++; + size_--; + continue; + } + + // If there are multiple subnodes. + if (data_[0] < current->min + || data_[0] >= current->min + current->count) + break; + if (!current->next.table[data_[0] - current->min]) + break; + current = current->next.table[data_[0] - current->min]; + data_++; + size_--; + } +} + +template bool zmq::generic_mtrie_t::is_redundant () const +{ + return !pipes && live_nodes == 0; +} + + +#endif diff --git a/src/mtrie.cpp b/src/mtrie.cpp index 7b22e4ba..30a84e32 100644 --- a/src/mtrie.cpp +++ b/src/mtrie.cpp @@ -28,389 +28,10 @@ */ #include "precompiled.hpp" -#include - -#include -#include - -#include "err.hpp" -#include "pipe.hpp" -#include "macros.hpp" #include "mtrie.hpp" +#include "generic_mtrie_impl.hpp" -zmq::mtrie_t::mtrie_t () : pipes (0), min (0), count (0), live_nodes (0) +namespace zmq { -} - -zmq::mtrie_t::~mtrie_t () -{ - LIBZMQ_DELETE (pipes); - - if (count == 1) { - zmq_assert (next.node); - LIBZMQ_DELETE (next.node); - } else if (count > 1) { - for (unsigned short i = 0; i != count; ++i) { - LIBZMQ_DELETE (next.table[i]); - } - free (next.table); - } -} - -bool zmq::mtrie_t::add (prefix_t prefix_, size_t size_, pipe_t *pipe_) -{ - return add_helper (prefix_, size_, pipe_); -} - -bool zmq::mtrie_t::add_helper (prefix_t prefix_, size_t size_, pipe_t *pipe_) -{ - // We are at the node corresponding to the prefix. We are done. - if (!size_) { - bool result = !pipes; - if (!pipes) { - pipes = new (std::nothrow) pipes_t; - alloc_assert (pipes); - } - pipes->insert (pipe_); - return result; - } - - unsigned char c = *prefix_; - if (c < min || c >= min + count) { - // The character is out of range of currently handled - // characters. We have to extend the table. - if (!count) { - min = c; - count = 1; - next.node = NULL; - } else if (count == 1) { - unsigned char oldc = min; - mtrie_t *oldp = next.node; - count = (min < c ? c - min : min - c) + 1; - next.table = (mtrie_t **) malloc (sizeof (mtrie_t *) * count); - alloc_assert (next.table); - for (unsigned short i = 0; i != count; ++i) - next.table[i] = 0; - min = std::min (min, c); - next.table[oldc - min] = oldp; - } else if (min < c) { - // The new character is above the current character range. - unsigned short old_count = count; - count = c - min + 1; - next.table = - (mtrie_t **) realloc (next.table, sizeof (mtrie_t *) * count); - alloc_assert (next.table); - for (unsigned short i = old_count; i != count; i++) - next.table[i] = NULL; - } else { - // The new character is below the current character range. - unsigned short old_count = count; - count = (min + old_count) - c; - next.table = - (mtrie_t **) realloc (next.table, sizeof (mtrie_t *) * count); - alloc_assert (next.table); - memmove (next.table + min - c, next.table, - old_count * sizeof (mtrie_t *)); - for (unsigned short i = 0; i != min - c; i++) - next.table[i] = NULL; - min = c; - } - } - - // If next node does not exist, create one. - if (count == 1) { - if (!next.node) { - next.node = new (std::nothrow) mtrie_t; - alloc_assert (next.node); - ++live_nodes; - } - return next.node->add_helper (prefix_ + 1, size_ - 1, pipe_); - } else { - if (!next.table[c - min]) { - next.table[c - min] = new (std::nothrow) mtrie_t; - alloc_assert (next.table[c - min]); - ++live_nodes; - } - return next.table[c - min]->add_helper (prefix_ + 1, size_ - 1, pipe_); - } -} - - -void zmq::mtrie_t::rm (pipe_t *pipe_, - void (*func_) (prefix_t data_, size_t size_, void *arg_), - void *arg_, - bool call_on_uniq_) -{ - unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_, call_on_uniq_); - free (buff); -} - -void zmq::mtrie_t::rm_helper (pipe_t *pipe_, - unsigned char **buff_, - size_t buffsize_, - size_t maxbuffsize_, - void (*func_) (prefix_t data_, - size_t size_, - void *arg_), - void *arg_, - bool call_on_uniq_) -{ - // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_)) { - if (!call_on_uniq_ || pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - } - - if (pipes->empty ()) { - LIBZMQ_DELETE (pipes); - } - } - - // Adjust the buffer. - if (buffsize_ >= maxbuffsize_) { - maxbuffsize_ = buffsize_ + 256; - *buff_ = (unsigned char *) realloc (*buff_, maxbuffsize_); - alloc_assert (*buff_); - } - - // If there are no subnodes in the trie, return. - if (count == 0) - return; - - // If there's one subnode (optimisation). - if (count == 1) { - (*buff_)[buffsize_] = min; - buffsize_++; - next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, func_, - arg_, call_on_uniq_); - - // Prune the node if it was made redundant by the removal - if (next.node->is_redundant ()) { - LIBZMQ_DELETE (next.node); - count = 0; - --live_nodes; - zmq_assert (live_nodes == 0); - } - return; - } - - // If there are multiple subnodes. - // - // New min non-null character in the node table after the removal - unsigned char new_min = min + count - 1; - // New max non-null character in the node table after the removal - unsigned char new_max = min; - for (unsigned short c = 0; c != count; c++) { - (*buff_)[buffsize_] = min + c; - if (next.table[c]) { - next.table[c]->rm_helper (pipe_, buff_, buffsize_ + 1, maxbuffsize_, - func_, arg_, call_on_uniq_); - - // Prune redundant nodes from the mtrie - if (next.table[c]->is_redundant ()) { - LIBZMQ_DELETE (next.table[c]); - - zmq_assert (live_nodes > 0); - --live_nodes; - } else { - // The node is not redundant, so it's a candidate for being - // the new min/max node. - // - // We loop through the node array from left to right, so the - // first non-null, non-redundant node encountered is the new - // minimum index. Conversely, the last non-redundant, non-null - // node encountered is the new maximum index. - if (c + min < new_min) - new_min = c + min; - if (c + min > new_max) - new_max = c + min; - } - } - } - - zmq_assert (count > 1); - - // Free the node table if it's no longer used. - if (live_nodes == 0) { - free (next.table); - next.table = NULL; - count = 0; - } - // Compact the node table if possible - else if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - zmq_assert (new_min == new_max); - zmq_assert (new_min >= min && new_min < min + count); - mtrie_t *node = next.table[new_min - min]; - zmq_assert (node); - free (next.table); - next.node = node; - count = 1; - min = new_min; - } else if (new_min > min || new_max < min + count - 1) { - zmq_assert (new_max - new_min + 1 > 1); - - mtrie_t **old_table = next.table; - zmq_assert (new_min > min || new_max < min + count - 1); - zmq_assert (new_min >= min); - zmq_assert (new_max <= min + count - 1); - zmq_assert (new_max - new_min + 1 < count); - - count = new_max - new_min + 1; - next.table = (mtrie_t **) malloc (sizeof (mtrie_t *) * count); - alloc_assert (next.table); - - memmove (next.table, old_table + (new_min - min), - sizeof (mtrie_t *) * count); - free (old_table); - - min = new_min; - } -} - -bool zmq::mtrie_t::rm (prefix_t prefix_, size_t size_, pipe_t *pipe_) -{ - return rm_helper (prefix_, size_, pipe_); -} - -bool zmq::mtrie_t::rm_helper (prefix_t prefix_, size_t size_, pipe_t *pipe_) -{ - if (!size_) { - if (pipes) { - pipes_t::size_type erased = pipes->erase (pipe_); - zmq_assert (erased == 1); - if (pipes->empty ()) { - LIBZMQ_DELETE (pipes); - } - } - return !pipes; - } - - unsigned char c = *prefix_; - if (!count || c < min || c >= min + count) - return false; - - mtrie_t *next_node = count == 1 ? next.node : next.table[c - min]; - - if (!next_node) - return false; - - bool ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); - - if (next_node->is_redundant ()) { - LIBZMQ_DELETE (next_node); - zmq_assert (count > 0); - - if (count == 1) { - next.node = 0; - count = 0; - --live_nodes; - zmq_assert (live_nodes == 0); - } else { - next.table[c - min] = 0; - zmq_assert (live_nodes > 1); - --live_nodes; - - // Compact the table if possible - if (live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - unsigned short i; - for (i = 0; i < count; ++i) - if (next.table[i]) - break; - - zmq_assert (i < count); - min += i; - count = 1; - mtrie_t *oldp = next.table[i]; - free (next.table); - next.node = oldp; - } else if (c == min) { - // We can compact the table "from the left" - unsigned short i; - for (i = 1; i < count; ++i) - if (next.table[i]) - break; - - zmq_assert (i < count); - min += i; - count -= i; - mtrie_t **old_table = next.table; - next.table = (mtrie_t **) malloc (sizeof (mtrie_t *) * count); - alloc_assert (next.table); - memmove (next.table, old_table + i, sizeof (mtrie_t *) * count); - free (old_table); - } else if (c == min + count - 1) { - // We can compact the table "from the right" - unsigned short i; - for (i = 1; i < count; ++i) - if (next.table[count - 1 - i]) - break; - - zmq_assert (i < count); - count -= i; - mtrie_t **old_table = next.table; - next.table = (mtrie_t **) malloc (sizeof (mtrie_t *) * count); - alloc_assert (next.table); - memmove (next.table, old_table, sizeof (mtrie_t *) * count); - free (old_table); - } - } - } - - return ret; -} - -void zmq::mtrie_t::match (prefix_t data_, - size_t size_, - void (*func_) (pipe_t *pipe_, void *arg_), - void *arg_) -{ - mtrie_t *current = this; - while (true) { - // Signal the pipes attached to this node. - if (current->pipes) { - for (pipes_t::iterator it = current->pipes->begin (); - it != current->pipes->end (); ++it) - func_ (*it, arg_); - } - - // If we are at the end of the message, there's nothing more to match. - if (!size_) - break; - - // If there are no subnodes in the trie, return. - if (current->count == 0) - break; - - // If there's one subnode (optimisation). - if (current->count == 1) { - if (data_[0] != current->min) - break; - current = current->next.node; - data_++; - size_--; - continue; - } - - // If there are multiple subnodes. - if (data_[0] < current->min - || data_[0] >= current->min + current->count) - break; - if (!current->next.table[data_[0] - current->min]) - break; - current = current->next.table[data_[0] - current->min]; - data_++; - size_--; - } -} - -bool zmq::mtrie_t::is_redundant () const -{ - return !pipes && live_nodes == 0; +template class generic_mtrie_t; } diff --git a/src/mtrie.hpp b/src/mtrie.hpp index ec93ad30..4d5080af 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -30,76 +30,23 @@ #ifndef __ZMQ_MTRIE_HPP_INCLUDED__ #define __ZMQ_MTRIE_HPP_INCLUDED__ -#include -#include +#include "generic_mtrie.hpp" -#include "stdint.hpp" +#if __cplusplus >= 201103L || defined(_MSC_VER) +#define ZMQ_HAS_EXTERN_TEMPLATE 1 +#else +#define ZMQ_HAS_EXTERN_TEMPLATE 0 +#endif namespace zmq { class pipe_t; -// Multi-trie. Each node in the trie is a set of pointers to pipes. +#if ZMQ_HAS_EXTERN_TEMPLATE +extern template class generic_mtrie_t; +#endif -class mtrie_t -{ - public: - typedef const unsigned char *prefix_t; - - mtrie_t (); - ~mtrie_t (); - - // Add key to the trie. Returns true if it's a new subscription - // rather than a duplicate. - bool add (prefix_t prefix_, size_t size_, zmq::pipe_t *pipe_); - - // Remove all subscriptions for a specific peer from the trie. - // The call_on_uniq_ flag controls if the callback is invoked - // when there are no subscriptions left on some topics or on - // every removal. - void - rm (zmq::pipe_t *pipe_, - void (*func_) (const unsigned char *data_, size_t size_, void *arg_), - void *arg_, - bool call_on_uniq_); - - // Remove specific subscription from the trie. Return true is it was - // actually removed rather than de-duplicated. - bool rm (prefix_t prefix_, size_t size_, zmq::pipe_t *pipe_); - - // Signal all the matching pipes. - void match (prefix_t data_, - size_t size_, - void (*func_) (zmq::pipe_t *pipe_, void *arg_), - void *arg_); - - private: - bool add_helper (prefix_t prefix_, size_t size_, zmq::pipe_t *pipe_); - void rm_helper (zmq::pipe_t *pipe_, - unsigned char **buff_, - size_t buffsize_, - size_t maxbuffsize_, - void (*func_) (prefix_t data_, size_t size_, void *arg_), - void *arg_, - bool call_on_uniq_); - bool rm_helper (prefix_t prefix_, size_t size_, zmq::pipe_t *pipe_); - bool is_redundant () const; - - typedef std::set pipes_t; - pipes_t *pipes; - - unsigned char min; - unsigned short count; - unsigned short live_nodes; - union - { - class mtrie_t *node; - class mtrie_t **table; - } next; - - mtrie_t (const mtrie_t &); - const mtrie_t &operator= (const mtrie_t &); -}; +typedef generic_mtrie_t mtrie_t; } #endif diff --git a/unittests/CMakeLists.txt b/unittests/CMakeLists.txt index 57ebdda7..137f4a18 100644 --- a/unittests/CMakeLists.txt +++ b/unittests/CMakeLists.txt @@ -4,6 +4,7 @@ cmake_minimum_required(VERSION "2.8.1") set(unittests unittest_ypipe unittest_poller + unittest_mtrie ) #IF (ENABLE_DRAFTS) diff --git a/unittests/unittest_mtrie.cpp b/unittests/unittest_mtrie.cpp new file mode 100644 index 00000000..6e0ec1ed --- /dev/null +++ b/unittests/unittest_mtrie.cpp @@ -0,0 +1,113 @@ +/* +Copyright (c) 2018 Contributors as noted in the AUTHORS file + +This file is part of 0MQ. + +0MQ is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +0MQ is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see . +*/ + +#include "../tests/testutil.hpp" + +#if defined(min) +#undef min +#endif + +#include + +#include + +void setUp () +{ +} +void tearDown () +{ +} + +void test_create () +{ + zmq::generic_mtrie_t mtrie; +} + +void mtrie_count (int *pipe, void *arg) +{ + LIBZMQ_UNUSED (pipe); + int *count = static_cast (arg); + ++*count; +} + +void test_check_empty_match_nonempty_data () +{ + zmq::generic_mtrie_t mtrie; + const zmq::generic_mtrie_t::prefix_t test_name = + reinterpret_cast::prefix_t> ("foo"); + + int count = 0; + mtrie.match (test_name, 3, mtrie_count, &count); + TEST_ASSERT_EQUAL_INT (0, count); +} + +void test_check_empty_match_empty_data () +{ + zmq::generic_mtrie_t mtrie; + + int count = 0; + mtrie.match (NULL, 0, mtrie_count, &count); + TEST_ASSERT_EQUAL_INT (0, count); +} + +void test_add_single_entry_match_exact () +{ + int pipe; + + zmq::generic_mtrie_t mtrie; + const zmq::generic_mtrie_t::prefix_t test_name = + reinterpret_cast::prefix_t> ("foo"); + + bool res = mtrie.add (test_name, 3, &pipe); + TEST_ASSERT_TRUE (res); + + int count = 0; + mtrie.match (test_name, 3, mtrie_count, &count); + TEST_ASSERT_EQUAL_INT (1, count); +} + +void test_add_rm_single_entry_match_exact () +{ + int pipe; + zmq::generic_mtrie_t mtrie; + const zmq::generic_mtrie_t::prefix_t test_name = + reinterpret_cast::prefix_t> ("foo"); + + mtrie.add (test_name, 3, &pipe); + bool res = mtrie.rm (test_name, 3, &pipe); + TEST_ASSERT_TRUE (res); + + int count = 0; + mtrie.match (test_name, 3, mtrie_count, &count); + TEST_ASSERT_EQUAL_INT (0, count); +} + +int main (void) +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_create); + RUN_TEST (test_check_empty_match_nonempty_data); + RUN_TEST (test_check_empty_match_empty_data); + RUN_TEST (test_add_single_entry_match_exact); + RUN_TEST (test_add_rm_single_entry_match_exact); + + return UNITY_END (); +}