mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-19 08:46:44 +01:00
zmq_poll implemented on Win32 platform
This commit is contained in:
parent
986ab66b8f
commit
d4fdc26efc
@ -26,6 +26,9 @@ extern "C" {
|
|||||||
|
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <stddef.h>
|
#include <stddef.h>
|
||||||
|
#if defined _WIN32
|
||||||
|
#include "winsock2.h"
|
||||||
|
#endif
|
||||||
|
|
||||||
// Microsoft Visual Studio uses non-standard way to export/import symbols.
|
// Microsoft Visual Studio uses non-standard way to export/import symbols.
|
||||||
#if defined ZMQ_BUILDING_LIBZMQ_WITH_MSVC
|
#if defined ZMQ_BUILDING_LIBZMQ_WITH_MSVC
|
||||||
@ -185,7 +188,11 @@ ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);
|
|||||||
typedef struct
|
typedef struct
|
||||||
{
|
{
|
||||||
void *socket;
|
void *socket;
|
||||||
|
#if defined _WIN32
|
||||||
|
SOCKET fd;
|
||||||
|
#else
|
||||||
int fd;
|
int fd;
|
||||||
|
#endif
|
||||||
short events;
|
short events;
|
||||||
short revents;
|
short revents;
|
||||||
} zmq_pollitem_t;
|
} zmq_pollitem_t;
|
||||||
|
125
src/zmq.cpp
125
src/zmq.cpp
@ -31,6 +31,7 @@
|
|||||||
#include "platform.hpp"
|
#include "platform.hpp"
|
||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
|
#include "fd.hpp"
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_LINUX
|
#if defined ZMQ_HAVE_LINUX
|
||||||
#include <poll.h>
|
#include <poll.h>
|
||||||
@ -263,12 +264,10 @@ int zmq_recv (void *s_, zmq_msg_t *msg_, int flags_)
|
|||||||
|
|
||||||
int zmq_poll (zmq_pollitem_t *items_, int nitems_)
|
int zmq_poll (zmq_pollitem_t *items_, int nitems_)
|
||||||
{
|
{
|
||||||
// TODO: Replace the polling mechanism by the virtualised framework
|
#if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
|
||||||
// used in 0MQ I/O threads. That'll make the thing work on all platforms.
|
defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
|
||||||
#if !defined ZMQ_HAVE_LINUX
|
defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_QNXNTO ||\
|
||||||
errno = ENOTSUP;
|
defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
|
||||||
return -1;
|
|
||||||
#else
|
|
||||||
|
|
||||||
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
|
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
|
||||||
zmq_assert (pollfds);
|
zmq_assert (pollfds);
|
||||||
@ -368,6 +367,119 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_)
|
|||||||
free (pollfds);
|
free (pollfds);
|
||||||
return nevents;
|
return nevents;
|
||||||
|
|
||||||
|
#elif defined ZMQ_HAVE_WINDOWS || defined ZMQ_HAVE_OPENVMS
|
||||||
|
|
||||||
|
fd_set pollset_in;
|
||||||
|
FD_ZERO (&pollset_in);
|
||||||
|
fd_set pollset_out;
|
||||||
|
FD_ZERO (&pollset_out);
|
||||||
|
fd_set pollset_err;
|
||||||
|
FD_ZERO (&pollset_err);
|
||||||
|
|
||||||
|
zmq::app_thread_t *app_thread = NULL;
|
||||||
|
int nsockets = 0;
|
||||||
|
zmq::fd_t maxfd = zmq::retired_fd;
|
||||||
|
zmq::fd_t notify_fd = zmq::retired_fd;
|
||||||
|
|
||||||
|
for (int i = 0; i != nitems_; i++) {
|
||||||
|
|
||||||
|
// 0MQ sockets.
|
||||||
|
if (items_ [i].socket) {
|
||||||
|
|
||||||
|
// Get the app_thread the socket is living in. If there are two
|
||||||
|
// sockets in the same pollset with different app threads, fail.
|
||||||
|
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
|
||||||
|
if (app_thread) {
|
||||||
|
if (app_thread != s->get_thread ()) {
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
app_thread = s->get_thread ();
|
||||||
|
|
||||||
|
nsockets++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Raw file descriptors.
|
||||||
|
if (items_ [i].events & ZMQ_POLLIN)
|
||||||
|
FD_SET (items_ [i].fd, &pollset_in);
|
||||||
|
if (items_ [i].events & ZMQ_POLLOUT)
|
||||||
|
FD_SET (items_ [i].fd, &pollset_out);
|
||||||
|
if (maxfd == zmq::retired_fd || maxfd < items_ [i].fd)
|
||||||
|
maxfd = items_ [i].fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If there's at least one 0MQ socket in the pollset we have to poll
|
||||||
|
// for 0MQ commands. If ZMQ_POLL was not set, fail.
|
||||||
|
if (nsockets) {
|
||||||
|
notify_fd = app_thread->get_signaler ()->get_fd ();
|
||||||
|
if (notify_fd == zmq::retired_fd) {
|
||||||
|
errno = ENOTSUP;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
FD_SET (notify_fd, &pollset_in);
|
||||||
|
if (maxfd == zmq::retired_fd || maxfd < notify_fd)
|
||||||
|
maxfd = notify_fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
int nevents = 0;
|
||||||
|
bool initial = true;
|
||||||
|
while (!nevents) {
|
||||||
|
|
||||||
|
// Wait for activity. In the first iteration just check for events,
|
||||||
|
// don't wait. Waiting would prevent exiting on any events that may
|
||||||
|
// already be signaled on 0MQ sockets.
|
||||||
|
timeval timeout = {0, 0};
|
||||||
|
int rc = select (maxfd, &pollset_in, &pollset_out, &pollset_err,
|
||||||
|
initial ? &timeout : NULL);
|
||||||
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
#else
|
||||||
|
if (rc == -1 && errno == EINTR)
|
||||||
|
continue;
|
||||||
|
#endif
|
||||||
|
|
||||||
|
errno_assert (rc >= 0);
|
||||||
|
initial = false;
|
||||||
|
|
||||||
|
// Process 0MQ commands if needed.
|
||||||
|
if (nsockets && FD_ISSET (notify_fd, &pollset_in))
|
||||||
|
app_thread->process_commands (false, false);
|
||||||
|
|
||||||
|
// Check for the events.
|
||||||
|
int pollfd_pos = 0;
|
||||||
|
for (int i = 0; i != nitems_; i++) {
|
||||||
|
|
||||||
|
// If the poll item is a raw file descriptor, simply convert
|
||||||
|
// the events to zmq_pollitem_t-style format.
|
||||||
|
if (!items_ [i].socket) {
|
||||||
|
items_ [i].revents =
|
||||||
|
(FD_ISSET (items_ [i].fd, &pollset_in) ? ZMQ_POLLIN : 0) |
|
||||||
|
(FD_ISSET (items_ [i].fd, &pollset_out) ? ZMQ_POLLOUT : 0);
|
||||||
|
if (items_ [i].revents)
|
||||||
|
nevents++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The poll item is a 0MQ socket.
|
||||||
|
zmq::socket_base_t *s = (zmq::socket_base_t*) items_ [i].socket;
|
||||||
|
items_ [i].revents = 0;
|
||||||
|
if ((items_ [i].events & ZMQ_POLLOUT) && s->has_out ())
|
||||||
|
items_ [i].revents |= ZMQ_POLLOUT;
|
||||||
|
if ((items_ [i].events & ZMQ_POLLIN) && s->has_in ())
|
||||||
|
items_ [i].revents |= ZMQ_POLLIN;
|
||||||
|
if (items_ [i].revents)
|
||||||
|
nevents++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nevents;
|
||||||
|
|
||||||
|
#else
|
||||||
|
errno = ENOTSUP;
|
||||||
|
return -1;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -428,4 +540,3 @@ unsigned long zmq_stopwatch_stop (void *watch_)
|
|||||||
free (watch_);
|
free (watch_);
|
||||||
return (unsigned long) (end - start);
|
return (unsigned long) (end - start);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user