From 6eddbd98bce3cd089668139f148a30e75c8416eb Mon Sep 17 00:00:00 2001 From: somdoron Date: Sun, 16 Aug 2015 13:55:41 +0300 Subject: [PATCH] polling on thread safe sockets --- src/zmq.cpp | 56 +++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 48 insertions(+), 8 deletions(-) diff --git a/src/zmq.cpp b/src/zmq.cpp index 339c486c..9c4048fb 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -779,14 +779,36 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // If the poll item is a 0MQ socket, we poll on the file descriptor // retrieved by the ZMQ_FD socket option. if (items_ [i].socket) { - size_t zmq_fd_size = sizeof (zmq::fd_t); - if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, - &zmq_fd_size) == -1) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + &thread_safe_size) == -1) { if (pollfds != spollfds) free (pollfds); return -1; } - pollfds [i].events = items_ [i].events ? POLLIN : 0; + + if (thread_safe) { + if (!items_ [i].poller) { + if (pollfds != spollfds) + free (pollfds); + errno = EINVAL; + return -1; + } + + pollfds [i].fd = zmq_poller_get_fd (items_ [i].poller); + } + else { + size_t zmq_fd_size = sizeof (zmq::fd_t); + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, + &zmq_fd_size) == -1) { + if (pollfds != spollfds) + free (pollfds); + return -1; + } + } + pollfds [i].events = items_ [i].events ? POLLIN : 0; } // Else, the poll item is a raw file descriptor. Just convert the // events to normal POLLIN/POLLOUT for poll (). @@ -942,11 +964,29 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // If the poll item is a 0MQ socket we are interested in input on the // notification file descriptor retrieved by the ZMQ_FD socket option. if (items_ [i].socket) { - size_t zmq_fd_size = sizeof (zmq::fd_t); - zmq::fd_t notify_fd; - if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, - &zmq_fd_size) == -1) + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + &thread_safe_size) == -1) return -1; + + zmq::fd_t notify_fd; + + if (thread_safe) { + if (!items_ [i].poller) { + errno = EINVAL; + return -1; + } + + notify_fd = zmq_poller_get_fd (items_ [i].poller); + } + else { + size_t zmq_fd_size = sizeof (zmq::fd_t); + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) == -1) + return -1; + } if (items_ [i].events) { FD_SET (notify_fd, &pollset_in); if (maxfd < notify_fd)