diff --git a/Makefile.am b/Makefile.am index 51a91602..8b31b1a9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -367,7 +367,6 @@ test_apps = \ tests/test_socketopt_hwm \ tests/test_heartbeats \ tests/test_stream_exceeds_buffer \ - tests/test_thread_safe_polling \ tests/test_poller tests_test_system_SOURCES = tests/test_system.cpp @@ -573,9 +572,6 @@ tests_test_heartbeats_LDADD = src/libzmq.la tests_test_stream_exceeds_buffer_SOURCES = tests/test_stream_exceeds_buffer.cpp tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la -tests_test_thread_safe_polling_SOURCES = tests/test_thread_safe_polling.cpp -tests_test_thread_safe_polling_LDADD = src/libzmq.la - tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = src/libzmq.la diff --git a/include/zmq.h b/include/zmq.h index 68925048..11a37d71 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -383,8 +383,6 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events); -ZMQ_EXPORT int zmq_add_pollfd (void *s, void *p); -ZMQ_EXPORT int zmq_remove_pollfd (void *s, void *p); /******************************************************************************/ /* I/O multiplexing. */ @@ -411,22 +409,6 @@ typedef struct zmq_pollitem_t ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); -/******************************************************************************/ -/* Pollfd polling on thread safe socket */ -/******************************************************************************/ - -ZMQ_EXPORT void *zmq_pollfd_new (); -ZMQ_EXPORT int zmq_pollfd_close (void *p); -ZMQ_EXPORT void zmq_pollfd_recv (void *p); -ZMQ_EXPORT int zmq_pollfd_wait (void *p, int timeout_); -ZMQ_EXPORT int zmq_pollfd_poll (void *p, zmq_pollitem_t *items, int nitems, long timeout); - -#if defined _WIN32 -ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p); -#else -ZMQ_EXPORT int zmq_pollfd_fd (void *p); -#endif - /******************************************************************************/ /* Poller polling on sockets,fd and threaf safe sockets */ /******************************************************************************/ diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index b37c37ba..48ee06af 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -32,10 +32,13 @@ zmq::socket_poller_t::socket_poller_t () : tag (0xCAFEBABE), - poll_set (NULL), - poll_events (NULL) -{ - pollfd = zmq_pollfd_new (); + need_rebuild (true), + use_signaler (false) +#if defined ZMQ_POLL_BASED_ON_POLL + , + pollfds (NULL) +#endif +{ } zmq::socket_poller_t::~socket_poller_t () @@ -43,27 +46,22 @@ zmq::socket_poller_t::~socket_poller_t () // Mark the socket_poller as dead tag = 0xdeadbeef; - for (events_t::iterator it = events.begin(); it != events.end(); ++it) { + for (items_t::iterator it = items.begin(); it != items.end(); ++it) { if (it->socket) { int thread_safe; size_t thread_safe_size = sizeof(int); - if (zmq_getsockopt(it->socket, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe) - zmq_remove_pollfd(it->socket, pollfd); + if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe) + it->socket->remove_signaler (&signaler); } } - zmq_pollfd_close (pollfd); - - if (poll_set) { - free (poll_set); - poll_set = NULL; - } - - if (poll_events) { - free (poll_events); - poll_events = NULL; +#if defined ZMQ_POLL_BASED_ON_POLL + if (pollfds) { + free (pollfds); + pollfds = NULL; } +#endif } bool zmq::socket_poller_t::check_tag () @@ -71,9 +69,9 @@ bool zmq::socket_poller_t::check_tag () return tag == 0xCAFEBABE; } -int zmq::socket_poller_t::add (void *socket_, void* user_data_, short events_) +int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_) { - for (events_t::iterator it = events.begin (); it != events.end (); ++it) { + for (items_t::iterator it = items.begin (); it != items.end (); ++it) { if (it->socket == socket_) { errno = EINVAL; return -1; @@ -82,52 +80,48 @@ int zmq::socket_poller_t::add (void *socket_, void* user_data_, short events_) int thread_safe; size_t thread_safe_size = sizeof(int); - - if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) + + if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) return -1; if (thread_safe) { - if (zmq_add_pollfd (socket_, pollfd) == -1) + if (socket_->add_signaler (&signaler) == -1) return -1; } - event_t event = {socket_, 0, user_data_, events_}; - events.push_back (event); + item_t item = {socket_, 0, user_data_, events_}; + items.push_back (item); need_rebuild = true; return 0; } -#if defined _WIN32 -int zmq::socket_poller_t::add_fd (SOCKET fd_, void *user_data_, short events_) -#else -int zmq::socket_poller_t::add_fd (int fd_, void *user_data_, short events_) -#endif +int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_) { - for (events_t::iterator it = events.begin (); it != events.end (); ++it) { + for (items_t::iterator it = items.begin (); it != items.end (); ++it) { if (!it->socket && it->fd == fd_) { errno = EINVAL; return -1; } } - event_t event = {NULL, fd_, user_data_, events_}; - events.push_back (event); + item_t item = {NULL, fd_, user_data_, events_}; + items.push_back (item); need_rebuild = true; return 0; } -int zmq::socket_poller_t::modify (void *socket_, short events_) +int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_) { - events_t::iterator it; + items_t::iterator it; - for (it = events.begin (); it != events.end (); ++it) { + for (it = items.begin (); it != items.end (); ++it) { if (it->socket == socket_) break; } - if (it == events.end()) { + if (it == items.end()) { errno = EINVAL; return -1; } @@ -139,20 +133,16 @@ int zmq::socket_poller_t::modify (void *socket_, short events_) } -#if defined _WIN32 -int zmq::socket_poller_t::modify_fd (SOCKET fd_, short events_) -#else -int zmq::socket_poller_t::modify_fd (int fd_, short events_) -#endif +int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_) { - events_t::iterator it; + items_t::iterator it; - for (it = events.begin (); it != events.end (); ++it) { + for (it = items.begin (); it != items.end (); ++it) { if (!it->socket && it->fd == fd_) break; } - if (it == events.end()) { + if (it == items.end()) { errno = EINVAL; return -1; } @@ -164,118 +154,499 @@ int zmq::socket_poller_t::modify_fd (int fd_, short events_) } -int zmq::socket_poller_t::remove (void* socket_) +int zmq::socket_poller_t::remove (socket_base_t *socket_) { - events_t::iterator it; + items_t::iterator it; - for (it = events.begin (); it != events.end (); ++it) { + for (it = items.begin (); it != items.end (); ++it) { if (it->socket == socket_) break; } - if (it == events.end()) { + if (it == items.end()) { errno = EINVAL; return -1; } - + int thread_safe; size_t thread_safe_size = sizeof(int); - if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) + if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) return -1; if (thread_safe) { - if (zmq_remove_pollfd (socket_, pollfd) == -1) + if (socket_->remove_signaler (&signaler) == -1) return -1; } - events.erase (it); + items.erase (it); need_rebuild = true; return 0; } -#if defined _WIN32 -int zmq::socket_poller_t::remove_fd (SOCKET fd_) -#else -int zmq::socket_poller_t::remove_fd (int fd_) -#endif +int zmq::socket_poller_t::remove_fd (fd_t fd_) { - events_t::iterator it; + items_t::iterator it; - for (it = events.begin (); it != events.end (); ++it) { + for (it = items.begin (); it != items.end (); ++it) { if (!it->socket && it->fd == fd_) break; } - if (it == events.end()) { + if (it == items.end()) { errno = EINVAL; return -1; } - events.erase (it); + items.erase (it); need_rebuild = true; return 0; -} +} + +int zmq::socket_poller_t::rebuild () +{ +#if defined ZMQ_POLL_BASED_ON_POLL + + if (pollfds) { + free (pollfds); + pollfds = NULL; + } + + use_signaler = false; + + poll_size = 0; + + for (items_t::iterator it = items.begin (); it != items.end (); ++it) { + if (it->events) { + if (it->socket) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) + return -1; + + if (thread_safe) { + if (!use_signaler) { + use_signaler = true; + poll_size++; + } + } + else + poll_size++; + } + else + poll_size++; + } + } + + if (poll_size == 0) + return 0; + + pollfds = (pollfd*) malloc (poll_size * sizeof (pollfd)); + alloc_assert (pollfds); + + int item_nbr = 0; + + if (use_signaler) { + item_nbr = 1; + pollfds[0].fd = signaler.get_fd(); + pollfds[0].events = POLLIN; + } + + for (items_t::iterator it = items.begin (); it != items.end (); ++it) { + if (it->events) { + if (it->socket) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) + return -1; + + if (!thread_safe) { + size_t fd_size = sizeof (zmq::fd_t); + if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) { + return -1; + } + + pollfds [item_nbr].events = POLLIN; + item_nbr++; + } + } + else { + pollfds [item_nbr].fd = it->fd; + pollfds [item_nbr].events = + (it->events & ZMQ_POLLIN ? POLLIN : 0) | + (it->events & ZMQ_POLLOUT ? POLLOUT : 0) | + (it->events & ZMQ_POLLPRI ? POLLPRI : 0); + it->pollfd_index = item_nbr; + item_nbr++; + } + } + } + + #elif defined ZMQ_POLL_BASED_ON_SELECT + + FD_ZERO (&pollset_in); + FD_ZERO (&pollset_out); + FD_ZERO (&pollset_err); + + // Ensure we do not attempt to select () on more than FD_SETSIZE + // file descriptors. + zmq_assert (items.size () <= FD_SETSIZE); + + poll_size = 0; + + use_signaler = false; + + for (items_t::iterator it = items.begin (); it != items.end (); ++it) { + if (it->socket) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) + return -1; + + if (thread_safe && it->events) { + use_signaler = true; + FD_SET (signaler.get_fd (), &pollset_in); + poll_size = 1; + break; + } + } + } + + maxfd = 0; + + // Build the fd_sets for passing to select (). + for (items_t::iterator it = items.begin (); it != items.end (); ++it) { + if (it->events) { + // If the poll item is a 0MQ socket we are interested in input on the + // notification file descriptor retrieved by the ZMQ_FD socket option. + if (it->socket) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1) + return -1; + + if (!thread_safe) { + zmq::fd_t notify_fd; + size_t fd_size = sizeof (zmq::fd_t); + if (it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size) == -1) + return -1; + + FD_SET (notify_fd, &pollset_in); + if (maxfd < notify_fd) + maxfd = notify_fd; + + poll_size++; + } + } + // Else, the poll item is a raw file descriptor. Convert the poll item + // events to the appropriate fd_sets. + else { + if (it->events & ZMQ_POLLIN) + FD_SET (it->fd, &pollset_in); + if (it->events & ZMQ_POLLOUT) + FD_SET (it->fd, &pollset_out); + if (it->events & ZMQ_POLLERR) + FD_SET (it->fd, &pollset_err); + if (maxfd < it->fd) + maxfd = it->fd; + + poll_size++; + } + } + } + +#endif + + need_rebuild = false; + return 0; +} int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_) { if (need_rebuild) - rebuild (); + if (rebuild () == -1) + return -1; - int rc = zmq_pollfd_poll (pollfd, poll_set, poll_size, timeout_); - - if (rc == -1) { - return rc; +#if defined ZMQ_POLL_BASED_ON_POLL + if (unlikely (poll_size == 0)) { + if (timeout_ == 0) + return 0; +#if defined ZMQ_HAVE_WINDOWS + Sleep (timeout_ > 0 ? timeout_ : INFINITE); + return 0; +#elif defined ZMQ_HAVE_ANDROID + usleep (timeout_ * 1000); + return 0; +#else + return usleep (timeout_ * 1000); +#endif } - if (rc == 0) { - errno = ETIMEDOUT; - return -1; - } + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + + bool first_pass = true; - for (int i = 0; i < poll_size; i++) { - if ((poll_set [i].revents & poll_events [i].events) != 0) { - *event_ = poll_events[i]; - event_->events = poll_set [i].revents & poll_events [i].events; + while (true) { + // Compute the timeout for the subsequent poll. + int timeout; + if (first_pass) + timeout = 0; + else + if (timeout_ < 0) + timeout = -1; + else + timeout = end - now; + // Wait for events. + while (true) { + int rc = poll (pollfds, poll_size, timeout); + if (rc == -1 && errno == EINTR) { + return -1; + } + errno_assert (rc >= 0); break; } + + // Receive the signal from pollfd + if (use_signaler && pollfds[0].revents & POLLIN) + signaler.recv (); + + // Check for the events. + for (items_t::iterator it = items.begin (); it != items.end (); ++it) { + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (it->socket) { + size_t events_size = sizeof (uint32_t); + uint32_t events; + if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) { + return -1; + } + + if (it->events & events) { + event_->socket = it->socket; + event_->user_data = it->user_data; + event_->events = it->events & events; + + // If there is event to return, we can exit immediately. + return 0; + } + } + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + short revents = pollfds [it->pollfd_index].revents; + short events = 0; + + if (revents & POLLIN) + events |= ZMQ_POLLIN; + if (revents & POLLOUT) + events |= ZMQ_POLLOUT; + if (revents & POLLPRI) + events |= ZMQ_POLLPRI; + if (revents & ~(POLLIN | POLLOUT | POLLPRI)) + events |= ZMQ_POLLERR; + + if (events) { + event_->socket = NULL; + event_->user_data = it->user_data; + event_->fd = it->fd; + event_->events = events; + + // If there is event to return, we can exit immediately. + return 0; + } + } + } + + // If timeout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) + break; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) { + if (first_pass) + first_pass = false; + continue; + } + + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + timeout_; + if (now == end) + break; + first_pass = false; + continue; + } + + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + break; } + + errno = ETIMEDOUT; + return -1; + +#elif defined ZMQ_POLL_BASED_ON_SELECT + + if (unlikely (poll_size == 0)) { + if (timeout_ == 0) + return 0; +#if defined ZMQ_HAVE_WINDOWS + Sleep (timeout_ > 0 ? timeout_ : INFINITE); + return 0; +#else + return usleep (timeout_ * 1000); +#endif + } + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + + bool first_pass = true; + fd_set inset, outset, errset; + + while (true) { + + // Compute the timeout for the subsequent poll. + timeval timeout; + timeval *ptimeout; + if (first_pass) { + timeout.tv_sec = 0; + timeout.tv_usec = 0; + ptimeout = &timeout; + } + else + if (timeout_ < 0) + ptimeout = NULL; + else { + timeout.tv_sec = (long) ((end - now) / 1000); + timeout.tv_usec = (long) ((end - now) % 1000 * 1000); + ptimeout = &timeout; + } + + // Wait for events. Ignore interrupts if there's infinite timeout. + while (true) { + memcpy (&inset, &pollset_in, sizeof (fd_set)); + memcpy (&outset, &pollset_out, sizeof (fd_set)); + memcpy (&errset, &pollset_err, sizeof (fd_set)); +#if defined ZMQ_HAVE_WINDOWS + int rc = select (0, &inset, &outset, &errset, ptimeout); + if (unlikely (rc == SOCKET_ERROR)) { + errno = zmq::wsa_error_to_errno (WSAGetLastError ()); + wsa_assert (errno == ENOTSOCK); + return -1; + } +#else + int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); + if (unlikely (rc == -1)) { + errno_assert (errno == EINTR || errno == EBADF); + return -1; + } +#endif + break; + } + + if (use_signaler && FD_ISSET (signaler.get_fd (), &inset)) + signaler.recv (); + + // Check for the events. + for (items_t::iterator it = items.begin (); it != items.end (); ++it) { + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (it->socket) { + size_t events_size = sizeof (uint32_t); + uint32_t events; + if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) + return -1; + + if (it->events & events) { + event_->socket = it->socket; + event_->user_data = it->user_data; + event_->events = it->events & events; + + // If there is event to return, we can exit immediately. + return 0; + } + } + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + short events = 0; + + if (FD_ISSET (it->fd, &inset)) + events |= ZMQ_POLLIN; + if (FD_ISSET (it->fd, &outset)) + events |= ZMQ_POLLOUT; + if (FD_ISSET (it->fd, &errset)) + events |= ZMQ_POLLERR; - return 0; + if (events) { + event_->socket = NULL; + event_->user_data = it->user_data; + event_->fd = it->fd; + event_->events = events; + + // If there is event to return, we can exit immediately. + return 0; + } + } + } + + // If timeout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) + break; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) { + if (first_pass) + first_pass = false; + continue; + } + + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + timeout_; + if (now == end) + break; + first_pass = false; + continue; + } + + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + break; + } + + errno = ETIMEDOUT; + return -1; + +#else + // Exotic platforms that support neither poll() nor select(). + errno = ENOTSUP; + return -1; +#endif } -void zmq::socket_poller_t::rebuild () -{ - if (poll_set) { - free (poll_set); - poll_set = NULL; - } - if (poll_events) { - free (poll_events); - poll_events = NULL; - } - - poll_size = events.size (); - - poll_set = (zmq_pollitem_t*) malloc (poll_size * sizeof (zmq_pollitem_t)); - alloc_assert (poll_set); - - poll_events = (event_t*) malloc (poll_size * sizeof (event_t)); - - int event_nbr = 0; - for (events_t::iterator it = events.begin (); it != events.end (); ++it, event_nbr++) { - poll_set [event_nbr].socket = it->socket; - - if (!it->socket) - poll_set [event_nbr].fd = it->fd; - - poll_set [event_nbr].events = it->events; - poll_events [event_nbr] = *it; - } - - need_rebuild = false; -} diff --git a/src/socket_poller.hpp b/src/socket_poller.hpp index 95f7dad0..95e5a1fa 100644 --- a/src/socket_poller.hpp +++ b/src/socket_poller.hpp @@ -30,10 +30,23 @@ #ifndef __ZMQ_SOCKET_POLLER_HPP_INCLUDED__ #define __ZMQ_SOCKET_POLLER_HPP_INCLUDED__ +#include "poller.hpp" + +#if defined ZMQ_POLL_BASED_ON_POLL +#include +#endif + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#endif + #include #include -#include "../include/zmq.h" +#include "socket_base.hpp" +#include "signaler.hpp" namespace zmq { @@ -46,28 +59,19 @@ namespace zmq typedef struct event_t { - void *socket; -#if defined _WIN32 - SOCKET fd; -#else - int fd; -#endif + socket_base_t *socket; + fd_t fd; void *user_data; short events; } event_t; - int add (void *socket, void *user_data, short events); - int modify (void *socket, short events); - int remove (void *socket); -#if defined _WIN32 - int add_fd (SOCKET fd, void *user_data, short events); - int modify_fd (SOCKET fd, short events); - int remove_fd (SOCKET fd); -#else - int add_fd (int fd, void *user_data, short events); - int modify_fd (int fd, short events); - int remove_fd (int fd); -#endif + int add (socket_base_t *socket, void *user_data, short events); + int modify (socket_base_t *socket, short events); + int remove (socket_base_t *socket); + + int add_fd (fd_t fd, void *user_data, short events); + int modify_fd (fd_t fd, short events); + int remove_fd (fd_t fd); int wait (event_t *event, long timeout); @@ -75,29 +79,45 @@ namespace zmq bool check_tag (); private: - void rebuild (); + int rebuild (); // Used to check whether the object is a socket_poller. uint32_t tag; - // Pollfd used for thread safe sockets polling - void *pollfd; + // Signaler used for thread safe sockets polling + signaler_t signaler; + + typedef struct item_t { + socket_base_t *socket; + fd_t fd; + void *user_data; + short events; +#if defined ZMQ_POLL_BASED_ON_POLL + int pollfd_index; +#endif + } item_t; // List of sockets - typedef std::vector events_t; - events_t events; - - // Current zmq_poll set - zmq_pollitem_t *poll_set; - - // Matching set to events - event_t *poll_events; - - // Size of the pollset - int poll_size; + typedef std::vector items_t; + items_t items; // Does the pollset needs rebuilding? bool need_rebuild; + + // Should the signaler be used for the thread safe polling? + bool use_signaler; + + // Size of the pollset + int poll_size; + +#if defined ZMQ_POLL_BASED_ON_POLL + pollfd *pollfds; +#elif defined ZMQ_POLL_BASED_ON_SELECT + fd_set pollset_in; + fd_set pollset_out; + fd_set pollset_err; + zmq::fd_t maxfd; +#endif socket_poller_t (const socket_poller_t&); const socket_poller_t &operator = (const socket_poller_t&); diff --git a/src/zmq.cpp b/src/zmq.cpp index 4413b61c..7bb21409 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -565,34 +565,6 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_) return nread; } -// Add/remove pollfd from a socket - -int zmq_add_pollfd (void *s_, void *p_) -{ - if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { - errno = ENOTSOCK; - return -1; - } - zmq::socket_base_t *s = (zmq::socket_base_t *) s_; - zmq::signaler_t *p = (zmq::signaler_t *) p_; - - return s->add_signaler(p); -} - -int zmq_remove_pollfd (void *s_, void *p_) -{ - if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { - errno = ENOTSOCK; - return -1; - } - zmq::socket_base_t *s = (zmq::socket_base_t *) s_; - zmq::signaler_t *p = (zmq::signaler_t *) p_; - - return s->remove_signaler(p); -} - - - // Message manipulators. int zmq_msg_init (zmq_msg_t *msg_) @@ -1070,495 +1042,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #endif } -// Create pollfd - -void *zmq_pollfd_new () -{ - return new zmq::signaler_t (); -} - -// Close pollfd - -int zmq_pollfd_close (void* p_) -{ - zmq::signaler_t *s = (zmq::signaler_t*)p_; - LIBZMQ_DELETE(s); - return 0; -} - -// Recv signal from pollfd - -void zmq_pollfd_recv(void *p_) -{ - zmq::signaler_t *s = (zmq::signaler_t*)p_; - s->recv (); -} - -// Wait until pollfd is signalled - -int zmq_pollfd_wait(void *p_, int timeout_) -{ - zmq::signaler_t *s = (zmq::signaler_t*)p_; - return s->wait (timeout_); -} - -// Get pollfd fd - -#if defined _WIN32 -SOCKET zmq_pollfd_fd (void *p_) -#else -int zmq_pollfd_fd (void *p_) -#endif -{ - zmq::signaler_t *s = (zmq::signaler_t*)p_; - return s->get_fd (); -} - -// Polling thread safe sockets version - -int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout_) -{ -#if defined ZMQ_POLL_BASED_ON_POLL - if (unlikely (nitems_ < 0)) { - errno = EINVAL; - return -1; - } - if (unlikely (nitems_ == 0)) { - if (timeout_ == 0) - return 0; -#if defined ZMQ_HAVE_WINDOWS - Sleep (timeout_ > 0 ? timeout_ : INFINITE); - return 0; -#elif defined ZMQ_HAVE_ANDROID - usleep (timeout_ * 1000); - return 0; -#else - return usleep (timeout_ * 1000); -#endif - } - - if (!items_) { - errno = EFAULT; - return -1; - } - - zmq::clock_t clock; - uint64_t now = 0; - uint64_t end = 0; - pollfd spollfds[ZMQ_POLLITEMS_DFLT]; - pollfd *pollfds = spollfds; - int pollfds_size = 0; - int pollfds_index = 0; - bool use_pollfd = false; - - for (int i = 0; i != nitems_; i++) { - if (items_ [i].socket) { - int thread_safe; - size_t thread_safe_size = sizeof(int); - - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size) == -1) { - return -1; - } - - // All thread safe sockets share same fd - if (thread_safe) { - - // if poll fd is not set yet and events are set for this socket - if (!use_pollfd && items_ [i].events) { - use_pollfd = true; - pollfds_size++; - } - } - else - pollfds_size++; - } - else - pollfds_size++; - } - - if (pollfds_size > ZMQ_POLLITEMS_DFLT) { - pollfds = (pollfd*) malloc (pollfds_size * sizeof (pollfd)); - alloc_assert (pollfds); - } - - // If we have at least one thread safe socket we set pollfd first - if (use_pollfd) { - pollfds [0].fd = zmq_pollfd_fd (p_); - pollfds [0].events = POLLIN; - pollfds_index = 1; - } - - // Build pollset for poll () system call. - for (int i = 0; i != nitems_; i++) { - - // If the poll item is a 0MQ socket, we poll on the file descriptor - // retrieved by the ZMQ_FD socket option. - if (items_ [i].socket) { - int thread_safe; - size_t thread_safe_size = sizeof(int); - - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size) == -1) { - if (pollfds != spollfds) - free (pollfds); - return -1; - } - - // We already handled the thread safe sockets - if (!thread_safe) { - size_t zmq_fd_size = sizeof (zmq::fd_t); - if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [pollfds_index].fd, - &zmq_fd_size) == -1) { - if (pollfds != spollfds) - free (pollfds); - return -1; - } - pollfds [pollfds_index].events = items_ [i].events ? POLLIN : 0; - pollfds_index++; - } - } - // Else, the poll item is a raw file descriptor. Just convert the - // events to normal POLLIN/POLLOUT for poll (). - else { - pollfds [pollfds_index].fd = items_ [i].fd; - pollfds [pollfds_index].events = - (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | - (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) | - (items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0); - pollfds_index++; - } - } - - bool first_pass = true; - int nevents = 0; - - while (true) { - // Compute the timeout for the subsequent poll. - int timeout; - if (first_pass) - timeout = 0; - else - if (timeout_ < 0) - timeout = -1; - else - timeout = end - now; - - // Wait for events. - while (true) { - int rc = poll (pollfds, pollfds_size, timeout); - if (rc == -1 && errno == EINTR) { - if (pollfds != spollfds) - free (pollfds); - return -1; - } - errno_assert (rc >= 0); - break; - } - - // Receive the signal from pollfd - if (use_pollfd && pollfds[0].revents & POLLIN) - zmq_pollfd_recv (p_); - - // Check for the events. - for (int i = 0; i != nitems_; i++) { - - items_ [i].revents = 0; - - // The poll item is a 0MQ socket. Retrieve pending events - // using the ZMQ_EVENTS socket option. - if (items_ [i].socket) { - size_t zmq_events_size = sizeof (uint32_t); - uint32_t zmq_events; - if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, - &zmq_events_size) == -1) { - if (pollfds != spollfds) - free (pollfds); - return -1; - } - if ((items_ [i].events & ZMQ_POLLOUT) && - (zmq_events & ZMQ_POLLOUT)) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && - (zmq_events & ZMQ_POLLIN)) - items_ [i].revents |= ZMQ_POLLIN; - } - // Else, the poll item is a raw file descriptor, simply convert - // the events to zmq_pollitem_t-style format. - else { - if (pollfds [i].revents & POLLIN) - items_ [i].revents |= ZMQ_POLLIN; - if (pollfds [i].revents & POLLOUT) - items_ [i].revents |= ZMQ_POLLOUT; - if (pollfds [i].revents & POLLPRI) - items_ [i].revents |= ZMQ_POLLPRI; - if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI)) - items_ [i].revents |= ZMQ_POLLERR; - } - - if (items_ [i].revents) - nevents++; - } - - // If timeout is zero, exit immediately whether there are events or not. - if (timeout_ == 0) - break; - - // If there are events to return, we can exit immediately. - if (nevents) - break; - - // At this point we are meant to wait for events but there are none. - // If timeout is infinite we can just loop until we get some events. - if (timeout_ < 0) { - if (first_pass) - first_pass = false; - continue; - } - - // The timeout is finite and there are no events. In the first pass - // we get a timestamp of when the polling have begun. (We assume that - // first pass have taken negligible time). We also compute the time - // when the polling should time out. - if (first_pass) { - now = clock.now_ms (); - end = now + timeout_; - if (now == end) - break; - first_pass = false; - continue; - } - - // Find out whether timeout have expired. - now = clock.now_ms (); - if (now >= end) - break; - } - - if (pollfds != spollfds) - free (pollfds); - return nevents; - -#elif defined ZMQ_POLL_BASED_ON_SELECT - - if (unlikely (nitems_ < 0)) { - errno = EINVAL; - return -1; - } - if (unlikely (nitems_ == 0)) { - if (timeout_ == 0) - return 0; -#if defined ZMQ_HAVE_WINDOWS - Sleep (timeout_ > 0 ? timeout_ : INFINITE); - return 0; -#else - return usleep (timeout_ * 1000); -#endif - } - zmq::clock_t clock; - uint64_t now = 0; - uint64_t end = 0; - - // Ensure we do not attempt to select () on more than FD_SETSIZE - // file descriptors. - zmq_assert (nitems_ <= FD_SETSIZE); - - fd_set pollset_in; - FD_ZERO (&pollset_in); - fd_set pollset_out; - FD_ZERO (&pollset_out); - fd_set pollset_err; - FD_ZERO (&pollset_err); - - bool use_pollfd = false; - - for (int i = 0; i != nitems_; i++) { - if (items_ [i].socket) { - int thread_safe; - size_t thread_safe_size = sizeof(int); - - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size) == -1) - return -1; - - if (thread_safe && items_ [i].events) { - use_pollfd = true; - FD_SET (zmq_pollfd_fd (p_), &pollset_in); - break; - } - } - } - - zmq::fd_t maxfd = 0; - - // Build the fd_sets for passing to select (). - for (int i = 0; i != nitems_; i++) { - - // If the poll item is a 0MQ socket we are interested in input on the - // notification file descriptor retrieved by the ZMQ_FD socket option. - if (items_ [i].socket) { - int thread_safe; - size_t thread_safe_size = sizeof(int); - - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size) == -1) - return -1; - - if (!thread_safe) { - zmq::fd_t notify_fd; - size_t zmq_fd_size = sizeof (zmq::fd_t); - if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, - &zmq_fd_size) == -1) - return -1; - - if (items_ [i].events) { - FD_SET (notify_fd, &pollset_in); - if (maxfd < notify_fd) - maxfd = notify_fd; - } - } - } - // Else, the poll item is a raw file descriptor. Convert the poll item - // events to the appropriate fd_sets. - else { - if (items_ [i].events & ZMQ_POLLIN) - FD_SET (items_ [i].fd, &pollset_in); - if (items_ [i].events & ZMQ_POLLOUT) - FD_SET (items_ [i].fd, &pollset_out); - if (items_ [i].events & ZMQ_POLLERR) - FD_SET (items_ [i].fd, &pollset_err); - if (maxfd < items_ [i].fd) - maxfd = items_ [i].fd; - } - } - - bool first_pass = true; - int nevents = 0; - fd_set inset, outset, errset; - - while (true) { - - // Compute the timeout for the subsequent poll. - timeval timeout; - timeval *ptimeout; - if (first_pass) { - timeout.tv_sec = 0; - timeout.tv_usec = 0; - ptimeout = &timeout; - } - else - if (timeout_ < 0) - ptimeout = NULL; - else { - timeout.tv_sec = (long) ((end - now) / 1000); - timeout.tv_usec = (long) ((end - now) % 1000 * 1000); - ptimeout = &timeout; - } - - // Wait for events. Ignore interrupts if there's infinite timeout. - while (true) { - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); -#if defined ZMQ_HAVE_WINDOWS - int rc = select (0, &inset, &outset, &errset, ptimeout); - if (unlikely (rc == SOCKET_ERROR)) { - errno = zmq::wsa_error_to_errno (WSAGetLastError ()); - wsa_assert (errno == ENOTSOCK); - return -1; - } -#else - int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); - if (unlikely (rc == -1)) { - errno_assert (errno == EINTR || errno == EBADF); - return -1; - } -#endif - break; - } - - if (use_pollfd && FD_ISSET (zmq_pollfd_fd (p_), &inset)) - zmq_pollfd_recv (p_); - - // Check for the events. - for (int i = 0; i != nitems_; i++) { - - items_ [i].revents = 0; - - // The poll item is a 0MQ socket. Retrieve pending events - // using the ZMQ_EVENTS socket option. - if (items_ [i].socket) { - size_t zmq_events_size = sizeof (uint32_t); - uint32_t zmq_events; - if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, - &zmq_events_size) == -1) - return -1; - if ((items_ [i].events & ZMQ_POLLOUT) && - (zmq_events & ZMQ_POLLOUT)) - items_ [i].revents |= ZMQ_POLLOUT; - if ((items_ [i].events & ZMQ_POLLIN) && - (zmq_events & ZMQ_POLLIN)) - items_ [i].revents |= ZMQ_POLLIN; - } - // Else, the poll item is a raw file descriptor, simply convert - // the events to zmq_pollitem_t-style format. - else { - if (FD_ISSET (items_ [i].fd, &inset)) - items_ [i].revents |= ZMQ_POLLIN; - if (FD_ISSET (items_ [i].fd, &outset)) - items_ [i].revents |= ZMQ_POLLOUT; - if (FD_ISSET (items_ [i].fd, &errset)) - items_ [i].revents |= ZMQ_POLLERR; - } - - if (items_ [i].revents) - nevents++; - } - - // If timeout is zero, exit immediately whether there are events or not. - if (timeout_ == 0) - break; - - // If there are events to return, we can exit immediately. - if (nevents) - break; - - // At this point we are meant to wait for events but there are none. - // If timeout is infinite we can just loop until we get some events. - if (timeout_ < 0) { - if (first_pass) - first_pass = false; - continue; - } - - // The timeout is finite and there are no events. In the first pass - // we get a timestamp of when the polling have begun. (We assume that - // first pass have taken negligible time). We also compute the time - // when the polling should time out. - if (first_pass) { - now = clock.now_ms (); - end = now + timeout_; - if (now == end) - break; - first_pass = false; - continue; - } - - // Find out whether timeout have expired. - now = clock.now_ms (); - if (now >= end) - break; - } - - return nevents; - -#else - // Exotic platforms that support neither poll() nor select(). - errno = ENOTSUP; - return -1; -#endif -} - // The poller functionality void* zmq_poller_new () @@ -1579,14 +1062,20 @@ int zmq_poller_close (void *poller_) return 0; } -int zmq_poller_add (void *poller_, void *socket_, void *user_data_, short events_) +int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_) { if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { errno = EFAULT; return -1; } - return ((zmq::socket_poller_t*)poller_)->add (socket_, user_data_, events_); + if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *socket = (zmq::socket_base_t*)s_; + + return ((zmq::socket_poller_t*)poller_)->add (socket, user_data_, events_); } #if defined _WIN32 @@ -1604,14 +1093,20 @@ int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_, short events_) } -int zmq_poller_modify (void *poller_, void *socket_, short events_) +int zmq_poller_modify (void *poller_, void *s_, short events_) { if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { errno = EFAULT; return -1; } - return ((zmq::socket_poller_t*)poller_)->modify (socket_, events_); + if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *socket = (zmq::socket_base_t*)s_; + + return ((zmq::socket_poller_t*)poller_)->modify (socket, events_); } @@ -1630,13 +1125,19 @@ int zmq_poller_modify_fd (void *poller_, int fd_, short events_) } -int zmq_poller_remove (void *poller_, void *socket) +int zmq_poller_remove (void *poller_, void *s_) { if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { errno = EFAULT; return -1; } + if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *socket = (zmq::socket_base_t*)s_; + return ((zmq::socket_poller_t*)poller_)->remove (socket); } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ddc6d78b..cae45258 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -52,7 +52,6 @@ set(tests test_client_server test_sockopt_hwm test_heartbeats - test_thread_safe_polling test_poller ) if(NOT WIN32) diff --git a/tests/test_poller.cpp b/tests/test_poller.cpp index 36ddd51b..ec22e837 100644 --- a/tests/test_poller.cpp +++ b/tests/test_poller.cpp @@ -67,7 +67,7 @@ int main (void) // Send a message char data[1] = {'H'}; rc = zmq_send_const (vent, data, 1, 0); - assert (rc == 1); + assert (rc == 1); // We expect a message only on the sink zmq_poller_event_t event; @@ -77,7 +77,12 @@ int main (void) assert (event.user_data == sink); rc = zmq_recv (sink, data, 1, 0); assert (rc == 1); - + + // We expect timed out + rc = zmq_poller_wait (poller, &event, 0); + assert (rc == -1); + assert (errno == ETIMEDOUT); + // Stop polling sink rc = zmq_poller_remove (poller, sink); assert (rc == 0); @@ -127,7 +132,7 @@ int main (void) assert (event.socket == server); assert (event.user_data == NULL); assert (event.events == ZMQ_POLLOUT); - + // Destory poller, sockets and ctx rc = zmq_poller_close (poller); assert (rc == 0); @@ -141,7 +146,7 @@ int main (void) assert (rc == 0); rc = zmq_close (client); assert (rc == 0); - rc = zmq_ctx_shutdown (ctx); + rc = zmq_ctx_term (ctx); assert (rc == 0); return 0; diff --git a/tests/test_thread_safe_polling.cpp b/tests/test_thread_safe_polling.cpp deleted file mode 100644 index 197bcced..00000000 --- a/tests/test_thread_safe_polling.cpp +++ /dev/null @@ -1,164 +0,0 @@ -/* - Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file - - This file is part of libzmq, the ZeroMQ core engine in C++. - - libzmq is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License (LGPL) as published - by the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - As a special exception, the Contributors give you permission to link - this library with independent modules to produce an executable, - regardless of the license terms of these independent modules, and to - copy and distribute the resulting executable under terms of your choice, - provided that you also meet, for each linked independent module, the - terms and conditions of the license of that module. An independent - module is a module which is not derived from or based on this library. - If you modify this library, you must extend this exception to your - version of the library. - - libzmq is distributed in the hope that it will be useful, but WITHOUT - ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public - License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "testutil.hpp" - -void worker(void* s); - -int main (void) -{ - setup_test_environment(); - void *ctx = zmq_ctx_new (); - assert (ctx); - - void *server = zmq_socket (ctx, ZMQ_SERVER); - void *server2 = zmq_socket (ctx, ZMQ_SERVER); - void *pollfd = zmq_pollfd_new (); - - int rc; - - rc = zmq_add_pollfd (server, pollfd); - assert (rc == 0); - - rc = zmq_add_pollfd (server2, pollfd); - assert (rc == 0); - - zmq_pollitem_t items[] = { - {server, 0, ZMQ_POLLIN, 0}, - {server2, 0, ZMQ_POLLIN, 0}}; - - rc = zmq_bind (server, "tcp://127.0.0.1:5560"); - assert (rc == 0); - - rc = zmq_bind (server2, "tcp://127.0.0.1:5561"); - assert (rc == 0); - - void* t = zmq_threadstart(worker, ctx); - - assert (rc == 0); - - rc = zmq_pollfd_poll (pollfd, items, 2, -1); - assert (rc == 1); - - assert (items[0].revents == ZMQ_POLLIN); - assert (items[1].revents == 0); - - zmq_msg_t msg; - rc = zmq_msg_init(&msg); - rc = zmq_msg_recv(&msg, server, ZMQ_DONTWAIT); - assert (rc == 1); - - rc = zmq_pollfd_poll (pollfd, items, 2, -1); - assert (rc == 1); - - assert (items[0].revents == 0); - assert (items[1].revents == ZMQ_POLLIN); - - rc = zmq_msg_recv(&msg, server2, ZMQ_DONTWAIT); - assert (rc == 1); - - rc = zmq_pollfd_poll (pollfd, items, 2, 0); - assert (rc == 0); - - assert (items[0].revents == 0); - assert (items[1].revents == 0); - - zmq_threadclose(t); - - rc = zmq_msg_close(&msg); - assert (rc == 0); - - rc = zmq_remove_pollfd (server, pollfd); - assert (rc == 0); - - rc = zmq_remove_pollfd (server2, pollfd); - assert (rc == 0); - - rc = zmq_pollfd_close (pollfd); - assert (rc == 0); - - rc = zmq_close (server); - assert (rc == 0); - - rc = zmq_close (server2); - assert (rc == 0); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); - - return 0; -} - -void worker(void* ctx) -{ - void *client = zmq_socket (ctx, ZMQ_CLIENT); - - int rc = zmq_connect (client, "tcp://127.0.0.1:5560"); - assert (rc == 0); - - msleep(100); - - zmq_msg_t msg; - rc = zmq_msg_init_size(&msg,1); - assert (rc == 0); - - char * data = (char *)zmq_msg_data(&msg); - data[0] = 1; - - rc = zmq_msg_send(&msg, client, 0); - assert (rc == 1); - - rc = zmq_disconnect (client, "tcp://127.0.0.1:5560"); - assert (rc == 0); - - rc = zmq_connect (client, "tcp://127.0.0.1:5561"); - assert (rc == 0); - - msleep(100); - - rc = zmq_msg_close(&msg); - assert (rc == 0); - - rc = zmq_msg_init_size(&msg,1); - assert (rc == 0); - - data = (char *)zmq_msg_data(&msg); - data[0] = 1; - - rc = zmq_msg_send(&msg, client, 0); - assert (rc == 1); - - rc = zmq_msg_close(&msg); - assert (rc == 0); - - rc = zmq_close (client); - assert (rc == 0); -} - -