diff --git a/CMakeLists.txt b/CMakeLists.txt index 082bce5e..01936b89 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -624,6 +624,7 @@ set (cxx-sources plain_server.cpp poll.cpp poller_base.cpp + polling_util.cpp pollset.cpp proxy.cpp pub.cpp @@ -746,6 +747,7 @@ set (cxx-sources poll.hpp poller.hpp poller_base.hpp + polling_util.hpp pollset.hpp precompiled.hpp proxy.hpp diff --git a/INSTALL b/INSTALL index e244fd44..d8ea4ab6 100644 --- a/INSTALL +++ b/INSTALL @@ -48,14 +48,7 @@ cmake -H. -B -G"Visual Studio 14 2015 Win64" \ In VS 2012 it is mandatory to increase the default stack size of 1 MB to at least 2 MB due to implementation of std::map intermittently requiring -substantial amount of stack and causing stack overflow. ZeroMQ generally -needs more stack when FD_SETSIZE is higher. -In all Windows builds it is recommended to start with at least 2 MB stack -size unless application using ZeroMQ is using large number of threads which -can cause substantial consumption of virtual address space, especially if -32 bit build is used. -Generally, programmer needs to tune the stack to balance memory consumption -but never get into situation that stack is overflown. +substantial amount of stack and causing stack overflow. Windows Builds - Static ======================= diff --git a/Makefile.am b/Makefile.am index 70dd7d81..354d0492 100644 --- a/Makefile.am +++ b/Makefile.am @@ -143,6 +143,8 @@ src_libzmq_la_SOURCES = \ src/poller.hpp \ src/poller_base.cpp \ src/poller_base.hpp \ + src/polling_util.cpp \ + src/polling_util.hpp \ src/pollset.cpp \ src/pollset.hpp \ src/precompiled.cpp \ diff --git a/appveyor.yml b/appveyor.yml index 12d338ac..88ce9fd0 100644 --- a/appveyor.yml +++ b/appveyor.yml @@ -10,6 +10,7 @@ environment: CMAKE_GENERATOR: "Visual Studio 12 2013" MSVCVERSION: "v120" MSVCYEAR: "vs2013" + ENABLE_DRAFTS: ON matrix: - platform: Win32 configuration: Release @@ -47,6 +48,7 @@ environment: configuration: Release WITH_LIBSODIUM: OFF ENABLE_CURVE: OFF + ENABLE_DRAFTS: OFF - platform: Win32 configuration: Release WITH_LIBSODIUM: ON @@ -111,7 +113,7 @@ before_build: # - cmd: set BUILDLOG="%LIBZMQ_SRCDIR%\build.log" - cmd: md "%LIBZMQ_BUILDDIR%" - cd "%LIBZMQ_BUILDDIR%" - - cmd: cmake -D CMAKE_INCLUDE_PATH="%SODIUM_INCLUDE_DIR%" -D CMAKE_LIBRARY_PATH="%SODIUM_LIBRARY_DIR%" -D WITH_LIBSODIUM="%WITH_LIBSODIUM%" -D ENABLE_DRAFTS="ON" -D ENABLE_ANALYSIS="%ENABLE_ANALYSIS%" -D ENABLE_CURVE="%ENABLE_CURVE%" -D API_POLLER="%API_POLLER%" -D POLLER="%POLLER%" -D CMAKE_C_FLAGS_RELEASE="/MT" -D CMAKE_C_FLAGS_DEBUG="/MTd" -D WITH_LIBSODIUM="%WITH_LIBSODIUM%" -G "%CMAKE_GENERATOR%" "%APPVEYOR_BUILD_FOLDER%" + - cmd: cmake -D CMAKE_INCLUDE_PATH="%SODIUM_INCLUDE_DIR%" -D CMAKE_LIBRARY_PATH="%SODIUM_LIBRARY_DIR%" -D WITH_LIBSODIUM="%WITH_LIBSODIUM%" -D ENABLE_DRAFTS="%ENABLE_DRAFTS%" -D ENABLE_ANALYSIS="%ENABLE_ANALYSIS%" -D ENABLE_CURVE="%ENABLE_CURVE%" -D API_POLLER="%API_POLLER%" -D POLLER="%POLLER%" -D CMAKE_C_FLAGS_RELEASE="/MT" -D CMAKE_C_FLAGS_DEBUG="/MTd" -D WITH_LIBSODIUM="%WITH_LIBSODIUM%" -G "%CMAKE_GENERATOR%" "%APPVEYOR_BUILD_FOLDER%" - cmd: cd "%LIBZMQ_SRCDIR%" build_script: diff --git a/src/polling_util.cpp b/src/polling_util.cpp new file mode 100644 index 00000000..c8e42eb8 --- /dev/null +++ b/src/polling_util.cpp @@ -0,0 +1,51 @@ +/* + Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "polling_util.hpp" + +#if defined ZMQ_POLL_BASED_ON_POLL +#include +#include + +zmq::timeout_t zmq::compute_timeout (const bool first_pass_, + const long timeout_, + const uint64_t now_, + const uint64_t end_) +{ + if (first_pass_) + return 0; + + if (timeout_ < 0) + return -1; + + return static_cast ( + std::min (end_ - now_, INT_MAX)); +} +#endif diff --git a/src/polling_util.hpp b/src/polling_util.hpp new file mode 100644 index 00000000..c2f3ff20 --- /dev/null +++ b/src/polling_util.hpp @@ -0,0 +1,179 @@ +/* + Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SOCKET_POLLING_UTIL_HPP_INCLUDED__ +#define __ZMQ_SOCKET_POLLING_UTIL_HPP_INCLUDED__ + +#include +#include + +#include "stdint.hpp" +#include "platform.hpp" +#include "err.hpp" + +namespace zmq +{ +template class fast_vector_t +{ + public: + explicit fast_vector_t (const size_t nitems_) + { + if (nitems_ > S) { + _buf = static_cast (malloc (nitems_ * sizeof (T))); + // TODO since this function is called by a client, we could return errno == ENOMEM here + alloc_assert (_buf); + } else { + _buf = _static_buf; + } + } + + T &operator[] (const size_t i) { return _buf[i]; } + + ~fast_vector_t () + { + if (_buf != _static_buf) + free (_buf); + } + + private: + fast_vector_t (const fast_vector_t &); + fast_vector_t &operator= (const fast_vector_t &); + + T _static_buf[S]; + T *_buf; +}; + +template class resizable_fast_vector_t +{ + public: + resizable_fast_vector_t () : _dynamic_buf (NULL) {} + + void resize (const size_t nitems_) + { + if (_dynamic_buf) + _dynamic_buf->resize (nitems_); + if (nitems_ > S) { + _dynamic_buf = new (std::nothrow) std::vector; + // TODO since this function is called by a client, we could return errno == ENOMEM here + alloc_assert (_dynamic_buf); + } + } + + T *get_buf () + { + // e.g. MSVC 2008 does not have std::vector::data, so we use &...[0] + return _dynamic_buf ? &(*_dynamic_buf)[0] : _static_buf; + } + + T &operator[] (const size_t i) { return get_buf ()[i]; } + + ~resizable_fast_vector_t () { delete _dynamic_buf; } + + private: + resizable_fast_vector_t (const resizable_fast_vector_t &); + resizable_fast_vector_t &operator= (const resizable_fast_vector_t &); + + T _static_buf[S]; + std::vector *_dynamic_buf; +}; + +#if defined ZMQ_POLL_BASED_ON_POLL +typedef int timeout_t; + +timeout_t compute_timeout (const bool first_pass_, + const long timeout_, + const uint64_t now_, + const uint64_t end_); + +#elif defined ZMQ_POLL_BASED_ON_SELECT +inline size_t valid_pollset_bytes (const fd_set &pollset_) +{ +#if defined ZMQ_HAVE_WINDOWS + // On Windows we don't need to copy the whole fd_set. + // SOCKETS are continuous from the beginning of fd_array in fd_set. + // We just need to copy fd_count elements of fd_array. + // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE. + return reinterpret_cast ( + &pollset_.fd_array[pollset_.fd_count]) + - reinterpret_cast (&pollset_); +#else + return sizeof (fd_set); +#endif +} + +#if defined ZMQ_HAVE_WINDOWS +class optimized_fd_set_t +{ + public: + explicit optimized_fd_set_t (size_t nevents_) : _fd_set (nevents_) {} + + fd_set *get () { return reinterpret_cast (&_fd_set[0]); } + + private: + fast_vector_t + _fd_set; +}; + +class resizable_optimized_fd_set_t +{ + public: + void resize (size_t nevents_) { _fd_set.resize (nevents_); } + + fd_set *get () { return reinterpret_cast (&_fd_set[0]); } + + private: + resizable_fast_vector_t + _fd_set; +}; +#else +class optimized_fd_set_t +{ + public: + explicit optimized_fd_set_t (size_t /*nevents_*/) {} + + fd_set *get () { return &_fd_set; } + + private: + fd_set _fd_set; +}; + +class resizable_optimized_fd_set_t : public optimized_fd_set_t +{ + public: + resizable_optimized_fd_set_t () : optimized_fd_set_t (0) {} + + void resize (size_t /*nevents_*/) {} +}; +#endif +#endif +} + +#endif diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index ba7ea9eb..c0258a6d 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -30,6 +30,7 @@ #include "precompiled.hpp" #include "socket_poller.hpp" #include "err.hpp" +#include "polling_util.hpp" #include @@ -41,10 +42,7 @@ static bool is_thread_safe (zmq::socket_base_t &socket_) zmq::socket_poller_t::socket_poller_t () : _tag (0xCAFEBABE), - _signaler (NULL), - _need_rebuild (true), - _use_signaler (false), - _pollset_size (0) + _signaler (NULL) #if defined ZMQ_POLL_BASED_ON_POLL , _pollfds (NULL) @@ -53,20 +51,7 @@ zmq::socket_poller_t::socket_poller_t () : _max_fd (0) #endif { -#if defined ZMQ_POLL_BASED_ON_SELECT -#if defined ZMQ_HAVE_WINDOWS - // On Windows fd_set contains array of SOCKETs, each 4 bytes. - // For large fd_sets memset() could be expensive and it is unnecessary. - // It is enough to set fd_count to 0, exactly what FD_ZERO() macro does. - FD_ZERO (&_pollset_in); - FD_ZERO (&_pollset_out); - FD_ZERO (&_pollset_err); -#else - memset (&_pollset_in, 0, sizeof (_pollset_in)); - memset (&_pollset_out, 0, sizeof (_pollset_out)); - memset (&_pollset_err, 0, sizeof (_pollset_err)); -#endif -#endif + rebuild (); } zmq::socket_poller_t::~socket_poller_t () @@ -270,6 +255,10 @@ int zmq::socket_poller_t::remove_fd (fd_t fd_) void zmq::socket_poller_t::rebuild () { + _use_signaler = false; + _pollset_size = 0; + _need_rebuild = false; + #if defined ZMQ_POLL_BASED_ON_POLL if (_pollfds) { @@ -277,10 +266,6 @@ void zmq::socket_poller_t::rebuild () _pollfds = NULL; } - _use_signaler = false; - - _pollset_size = 0; - for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { if (it->events) { if (it->socket && is_thread_safe (*it->socket)) { @@ -333,22 +318,22 @@ void zmq::socket_poller_t::rebuild () #elif defined ZMQ_POLL_BASED_ON_SELECT - FD_ZERO (&_pollset_in); - FD_ZERO (&_pollset_out); - FD_ZERO (&_pollset_err); - // Ensure we do not attempt to select () on more than FD_SETSIZE // file descriptors. zmq_assert (_items.size () <= FD_SETSIZE); - _pollset_size = 0; + _pollset_in.resize (_items.size ()); + _pollset_out.resize (_items.size ()); + _pollset_err.resize (_items.size ()); - _use_signaler = false; + FD_ZERO (_pollset_in.get ()); + FD_ZERO (_pollset_out.get ()); + FD_ZERO (_pollset_err.get ()); for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { if (it->socket && is_thread_safe (*it->socket) && it->events) { _use_signaler = true; - FD_SET (_signaler->get_fd (), &_pollset_in); + FD_SET (_signaler->get_fd (), _pollset_in.get ()); _pollset_size = 1; break; } @@ -369,7 +354,7 @@ void zmq::socket_poller_t::rebuild () it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size); zmq_assert (rc == 0); - FD_SET (notify_fd, &_pollset_in); + FD_SET (notify_fd, _pollset_in.get ()); if (_max_fd < notify_fd) _max_fd = notify_fd; @@ -380,11 +365,11 @@ void zmq::socket_poller_t::rebuild () // events to the appropriate fd_sets. else { if (it->events & ZMQ_POLLIN) - FD_SET (it->fd, &_pollset_in); + FD_SET (it->fd, _pollset_in.get ()); if (it->events & ZMQ_POLLOUT) - FD_SET (it->fd, &_pollset_out); + FD_SET (it->fd, _pollset_out.get ()); if (it->events & ZMQ_POLLERR) - FD_SET (it->fd, &_pollset_err); + FD_SET (it->fd, _pollset_err.get ()); if (_max_fd < it->fd) _max_fd = it->fd; @@ -394,8 +379,6 @@ void zmq::socket_poller_t::rebuild () } #endif - - _need_rebuild = false; } void zmq::socket_poller_t::zero_trail_events ( @@ -617,7 +600,9 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, bool first_pass = true; - fd_set inset, outset, errset; + optimized_fd_set_t inset (_pollset_size); + optimized_fd_set_t outset (_pollset_size); + optimized_fd_set_t errset (_pollset_size); while (true) { // Compute the timeout for the subsequent poll. @@ -637,34 +622,21 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, // Wait for events. Ignore interrupts if there's infinite timeout. while (true) { + memcpy (inset.get (), _pollset_in.get (), + valid_pollset_bytes (*_pollset_in.get ())); + memcpy (outset.get (), _pollset_out.get (), + valid_pollset_bytes (*_pollset_out.get ())); + memcpy (errset.get (), _pollset_err.get (), + valid_pollset_bytes (*_pollset_err.get ())); + const int rc = select (static_cast (_max_fd + 1), inset.get (), + outset.get (), errset.get (), ptimeout); #if defined ZMQ_HAVE_WINDOWS - // On Windows we don't need to copy the whole fd_set. - // SOCKETS are continuous from the beginning of fd_array in fd_set. - // We just need to copy fd_count elements of fd_array. - // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE. - memcpy (&inset, &_pollset_in, - reinterpret_cast (_pollset_in.fd_array - + _pollset_in.fd_count) - - reinterpret_cast (&_pollset_in)); - memcpy (&outset, &_pollset_out, - reinterpret_cast (_pollset_out.fd_array - + _pollset_out.fd_count) - - reinterpret_cast (&_pollset_out)); - memcpy (&errset, &_pollset_err, - reinterpret_cast (_pollset_err.fd_array - + _pollset_err.fd_count) - - reinterpret_cast (&_pollset_err)); - int rc = select (0, &inset, &outset, &errset, ptimeout); if (unlikely (rc == SOCKET_ERROR)) { - errno = zmq::wsa_error_to_errno (WSAGetLastError ()); + errno = wsa_error_to_errno (WSAGetLastError ()); wsa_assert (errno == ENOTSOCK); return -1; } #else - memcpy (&inset, &_pollset_in, sizeof (fd_set)); - memcpy (&outset, &_pollset_out, sizeof (fd_set)); - memcpy (&errset, &_pollset_err, sizeof (fd_set)); - int rc = select (_max_fd + 1, &inset, &outset, &errset, ptimeout); if (unlikely (rc == -1)) { errno_assert (errno == EINTR || errno == EBADF); return -1; @@ -673,11 +645,12 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, break; } - if (_use_signaler && FD_ISSET (_signaler->get_fd (), &inset)) + if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ())) _signaler->recv (); // Check for the events. - int found = check_events (events_, n_events_, inset, outset, errset); + const int found = check_events (events_, n_events_, *inset.get (), + *outset.get (), *errset.get ()); if (found) { if (found > 0) zero_trail_events (events_, n_events_, found); diff --git a/src/socket_poller.hpp b/src/socket_poller.hpp index e989b090..5c436f0d 100644 --- a/src/socket_poller.hpp +++ b/src/socket_poller.hpp @@ -50,6 +50,7 @@ #include "socket_base.hpp" #include "signaler.hpp" +#include "polling_util.hpp" namespace zmq { @@ -135,9 +136,9 @@ class socket_poller_t #if defined ZMQ_POLL_BASED_ON_POLL pollfd *_pollfds; #elif defined ZMQ_POLL_BASED_ON_SELECT - fd_set _pollset_in; - fd_set _pollset_out; - fd_set _pollset_err; + resizable_optimized_fd_set_t _pollset_in; + resizable_optimized_fd_set_t _pollset_out; + resizable_optimized_fd_set_t _pollset_err; zmq::fd_t _max_fd; #endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 43423549..0ce4f531 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -41,6 +41,7 @@ #include "macros.hpp" #include "poller.hpp" +#if !defined ZMQ_HAVE_POLLER // On AIX platform, poll.h has to be included first to get consistent // definition of pollfd structure (AIX uses 'reqevents' and 'retnevents' // instead of 'events' and 'revents' and defines macros to map from POSIX-y @@ -49,6 +50,9 @@ #include #endif +#include "polling_util.hpp" +#endif + // TODO: determine if this is an issue, since zmq.h is being loaded from pch. // zmq.h must be included *after* poll.h for AIX to build properly //#include "../include/zmq.h" @@ -800,7 +804,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // if poller is present, use that. return zmq_poller_poll (items_, nitems_, timeout_); #else -#if defined ZMQ_POLL_BASED_ON_POLL +#if defined ZMQ_POLL_BASED_ON_POLL || defined ZMQ_POLL_BASED_ON_SELECT if (unlikely (nitems_ < 0)) { errno = EINVAL; return -1; @@ -811,14 +815,15 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #if defined ZMQ_HAVE_WINDOWS Sleep (timeout_ > 0 ? timeout_ : INFINITE); return 0; -#elif defined ZMQ_HAVE_ANDROID - usleep (timeout_ * 1000); - return 0; +#elif defined ZMQ_HAVE_VXWORKS + struct timespec ns_; + ns_.tv_sec = timeout_ / 1000; + ns_.tv_nsec = timeout_ % 1000 * 1000000; + return nanosleep (&ns_, 0); #else return usleep (timeout_ * 1000); #endif } - if (!items_) { errno = EFAULT; return -1; @@ -827,13 +832,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) zmq::clock_t clock; uint64_t now = 0; uint64_t end = 0; - pollfd spollfds[ZMQ_POLLITEMS_DFLT]; - pollfd *pollfds = spollfds; - - if (nitems_ > ZMQ_POLLITEMS_DFLT) { - pollfds = static_cast (malloc (nitems_ * sizeof (pollfd))); - alloc_assert (pollfds); - } +#if defined ZMQ_POLL_BASED_ON_POLL + zmq::fast_vector_t pollfds (nitems_); // Build pollset for poll () system call. for (int i = 0; i != nitems_; i++) { @@ -844,8 +844,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) if (zmq_getsockopt (items_[i].socket, ZMQ_FD, &pollfds[i].fd, &zmq_fd_size) == -1) { - if (pollfds != spollfds) - free (pollfds); return -1; } pollfds[i].events = items_[i].events ? POLLIN : 0; @@ -860,27 +858,71 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) | (items_[i].events & ZMQ_POLLPRI ? POLLPRI : 0); } } +#else + // Ensure we do not attempt to select () on more than FD_SETSIZE + // file descriptors. + // TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here + zmq_assert (nitems_ <= FD_SETSIZE); + + zmq::optimized_fd_set_t pollset_in (nitems_); + FD_ZERO (pollset_in.get ()); + zmq::optimized_fd_set_t pollset_out (nitems_); + FD_ZERO (pollset_out.get ()); + zmq::optimized_fd_set_t pollset_err (nitems_); + FD_ZERO (pollset_err.get ()); + + zmq::fd_t maxfd = 0; + + // Build the fd_sets for passing to select (). + for (int i = 0; i != nitems_; i++) { + // If the poll item is a 0MQ socket we are interested in input on the + // notification file descriptor retrieved by the ZMQ_FD socket option. + if (items_[i].socket) { + size_t zmq_fd_size = sizeof (zmq::fd_t); + zmq::fd_t notify_fd; + if (zmq_getsockopt (items_[i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) + == -1) + return -1; + if (items_[i].events) { + FD_SET (notify_fd, pollset_in.get ()); + if (maxfd < notify_fd) + maxfd = notify_fd; + } + } + // Else, the poll item is a raw file descriptor. Convert the poll item + // events to the appropriate fd_sets. + else { + if (items_[i].events & ZMQ_POLLIN) + FD_SET (items_[i].fd, pollset_in.get ()); + if (items_[i].events & ZMQ_POLLOUT) + FD_SET (items_[i].fd, pollset_out.get ()); + if (items_[i].events & ZMQ_POLLERR) + FD_SET (items_[i].fd, pollset_err.get ()); + if (maxfd < items_[i].fd) + maxfd = items_[i].fd; + } + } + + zmq::optimized_fd_set_t inset (nitems_); + zmq::optimized_fd_set_t outset (nitems_); + zmq::optimized_fd_set_t errset (nitems_); +#endif bool first_pass = true; int nevents = 0; while (true) { +#if defined ZMQ_POLL_BASED_ON_POLL + // Compute the timeout for the subsequent poll. - int timeout; - if (first_pass) - timeout = 0; - else if (timeout_ < 0) - timeout = -1; - else - timeout = - static_cast (std::min (end - now, INT_MAX)); + zmq::timeout_t timeout = + zmq::compute_timeout (first_pass, timeout_, now, end); // Wait for events. { - int rc = poll (pollfds, nitems_, timeout); + int rc = poll (&pollfds[0], nitems_, timeout); if (rc == -1 && errno == EINTR) { - if (pollfds != spollfds) - free (pollfds); return -1; } errno_assert (rc >= 0); @@ -897,8 +939,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events, &zmq_events_size) == -1) { - if (pollfds != spollfds) - free (pollfds); return -1; } if ((items_[i].events & ZMQ_POLLOUT) @@ -925,119 +965,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) nevents++; } - // If timeout is zero, exit immediately whether there are events or not. - if (timeout_ == 0) - break; - - // If there are events to return, we can exit immediately. - if (nevents) - break; - - // At this point we are meant to wait for events but there are none. - // If timeout is infinite we can just loop until we get some events. - if (timeout_ < 0) { - if (first_pass) - first_pass = false; - continue; - } - - // The timeout is finite and there are no events. In the first pass - // we get a timestamp of when the polling have begun. (We assume that - // first pass have taken negligible time). We also compute the time - // when the polling should time out. - if (first_pass) { - now = clock.now_ms (); - end = now + timeout_; - if (now == end) - break; - first_pass = false; - continue; - } - - // Find out whether timeout have expired. - now = clock.now_ms (); - if (now >= end) - break; - } - - if (pollfds != spollfds) - free (pollfds); - return nevents; - -#elif defined ZMQ_POLL_BASED_ON_SELECT - - if (unlikely (nitems_ < 0)) { - errno = EINVAL; - return -1; - } - if (unlikely (nitems_ == 0)) { - if (timeout_ == 0) - return 0; -#if defined ZMQ_HAVE_WINDOWS - Sleep (timeout_ > 0 ? timeout_ : INFINITE); - return 0; -#elif defined ZMQ_HAVE_VXWORKS - struct timespec ns_; - ns_.tv_sec = timeout_ / 1000; - ns_.tv_nsec = timeout_ % 1000 * 1000000; - return nanosleep (&ns_, 0); #else - return usleep (timeout_ * 1000); -#endif - } - zmq::clock_t clock; - uint64_t now = 0; - uint64_t end = 0; - // Ensure we do not attempt to select () on more than FD_SETSIZE - // file descriptors. - zmq_assert (nitems_ <= FD_SETSIZE); - - fd_set pollset_in; - FD_ZERO (&pollset_in); - fd_set pollset_out; - FD_ZERO (&pollset_out); - fd_set pollset_err; - FD_ZERO (&pollset_err); - - zmq::fd_t maxfd = 0; - - // Build the fd_sets for passing to select (). - for (int i = 0; i != nitems_; i++) { - // If the poll item is a 0MQ socket we are interested in input on the - // notification file descriptor retrieved by the ZMQ_FD socket option. - if (items_[i].socket) { - size_t zmq_fd_size = sizeof (zmq::fd_t); - zmq::fd_t notify_fd; - if (zmq_getsockopt (items_[i].socket, ZMQ_FD, ¬ify_fd, - &zmq_fd_size) - == -1) - return -1; - if (items_[i].events) { - FD_SET (notify_fd, &pollset_in); - if (maxfd < notify_fd) - maxfd = notify_fd; - } - } - // Else, the poll item is a raw file descriptor. Convert the poll item - // events to the appropriate fd_sets. - else { - if (items_[i].events & ZMQ_POLLIN) - FD_SET (items_[i].fd, &pollset_in); - if (items_[i].events & ZMQ_POLLOUT) - FD_SET (items_[i].fd, &pollset_out); - if (items_[i].events & ZMQ_POLLERR) - FD_SET (items_[i].fd, &pollset_err); - if (maxfd < items_[i].fd) - maxfd = items_[i].fd; - } - } - - bool first_pass = true; - int nevents = 0; - fd_set inset, outset, errset; - - while (true) { // Compute the timeout for the subsequent poll. timeval timeout; timeval *ptimeout; @@ -1055,34 +984,23 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // Wait for events. Ignore interrupts if there's infinite timeout. while (true) { + memcpy (inset.get (), pollset_in.get (), + zmq::valid_pollset_bytes (*pollset_in.get ())); + memcpy (outset.get (), pollset_out.get (), + zmq::valid_pollset_bytes (*pollset_out.get ())); + memcpy (errset.get (), pollset_err.get (), + zmq::valid_pollset_bytes (*pollset_err.get ())); #if defined ZMQ_HAVE_WINDOWS - // On Windows we don't need to copy the whole fd_set. - // SOCKETS are continuous from the beginning of fd_array in fd_set. - // We just need to copy fd_count elements of fd_array. - // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE. - memcpy (&inset, &pollset_in, - reinterpret_cast (pollset_in.fd_array - + pollset_in.fd_count) - - reinterpret_cast (&pollset_in)); - memcpy (&outset, &pollset_out, - reinterpret_cast (pollset_out.fd_array - + pollset_out.fd_count) - - reinterpret_cast (&pollset_out)); - memcpy (&errset, &pollset_err, - reinterpret_cast (pollset_err.fd_array - + pollset_err.fd_count) - - reinterpret_cast (&pollset_err)); - int rc = select (0, &inset, &outset, &errset, ptimeout); + int rc = + select (0, inset.get (), outset.get (), errset.get (), ptimeout); if (unlikely (rc == SOCKET_ERROR)) { errno = zmq::wsa_error_to_errno (WSAGetLastError ()); wsa_assert (errno == ENOTSOCK); return -1; } #else - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); - int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); + int rc = select (maxfd + 1, inset.get (), outset.get (), + errset.get (), ptimeout); if (unlikely (rc == -1)) { errno_assert (errno == EINTR || errno == EBADF); return -1; @@ -1114,17 +1032,18 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // Else, the poll item is a raw file descriptor, simply convert // the events to zmq_pollitem_t-style format. else { - if (FD_ISSET (items_[i].fd, &inset)) + if (FD_ISSET (items_[i].fd, inset.get ())) items_[i].revents |= ZMQ_POLLIN; - if (FD_ISSET (items_[i].fd, &outset)) + if (FD_ISSET (items_[i].fd, outset.get ())) items_[i].revents |= ZMQ_POLLOUT; - if (FD_ISSET (items_[i].fd, &errset)) + if (FD_ISSET (items_[i].fd, errset.get ())) items_[i].revents |= ZMQ_POLLERR; } if (items_[i].revents) nevents++; } +#endif // If timeout is zero, exit immediately whether there are events or not. if (timeout_ == 0) @@ -1162,7 +1081,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) } return nevents; - #else // Exotic platforms that support neither poll() nor select(). errno = ENOTSUP; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 8dd6aca4..df7cb1d2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -224,7 +224,7 @@ if(WIN32 AND ${POLLER} MATCHES "epoll") set_tests_properties(test_many_sockets PROPERTIES TIMEOUT 120) endif() -if(WIN32) +if(WIN32 AND ENABLE_DRAFTS) set_tests_properties(test_radio_dish PROPERTIES TIMEOUT 30) endif()