mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-16 18:56:55 +02:00
Problem: code duplication and unnecessary nesting around ZMQ_THREAD_SAFE
querying Solution: remove code duplication and rearrange conditions
This commit is contained in:
@@ -31,6 +31,17 @@
|
|||||||
#include "socket_poller.hpp"
|
#include "socket_poller.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
|
|
||||||
|
static bool is_thread_safe (zmq::socket_base_t &socket)
|
||||||
|
{
|
||||||
|
int thread_safe;
|
||||||
|
size_t thread_safe_size = sizeof (int);
|
||||||
|
|
||||||
|
int rc =
|
||||||
|
socket.getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size);
|
||||||
|
zmq_assert (rc == 0);
|
||||||
|
return thread_safe;
|
||||||
|
}
|
||||||
|
|
||||||
zmq::socket_poller_t::socket_poller_t () :
|
zmq::socket_poller_t::socket_poller_t () :
|
||||||
tag (0xCAFEBABE),
|
tag (0xCAFEBABE),
|
||||||
signaler (NULL),
|
signaler (NULL),
|
||||||
@@ -67,15 +78,10 @@ zmq::socket_poller_t::~socket_poller_t ()
|
|||||||
tag = 0xdeadbeef;
|
tag = 0xdeadbeef;
|
||||||
|
|
||||||
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
||||||
if (it->socket && it->socket->check_tag ()) {
|
// TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead?
|
||||||
int thread_safe;
|
if (it->socket && it->socket->check_tag ()
|
||||||
size_t thread_safe_size = sizeof (int);
|
&& is_thread_safe (*it->socket)) {
|
||||||
|
it->socket->remove_signaler (signaler);
|
||||||
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
|
|
||||||
&thread_safe_size)
|
|
||||||
== 0
|
|
||||||
&& thread_safe)
|
|
||||||
it->socket->remove_signaler (signaler);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -108,14 +114,7 @@ int zmq::socket_poller_t::add (socket_base_t *socket_,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int thread_safe;
|
if (is_thread_safe (*socket_)) {
|
||||||
size_t thread_safe_size = sizeof (int);
|
|
||||||
|
|
||||||
int rc =
|
|
||||||
socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size);
|
|
||||||
zmq_assert (rc == 0);
|
|
||||||
|
|
||||||
if (thread_safe) {
|
|
||||||
if (signaler == NULL) {
|
if (signaler == NULL) {
|
||||||
signaler = new (std::nothrow) signaler_t ();
|
signaler = new (std::nothrow) signaler_t ();
|
||||||
if (!signaler) {
|
if (!signaler) {
|
||||||
@@ -233,13 +232,9 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
|||||||
items.erase (it);
|
items.erase (it);
|
||||||
need_rebuild = true;
|
need_rebuild = true;
|
||||||
|
|
||||||
int thread_safe;
|
if (is_thread_safe (*socket_)) {
|
||||||
size_t thread_safe_size = sizeof (int);
|
|
||||||
|
|
||||||
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size)
|
|
||||||
== 0
|
|
||||||
&& thread_safe)
|
|
||||||
socket_->remove_signaler (signaler);
|
socket_->remove_signaler (signaler);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@@ -279,21 +274,11 @@ void zmq::socket_poller_t::rebuild ()
|
|||||||
|
|
||||||
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
||||||
if (it->events) {
|
if (it->events) {
|
||||||
if (it->socket) {
|
if (it->socket && is_thread_safe (*it->socket)) {
|
||||||
int thread_safe;
|
if (!use_signaler) {
|
||||||
size_t thread_safe_size = sizeof (int);
|
use_signaler = true;
|
||||||
|
|
||||||
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
|
|
||||||
&thread_safe_size);
|
|
||||||
zmq_assert (rc == 0);
|
|
||||||
|
|
||||||
if (thread_safe) {
|
|
||||||
if (!use_signaler) {
|
|
||||||
use_signaler = true;
|
|
||||||
poll_size++;
|
|
||||||
}
|
|
||||||
} else
|
|
||||||
poll_size++;
|
poll_size++;
|
||||||
|
}
|
||||||
} else
|
} else
|
||||||
poll_size++;
|
poll_size++;
|
||||||
}
|
}
|
||||||
@@ -316,17 +301,10 @@ void zmq::socket_poller_t::rebuild ()
|
|||||||
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
||||||
if (it->events) {
|
if (it->events) {
|
||||||
if (it->socket) {
|
if (it->socket) {
|
||||||
int thread_safe;
|
if (!is_thread_safe (*it->socket)) {
|
||||||
size_t thread_safe_size = sizeof (int);
|
|
||||||
|
|
||||||
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
|
|
||||||
&thread_safe_size);
|
|
||||||
zmq_assert (rc == 0);
|
|
||||||
|
|
||||||
if (!thread_safe) {
|
|
||||||
size_t fd_size = sizeof (zmq::fd_t);
|
size_t fd_size = sizeof (zmq::fd_t);
|
||||||
rc = it->socket->getsockopt (ZMQ_FD, &pollfds[item_nbr].fd,
|
int rc = it->socket->getsockopt (
|
||||||
&fd_size);
|
ZMQ_FD, &pollfds[item_nbr].fd, &fd_size);
|
||||||
zmq_assert (rc == 0);
|
zmq_assert (rc == 0);
|
||||||
|
|
||||||
pollfds[item_nbr].events = POLLIN;
|
pollfds[item_nbr].events = POLLIN;
|
||||||
@@ -359,20 +337,11 @@ void zmq::socket_poller_t::rebuild ()
|
|||||||
use_signaler = false;
|
use_signaler = false;
|
||||||
|
|
||||||
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
||||||
if (it->socket) {
|
if (it->socket && is_thread_safe (*it->socket) && it->events) {
|
||||||
int thread_safe;
|
use_signaler = true;
|
||||||
size_t thread_safe_size = sizeof (int);
|
FD_SET (signaler->get_fd (), &pollset_in);
|
||||||
|
poll_size = 1;
|
||||||
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
|
break;
|
||||||
&thread_safe_size);
|
|
||||||
zmq_assert (rc == 0);
|
|
||||||
|
|
||||||
if (thread_safe && it->events) {
|
|
||||||
use_signaler = true;
|
|
||||||
FD_SET (signaler->get_fd (), &pollset_in);
|
|
||||||
poll_size = 1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -384,17 +353,11 @@ void zmq::socket_poller_t::rebuild ()
|
|||||||
// If the poll item is a 0MQ socket we are interested in input on the
|
// If the poll item is a 0MQ socket we are interested in input on the
|
||||||
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
||||||
if (it->socket) {
|
if (it->socket) {
|
||||||
int thread_safe;
|
if (!is_thread_safe (*it->socket)) {
|
||||||
size_t thread_safe_size = sizeof (int);
|
|
||||||
|
|
||||||
int rc = it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe,
|
|
||||||
&thread_safe_size);
|
|
||||||
zmq_assert (rc == 0);
|
|
||||||
|
|
||||||
if (!thread_safe) {
|
|
||||||
zmq::fd_t notify_fd;
|
zmq::fd_t notify_fd;
|
||||||
size_t fd_size = sizeof (zmq::fd_t);
|
size_t fd_size = sizeof (zmq::fd_t);
|
||||||
rc = it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size);
|
int rc =
|
||||||
|
it->socket->getsockopt (ZMQ_FD, ¬ify_fd, &fd_size);
|
||||||
zmq_assert (rc == 0);
|
zmq_assert (rc == 0);
|
||||||
|
|
||||||
FD_SET (notify_fd, &pollset_in);
|
FD_SET (notify_fd, &pollset_in);
|
||||||
|
Reference in New Issue
Block a user