mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-31 22:45:38 +01:00
Reduced number of calls to container end method
This commit is contained in:
parent
05e400a3e0
commit
9e2cf35b66
@ -137,8 +137,8 @@ int zmq::ctx_t::terminate ()
|
|||||||
|
|
||||||
// Connect up any pending inproc connections, otherwise we will hang
|
// Connect up any pending inproc connections, otherwise we will hang
|
||||||
pending_connections_t copy = _pending_connections;
|
pending_connections_t copy = _pending_connections;
|
||||||
for (pending_connections_t::iterator p = copy.begin (); p != copy.end ();
|
for (pending_connections_t::iterator p = copy.begin (), end = copy.end ();
|
||||||
++p) {
|
p != end; ++p) {
|
||||||
zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
|
zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
|
||||||
// create_socket might fail eg: out of memory/sockets limit reached
|
// create_socket might fail eg: out of memory/sockets limit reached
|
||||||
zmq_assert (s);
|
zmq_assert (s);
|
||||||
@ -528,8 +528,9 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
|
|||||||
{
|
{
|
||||||
scoped_lock_t locker (_endpoints_sync);
|
scoped_lock_t locker (_endpoints_sync);
|
||||||
|
|
||||||
for (endpoints_t::iterator it = _endpoints.begin ();
|
for (endpoints_t::iterator it = _endpoints.begin (),
|
||||||
it != _endpoints.end ();) {
|
end = _endpoints.end ();
|
||||||
|
it != end;) {
|
||||||
if (it->second.socket == socket_)
|
if (it->second.socket == socket_)
|
||||||
#if __cplusplus >= 201103L
|
#if __cplusplus >= 201103L
|
||||||
it = _endpoints.erase (it);
|
it = _endpoints.erase (it);
|
||||||
|
11
src/dish.cpp
11
src/dish.cpp
@ -100,16 +100,12 @@ int zmq::dish_t::xjoin (const char *group_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
subscriptions_t::iterator it = _subscriptions.find (group);
|
|
||||||
|
|
||||||
// User cannot join same group twice
|
// User cannot join same group twice
|
||||||
if (it != _subscriptions.end ()) {
|
if (!_subscriptions.insert (group).second) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
_subscriptions.insert (group);
|
|
||||||
|
|
||||||
msg_t msg;
|
msg_t msg;
|
||||||
int rc = msg.init_join ();
|
int rc = msg.init_join ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
@ -230,8 +226,9 @@ const zmq::blob_t &zmq::dish_t::get_credential () const
|
|||||||
|
|
||||||
void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
|
void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
for (subscriptions_t::iterator it = _subscriptions.begin ();
|
for (subscriptions_t::iterator it = _subscriptions.begin (),
|
||||||
it != _subscriptions.end (); ++it) {
|
end = _subscriptions.end ();
|
||||||
|
it != end; ++it) {
|
||||||
msg_t msg;
|
msg_t msg;
|
||||||
int rc = msg.init_join ();
|
int rc = msg.init_join ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
@ -75,8 +75,8 @@ zmq::epoll_t::~epoll_t ()
|
|||||||
#else
|
#else
|
||||||
close (_epoll_fd);
|
close (_epoll_fd);
|
||||||
#endif
|
#endif
|
||||||
for (retired_t::iterator it = _retired.begin (); it != _retired.end ();
|
for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
|
||||||
++it) {
|
it != end; ++it) {
|
||||||
LIBZMQ_DELETE (*it);
|
LIBZMQ_DELETE (*it);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -207,8 +207,8 @@ void zmq::epoll_t::loop ()
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Destroy retired event sources.
|
// Destroy retired event sources.
|
||||||
for (retired_t::iterator it = _retired.begin (); it != _retired.end ();
|
for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
|
||||||
++it) {
|
it != end; ++it) {
|
||||||
LIBZMQ_DELETE (*it);
|
LIBZMQ_DELETE (*it);
|
||||||
}
|
}
|
||||||
_retired.clear ();
|
_retired.clear ();
|
||||||
|
@ -61,10 +61,11 @@ void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_)
|
|||||||
void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
|
void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
|
||||||
{
|
{
|
||||||
// TODO: make a copy of array and signal outside the lock
|
// TODO: make a copy of array and signal outside the lock
|
||||||
|
const std::vector<zmq::signaler_t *>::iterator end = _signalers.end ();
|
||||||
std::vector<signaler_t *>::iterator it =
|
std::vector<signaler_t *>::iterator it =
|
||||||
std::find (_signalers.begin (), _signalers.end (), signaler_);
|
std::find (_signalers.begin (), end, signaler_);
|
||||||
|
|
||||||
if (it != _signalers.end ())
|
if (it != end)
|
||||||
_signalers.erase (it);
|
_signalers.erase (it);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,8 +82,10 @@ void zmq::mailbox_safe_t::send (const command_t &cmd_)
|
|||||||
|
|
||||||
if (!ok) {
|
if (!ok) {
|
||||||
_cond_var.broadcast ();
|
_cond_var.broadcast ();
|
||||||
for (std::vector<signaler_t *>::iterator it = _signalers.begin ();
|
|
||||||
it != _signalers.end (); ++it) {
|
for (std::vector<signaler_t *>::iterator it = _signalers.begin (),
|
||||||
|
end = _signalers.end ();
|
||||||
|
it != end; ++it) {
|
||||||
(*it)->send ();
|
(*it)->send ();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -178,12 +178,14 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *ptr_,
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
for (std::map<std::string, std::string>::const_iterator it =
|
for (std::map<std::string, std::string>::const_iterator
|
||||||
options.app_metadata.begin ();
|
it = options.app_metadata.begin (),
|
||||||
it != options.app_metadata.end (); ++it)
|
end = options.app_metadata.end ();
|
||||||
|
it != end; ++it) {
|
||||||
ptr +=
|
ptr +=
|
||||||
add_property (ptr, ptr_capacity_ - (ptr - ptr_), it->first.c_str (),
|
add_property (ptr, ptr_capacity_ - (ptr - ptr_), it->first.c_str (),
|
||||||
it->second.c_str (), strlen (it->second.c_str ()));
|
it->second.c_str (), strlen (it->second.c_str ()));
|
||||||
|
}
|
||||||
|
|
||||||
return ptr - ptr_;
|
return ptr - ptr_;
|
||||||
}
|
}
|
||||||
@ -193,9 +195,10 @@ size_t zmq::mechanism_t::basic_properties_len () const
|
|||||||
const char *socket_type = socket_type_string (options.type);
|
const char *socket_type = socket_type_string (options.type);
|
||||||
size_t meta_len = 0;
|
size_t meta_len = 0;
|
||||||
|
|
||||||
for (std::map<std::string, std::string>::const_iterator it =
|
for (std::map<std::string, std::string>::const_iterator
|
||||||
options.app_metadata.begin ();
|
it = options.app_metadata.begin (),
|
||||||
it != options.app_metadata.end (); ++it) {
|
end = options.app_metadata.end ();
|
||||||
|
it != end; ++it) {
|
||||||
meta_len +=
|
meta_len +=
|
||||||
property_len (it->first.c_str (), strlen (it->second.c_str ()));
|
property_len (it->first.c_str (), strlen (it->second.c_str ()));
|
||||||
}
|
}
|
||||||
|
@ -158,7 +158,8 @@ void zmq::own_t::process_term (int linger_)
|
|||||||
zmq_assert (!_terminating);
|
zmq_assert (!_terminating);
|
||||||
|
|
||||||
// Send termination request to all owned objects.
|
// Send termination request to all owned objects.
|
||||||
for (owned_t::iterator it = _owned.begin (); it != _owned.end (); ++it)
|
for (owned_t::iterator it = _owned.begin (), end = _owned.end (); it != end;
|
||||||
|
++it)
|
||||||
send_term (*it, linger_);
|
send_term (*it, linger_);
|
||||||
register_term_acks (static_cast<int> (_owned.size ()));
|
register_term_acks (static_cast<int> (_owned.size ()));
|
||||||
_owned.clear ();
|
_owned.clear ();
|
||||||
|
@ -86,7 +86,8 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
|
|||||||
void zmq::pgm_receiver_t::unplug ()
|
void zmq::pgm_receiver_t::unplug ()
|
||||||
{
|
{
|
||||||
// Delete decoders.
|
// Delete decoders.
|
||||||
for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
|
for (peers_t::iterator it = peers.begin (), end = peers.end (); it != end;
|
||||||
|
++it) {
|
||||||
if (it->second.decoder != NULL) {
|
if (it->second.decoder != NULL) {
|
||||||
LIBZMQ_DELETE (it->second.decoder);
|
LIBZMQ_DELETE (it->second.decoder);
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,8 @@ void zmq::poller_base_t::add_timer (int timeout_, i_poll_events *sink_, int id_)
|
|||||||
void zmq::poller_base_t::cancel_timer (i_poll_events *sink_, int id_)
|
void zmq::poller_base_t::cancel_timer (i_poll_events *sink_, int id_)
|
||||||
{
|
{
|
||||||
// Complexity of this operation is O(n). We assume it is rarely used.
|
// Complexity of this operation is O(n). We assume it is rarely used.
|
||||||
for (timers_t::iterator it = _timers.begin (); it != _timers.end (); ++it)
|
for (timers_t::iterator it = _timers.begin (), end = _timers.end ();
|
||||||
|
it != end; ++it)
|
||||||
if (it->second.sink == sink_ && it->second.id == id_) {
|
if (it->second.sink == sink_ && it->second.id == id_) {
|
||||||
_timers.erase (it);
|
_timers.erase (it);
|
||||||
return;
|
return;
|
||||||
|
@ -122,8 +122,9 @@ int zmq::radio_t::xsetsockopt (int option_,
|
|||||||
|
|
||||||
void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
|
void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
for (subscriptions_t::iterator it = _subscriptions.begin ();
|
for (subscriptions_t::iterator it = _subscriptions.begin (),
|
||||||
it != _subscriptions.end ();) {
|
end = _subscriptions.end ();
|
||||||
|
it != end;) {
|
||||||
if (it->second == pipe_) {
|
if (it->second == pipe_) {
|
||||||
#if __cplusplus >= 201103L
|
#if __cplusplus >= 201103L
|
||||||
it = _subscriptions.erase (it);
|
it = _subscriptions.erase (it);
|
||||||
@ -135,10 +136,13 @@ void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
udp_pipes_t::iterator it =
|
{
|
||||||
std::find (_udp_pipes.begin (), _udp_pipes.end (), pipe_);
|
const udp_pipes_t::iterator end = _udp_pipes.end ();
|
||||||
if (it != _udp_pipes.end ())
|
const udp_pipes_t::iterator it =
|
||||||
_udp_pipes.erase (it);
|
std::find (_udp_pipes.begin (), end, pipe_);
|
||||||
|
if (it != end)
|
||||||
|
_udp_pipes.erase (it);
|
||||||
|
}
|
||||||
|
|
||||||
_dist.pipe_terminated (pipe_);
|
_dist.pipe_terminated (pipe_);
|
||||||
}
|
}
|
||||||
@ -159,8 +163,9 @@ int zmq::radio_t::xsend (msg_t *msg_)
|
|||||||
for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
|
for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
|
||||||
_dist.match (it->second);
|
_dist.match (it->second);
|
||||||
|
|
||||||
for (udp_pipes_t::iterator it = _udp_pipes.begin ();
|
for (udp_pipes_t::iterator it = _udp_pipes.begin (),
|
||||||
it != _udp_pipes.end (); ++it)
|
end = _udp_pipes.end ();
|
||||||
|
it != end; ++it)
|
||||||
_dist.match (*it);
|
_dist.match (*it);
|
||||||
|
|
||||||
int rc = -1;
|
int rc = -1;
|
||||||
|
@ -165,7 +165,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
|
|||||||
if (it == _anonymous_pipes.end ())
|
if (it == _anonymous_pipes.end ())
|
||||||
_fq.activated (pipe_);
|
_fq.activated (pipe_);
|
||||||
else {
|
else {
|
||||||
bool routing_id_ok = identify_peer (pipe_, false);
|
const bool routing_id_ok = identify_peer (pipe_, false);
|
||||||
if (routing_id_ok) {
|
if (routing_id_ok) {
|
||||||
_anonymous_pipes.erase (it);
|
_anonymous_pipes.erase (it);
|
||||||
_fq.attach (pipe_);
|
_fq.attach (pipe_);
|
||||||
|
@ -86,8 +86,9 @@ void zmq::server_t::xread_activated (pipe_t *pipe_)
|
|||||||
|
|
||||||
void zmq::server_t::xwrite_activated (pipe_t *pipe_)
|
void zmq::server_t::xwrite_activated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
|
const out_pipes_t::iterator end = _out_pipes.end ();
|
||||||
out_pipes_t::iterator it;
|
out_pipes_t::iterator it;
|
||||||
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
|
for (it = _out_pipes.begin (); it != end; ++it)
|
||||||
if (it->second.pipe == pipe_)
|
if (it->second.pipe == pipe_)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
|
@ -786,12 +786,11 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
options.connected = true;
|
options.connected = true;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
bool is_single_connect =
|
const bool is_single_connect =
|
||||||
(options.type == ZMQ_DEALER || options.type == ZMQ_SUB
|
(options.type == ZMQ_DEALER || options.type == ZMQ_SUB
|
||||||
|| options.type == ZMQ_PUB || options.type == ZMQ_REQ);
|
|| options.type == ZMQ_PUB || options.type == ZMQ_REQ);
|
||||||
if (unlikely (is_single_connect)) {
|
if (unlikely (is_single_connect)) {
|
||||||
const endpoints_t::iterator it = _endpoints.find (addr_);
|
if (0 != _endpoints.count (addr_)) {
|
||||||
if (it != _endpoints.end ()) {
|
|
||||||
// There is no valid use for multiple connects for SUB-PUB nor
|
// There is no valid use for multiple connects for SUB-PUB nor
|
||||||
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
|
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
|
||||||
// nonsensical results.
|
// nonsensical results.
|
||||||
@ -1551,8 +1550,8 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
|
|||||||
xpipe_terminated (pipe_);
|
xpipe_terminated (pipe_);
|
||||||
|
|
||||||
// Remove pipe from inproc pipes
|
// Remove pipe from inproc pipes
|
||||||
for (inprocs_t::iterator it = _inprocs.begin (); it != _inprocs.end ();
|
for (inprocs_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
|
||||||
++it)
|
it != end; ++it)
|
||||||
if (it->second == pipe_) {
|
if (it->second == pipe_) {
|
||||||
_inprocs.erase (it);
|
_inprocs.erase (it);
|
||||||
break;
|
break;
|
||||||
@ -1790,12 +1789,13 @@ int zmq::routing_socket_base_t::xsetsockopt (int option_,
|
|||||||
|
|
||||||
void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
|
void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
|
const out_pipes_t::iterator end = _out_pipes.end ();
|
||||||
out_pipes_t::iterator it;
|
out_pipes_t::iterator it;
|
||||||
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
|
for (it = _out_pipes.begin (); it != end; ++it)
|
||||||
if (it->second.pipe == pipe_)
|
if (it->second.pipe == pipe_)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
zmq_assert (it != _out_pipes.end ());
|
zmq_assert (it != end);
|
||||||
zmq_assert (!it->second.active);
|
zmq_assert (!it->second.active);
|
||||||
it->second.active = true;
|
it->second.active = true;
|
||||||
}
|
}
|
||||||
|
@ -31,6 +31,7 @@
|
|||||||
#include "socket_poller.hpp"
|
#include "socket_poller.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "polling_util.hpp"
|
#include "polling_util.hpp"
|
||||||
|
#include "macros.hpp"
|
||||||
|
|
||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
|
|
||||||
@ -59,7 +60,8 @@ zmq::socket_poller_t::~socket_poller_t ()
|
|||||||
// Mark the socket_poller as dead
|
// Mark the socket_poller as dead
|
||||||
_tag = 0xdeadbeef;
|
_tag = 0xdeadbeef;
|
||||||
|
|
||||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||||
|
++it) {
|
||||||
// TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead?
|
// TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead?
|
||||||
if (it->socket && it->socket->check_tag ()
|
if (it->socket && it->socket->check_tag ()
|
||||||
&& is_thread_safe (*it->socket)) {
|
&& is_thread_safe (*it->socket)) {
|
||||||
@ -68,8 +70,7 @@ zmq::socket_poller_t::~socket_poller_t ()
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (_signaler != NULL) {
|
if (_signaler != NULL) {
|
||||||
delete _signaler;
|
LIBZMQ_DELETE (_signaler);
|
||||||
_signaler = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||||
@ -89,7 +90,8 @@ int zmq::socket_poller_t::add (socket_base_t *socket_,
|
|||||||
void *user_data_,
|
void *user_data_,
|
||||||
short events_)
|
short events_)
|
||||||
{
|
{
|
||||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||||
|
++it) {
|
||||||
if (it->socket == socket_) {
|
if (it->socket == socket_) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
@ -138,7 +140,8 @@ int zmq::socket_poller_t::add (socket_base_t *socket_,
|
|||||||
|
|
||||||
int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
|
int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
|
||||||
{
|
{
|
||||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||||
|
++it) {
|
||||||
if (!it->socket && it->fd == fd_) {
|
if (!it->socket && it->fd == fd_) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
@ -169,14 +172,15 @@ int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
|
|||||||
|
|
||||||
int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
|
int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
|
||||||
{
|
{
|
||||||
|
const items_t::iterator end = _items.end ();
|
||||||
items_t::iterator it;
|
items_t::iterator it;
|
||||||
|
|
||||||
for (it = _items.begin (); it != _items.end (); ++it) {
|
for (it = _items.begin (); it != end; ++it) {
|
||||||
if (it->socket == socket_)
|
if (it->socket == socket_)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (it == _items.end ()) {
|
if (it == end) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -190,14 +194,15 @@ int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
|
|||||||
|
|
||||||
int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
|
int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
|
||||||
{
|
{
|
||||||
|
const items_t::iterator end = _items.end ();
|
||||||
items_t::iterator it;
|
items_t::iterator it;
|
||||||
|
|
||||||
for (it = _items.begin (); it != _items.end (); ++it) {
|
for (it = _items.begin (); it != end; ++it) {
|
||||||
if (!it->socket && it->fd == fd_)
|
if (!it->socket && it->fd == fd_)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (it == _items.end ()) {
|
if (it == end) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -211,14 +216,15 @@ int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
|
|||||||
|
|
||||||
int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
||||||
{
|
{
|
||||||
|
const items_t::iterator end = _items.end ();
|
||||||
items_t::iterator it;
|
items_t::iterator it;
|
||||||
|
|
||||||
for (it = _items.begin (); it != _items.end (); ++it) {
|
for (it = _items.begin (); it != end; ++it) {
|
||||||
if (it->socket == socket_)
|
if (it->socket == socket_)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (it == _items.end ()) {
|
if (it == end) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -235,14 +241,15 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
|||||||
|
|
||||||
int zmq::socket_poller_t::remove_fd (fd_t fd_)
|
int zmq::socket_poller_t::remove_fd (fd_t fd_)
|
||||||
{
|
{
|
||||||
|
const items_t::iterator end = _items.end ();
|
||||||
items_t::iterator it;
|
items_t::iterator it;
|
||||||
|
|
||||||
for (it = _items.begin (); it != _items.end (); ++it) {
|
for (it = _items.begin (); it != end; ++it) {
|
||||||
if (!it->socket && it->fd == fd_)
|
if (!it->socket && it->fd == fd_)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (it == _items.end ()) {
|
if (it == end) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -266,7 +273,8 @@ void zmq::socket_poller_t::rebuild ()
|
|||||||
_pollfds = NULL;
|
_pollfds = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||||
|
++it) {
|
||||||
if (it->events) {
|
if (it->events) {
|
||||||
if (it->socket && is_thread_safe (*it->socket)) {
|
if (it->socket && is_thread_safe (*it->socket)) {
|
||||||
if (!_use_signaler) {
|
if (!_use_signaler) {
|
||||||
@ -292,7 +300,8 @@ void zmq::socket_poller_t::rebuild ()
|
|||||||
_pollfds[0].events = POLLIN;
|
_pollfds[0].events = POLLIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||||
|
++it) {
|
||||||
if (it->events) {
|
if (it->events) {
|
||||||
if (it->socket) {
|
if (it->socket) {
|
||||||
if (!is_thread_safe (*it->socket)) {
|
if (!is_thread_safe (*it->socket)) {
|
||||||
@ -330,7 +339,8 @@ void zmq::socket_poller_t::rebuild ()
|
|||||||
FD_ZERO (_pollset_out.get ());
|
FD_ZERO (_pollset_out.get ());
|
||||||
FD_ZERO (_pollset_err.get ());
|
FD_ZERO (_pollset_err.get ());
|
||||||
|
|
||||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||||
|
++it) {
|
||||||
if (it->socket && is_thread_safe (*it->socket) && it->events) {
|
if (it->socket && is_thread_safe (*it->socket) && it->events) {
|
||||||
_use_signaler = true;
|
_use_signaler = true;
|
||||||
FD_SET (_signaler->get_fd (), _pollset_in.get ());
|
FD_SET (_signaler->get_fd (), _pollset_in.get ());
|
||||||
@ -342,7 +352,8 @@ void zmq::socket_poller_t::rebuild ()
|
|||||||
_max_fd = 0;
|
_max_fd = 0;
|
||||||
|
|
||||||
// Build the fd_sets for passing to select ().
|
// Build the fd_sets for passing to select ().
|
||||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||||
|
++it) {
|
||||||
if (it->events) {
|
if (it->events) {
|
||||||
// If the poll item is a 0MQ socket we are interested in input on the
|
// If the poll item is a 0MQ socket we are interested in input on the
|
||||||
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
||||||
@ -404,8 +415,8 @@ int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
|
|||||||
#endif
|
#endif
|
||||||
{
|
{
|
||||||
int found = 0;
|
int found = 0;
|
||||||
for (items_t::iterator it = _items.begin ();
|
for (items_t::iterator it = _items.begin (), end = _items.end ();
|
||||||
it != _items.end () && found < n_events_; ++it) {
|
it != end && found < n_events_; ++it) {
|
||||||
// The poll item is a 0MQ socket. Retrieve pending events
|
// The poll item is a 0MQ socket. Retrieve pending events
|
||||||
// using the ZMQ_EVENTS socket option.
|
// using the ZMQ_EVENTS socket option.
|
||||||
if (it->socket) {
|
if (it->socket) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user