diff --git a/CMakeLists.txt b/CMakeLists.txt index 082bce5e..01936b89 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -624,6 +624,7 @@ set (cxx-sources plain_server.cpp poll.cpp poller_base.cpp + polling_util.cpp pollset.cpp proxy.cpp pub.cpp @@ -746,6 +747,7 @@ set (cxx-sources poll.hpp poller.hpp poller_base.hpp + polling_util.hpp pollset.hpp precompiled.hpp proxy.hpp diff --git a/Makefile.am b/Makefile.am index 70dd7d81..354d0492 100644 --- a/Makefile.am +++ b/Makefile.am @@ -143,6 +143,8 @@ src_libzmq_la_SOURCES = \ src/poller.hpp \ src/poller_base.cpp \ src/poller_base.hpp \ + src/polling_util.cpp \ + src/polling_util.hpp \ src/pollset.cpp \ src/pollset.hpp \ src/precompiled.cpp \ diff --git a/src/polling_util.cpp b/src/polling_util.cpp new file mode 100644 index 00000000..c8e42eb8 --- /dev/null +++ b/src/polling_util.cpp @@ -0,0 +1,51 @@ +/* + Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "polling_util.hpp" + +#if defined ZMQ_POLL_BASED_ON_POLL +#include +#include + +zmq::timeout_t zmq::compute_timeout (const bool first_pass_, + const long timeout_, + const uint64_t now_, + const uint64_t end_) +{ + if (first_pass_) + return 0; + + if (timeout_ < 0) + return -1; + + return static_cast ( + std::min (end_ - now_, INT_MAX)); +} +#endif diff --git a/src/polling_util.hpp b/src/polling_util.hpp new file mode 100644 index 00000000..381a77e7 --- /dev/null +++ b/src/polling_util.hpp @@ -0,0 +1,122 @@ +/* + Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SOCKET_POLLING_UTIL_HPP_INCLUDED__ +#define __ZMQ_SOCKET_POLLING_UTIL_HPP_INCLUDED__ + +#include + +#include "stdint.hpp" +#include "platform.hpp" +#include "err.hpp" + +namespace zmq +{ +template class fast_vector_t +{ + public: + explicit fast_vector_t (const size_t nitems_) + { + if (nitems_ > S) { + _buf = static_cast (malloc (nitems_ * sizeof (T))); + // TODO since this function is called by a client, we could return errno == ENOMEM here + alloc_assert (_buf); + } else { + _buf = _static_buf; + } + } + + T &operator[] (const size_t i) { return _buf[i]; } + + ~fast_vector_t () + { + if (_buf != _static_buf) + free (_buf); + } + + private: + fast_vector_t (const fast_vector_t &); + fast_vector_t &operator= (const fast_vector_t &); + + T _static_buf[S]; + T *_buf; +}; + +#if defined ZMQ_POLL_BASED_ON_POLL +typedef int timeout_t; + +timeout_t compute_timeout (const bool first_pass_, + const long timeout_, + const uint64_t now_, + const uint64_t end_); + +#elif defined ZMQ_POLL_BASED_ON_SELECT +inline size_t valid_pollset_bytes (const fd_set &pollset_) +{ +#if defined ZMQ_HAVE_WINDOWS + // On Windows we don't need to copy the whole fd_set. + // SOCKETS are continuous from the beginning of fd_array in fd_set. + // We just need to copy fd_count elements of fd_array. + // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE. + return reinterpret_cast ( + &pollset_.fd_array[pollset_.fd_count]) + - reinterpret_cast (&pollset_); +#else + return sizeof (fd_set); +#endif +} + +#if defined ZMQ_HAVE_WINDOWS +class optimized_fd_set_t +{ + public: + explicit optimized_fd_set_t (size_t nevents_) : _fd_set (nevents_) {} + + fd_set *get () { return reinterpret_cast (&_fd_set[0]); } + + private: + fast_vector_t + _fd_set; +}; +#else +class optimized_fd_set_t +{ + public: + explicit optimized_fd_set_t (size_t /*nevents_*/) {} + + fd_set *get () { return &_fd_set; } + + private: + fd_set _fd_set; +}; +#endif +#endif +} + +#endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 504eda6a..0ce4f531 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -41,6 +41,7 @@ #include "macros.hpp" #include "poller.hpp" +#if !defined ZMQ_HAVE_POLLER // On AIX platform, poll.h has to be included first to get consistent // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' // instead of 'events' and 'revents' and defines macros to map from POSIX-y @@ -49,6 +50,9 @@ #include #endif +#include "polling_util.hpp" +#endif + // TODO: determine if this is an issue, since zmq.h is being loaded from pch. // zmq.h must be included *after* poll.h for AIX to build properly //#include "../include/zmq.h" @@ -792,95 +796,6 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) } #endif // ZMQ_HAVE_POLLER -#if !defined ZMQ_HAVE_POLLER -template class fast_vector_t -{ - public: - fast_vector_t (const size_t nitems_) - { - if (nitems_ > S) { - _buf = static_cast (malloc (nitems_ * sizeof (T))); - // TODO since this function is called by a client, we could return errno == ENOMEM here - alloc_assert (_buf); - } else { - _buf = _static_buf; - } - } - - T &operator[] (const size_t i) { return _buf[i]; } - - ~fast_vector_t () - { - if (_buf != _static_buf) - free (_buf); - } - - private: - fast_vector_t (const fast_vector_t &); - fast_vector_t &operator= (const fast_vector_t &); - - T _static_buf[S]; - T *_buf; -}; - -#if defined ZMQ_POLL_BASED_ON_POLL -typedef int timeout_t; - -static timeout_t compute_timeout (const bool first_pass_, - const long timeout_, - const uint64_t now_, - const uint64_t end_) -{ - if (first_pass_) - return 0; - else if (timeout_ < 0) - return -1; - else - return static_cast ( - std::min (end_ - now_, INT_MAX)); -} -#elif defined ZMQ_POLL_BASED_ON_SELECT -static size_t valid_pollset_bytes (const fd_set &pollset_) -{ -#if defined ZMQ_HAVE_WINDOWS - // On Windows we don't need to copy the whole fd_set. - // SOCKETS are continuous from the beginning of fd_array in fd_set. - // We just need to copy fd_count elements of fd_array. - // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE. - return reinterpret_cast (pollset_.fd_array + pollset_.fd_count) - - reinterpret_cast (&pollset_); -#else - return sizeof (fd_set); -#endif -} - -#if defined ZMQ_HAVE_WINDOWS -class optimized_fd_set_t -{ - public: - optimized_fd_set_t (size_t nevents_) : _fd_set (nevents_) {} - - fd_set *get () { return reinterpret_cast (&_fd_set[0]); } - - private: - fast_vector_t - _fd_set; -}; -#else -class optimized_fd_set_t -{ - public: - optimized_fd_set_t (size_t /*nevents_*/) {} - - fd_set *get () { return &_fd_set; } - - private: - fd_set _fd_set; -}; -#endif -#endif -#endif - int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) { // TODO: the function implementation can just call zmq_pollfd_poll with @@ -918,7 +833,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) uint64_t now = 0; uint64_t end = 0; #if defined ZMQ_POLL_BASED_ON_POLL - fast_vector_t pollfds (nitems_); + zmq::fast_vector_t pollfds (nitems_); // Build pollset for poll () system call. for (int i = 0; i != nitems_; i++) { @@ -949,11 +864,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here zmq_assert (nitems_ <= FD_SETSIZE); - optimized_fd_set_t pollset_in (nitems_); + zmq::optimized_fd_set_t pollset_in (nitems_); FD_ZERO (pollset_in.get ()); - optimized_fd_set_t pollset_out (nitems_); + zmq::optimized_fd_set_t pollset_out (nitems_); FD_ZERO (pollset_out.get ()); - optimized_fd_set_t pollset_err (nitems_); + zmq::optimized_fd_set_t pollset_err (nitems_); FD_ZERO (pollset_err.get ()); zmq::fd_t maxfd = 0; @@ -989,9 +904,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) } } - optimized_fd_set_t inset (nitems_); - optimized_fd_set_t outset (nitems_); - optimized_fd_set_t errset (nitems_); + zmq::optimized_fd_set_t inset (nitems_); + zmq::optimized_fd_set_t outset (nitems_); + zmq::optimized_fd_set_t errset (nitems_); #endif bool first_pass = true; @@ -1001,7 +916,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #if defined ZMQ_POLL_BASED_ON_POLL // Compute the timeout for the subsequent poll. - timeout_t timeout = compute_timeout (first_pass, timeout_, now, end); + zmq::timeout_t timeout = + zmq::compute_timeout (first_pass, timeout_, now, end); // Wait for events. { @@ -1069,11 +985,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // Wait for events. Ignore interrupts if there's infinite timeout. while (true) { memcpy (inset.get (), pollset_in.get (), - valid_pollset_bytes (*pollset_in.get ())); + zmq::valid_pollset_bytes (*pollset_in.get ())); memcpy (outset.get (), pollset_out.get (), - valid_pollset_bytes (*pollset_out.get ())); + zmq::valid_pollset_bytes (*pollset_out.get ())); memcpy (errset.get (), pollset_err.get (), - valid_pollset_bytes (*pollset_err.get ())); + zmq::valid_pollset_bytes (*pollset_err.get ())); #if defined ZMQ_HAVE_WINDOWS int rc = select (0, inset.get (), outset.get (), errset.get (), ptimeout);