mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-19 00:46:05 +01:00
Merge pull request #1535 from somdoron/master
problem: trying to support polling on thread safe sockets with zmq_poll failed
This commit is contained in:
commit
709a1e9a2d
@ -410,8 +410,16 @@ typedef struct zmq_pollitem_t
|
||||
#define ZMQ_POLLITEMS_DFLT 16
|
||||
|
||||
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
|
||||
|
||||
/******************************************************************************/
|
||||
/* Pollfd polling on thread safe socket */
|
||||
/******************************************************************************/
|
||||
|
||||
ZMQ_EXPORT void *zmq_pollfd_new ();
|
||||
ZMQ_EXPORT int zmq_pollfd_close (void *p);
|
||||
ZMQ_EXPORT void zmq_pollfd_recv (void *p);
|
||||
ZMQ_EXPORT int zmq_pollfd_wait (void *p, int timeout_);
|
||||
ZMQ_EXPORT int zmq_pollfd_poll (void *p, zmq_pollitem_t *items, int nitems, long timeout);
|
||||
|
||||
#if defined _WIN32
|
||||
ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p);
|
||||
|
574
src/zmq.cpp
574
src/zmq.cpp
@ -709,38 +709,11 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
|
||||
}
|
||||
}
|
||||
|
||||
// Create pollfd
|
||||
|
||||
void *zmq_pollfd_new ()
|
||||
{
|
||||
return new zmq::signaler_t ();
|
||||
}
|
||||
|
||||
// Close pollfd
|
||||
|
||||
int zmq_pollfd_close (void* p)
|
||||
{
|
||||
zmq::signaler_t *s = (zmq::signaler_t*)p;
|
||||
LIBZMQ_DELETE(s);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Get poller fd
|
||||
#if defined _WIN32
|
||||
SOCKET zmq_pollfd_fd (void *p)
|
||||
#else
|
||||
int zmq_pollfd_fd (void *p)
|
||||
#endif
|
||||
{
|
||||
zmq::signaler_t *s = (zmq::signaler_t*)p;
|
||||
return s->get_fd ();
|
||||
}
|
||||
|
||||
// Polling.
|
||||
|
||||
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
{
|
||||
// TODO: the function implementation can just call zmq_pollfd_poll with pollfd as NULL, however pollfd is not yet stable
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
if (unlikely (nitems_ < 0)) {
|
||||
errno = EINVAL;
|
||||
@ -782,36 +755,14 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
// If the poll item is a 0MQ socket, we poll on the file descriptor
|
||||
// retrieved by the ZMQ_FD socket option.
|
||||
if (items_ [i].socket) {
|
||||
int thread_safe;
|
||||
size_t thread_safe_size = sizeof(int);
|
||||
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
|
||||
&thread_safe_size) == -1) {
|
||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
|
||||
&zmq_fd_size) == -1) {
|
||||
if (pollfds != spollfds)
|
||||
free (pollfds);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (thread_safe) {
|
||||
if (!items_ [i].fd) {
|
||||
if (pollfds != spollfds)
|
||||
free (pollfds);
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pollfds [i].fd = items_ [i].fd;
|
||||
}
|
||||
else {
|
||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
|
||||
&zmq_fd_size) == -1) {
|
||||
if (pollfds != spollfds)
|
||||
free (pollfds);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
pollfds [i].events = items_ [i].events ? POLLIN : 0;
|
||||
pollfds [i].events = items_ [i].events ? POLLIN : 0;
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor. Just convert the
|
||||
// events to normal POLLIN/POLLOUT for poll ().
|
||||
@ -967,29 +918,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
// If the poll item is a 0MQ socket we are interested in input on the
|
||||
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
||||
if (items_ [i].socket) {
|
||||
int thread_safe;
|
||||
size_t thread_safe_size = sizeof(int);
|
||||
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
|
||||
&thread_safe_size) == -1)
|
||||
return -1;
|
||||
|
||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||
zmq::fd_t notify_fd;
|
||||
|
||||
if (thread_safe) {
|
||||
if (!items_ [i].fd) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
notify_fd = items_ [i].fd;
|
||||
}
|
||||
else {
|
||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd,
|
||||
&zmq_fd_size) == -1)
|
||||
return -1;
|
||||
}
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd,
|
||||
&zmq_fd_size) == -1)
|
||||
return -1;
|
||||
if (items_ [i].events) {
|
||||
FD_SET (notify_fd, &pollset_in);
|
||||
if (maxfd < notify_fd)
|
||||
@ -1119,6 +1052,495 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find out whether timeout have expired.
|
||||
now = clock.now_ms ();
|
||||
if (now >= end)
|
||||
break;
|
||||
}
|
||||
|
||||
return nevents;
|
||||
|
||||
#else
|
||||
// Exotic platforms that support neither poll() nor select().
|
||||
errno = ENOTSUP;
|
||||
return -1;
|
||||
#endif
|
||||
}
|
||||
|
||||
// Create pollfd
|
||||
|
||||
void *zmq_pollfd_new ()
|
||||
{
|
||||
return new zmq::signaler_t ();
|
||||
}
|
||||
|
||||
// Close pollfd
|
||||
|
||||
int zmq_pollfd_close (void* p_)
|
||||
{
|
||||
zmq::signaler_t *s = (zmq::signaler_t*)p_;
|
||||
LIBZMQ_DELETE(s);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Recv signal from pollfd
|
||||
|
||||
void zmq_pollfd_recv(void *p_)
|
||||
{
|
||||
zmq::signaler_t *s = (zmq::signaler_t*)p_;
|
||||
s->recv ();
|
||||
}
|
||||
|
||||
// Wait until pollfd is signalled
|
||||
|
||||
int zmq_pollfd_wait(void *p_, int timeout_)
|
||||
{
|
||||
zmq::signaler_t *s = (zmq::signaler_t*)p_;
|
||||
return s->wait (timeout_);
|
||||
}
|
||||
|
||||
// Get pollfd fd
|
||||
|
||||
#if defined _WIN32
|
||||
SOCKET zmq_pollfd_fd (void *p_)
|
||||
#else
|
||||
int zmq_pollfd_fd (void *p_)
|
||||
#endif
|
||||
{
|
||||
zmq::signaler_t *s = (zmq::signaler_t*)p_;
|
||||
return s->get_fd ();
|
||||
}
|
||||
|
||||
// Polling thread safe sockets version
|
||||
|
||||
int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
{
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
if (unlikely (nitems_ < 0)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
if (unlikely (nitems_ == 0)) {
|
||||
if (timeout_ == 0)
|
||||
return 0;
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
|
||||
return 0;
|
||||
#elif defined ZMQ_HAVE_ANDROID
|
||||
usleep (timeout_ * 1000);
|
||||
return 0;
|
||||
#else
|
||||
return usleep (timeout_ * 1000);
|
||||
#endif
|
||||
}
|
||||
|
||||
if (!items_) {
|
||||
errno = EFAULT;
|
||||
return -1;
|
||||
}
|
||||
|
||||
zmq::clock_t clock;
|
||||
uint64_t now = 0;
|
||||
uint64_t end = 0;
|
||||
pollfd spollfds[ZMQ_POLLITEMS_DFLT];
|
||||
pollfd *pollfds = spollfds;
|
||||
int pollfds_size = 0;
|
||||
int pollfds_index = 0;
|
||||
bool use_pollfd = false;
|
||||
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
if (items_ [i].socket) {
|
||||
int thread_safe;
|
||||
size_t thread_safe_size = sizeof(int);
|
||||
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
|
||||
&thread_safe_size) == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// All thread safe sockets share same fd
|
||||
if (thread_safe) {
|
||||
|
||||
// if poll fd is not set yet and events are set for this socket
|
||||
if (!use_pollfd && items_ [i].events) {
|
||||
use_pollfd = true;
|
||||
pollfds_size++;
|
||||
}
|
||||
}
|
||||
else
|
||||
pollfds_size++;
|
||||
}
|
||||
else
|
||||
pollfds_size++;
|
||||
}
|
||||
|
||||
if (pollfds_size > ZMQ_POLLITEMS_DFLT) {
|
||||
pollfds = (pollfd*) malloc (pollfds_size * sizeof (pollfd));
|
||||
alloc_assert (pollfds);
|
||||
}
|
||||
|
||||
// If we have at least one thread safe socket we set pollfd first
|
||||
if (use_pollfd) {
|
||||
pollfds [0].fd = zmq_pollfd_fd (p_);
|
||||
pollfds [0].events = POLLIN;
|
||||
pollfds_index = 1;
|
||||
}
|
||||
|
||||
// Build pollset for poll () system call.
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
|
||||
// If the poll item is a 0MQ socket, we poll on the file descriptor
|
||||
// retrieved by the ZMQ_FD socket option.
|
||||
if (items_ [i].socket) {
|
||||
int thread_safe;
|
||||
size_t thread_safe_size = sizeof(int);
|
||||
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
|
||||
&thread_safe_size) == -1) {
|
||||
if (pollfds != spollfds)
|
||||
free (pollfds);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// We already handled the thread safe sockets
|
||||
if (!thread_safe) {
|
||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [pollfds_index].fd,
|
||||
&zmq_fd_size) == -1) {
|
||||
if (pollfds != spollfds)
|
||||
free (pollfds);
|
||||
return -1;
|
||||
}
|
||||
pollfds [pollfds_index].events = items_ [i].events ? POLLIN : 0;
|
||||
pollfds_index++;
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor. Just convert the
|
||||
// events to normal POLLIN/POLLOUT for poll ().
|
||||
else {
|
||||
pollfds [pollfds_index].fd = items_ [i].fd;
|
||||
pollfds [pollfds_index].events =
|
||||
(items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
|
||||
(items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
|
||||
(items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0);
|
||||
pollfds_index++;
|
||||
}
|
||||
}
|
||||
|
||||
bool first_pass = true;
|
||||
int nevents = 0;
|
||||
|
||||
while (true) {
|
||||
// Compute the timeout for the subsequent poll.
|
||||
int timeout;
|
||||
if (first_pass)
|
||||
timeout = 0;
|
||||
else
|
||||
if (timeout_ < 0)
|
||||
timeout = -1;
|
||||
else
|
||||
timeout = end - now;
|
||||
|
||||
// Wait for events.
|
||||
while (true) {
|
||||
int rc = poll (pollfds, pollfds_size, timeout);
|
||||
if (rc == -1 && errno == EINTR) {
|
||||
if (pollfds != spollfds)
|
||||
free (pollfds);
|
||||
return -1;
|
||||
}
|
||||
errno_assert (rc >= 0);
|
||||
break;
|
||||
}
|
||||
|
||||
// Receive the signal from pollfd
|
||||
if (use_pollfd && pollfds[0].revents & POLLIN)
|
||||
zmq_pollfd_recv (p_);
|
||||
|
||||
// Check for the events.
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
|
||||
items_ [i].revents = 0;
|
||||
|
||||
// The poll item is a 0MQ socket. Retrieve pending events
|
||||
// using the ZMQ_EVENTS socket option.
|
||||
if (items_ [i].socket) {
|
||||
size_t zmq_events_size = sizeof (uint32_t);
|
||||
uint32_t zmq_events;
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
|
||||
&zmq_events_size) == -1) {
|
||||
if (pollfds != spollfds)
|
||||
free (pollfds);
|
||||
return -1;
|
||||
}
|
||||
if ((items_ [i].events & ZMQ_POLLOUT) &&
|
||||
(zmq_events & ZMQ_POLLOUT))
|
||||
items_ [i].revents |= ZMQ_POLLOUT;
|
||||
if ((items_ [i].events & ZMQ_POLLIN) &&
|
||||
(zmq_events & ZMQ_POLLIN))
|
||||
items_ [i].revents |= ZMQ_POLLIN;
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor, simply convert
|
||||
// the events to zmq_pollitem_t-style format.
|
||||
else {
|
||||
if (pollfds [i].revents & POLLIN)
|
||||
items_ [i].revents |= ZMQ_POLLIN;
|
||||
if (pollfds [i].revents & POLLOUT)
|
||||
items_ [i].revents |= ZMQ_POLLOUT;
|
||||
if (pollfds [i].revents & POLLPRI)
|
||||
items_ [i].revents |= ZMQ_POLLPRI;
|
||||
if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
|
||||
items_ [i].revents |= ZMQ_POLLERR;
|
||||
}
|
||||
|
||||
if (items_ [i].revents)
|
||||
nevents++;
|
||||
}
|
||||
|
||||
// If timout is zero, exit immediately whether there are events or not.
|
||||
if (timeout_ == 0)
|
||||
break;
|
||||
|
||||
// If there are events to return, we can exit immediately.
|
||||
if (nevents)
|
||||
break;
|
||||
|
||||
// At this point we are meant to wait for events but there are none.
|
||||
// If timeout is infinite we can just loop until we get some events.
|
||||
if (timeout_ < 0) {
|
||||
if (first_pass)
|
||||
first_pass = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// The timeout is finite and there are no events. In the first pass
|
||||
// we get a timestamp of when the polling have begun. (We assume that
|
||||
// first pass have taken negligible time). We also compute the time
|
||||
// when the polling should time out.
|
||||
if (first_pass) {
|
||||
now = clock.now_ms ();
|
||||
end = now + timeout_;
|
||||
if (now == end)
|
||||
break;
|
||||
first_pass = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find out whether timeout have expired.
|
||||
now = clock.now_ms ();
|
||||
if (now >= end)
|
||||
break;
|
||||
}
|
||||
|
||||
if (pollfds != spollfds)
|
||||
free (pollfds);
|
||||
return nevents;
|
||||
|
||||
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
||||
|
||||
if (unlikely (nitems_ < 0)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
if (unlikely (nitems_ == 0)) {
|
||||
if (timeout_ == 0)
|
||||
return 0;
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
|
||||
return 0;
|
||||
#else
|
||||
return usleep (timeout_ * 1000);
|
||||
#endif
|
||||
}
|
||||
zmq::clock_t clock;
|
||||
uint64_t now = 0;
|
||||
uint64_t end = 0;
|
||||
|
||||
// Ensure we do not attempt to select () on more than FD_SETSIZE
|
||||
// file descriptors.
|
||||
zmq_assert (nitems_ <= FD_SETSIZE);
|
||||
|
||||
fd_set pollset_in;
|
||||
FD_ZERO (&pollset_in);
|
||||
fd_set pollset_out;
|
||||
FD_ZERO (&pollset_out);
|
||||
fd_set pollset_err;
|
||||
FD_ZERO (&pollset_err);
|
||||
|
||||
bool use_pollfd = false;
|
||||
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
if (items_ [i].socket) {
|
||||
int thread_safe;
|
||||
size_t thread_safe_size = sizeof(int);
|
||||
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
|
||||
&thread_safe_size) == -1)
|
||||
return -1;
|
||||
|
||||
if (thread_safe && items_ [i].events) {
|
||||
use_pollfd = true;
|
||||
FD_SET (zmq_pollfd_fd (p_), &pollset_in);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
zmq::fd_t maxfd = 0;
|
||||
|
||||
// Build the fd_sets for passing to select ().
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
|
||||
// If the poll item is a 0MQ socket we are interested in input on the
|
||||
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
||||
if (items_ [i].socket) {
|
||||
int thread_safe;
|
||||
size_t thread_safe_size = sizeof(int);
|
||||
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
|
||||
&thread_safe_size) == -1)
|
||||
return -1;
|
||||
|
||||
if (!thread_safe) {
|
||||
zmq::fd_t notify_fd;
|
||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd,
|
||||
&zmq_fd_size) == -1)
|
||||
return -1;
|
||||
|
||||
if (items_ [i].events) {
|
||||
FD_SET (notify_fd, &pollset_in);
|
||||
if (maxfd < notify_fd)
|
||||
maxfd = notify_fd;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor. Convert the poll item
|
||||
// events to the appropriate fd_sets.
|
||||
else {
|
||||
if (items_ [i].events & ZMQ_POLLIN)
|
||||
FD_SET (items_ [i].fd, &pollset_in);
|
||||
if (items_ [i].events & ZMQ_POLLOUT)
|
||||
FD_SET (items_ [i].fd, &pollset_out);
|
||||
if (items_ [i].events & ZMQ_POLLERR)
|
||||
FD_SET (items_ [i].fd, &pollset_err);
|
||||
if (maxfd < items_ [i].fd)
|
||||
maxfd = items_ [i].fd;
|
||||
}
|
||||
}
|
||||
|
||||
bool first_pass = true;
|
||||
int nevents = 0;
|
||||
fd_set inset, outset, errset;
|
||||
|
||||
while (true) {
|
||||
|
||||
// Compute the timeout for the subsequent poll.
|
||||
timeval timeout;
|
||||
timeval *ptimeout;
|
||||
if (first_pass) {
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = 0;
|
||||
ptimeout = &timeout;
|
||||
}
|
||||
else
|
||||
if (timeout_ < 0)
|
||||
ptimeout = NULL;
|
||||
else {
|
||||
timeout.tv_sec = (long) ((end - now) / 1000);
|
||||
timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
|
||||
ptimeout = &timeout;
|
||||
}
|
||||
|
||||
// Wait for events. Ignore interrupts if there's infinite timeout.
|
||||
while (true) {
|
||||
memcpy (&inset, &pollset_in, sizeof (fd_set));
|
||||
memcpy (&outset, &pollset_out, sizeof (fd_set));
|
||||
memcpy (&errset, &pollset_err, sizeof (fd_set));
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
int rc = select (0, &inset, &outset, &errset, ptimeout);
|
||||
if (unlikely (rc == SOCKET_ERROR)) {
|
||||
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
|
||||
wsa_assert (errno == ENOTSOCK);
|
||||
return -1;
|
||||
}
|
||||
#else
|
||||
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
|
||||
if (unlikely (rc == -1)) {
|
||||
errno_assert (errno == EINTR || errno == EBADF);
|
||||
return -1;
|
||||
}
|
||||
#endif
|
||||
break;
|
||||
}
|
||||
|
||||
if (use_pollfd && FD_ISSET (zmq_pollfd_fd (p_), &inset))
|
||||
zmq_pollfd_recv (p_);
|
||||
|
||||
// Check for the events.
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
|
||||
items_ [i].revents = 0;
|
||||
|
||||
// The poll item is a 0MQ socket. Retrieve pending events
|
||||
// using the ZMQ_EVENTS socket option.
|
||||
if (items_ [i].socket) {
|
||||
size_t zmq_events_size = sizeof (uint32_t);
|
||||
uint32_t zmq_events;
|
||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
|
||||
&zmq_events_size) == -1)
|
||||
return -1;
|
||||
if ((items_ [i].events & ZMQ_POLLOUT) &&
|
||||
(zmq_events & ZMQ_POLLOUT))
|
||||
items_ [i].revents |= ZMQ_POLLOUT;
|
||||
if ((items_ [i].events & ZMQ_POLLIN) &&
|
||||
(zmq_events & ZMQ_POLLIN))
|
||||
items_ [i].revents |= ZMQ_POLLIN;
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor, simply convert
|
||||
// the events to zmq_pollitem_t-style format.
|
||||
else {
|
||||
if (FD_ISSET (items_ [i].fd, &inset))
|
||||
items_ [i].revents |= ZMQ_POLLIN;
|
||||
if (FD_ISSET (items_ [i].fd, &outset))
|
||||
items_ [i].revents |= ZMQ_POLLOUT;
|
||||
if (FD_ISSET (items_ [i].fd, &errset))
|
||||
items_ [i].revents |= ZMQ_POLLERR;
|
||||
}
|
||||
|
||||
if (items_ [i].revents)
|
||||
nevents++;
|
||||
}
|
||||
|
||||
// If timout is zero, exit immediately whether there are events or not.
|
||||
if (timeout_ == 0)
|
||||
break;
|
||||
|
||||
// If there are events to return, we can exit immediately.
|
||||
if (nevents)
|
||||
break;
|
||||
|
||||
// At this point we are meant to wait for events but there are none.
|
||||
// If timeout is infinite we can just loop until we get some events.
|
||||
if (timeout_ < 0) {
|
||||
if (first_pass)
|
||||
first_pass = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// The timeout is finite and there are no events. In the first pass
|
||||
// we get a timestamp of when the polling have begun. (We assume that
|
||||
// first pass have taken negligible time). We also compute the time
|
||||
// when the polling should time out.
|
||||
if (first_pass) {
|
||||
now = clock.now_ms ();
|
||||
end = now + timeout_;
|
||||
if (now == end)
|
||||
break;
|
||||
first_pass = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find out whether timeout have expired.
|
||||
now = clock.now_ms ();
|
||||
if (now >= end)
|
||||
|
@ -50,8 +50,8 @@ int main (void)
|
||||
assert (rc == 0);
|
||||
|
||||
zmq_pollitem_t items[] = {
|
||||
{server, zmq_pollfd_fd(pollfd), ZMQ_POLLIN, 0},
|
||||
{server2, zmq_pollfd_fd(pollfd), ZMQ_POLLIN, 0}};
|
||||
{server, 0, ZMQ_POLLIN, 0},
|
||||
{server2, 0, ZMQ_POLLIN, 0}};
|
||||
|
||||
rc = zmq_bind (server, "tcp://127.0.0.1:5560");
|
||||
assert (rc == 0);
|
||||
@ -63,7 +63,7 @@ int main (void)
|
||||
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_poll (items, 2, -1);
|
||||
rc = zmq_pollfd_poll (pollfd, items, 2, -1);
|
||||
assert (rc == 1);
|
||||
|
||||
assert (items[0].revents == ZMQ_POLLIN);
|
||||
@ -74,7 +74,7 @@ int main (void)
|
||||
rc = zmq_msg_recv(&msg, server, ZMQ_DONTWAIT);
|
||||
assert (rc == 1);
|
||||
|
||||
rc = zmq_poll (items, 2, -1);
|
||||
rc = zmq_pollfd_poll (pollfd, items, 2, -1);
|
||||
assert (rc == 1);
|
||||
|
||||
assert (items[0].revents == 0);
|
||||
@ -83,7 +83,7 @@ int main (void)
|
||||
rc = zmq_msg_recv(&msg, server2, ZMQ_DONTWAIT);
|
||||
assert (rc == 1);
|
||||
|
||||
rc = zmq_poll (items, 2, 0);
|
||||
rc = zmq_pollfd_poll (pollfd, items, 2, 0);
|
||||
assert (rc == 0);
|
||||
|
||||
assert (items[0].revents == 0);
|
||||
|
Loading…
x
Reference in New Issue
Block a user