mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-01 10:57:59 +01:00
Problem: irregular loop conditions
Solution: use standard loop constructs and optimize some loops
This commit is contained in:
parent
628adf1cb7
commit
f8f7913737
23
src/ctx.cpp
23
src/ctx.cpp
@ -117,12 +117,13 @@ zmq::ctx_t::~ctx_t ()
|
|||||||
|
|
||||||
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
|
// Ask I/O threads to terminate. If stop signal wasn't sent to I/O
|
||||||
// thread subsequent invocation of destructor would hang-up.
|
// thread subsequent invocation of destructor would hang-up.
|
||||||
for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
|
const io_threads_t::size_type io_threads_size = _io_threads.size ();
|
||||||
|
for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
|
||||||
_io_threads[i]->stop ();
|
_io_threads[i]->stop ();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait till I/O threads actually terminate.
|
// Wait till I/O threads actually terminate.
|
||||||
for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
|
for (io_threads_t::size_type i = 0; i != io_threads_size; i++) {
|
||||||
LIBZMQ_DELETE (_io_threads[i]);
|
LIBZMQ_DELETE (_io_threads[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -176,9 +177,10 @@ int zmq::ctx_t::terminate ()
|
|||||||
if (_pid != getpid ()) {
|
if (_pid != getpid ()) {
|
||||||
// we are a forked child process. Close all file descriptors
|
// we are a forked child process. Close all file descriptors
|
||||||
// inherited from the parent.
|
// inherited from the parent.
|
||||||
for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
|
for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
|
||||||
|
i++) {
|
||||||
_sockets[i]->get_mailbox ()->forked ();
|
_sockets[i]->get_mailbox ()->forked ();
|
||||||
|
}
|
||||||
_term_mailbox.forked ();
|
_term_mailbox.forked ();
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
@ -193,8 +195,10 @@ int zmq::ctx_t::terminate ()
|
|||||||
// First send stop command to sockets so that any blocking calls
|
// First send stop command to sockets so that any blocking calls
|
||||||
// can be interrupted. If there are no sockets we can ask reaper
|
// can be interrupted. If there are no sockets we can ask reaper
|
||||||
// thread to stop.
|
// thread to stop.
|
||||||
for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
|
for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
|
||||||
|
i++) {
|
||||||
_sockets[i]->stop ();
|
_sockets[i]->stop ();
|
||||||
|
}
|
||||||
if (_sockets.empty ())
|
if (_sockets.empty ())
|
||||||
_reaper->stop ();
|
_reaper->stop ();
|
||||||
}
|
}
|
||||||
@ -239,8 +243,10 @@ int zmq::ctx_t::shutdown ()
|
|||||||
// Send stop command to sockets so that any blocking calls
|
// Send stop command to sockets so that any blocking calls
|
||||||
// can be interrupted. If there are no sockets we can ask reaper
|
// can be interrupted. If there are no sockets we can ask reaper
|
||||||
// thread to stop.
|
// thread to stop.
|
||||||
for (sockets_t::size_type i = 0; i != _sockets.size (); i++)
|
for (sockets_t::size_type i = 0, size = _sockets.size (); i != size;
|
||||||
|
i++) {
|
||||||
_sockets[i]->stop ();
|
_sockets[i]->stop ();
|
||||||
|
}
|
||||||
if (_sockets.empty ())
|
if (_sockets.empty ())
|
||||||
_reaper->stop ();
|
_reaper->stop ();
|
||||||
}
|
}
|
||||||
@ -666,9 +672,10 @@ zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
|
|||||||
// Find the I/O thread with minimum load.
|
// Find the I/O thread with minimum load.
|
||||||
int min_load = -1;
|
int min_load = -1;
|
||||||
io_thread_t *selected_io_thread = NULL;
|
io_thread_t *selected_io_thread = NULL;
|
||||||
for (io_threads_t::size_type i = 0; i != _io_threads.size (); i++) {
|
for (io_threads_t::size_type i = 0, size = _io_threads.size (); i != size;
|
||||||
|
i++) {
|
||||||
if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
|
if (!affinity_ || (affinity_ & (uint64_t (1) << i))) {
|
||||||
int load = _io_threads[i]->get_load ();
|
const int load = _io_threads[i]->get_load ();
|
||||||
if (selected_io_thread == NULL || load < min_load) {
|
if (selected_io_thread == NULL || load < min_load) {
|
||||||
min_load = load;
|
min_load = load;
|
||||||
selected_io_thread = _io_threads[i];
|
selected_io_thread = _io_threads[i];
|
||||||
|
@ -427,13 +427,14 @@ void zmq::generic_mtrie_t<T>::match (prefix_t data_,
|
|||||||
void (*func_) (value_t *pipe_, Arg arg_),
|
void (*func_) (value_t *pipe_, Arg arg_),
|
||||||
Arg arg_)
|
Arg arg_)
|
||||||
{
|
{
|
||||||
generic_mtrie_t *current = this;
|
for (generic_mtrie_t *current = this; current; data_++, size_--) {
|
||||||
while (true) {
|
|
||||||
// Signal the pipes attached to this node.
|
// Signal the pipes attached to this node.
|
||||||
if (current->_pipes) {
|
if (current->_pipes) {
|
||||||
for (typename pipes_t::iterator it = current->_pipes->begin ();
|
for (typename pipes_t::iterator it = current->_pipes->begin (),
|
||||||
it != current->_pipes->end (); ++it)
|
end = current->_pipes->end ();
|
||||||
|
it != end; ++it) {
|
||||||
func_ (*it, arg_);
|
func_ (*it, arg_);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we are at the end of the message, there's nothing more to match.
|
// If we are at the end of the message, there's nothing more to match.
|
||||||
@ -444,25 +445,20 @@ void zmq::generic_mtrie_t<T>::match (prefix_t data_,
|
|||||||
if (current->_count == 0)
|
if (current->_count == 0)
|
||||||
break;
|
break;
|
||||||
|
|
||||||
// If there's one subnode (optimisation).
|
|
||||||
if (current->_count == 1) {
|
if (current->_count == 1) {
|
||||||
if (data_[0] != current->_min)
|
// If there's one subnode (optimisation).
|
||||||
|
if (data_[0] != current->_min) {
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
current = current->_next.node;
|
current = current->_next.node;
|
||||||
data_++;
|
} else {
|
||||||
size_--;
|
// If there are multiple subnodes.
|
||||||
continue;
|
if (data_[0] < current->_min
|
||||||
|
|| data_[0] >= current->_min + current->_count) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
current = current->_next.table[data_[0] - current->_min];
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are multiple subnodes.
|
|
||||||
if (data_[0] < current->_min
|
|
||||||
|| data_[0] >= current->_min + current->_count)
|
|
||||||
break;
|
|
||||||
if (!current->_next.table[data_[0] - current->_min])
|
|
||||||
break;
|
|
||||||
current = current->_next.table[data_[0] - current->_min];
|
|
||||||
data_++;
|
|
||||||
size_--;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -253,9 +253,10 @@ bool zmq::ipc_listener_t::filter (fd_t sock_)
|
|||||||
|
|
||||||
if (!(pw = getpwuid (cred.uid)))
|
if (!(pw = getpwuid (cred.uid)))
|
||||||
return false;
|
return false;
|
||||||
for (options_t::ipc_gid_accept_filters_t::const_iterator it =
|
for (options_t::ipc_gid_accept_filters_t::const_iterator
|
||||||
options.ipc_gid_accept_filters.begin ();
|
it = options.ipc_gid_accept_filters.begin (),
|
||||||
it != options.ipc_gid_accept_filters.end (); it++) {
|
end = options.ipc_gid_accept_filters.end ();
|
||||||
|
it != end; it++) {
|
||||||
if (!(gr = getgrgid (*it)))
|
if (!(gr = getgrgid (*it)))
|
||||||
continue;
|
continue;
|
||||||
for (char **mem = gr->gr_mem; *mem; mem++) {
|
for (char **mem = gr->gr_mem; *mem; mem++) {
|
||||||
|
@ -188,7 +188,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
|
|||||||
if (unlikely (_state != active && _state != waiting_for_delimiter))
|
if (unlikely (_state != active && _state != waiting_for_delimiter))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
for (bool payload_read = false; !payload_read;) {
|
while (true) {
|
||||||
if (!_in_pipe->read (msg_)) {
|
if (!_in_pipe->read (msg_)) {
|
||||||
_in_active = false;
|
_in_active = false;
|
||||||
return false;
|
return false;
|
||||||
@ -198,8 +198,9 @@ bool zmq::pipe_t::read (msg_t *msg_)
|
|||||||
if (unlikely (msg_->is_credential ())) {
|
if (unlikely (msg_->is_credential ())) {
|
||||||
const int rc = msg_->close ();
|
const int rc = msg_->close ();
|
||||||
zmq_assert (rc == 0);
|
zmq_assert (rc == 0);
|
||||||
} else
|
} else {
|
||||||
payload_read = true;
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If delimiter was read, start termination process of the pipe.
|
// If delimiter was read, start termination process of the pipe.
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
|
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <iterator>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
node_t::node_t (unsigned char *data_) : _data (data_)
|
node_t::node_t (unsigned char *data_) : _data (data_)
|
||||||
@ -187,7 +188,7 @@ zmq::radix_tree_t::radix_tree_t () : _root (make_node (0, 0, 0)), _size (0)
|
|||||||
|
|
||||||
static void free_nodes (node_t node_)
|
static void free_nodes (node_t node_)
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < node_.edgecount (); ++i)
|
for (size_t i = 0, count = node_.edgecount (); i < count; ++i)
|
||||||
free_nodes (node_.node_at (i));
|
free_nodes (node_.node_at (i));
|
||||||
free (node_._data);
|
free (node_._data);
|
||||||
}
|
}
|
||||||
@ -234,18 +235,19 @@ match_result_t zmq::radix_tree_t::match (const unsigned char *key_,
|
|||||||
size_t parent_edge_index = 0;
|
size_t parent_edge_index = 0;
|
||||||
|
|
||||||
while (current_node.prefix_length () > 0 || current_node.edgecount () > 0) {
|
while (current_node.prefix_length () > 0 || current_node.edgecount () > 0) {
|
||||||
|
const unsigned char *const prefix = current_node.prefix ();
|
||||||
|
const size_t prefix_length = current_node.prefix_length ();
|
||||||
|
|
||||||
for (prefix_byte_index = 0;
|
for (prefix_byte_index = 0;
|
||||||
prefix_byte_index < current_node.prefix_length ()
|
prefix_byte_index < prefix_length && key_byte_index < key_size_;
|
||||||
&& key_byte_index < key_size_;
|
|
||||||
++prefix_byte_index, ++key_byte_index) {
|
++prefix_byte_index, ++key_byte_index) {
|
||||||
if (current_node.prefix ()[prefix_byte_index]
|
if (prefix[prefix_byte_index] != key_[key_byte_index])
|
||||||
!= key_[key_byte_index])
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Even if a prefix of the key matches and we're doing a
|
// Even if a prefix of the key matches and we're doing a
|
||||||
// lookup, this means we've found a matching subscription.
|
// lookup, this means we've found a matching subscription.
|
||||||
if (is_lookup_ && prefix_byte_index == current_node.prefix_length ()
|
if (is_lookup_ && prefix_byte_index == prefix_length
|
||||||
&& current_node.refcount () > 0) {
|
&& current_node.refcount () > 0) {
|
||||||
key_byte_index = key_size_;
|
key_byte_index = key_size_;
|
||||||
break;
|
break;
|
||||||
@ -253,14 +255,14 @@ match_result_t zmq::radix_tree_t::match (const unsigned char *key_,
|
|||||||
|
|
||||||
// There was a mismatch or we've matched the whole key, so
|
// There was a mismatch or we've matched the whole key, so
|
||||||
// there's nothing more to do.
|
// there's nothing more to do.
|
||||||
if (prefix_byte_index != current_node.prefix_length ()
|
if (prefix_byte_index != prefix_length || key_byte_index == key_size_)
|
||||||
|| key_byte_index == key_size_)
|
|
||||||
break;
|
break;
|
||||||
|
|
||||||
// We need to match the rest of the key. Check if there's an
|
// We need to match the rest of the key. Check if there's an
|
||||||
// outgoing edge from this node.
|
// outgoing edge from this node.
|
||||||
node_t next_node = current_node;
|
node_t next_node = current_node;
|
||||||
for (size_t i = 0; i < current_node.edgecount (); ++i) {
|
for (size_t i = 0, edgecount = current_node.edgecount (); i < edgecount;
|
||||||
|
++i) {
|
||||||
if (current_node.first_byte_at (i) == key_[key_byte_index]) {
|
if (current_node.first_byte_at (i) == key_[key_byte_index]) {
|
||||||
parent_edge_index = edge_index;
|
parent_edge_index = edge_index;
|
||||||
edge_index = i;
|
edge_index = i;
|
||||||
@ -543,18 +545,20 @@ visit_keys (node_t node_,
|
|||||||
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
||||||
void *arg_)
|
void *arg_)
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < node_.prefix_length (); ++i)
|
const size_t prefix_length = node_.prefix_length ();
|
||||||
buffer_.push_back (node_.prefix ()[i]);
|
buffer_.reserve (buffer_.size () + prefix_length);
|
||||||
|
std::copy (node_.prefix (), node_.prefix () + prefix_length,
|
||||||
|
std::back_inserter (buffer_));
|
||||||
|
|
||||||
if (node_.refcount () > 0) {
|
if (node_.refcount () > 0) {
|
||||||
zmq_assert (!buffer_.empty ());
|
zmq_assert (!buffer_.empty ());
|
||||||
func_ (&buffer_[0], buffer_.size (), arg_);
|
func_ (&buffer_[0], buffer_.size (), arg_);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < node_.edgecount (); ++i)
|
for (size_t i = 0, edgecount = node_.edgecount (); i < edgecount; ++i) {
|
||||||
visit_keys (node_.node_at (i), buffer_, func_, arg_);
|
visit_keys (node_.node_at (i), buffer_, func_, arg_);
|
||||||
for (size_t i = 0; i < node_.prefix_length (); ++i)
|
}
|
||||||
buffer_.pop_back ();
|
buffer_.resize (buffer_.size () - prefix_length);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::radix_tree_t::apply (
|
void zmq::radix_tree_t::apply (
|
||||||
|
@ -1469,8 +1469,9 @@ void zmq::socket_base_t::process_term (int linger_)
|
|||||||
unregister_endpoints (this);
|
unregister_endpoints (this);
|
||||||
|
|
||||||
// Ask all attached pipes to terminate.
|
// Ask all attached pipes to terminate.
|
||||||
for (pipes_t::size_type i = 0; i != _pipes.size (); ++i)
|
for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
|
||||||
_pipes[i]->terminate (false);
|
_pipes[i]->terminate (false);
|
||||||
|
}
|
||||||
register_term_acks (static_cast<int> (_pipes.size ()));
|
register_term_acks (static_cast<int> (_pipes.size ()));
|
||||||
|
|
||||||
// Continue the termination process immediately.
|
// Continue the termination process immediately.
|
||||||
@ -1515,7 +1516,7 @@ int zmq::socket_base_t::query_pipes_stats ()
|
|||||||
errno = EAGAIN;
|
errno = EAGAIN;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
|
for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
|
||||||
_pipes[i]->send_stats_to_peer (this);
|
_pipes[i]->send_stats_to_peer (this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1525,7 +1526,7 @@ int zmq::socket_base_t::query_pipes_stats ()
|
|||||||
void zmq::socket_base_t::update_pipe_options (int option_)
|
void zmq::socket_base_t::update_pipe_options (int option_)
|
||||||
{
|
{
|
||||||
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
|
if (option_ == ZMQ_SNDHWM || option_ == ZMQ_RCVHWM) {
|
||||||
for (pipes_t::size_type i = 0; i != _pipes.size (); ++i) {
|
for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
|
||||||
_pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
|
_pipes[i]->set_hwms (options.rcvhwm, options.sndhwm);
|
||||||
_pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
|
_pipes[i]->send_hwms_to_peer (options.sndhwm, options.rcvhwm);
|
||||||
}
|
}
|
||||||
|
@ -376,8 +376,9 @@ class routing_socket_base_t : public socket_base_t
|
|||||||
template <typename Func> bool any_of_out_pipes (Func func_)
|
template <typename Func> bool any_of_out_pipes (Func func_)
|
||||||
{
|
{
|
||||||
bool res = false;
|
bool res = false;
|
||||||
for (out_pipes_t::iterator it = _out_pipes.begin ();
|
for (out_pipes_t::iterator it = _out_pipes.begin (),
|
||||||
it != _out_pipes.end () && !res; ++it) {
|
end = _out_pipes.end ();
|
||||||
|
it != end && !res; ++it) {
|
||||||
res |= func_ (*it->second.pipe);
|
res |= func_ (*it->second.pipe);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,8 +232,10 @@ zmq::fd_t zmq::tcp_listener_t::accept ()
|
|||||||
|
|
||||||
if (!options.tcp_accept_filters.empty ()) {
|
if (!options.tcp_accept_filters.empty ()) {
|
||||||
bool matched = false;
|
bool matched = false;
|
||||||
for (options_t::tcp_accept_filters_t::size_type i = 0;
|
for (options_t::tcp_accept_filters_t::size_type
|
||||||
i != options.tcp_accept_filters.size (); ++i) {
|
i = 0,
|
||||||
|
size = options.tcp_accept_filters.size ();
|
||||||
|
i != size; ++i) {
|
||||||
if (options.tcp_accept_filters[i].match_address (
|
if (options.tcp_accept_filters[i].match_address (
|
||||||
reinterpret_cast<struct sockaddr *> (&ss), ss_len)) {
|
reinterpret_cast<struct sockaddr *> (&ss), ss_len)) {
|
||||||
matched = true;
|
matched = true;
|
||||||
|
@ -355,8 +355,9 @@ void zmq::thread_t::
|
|||||||
if (!_thread_affinity_cpus.empty ()) {
|
if (!_thread_affinity_cpus.empty ()) {
|
||||||
cpu_set_t cpuset;
|
cpu_set_t cpuset;
|
||||||
CPU_ZERO (&cpuset);
|
CPU_ZERO (&cpuset);
|
||||||
for (std::set<int>::const_iterator it = _thread_affinity_cpus.begin ();
|
for (std::set<int>::const_iterator it = _thread_affinity_cpus.begin (),
|
||||||
it != _thread_affinity_cpus.end (); it++) {
|
end = _thread_affinity_cpus.end ();
|
||||||
|
it != end; it++) {
|
||||||
CPU_SET ((int) (*it), &cpuset);
|
CPU_SET ((int) (*it), &cpuset);
|
||||||
}
|
}
|
||||||
rc =
|
rc =
|
||||||
|
@ -117,7 +117,8 @@ void zmq::ws_encoder_t::size_ready ()
|
|||||||
static_cast<unsigned char *> (_masked_msg.data ());
|
static_cast<unsigned char *> (_masked_msg.data ());
|
||||||
unsigned char *src =
|
unsigned char *src =
|
||||||
static_cast<unsigned char *> (in_progress ()->data ());
|
static_cast<unsigned char *> (in_progress ()->data ());
|
||||||
for (size_t i = 0; i < in_progress ()->size (); ++i, mask_index++)
|
for (size_t i = 0, size = in_progress ()->size (); i < size;
|
||||||
|
++i, mask_index++)
|
||||||
dest[i] = src[i] ^ _mask[mask_index % 4];
|
dest[i] = src[i] ^ _mask[mask_index % 4];
|
||||||
|
|
||||||
next_step (_masked_msg.data (), _masked_msg.size (),
|
next_step (_masked_msg.data (), _masked_msg.size (),
|
||||||
|
Loading…
x
Reference in New Issue
Block a user