diff --git a/src/polling_util.hpp b/src/polling_util.hpp index 381a77e7..c2f3ff20 100644 --- a/src/polling_util.hpp +++ b/src/polling_util.hpp @@ -31,6 +31,7 @@ #define __ZMQ_SOCKET_POLLING_UTIL_HPP_INCLUDED__ #include +#include #include "stdint.hpp" #include "platform.hpp" @@ -68,6 +69,40 @@ template class fast_vector_t T *_buf; }; +template class resizable_fast_vector_t +{ + public: + resizable_fast_vector_t () : _dynamic_buf (NULL) {} + + void resize (const size_t nitems_) + { + if (_dynamic_buf) + _dynamic_buf->resize (nitems_); + if (nitems_ > S) { + _dynamic_buf = new (std::nothrow) std::vector; + // TODO since this function is called by a client, we could return errno == ENOMEM here + alloc_assert (_dynamic_buf); + } + } + + T *get_buf () + { + // e.g. MSVC 2008 does not have std::vector::data, so we use &...[0] + return _dynamic_buf ? &(*_dynamic_buf)[0] : _static_buf; + } + + T &operator[] (const size_t i) { return get_buf ()[i]; } + + ~resizable_fast_vector_t () { delete _dynamic_buf; } + + private: + resizable_fast_vector_t (const resizable_fast_vector_t &); + resizable_fast_vector_t &operator= (const resizable_fast_vector_t &); + + T _static_buf[S]; + std::vector *_dynamic_buf; +}; + #if defined ZMQ_POLL_BASED_ON_POLL typedef int timeout_t; @@ -104,6 +139,20 @@ class optimized_fd_set_t fast_vector_t _fd_set; }; + +class resizable_optimized_fd_set_t +{ + public: + void resize (size_t nevents_) { _fd_set.resize (nevents_); } + + fd_set *get () { return reinterpret_cast (&_fd_set[0]); } + + private: + resizable_fast_vector_t + _fd_set; +}; #else class optimized_fd_set_t { @@ -115,6 +164,14 @@ class optimized_fd_set_t private: fd_set _fd_set; }; + +class resizable_optimized_fd_set_t : public optimized_fd_set_t +{ + public: + resizable_optimized_fd_set_t () : optimized_fd_set_t (0) {} + + void resize (size_t /*nevents_*/) {} +}; #endif #endif } diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index 592f2063..c0258a6d 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -318,18 +318,22 @@ void zmq::socket_poller_t::rebuild () #elif defined ZMQ_POLL_BASED_ON_SELECT - FD_ZERO (&_pollset_in); - FD_ZERO (&_pollset_out); - FD_ZERO (&_pollset_err); - // Ensure we do not attempt to select () on more than FD_SETSIZE // file descriptors. zmq_assert (_items.size () <= FD_SETSIZE); + _pollset_in.resize (_items.size ()); + _pollset_out.resize (_items.size ()); + _pollset_err.resize (_items.size ()); + + FD_ZERO (_pollset_in.get ()); + FD_ZERO (_pollset_out.get ()); + FD_ZERO (_pollset_err.get ()); + for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { if (it->socket && is_thread_safe (*it->socket) && it->events) { _use_signaler = true; - FD_SET (_signaler->get_fd (), &_pollset_in); + FD_SET (_signaler->get_fd (), _pollset_in.get ()); _pollset_size = 1; break; } @@ -350,7 +354,7 @@ void zmq::socket_poller_t::rebuild () it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size); zmq_assert (rc == 0); - FD_SET (notify_fd, &_pollset_in); + FD_SET (notify_fd, _pollset_in.get ()); if (_max_fd < notify_fd) _max_fd = notify_fd; @@ -361,11 +365,11 @@ void zmq::socket_poller_t::rebuild () // events to the appropriate fd_sets. else { if (it->events & ZMQ_POLLIN) - FD_SET (it->fd, &_pollset_in); + FD_SET (it->fd, _pollset_in.get ()); if (it->events & ZMQ_POLLOUT) - FD_SET (it->fd, &_pollset_out); + FD_SET (it->fd, _pollset_out.get ()); if (it->events & ZMQ_POLLERR) - FD_SET (it->fd, &_pollset_err); + FD_SET (it->fd, _pollset_err.get ()); if (_max_fd < it->fd) _max_fd = it->fd; @@ -618,12 +622,12 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, // Wait for events. Ignore interrupts if there's infinite timeout. while (true) { - memcpy (inset.get (), &_pollset_in, - valid_pollset_bytes (_pollset_in)); - memcpy (outset.get (), &_pollset_out, - valid_pollset_bytes (_pollset_out)); - memcpy (errset.get (), &_pollset_err, - valid_pollset_bytes (_pollset_err)); + memcpy (inset.get (), _pollset_in.get (), + valid_pollset_bytes (*_pollset_in.get ())); + memcpy (outset.get (), _pollset_out.get (), + valid_pollset_bytes (*_pollset_out.get ())); + memcpy (errset.get (), _pollset_err.get (), + valid_pollset_bytes (*_pollset_err.get ())); const int rc = select (static_cast (_max_fd + 1), inset.get (), outset.get (), errset.get (), ptimeout); #if defined ZMQ_HAVE_WINDOWS @@ -641,7 +645,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, break; } - if (_use_signaler && FD_ISSET (_signaler->get_fd (), &inset)) + if (_use_signaler && FD_ISSET (_signaler->get_fd (), inset.get ())) _signaler->recv (); // Check for the events. diff --git a/src/socket_poller.hpp b/src/socket_poller.hpp index e989b090..5c436f0d 100644 --- a/src/socket_poller.hpp +++ b/src/socket_poller.hpp @@ -50,6 +50,7 @@ #include "socket_base.hpp" #include "signaler.hpp" +#include "polling_util.hpp" namespace zmq { @@ -135,9 +136,9 @@ class socket_poller_t #if defined ZMQ_POLL_BASED_ON_POLL pollfd *_pollfds; #elif defined ZMQ_POLL_BASED_ON_SELECT - fd_set _pollset_in; - fd_set _pollset_out; - fd_set _pollset_err; + resizable_optimized_fd_set_t _pollset_in; + resizable_optimized_fd_set_t _pollset_out; + resizable_optimized_fd_set_t _pollset_err; zmq::fd_t _max_fd; #endif