problem: zmq_poll is slow because FD is being created on every call

making the creation of FD only when thread safe sockets are in used
within the zmq_poller which improve the zmq_poll performance.
This commit is contained in:
somdoron 2017-03-11 10:57:29 +02:00
parent 651f81e8af
commit f694a2d985
2 changed files with 18 additions and 9 deletions

View File

@ -33,6 +33,7 @@
zmq::socket_poller_t::socket_poller_t () :
tag (0xCAFEBABE),
signaler (NULL),
need_rebuild (true),
use_signaler (false),
poll_size(0)
@ -62,10 +63,15 @@ zmq::socket_poller_t::~socket_poller_t ()
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
it->socket->remove_signaler (&signaler);
it->socket->remove_signaler (signaler);
}
}
if (signaler != NULL) {
delete signaler;
signaler = NULL;
}
#if defined ZMQ_POLL_BASED_ON_POLL
if (pollfds) {
free (pollfds);
@ -95,7 +101,10 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short e
return -1;
if (thread_safe) {
if (socket_->add_signaler (&signaler) == -1)
if (signaler == NULL)
signaler = new signaler_t ();
if (socket_->add_signaler (signaler) == -1)
return -1;
}
@ -193,7 +202,7 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
size_t thread_safe_size = sizeof(int);
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
socket_->remove_signaler (&signaler);
socket_->remove_signaler (signaler);
return 0;
}
@ -264,7 +273,7 @@ int zmq::socket_poller_t::rebuild ()
if (use_signaler) {
item_nbr = 1;
pollfds[0].fd = signaler.get_fd();
pollfds[0].fd = signaler->get_fd();
pollfds[0].events = POLLIN;
}
@ -323,7 +332,7 @@ int zmq::socket_poller_t::rebuild ()
if (thread_safe && it->events) {
use_signaler = true;
FD_SET (signaler.get_fd (), &pollset_in);
FD_SET (signaler->get_fd (), &pollset_in);
poll_size = 1;
break;
}
@ -436,7 +445,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
// Receive the signal from pollfd
if (use_signaler && pollfds[0].revents & POLLIN)
signaler.recv ();
signaler->recv ();
// Check for the events.
int found = 0;
@ -596,8 +605,8 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
break;
}
if (use_signaler && FD_ISSET (signaler.get_fd (), &inset))
signaler.recv ();
if (use_signaler && FD_ISSET (signaler->get_fd (), &inset))
signaler->recv ();
// Check for the events.
int found = 0;

View File

@ -87,7 +87,7 @@ namespace zmq
uint32_t tag;
// Signaler used for thread safe sockets polling
signaler_t signaler;
signaler_t* signaler;
typedef struct item_t {
socket_base_t *socket;