diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index 2cd8cbe7..5e9dea30 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -31,6 +31,17 @@ #include "socket_poller.hpp" #include "err.hpp" +static bool is_thread_safe (zmq::socket_base_t &socket) +{ + int thread_safe; + size_t thread_safe_size = sizeof (int); + + int rc = + socket.getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size); + zmq_assert (rc == 0); + return thread_safe; +} + zmq::socket_poller_t::socket_poller_t () : tag (0xCAFEBABE), signaler (NULL), @@ -67,15 +78,10 @@ zmq::socket_poller_t::~socket_poller_t () tag = 0xdeadbeef; for (items_t::iterator it = items.begin (); it != items.end (); ++it) { - if (it->socket && it->socket->check_tag ()) { - int thread_safe; - size_t thread_safe_size = sizeof (int); - - if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size) - == 0 - && thread_safe) - it->socket->remove_signaler (signaler); + // TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead? + if (it->socket && it->socket->check_tag () + && is_thread_safe (*it->socket)) { + it->socket->remove_signaler (signaler); } } @@ -108,14 +114,7 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, } } - int thread_safe; - size_t thread_safe_size = sizeof (int); - - int rc = - socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size); - zmq_assert (rc == 0); - - if (thread_safe) { + if (is_thread_safe (*socket_)) { if (signaler == NULL) { signaler = new (std::nothrow) signaler_t (); if (!signaler) { @@ -233,13 +232,9 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_) items.erase (it); need_rebuild = true; - int thread_safe; - size_t thread_safe_size = sizeof (int); - - if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) - == 0 - && thread_safe) + if (is_thread_safe (*socket_)) { socket_->remove_signaler (signaler); + } return 0; } @@ -279,21 +274,11 @@ void zmq::socket_poller_t::rebuild () for (items_t::iterator it = items.begin (); it != items.end (); ++it) { if (it->events) { - if (it->socket) { - int thread_safe; - size_t thread_safe_size = sizeof (int); - - int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size); - zmq_assert (rc == 0); - - if (thread_safe) { - if (!use_signaler) { - use_signaler = true; - poll_size++; - } - } else + if (it->socket && is_thread_safe (*it->socket)) { + if (!use_signaler) { + use_signaler = true; poll_size++; + } } else poll_size++; } @@ -316,17 +301,10 @@ void zmq::socket_poller_t::rebuild () for (items_t::iterator it = items.begin (); it != items.end (); ++it) { if (it->events) { if (it->socket) { - int thread_safe; - size_t thread_safe_size = sizeof (int); - - int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size); - zmq_assert (rc == 0); - - if (!thread_safe) { + if (!is_thread_safe (*it->socket)) { size_t fd_size = sizeof (zmq::fd_t); - rc = it->socket->getsockopt (ZMQ_FD, &pollfds[item_nbr].fd, - &fd_size); + int rc = it->socket->getsockopt ( + ZMQ_FD, &pollfds[item_nbr].fd, &fd_size); zmq_assert (rc == 0); pollfds[item_nbr].events = POLLIN; @@ -359,20 +337,11 @@ void zmq::socket_poller_t::rebuild () use_signaler = false; for (items_t::iterator it = items.begin (); it != items.end (); ++it) { - if (it->socket) { - int thread_safe; - size_t thread_safe_size = sizeof (int); - - int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size); - zmq_assert (rc == 0); - - if (thread_safe && it->events) { - use_signaler = true; - FD_SET (signaler->get_fd (), &pollset_in); - poll_size = 1; - break; - } + if (it->socket && is_thread_safe (*it->socket) && it->events) { + use_signaler = true; + FD_SET (signaler->get_fd (), &pollset_in); + poll_size = 1; + break; } } @@ -384,17 +353,11 @@ void zmq::socket_poller_t::rebuild () // If the poll item is a 0MQ socket we are interested in input on the // notification file descriptor retrieved by the ZMQ_FD socket option. if (it->socket) { - int thread_safe; - size_t thread_safe_size = sizeof (int); - - int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size); - zmq_assert (rc == 0); - - if (!thread_safe) { + if (!is_thread_safe (*it->socket)) { zmq::fd_t notify_fd; size_t fd_size = sizeof (zmq::fd_t); - rc = it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size); + int rc = + it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size); zmq_assert (rc == 0); FD_SET (notify_fd, &pollset_in);