diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index 0eae1af9..495feb06 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -412,7 +412,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev uint64_t end = 0; bool first_pass = true; - bool found = false; while (true) { // Compute the timeout for the subsequent poll. @@ -440,13 +439,13 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev signaler.recv (); // Check for the events. - int i = 0; - for (items_t::iterator it = items.begin (); it != items.end () && i < n_events_; ++i, ++it) { + int found = 0; + for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) { - events_[i].socket = NULL; - events_[i].fd = 0; - events_[i].user_data = NULL; - events_[i].events = 0; + events_[found].socket = NULL; + events_[found].fd = 0; + events_[found].user_data = NULL; + events_[found].events = 0; // The poll item is a 0MQ socket. Retrieve pending events // using the ZMQ_EVENTS socket option. @@ -458,10 +457,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev } if (it->events & events) { - events_[i].socket = it->socket; - events_[i].user_data = it->user_data; - events_[i].events = it->events & events; - found = true; + events_[found].socket = it->socket; + events_[found].user_data = it->user_data; + events_[found].events = it->events & events; + ++found; } } // Else, the poll item is a raw file descriptor, simply convert @@ -480,16 +479,22 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev events |= ZMQ_POLLERR; if (events) { - events_[i].socket = NULL; - events_[i].user_data = it->user_data; - events_[i].fd = it->fd; - events_[i].events = events; - found = true; + events_[found].socket = NULL; + events_[found].user_data = it->user_data; + events_[found].fd = it->fd; + events_[found].events = events; + ++found; } } } if (found) { - return 0; + for (int i = found; i < n_events_; ++i) { + events_[i].socket = NULL; + events_[i].fd = 0; + events_[i].user_data = NULL; + events_[i].events = 0; + } + return found; } // If timeout is zero, exit immediately whether there are events or not. @@ -548,7 +553,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev uint64_t end = 0; bool first_pass = true; - bool found = false; fd_set inset, outset, errset; while (true) { @@ -596,8 +600,8 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev signaler.recv (); // Check for the events. - int i = 0; - for (items_t::iterator it = items.begin (); it != items.end () && i < n_events_; ++i, ++it) { + int found = 0; + for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) { // The poll item is a 0MQ socket. Retrieve pending events // using the ZMQ_EVENTS socket option. @@ -608,10 +612,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev return -1; if (it->events & events) { - events_[i].socket = it->socket; - events_[i].user_data = it->user_data; - events_[i].events = it->events & events; - found = true; + events_[found].socket = it->socket; + events_[found].user_data = it->user_data; + events_[found].events = it->events & events; + ++found; } } // Else, the poll item is a raw file descriptor, simply convert @@ -627,16 +631,23 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev events |= ZMQ_POLLERR; if (events) { - events_[i].socket = NULL; - events_[i].user_data = it->user_data; - events_[i].fd = it->fd; - events_[i].events = events; - found = true; + events_[found].socket = NULL; + events_[found].user_data = it->user_data; + events_[found].fd = it->fd; + events_[found].events = events; + ++found; } } } if (found) { - return 0; + // zero-out remaining events + for (int i = found; i < n_events_; ++i) { + events_[i].socket = NULL; + events_[i].fd = 0; + events_[i].user_data = NULL; + events_[i].events = 0; + } + return found; } // If timeout is zero, exit immediately whether there are events or not. diff --git a/src/zmq.cpp b/src/zmq.cpp index 0db3a884..91b6c8a2 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -755,6 +755,7 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // Register sockets with poller for (int i = 0; i < nitems_; i++) { + items_[i].revents = 0; if (items_[i].socket) { // Poll item is a 0MQ socket. rc = zmq_poller_add (poller, items_[i].socket, NULL, items_[i].events); @@ -782,9 +783,19 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) return rc; } - // Put the event information where zmq_poll expects it to go. - for (int i = 0; i < nitems_; i++) { - items_[i].revents = events[i].events; + // Transform poller events into zmq_pollitem events + // items_ contains all items, while events only contains fired events. + // The two are still co-ordered, so the step through items + // Checking for matches only on the first event + int found_events = rc; + for (int i = 0, j = 0; i < nitems_ && j < found_events; i++) { + if ( + (items_[i].socket && items_[i].socket == events[j].socket) || + (items_[i].fd && items_[i].fd == events[j].fd) + ) { + items_[i].revents = events[j].events; + j++; + } } // Cleanup @@ -1284,8 +1295,8 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_) if (rc < 0) { memset (event, 0, sizeof(zmq_poller_event_t)); } - - return rc; + // wait_all returns number of events, but we return 0 for any success + return rc >= 0 ? 0 : rc; } int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, int n_events, long timeout_)