mirror of
https://github.com/zeromq/libzmq.git
synced 2025-11-06 05:00:07 +01:00
polling on thread safe sockets
This commit is contained in:
42
src/zmq.cpp
42
src/zmq.cpp
@@ -779,6 +779,27 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
// If the poll item is a 0MQ socket, we poll on the file descriptor
|
// If the poll item is a 0MQ socket, we poll on the file descriptor
|
||||||
// retrieved by the ZMQ_FD socket option.
|
// retrieved by the ZMQ_FD socket option.
|
||||||
if (items_ [i].socket) {
|
if (items_ [i].socket) {
|
||||||
|
int thread_safe;
|
||||||
|
size_t thread_safe_size = sizeof(int);
|
||||||
|
|
||||||
|
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
|
||||||
|
&thread_safe_size) == -1) {
|
||||||
|
if (pollfds != spollfds)
|
||||||
|
free (pollfds);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (thread_safe) {
|
||||||
|
if (!items_ [i].poller) {
|
||||||
|
if (pollfds != spollfds)
|
||||||
|
free (pollfds);
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pollfds [i].fd = zmq_poller_get_fd (items_ [i].poller);
|
||||||
|
}
|
||||||
|
else {
|
||||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
|
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd,
|
||||||
&zmq_fd_size) == -1) {
|
&zmq_fd_size) == -1) {
|
||||||
@@ -786,6 +807,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
free (pollfds);
|
free (pollfds);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
pollfds [i].events = items_ [i].events ? POLLIN : 0;
|
pollfds [i].events = items_ [i].events ? POLLIN : 0;
|
||||||
}
|
}
|
||||||
// Else, the poll item is a raw file descriptor. Just convert the
|
// Else, the poll item is a raw file descriptor. Just convert the
|
||||||
@@ -942,11 +964,29 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
// If the poll item is a 0MQ socket we are interested in input on the
|
// If the poll item is a 0MQ socket we are interested in input on the
|
||||||
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
||||||
if (items_ [i].socket) {
|
if (items_ [i].socket) {
|
||||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
int thread_safe;
|
||||||
|
size_t thread_safe_size = sizeof(int);
|
||||||
|
|
||||||
|
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
|
||||||
|
&thread_safe_size) == -1)
|
||||||
|
return -1;
|
||||||
|
|
||||||
zmq::fd_t notify_fd;
|
zmq::fd_t notify_fd;
|
||||||
|
|
||||||
|
if (thread_safe) {
|
||||||
|
if (!items_ [i].poller) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
notify_fd = zmq_poller_get_fd (items_ [i].poller);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||||
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd,
|
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd,
|
||||||
&zmq_fd_size) == -1)
|
&zmq_fd_size) == -1)
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
if (items_ [i].events) {
|
if (items_ [i].events) {
|
||||||
FD_SET (notify_fd, &pollset_in);
|
FD_SET (notify_fd, &pollset_in);
|
||||||
if (maxfd < notify_fd)
|
if (maxfd < notify_fd)
|
||||||
|
|||||||
Reference in New Issue
Block a user