diff --git a/src/ctx.cpp b/src/ctx.cpp index 69dd8c22..c7ffb47d 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -137,8 +137,8 @@ int zmq::ctx_t::terminate () // Connect up any pending inproc connections, otherwise we will hang pending_connections_t copy = _pending_connections; - for (pending_connections_t::iterator p = copy.begin (); p != copy.end (); - ++p) { + for (pending_connections_t::iterator p = copy.begin (), end = copy.end (); + p != end; ++p) { zmq::socket_base_t *s = create_socket (ZMQ_PAIR); // create_socket might fail eg: out of memory/sockets limit reached zmq_assert (s); @@ -528,8 +528,9 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_) { scoped_lock_t locker (_endpoints_sync); - for (endpoints_t::iterator it = _endpoints.begin (); - it != _endpoints.end ();) { + for (endpoints_t::iterator it = _endpoints.begin (), + end = _endpoints.end (); + it != end;) { if (it->second.socket == socket_) #if __cplusplus >= 201103L it = _endpoints.erase (it); diff --git a/src/dish.cpp b/src/dish.cpp index 4d6ea976..5a7b67a4 100644 --- a/src/dish.cpp +++ b/src/dish.cpp @@ -100,16 +100,12 @@ int zmq::dish_t::xjoin (const char *group_) return -1; } - subscriptions_t::iterator it = _subscriptions.find (group); - // User cannot join same group twice - if (it != _subscriptions.end ()) { + if (!_subscriptions.insert (group).second) { errno = EINVAL; return -1; } - _subscriptions.insert (group); - msg_t msg; int rc = msg.init_join (); errno_assert (rc == 0); @@ -230,8 +226,9 @@ const zmq::blob_t &zmq::dish_t::get_credential () const void zmq::dish_t::send_subscriptions (pipe_t *pipe_) { - for (subscriptions_t::iterator it = _subscriptions.begin (); - it != _subscriptions.end (); ++it) { + for (subscriptions_t::iterator it = _subscriptions.begin (), + end = _subscriptions.end (); + it != end; ++it) { msg_t msg; int rc = msg.init_join (); errno_assert (rc == 0); diff --git a/src/epoll.cpp b/src/epoll.cpp index 050783bf..86634c07 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -75,8 +75,8 @@ zmq::epoll_t::~epoll_t () #else close (_epoll_fd); #endif - for (retired_t::iterator it = _retired.begin (); it != _retired.end (); - ++it) { + for (retired_t::iterator it = _retired.begin (), end = _retired.end (); + it != end; ++it) { LIBZMQ_DELETE (*it); } } @@ -207,8 +207,8 @@ void zmq::epoll_t::loop () } // Destroy retired event sources. - for (retired_t::iterator it = _retired.begin (); it != _retired.end (); - ++it) { + for (retired_t::iterator it = _retired.begin (), end = _retired.end (); + it != end; ++it) { LIBZMQ_DELETE (*it); } _retired.clear (); diff --git a/src/generic_mtrie.hpp b/src/generic_mtrie.hpp index 521de66d..8063fa60 100644 --- a/src/generic_mtrie.hpp +++ b/src/generic_mtrie.hpp @@ -92,6 +92,17 @@ template class generic_mtrie_t void (*func_) (prefix_t data_, size_t size_, Arg arg_), Arg arg_, bool call_on_uniq_); + template + void rm_helper_multiple_subnodes (unsigned char **buff_, + size_t buffsize_, + size_t maxbuffsize_, + void (*func_) (prefix_t data_, + size_t size_, + Arg arg_), + Arg arg_, + bool call_on_uniq_, + value_t *pipe_); + rm_result rm_helper (prefix_t prefix_, size_t size_, value_t *value_); bool is_redundant () const; diff --git a/src/generic_mtrie_impl.hpp b/src/generic_mtrie_impl.hpp index 36245495..35162bab 100644 --- a/src/generic_mtrie_impl.hpp +++ b/src/generic_mtrie_impl.hpp @@ -37,7 +37,6 @@ along with this program. If not, see . #include #include "err.hpp" -#include "pipe.hpp" #include "macros.hpp" #include "generic_mtrie.hpp" @@ -80,7 +79,7 @@ bool zmq::generic_mtrie_t::add_helper (prefix_t prefix_, { // We are at the node corresponding to the prefix. We are done. if (!size_) { - bool result = !_pipes; + const bool result = !_pipes; if (!_pipes) { _pipes = new (std::nothrow) pipes_t; alloc_assert (_pipes); @@ -89,7 +88,7 @@ bool zmq::generic_mtrie_t::add_helper (prefix_t prefix_, return result; } - unsigned char c = *prefix_; + const unsigned char c = *prefix_; if (c < _min || c >= _min + _count) { // The character is out of range of currently handled // characters. We have to extend the table. @@ -98,11 +97,11 @@ bool zmq::generic_mtrie_t::add_helper (prefix_t prefix_, _count = 1; _next.node = NULL; } else if (_count == 1) { - unsigned char oldc = _min; + const unsigned char oldc = _min; generic_mtrie_t *oldp = _next.node; _count = (_min < c ? c - _min : _min - c) + 1; - _next.table = - (generic_mtrie_t **) malloc (sizeof (generic_mtrie_t *) * _count); + _next.table = static_cast ( + malloc (sizeof (generic_mtrie_t *) * _count)); alloc_assert (_next.table); for (unsigned short i = 0; i != _count; ++i) _next.table[i] = 0; @@ -110,19 +109,19 @@ bool zmq::generic_mtrie_t::add_helper (prefix_t prefix_, _next.table[oldc - _min] = oldp; } else if (_min < c) { // The new character is above the current character range. - unsigned short old_count = _count; + const unsigned short old_count = _count; _count = c - _min + 1; - _next.table = (generic_mtrie_t **) realloc ( - _next.table, sizeof (generic_mtrie_t *) * _count); + _next.table = static_cast ( + realloc (_next.table, sizeof (generic_mtrie_t *) * _count)); alloc_assert (_next.table); for (unsigned short i = old_count; i != _count; i++) _next.table[i] = NULL; } else { // The new character is below the current character range. - unsigned short old_count = _count; + const unsigned short old_count = _count; _count = (_min + old_count) - c; - _next.table = (generic_mtrie_t **) realloc ( - _next.table, sizeof (generic_mtrie_t *) * _count); + _next.table = static_cast ( + realloc (_next.table, sizeof (generic_mtrie_t *) * _count)); alloc_assert (_next.table); memmove (_next.table + _min - c, _next.table, old_count * sizeof (generic_mtrie_t *)); @@ -194,29 +193,45 @@ void zmq::generic_mtrie_t::rm_helper (value_t *pipe_, alloc_assert (*buff_); } - // If there are no subnodes in the trie, return. - if (_count == 0) - return; + switch (_count) { + case 0: + // If there are no subnodes in the trie, return. + break; + case 1: + // If there's one subnode (optimisation). - // If there's one subnode (optimisation). - if (_count == 1) { - (*buff_)[buffsize_] = _min; - buffsize_++; - _next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, func_, - arg_, call_on_uniq_); + (*buff_)[buffsize_] = _min; + buffsize_++; + _next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, func_, + arg_, call_on_uniq_); - // Prune the node if it was made redundant by the removal - if (_next.node->is_redundant ()) { - LIBZMQ_DELETE (_next.node); - _count = 0; - --_live_nodes; - zmq_assert (_live_nodes == 0); - } - return; + // Prune the node if it was made redundant by the removal + if (_next.node->is_redundant ()) { + LIBZMQ_DELETE (_next.node); + _count = 0; + --_live_nodes; + zmq_assert (_live_nodes == 0); + } + break; + default: + // If there are multiple subnodes. + rm_helper_multiple_subnodes (buff_, buffsize_, maxbuffsize_, func_, + arg_, call_on_uniq_, pipe_); + break; } +} - // If there are multiple subnodes. - // +template +template +void zmq::generic_mtrie_t::rm_helper_multiple_subnodes ( + unsigned char **buff_, + size_t buffsize_, + size_t maxbuffsize_, + void (*func_) (prefix_t data_, size_t size_, Arg arg_), + Arg arg_, + bool call_on_uniq_, + value_t *pipe_) +{ // New min non-null character in the node table after the removal unsigned char new_min = _min + _count - 1; // New max non-null character in the node table after the removal @@ -253,46 +268,52 @@ void zmq::generic_mtrie_t::rm_helper (value_t *pipe_, zmq_assert (_count > 1); // Free the node table if it's no longer used. - if (_live_nodes == 0) { - free (_next.table); - _next.table = NULL; - _count = 0; - } - // Compact the node table if possible - else if (_live_nodes == 1) { - // If there's only one live node in the table we can - // switch to using the more compact single-node - // representation - zmq_assert (new_min == new_max); - zmq_assert (new_min >= _min && new_min < _min + _count); - generic_mtrie_t *node = _next.table[new_min - _min]; - zmq_assert (node); - free (_next.table); - _next.node = node; - _count = 1; - _min = new_min; - } else if (new_min > _min || new_max < _min + _count - 1) { - zmq_assert (new_max - new_min + 1 > 1); + switch (_live_nodes) { + case 0: + free (_next.table); + _next.table = NULL; + _count = 0; + break; + case 1: + // Compact the node table if possible - generic_mtrie_t **old_table = _next.table; - zmq_assert (new_min > _min || new_max < _min + _count - 1); - zmq_assert (new_min >= _min); - zmq_assert (new_max <= _min + _count - 1); - zmq_assert (new_max - new_min + 1 < _count); + // If there's only one live node in the table we can + // switch to using the more compact single-node + // representation + zmq_assert (new_min == new_max); + zmq_assert (new_min >= _min && new_min < _min + _count); + { + generic_mtrie_t *node = _next.table[new_min - _min]; + zmq_assert (node); + free (_next.table); + _next.node = node; + } + _count = 1; + _min = new_min; + break; + default: + if (new_min > _min || new_max < _min + _count - 1) { + zmq_assert (new_max - new_min + 1 > 1); - _count = new_max - new_min + 1; - _next.table = - (generic_mtrie_t **) malloc (sizeof (generic_mtrie_t *) * _count); - alloc_assert (_next.table); + generic_mtrie_t **old_table = _next.table; + zmq_assert (new_min > _min || new_max < _min + _count - 1); + zmq_assert (new_min >= _min); + zmq_assert (new_max <= _min + _count - 1); + zmq_assert (new_max - new_min + 1 < _count); - memmove (_next.table, old_table + (new_min - _min), - sizeof (generic_mtrie_t *) * _count); - free (old_table); + _count = new_max - new_min + 1; + _next.table = static_cast ( + malloc (sizeof (generic_mtrie_t *) * _count)); + alloc_assert (_next.table); - _min = new_min; + memmove (_next.table, old_table + (new_min - _min), + sizeof (generic_mtrie_t *) * _count); + free (old_table); + + _min = new_min; + } } } - template typename zmq::generic_mtrie_t::rm_result zmq::generic_mtrie_t::rm (prefix_t prefix_, size_t size_, value_t *pipe_) @@ -317,7 +338,7 @@ typename zmq::generic_mtrie_t::rm_result zmq::generic_mtrie_t::rm_helper ( return (erased == 1) ? values_remain : not_found; } - unsigned char c = *prefix_; + const unsigned char c = *prefix_; if (!_count || c < _min || c >= _min + _count) return not_found; @@ -327,7 +348,7 @@ typename zmq::generic_mtrie_t::rm_result zmq::generic_mtrie_t::rm_helper ( if (!next_node) return not_found; - rm_result ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); + const rm_result ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); if (next_node->is_redundant ()) { LIBZMQ_DELETE (next_node); @@ -370,8 +391,8 @@ typename zmq::generic_mtrie_t::rm_result zmq::generic_mtrie_t::rm_helper ( _min += i; _count -= i; generic_mtrie_t **old_table = _next.table; - _next.table = (generic_mtrie_t **) malloc ( - sizeof (generic_mtrie_t *) * _count); + _next.table = static_cast ( + malloc (sizeof (generic_mtrie_t *) * _count)); alloc_assert (_next.table); memmove (_next.table, old_table + i, sizeof (generic_mtrie_t *) * _count); @@ -386,8 +407,8 @@ typename zmq::generic_mtrie_t::rm_result zmq::generic_mtrie_t::rm_helper ( zmq_assert (i < _count); _count -= i; generic_mtrie_t **old_table = _next.table; - _next.table = (generic_mtrie_t **) malloc ( - sizeof (generic_mtrie_t *) * _count); + _next.table = static_cast ( + malloc (sizeof (generic_mtrie_t *) * _count)); alloc_assert (_next.table); memmove (_next.table, old_table, sizeof (generic_mtrie_t *) * _count); diff --git a/src/mailbox_safe.cpp b/src/mailbox_safe.cpp index 624e8543..509aa85a 100644 --- a/src/mailbox_safe.cpp +++ b/src/mailbox_safe.cpp @@ -61,10 +61,11 @@ void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_) void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_) { // TODO: make a copy of array and signal outside the lock + const std::vector::iterator end = _signalers.end (); std::vector::iterator it = - std::find (_signalers.begin (), _signalers.end (), signaler_); + std::find (_signalers.begin (), end, signaler_); - if (it != _signalers.end ()) + if (it != end) _signalers.erase (it); } @@ -81,8 +82,10 @@ void zmq::mailbox_safe_t::send (const command_t &cmd_) if (!ok) { _cond_var.broadcast (); - for (std::vector::iterator it = _signalers.begin (); - it != _signalers.end (); ++it) { + + for (std::vector::iterator it = _signalers.begin (), + end = _signalers.end (); + it != end; ++it) { (*it)->send (); } } diff --git a/src/mechanism.cpp b/src/mechanism.cpp index 53a015fb..ad5959bc 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -178,12 +178,14 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *ptr_, } - for (std::map::const_iterator it = - options.app_metadata.begin (); - it != options.app_metadata.end (); ++it) + for (std::map::const_iterator + it = options.app_metadata.begin (), + end = options.app_metadata.end (); + it != end; ++it) { ptr += add_property (ptr, ptr_capacity_ - (ptr - ptr_), it->first.c_str (), it->second.c_str (), strlen (it->second.c_str ())); + } return ptr - ptr_; } @@ -193,9 +195,10 @@ size_t zmq::mechanism_t::basic_properties_len () const const char *socket_type = socket_type_string (options.type); size_t meta_len = 0; - for (std::map::const_iterator it = - options.app_metadata.begin (); - it != options.app_metadata.end (); ++it) { + for (std::map::const_iterator + it = options.app_metadata.begin (), + end = options.app_metadata.end (); + it != end; ++it) { meta_len += property_len (it->first.c_str (), strlen (it->second.c_str ())); } diff --git a/src/own.cpp b/src/own.cpp index e7f3c6cd..7588dd07 100644 --- a/src/own.cpp +++ b/src/own.cpp @@ -158,7 +158,8 @@ void zmq::own_t::process_term (int linger_) zmq_assert (!_terminating); // Send termination request to all owned objects. - for (owned_t::iterator it = _owned.begin (); it != _owned.end (); ++it) + for (owned_t::iterator it = _owned.begin (), end = _owned.end (); it != end; + ++it) send_term (*it, linger_); register_term_acks (static_cast (_owned.size ())); _owned.clear (); diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index cbbaf16d..b70c21b5 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -86,7 +86,8 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_, void zmq::pgm_receiver_t::unplug () { // Delete decoders. - for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) { + for (peers_t::iterator it = peers.begin (), end = peers.end (); it != end; + ++it) { if (it->second.decoder != NULL) { LIBZMQ_DELETE (it->second.decoder); } diff --git a/src/pipe.cpp b/src/pipe.cpp index c37cf903..346d6468 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -520,8 +520,8 @@ void zmq::pipe_t::hiccup () void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_) { - int in = inhwm_ + (_in_hwm_boost > 0 ? _in_hwm_boost : 0); - int out = outhwm_ + (_out_hwm_boost > 0 ? _out_hwm_boost : 0); + int in = inhwm_ + std::max (_in_hwm_boost, 0); + int out = outhwm_ + std::max (_out_hwm_boost, 0); // if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite if (inhwm_ <= 0 || _in_hwm_boost == 0) diff --git a/src/poller_base.cpp b/src/poller_base.cpp index ebcc6997..803389a8 100644 --- a/src/poller_base.cpp +++ b/src/poller_base.cpp @@ -65,7 +65,8 @@ void zmq::poller_base_t::add_timer (int timeout_, i_poll_events *sink_, int id_) void zmq::poller_base_t::cancel_timer (i_poll_events *sink_, int id_) { // Complexity of this operation is O(n). We assume it is rarely used. - for (timers_t::iterator it = _timers.begin (); it != _timers.end (); ++it) + for (timers_t::iterator it = _timers.begin (), end = _timers.end (); + it != end; ++it) if (it->second.sink == sink_ && it->second.id == id_) { _timers.erase (it); return; diff --git a/src/radio.cpp b/src/radio.cpp index e31e0dc2..486732ac 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -122,8 +122,9 @@ int zmq::radio_t::xsetsockopt (int option_, void zmq::radio_t::xpipe_terminated (pipe_t *pipe_) { - for (subscriptions_t::iterator it = _subscriptions.begin (); - it != _subscriptions.end ();) { + for (subscriptions_t::iterator it = _subscriptions.begin (), + end = _subscriptions.end (); + it != end;) { if (it->second == pipe_) { #if __cplusplus >= 201103L it = _subscriptions.erase (it); @@ -135,10 +136,13 @@ void zmq::radio_t::xpipe_terminated (pipe_t *pipe_) } } - udp_pipes_t::iterator it = - std::find (_udp_pipes.begin (), _udp_pipes.end (), pipe_); - if (it != _udp_pipes.end ()) - _udp_pipes.erase (it); + { + const udp_pipes_t::iterator end = _udp_pipes.end (); + const udp_pipes_t::iterator it = + std::find (_udp_pipes.begin (), end, pipe_); + if (it != end) + _udp_pipes.erase (it); + } _dist.pipe_terminated (pipe_); } @@ -159,8 +163,9 @@ int zmq::radio_t::xsend (msg_t *msg_) for (subscriptions_t::iterator it = range.first; it != range.second; ++it) _dist.match (it->second); - for (udp_pipes_t::iterator it = _udp_pipes.begin (); - it != _udp_pipes.end (); ++it) + for (udp_pipes_t::iterator it = _udp_pipes.begin (), + end = _udp_pipes.end (); + it != end; ++it) _dist.match (*it); int rc = -1; diff --git a/src/router.cpp b/src/router.cpp index 03d117cd..82f94968 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -165,7 +165,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_) if (it == _anonymous_pipes.end ()) _fq.activated (pipe_); else { - bool routing_id_ok = identify_peer (pipe_, false); + const bool routing_id_ok = identify_peer (pipe_, false); if (routing_id_ok) { _anonymous_pipes.erase (it); _fq.attach (pipe_); diff --git a/src/server.cpp b/src/server.cpp index b8c32b9b..becd4eca 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -86,8 +86,9 @@ void zmq::server_t::xread_activated (pipe_t *pipe_) void zmq::server_t::xwrite_activated (pipe_t *pipe_) { + const out_pipes_t::iterator end = _out_pipes.end (); out_pipes_t::iterator it; - for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) + for (it = _out_pipes.begin (); it != end; ++it) if (it->second.pipe == pipe_) break; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index de4c58fc..14f87232 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -543,8 +543,6 @@ int zmq::socket_base_t::bind (const char *addr_) session_base_t::create (io_thread, true, this, options, paddr); errno_assert (session); - pipe_t *newpipe = NULL; - // Create a bi-directional pipe. object_t *parents[2] = {this, session}; pipe_t *new_pipes[2] = {NULL, NULL}; @@ -556,7 +554,7 @@ int zmq::socket_base_t::bind (const char *addr_) // Attach local end of the pipe to the socket object. attach_pipe (new_pipes[0], true, true); - newpipe = new_pipes[0]; + pipe_t *const newpipe = new_pipes[0]; // Attach remote end of the pipe to the session object later on. session->attach_pipe (new_pipes[1]); @@ -564,7 +562,7 @@ int zmq::socket_base_t::bind (const char *addr_) // Save last endpoint URI paddr->to_string (_last_endpoint); - add_endpoint (addr_, (own_t *) session, newpipe); + add_endpoint (addr_, static_cast (session), newpipe); return 0; } @@ -786,12 +784,11 @@ int zmq::socket_base_t::connect (const char *addr_) options.connected = true; return 0; } - bool is_single_connect = + const bool is_single_connect = (options.type == ZMQ_DEALER || options.type == ZMQ_SUB || options.type == ZMQ_PUB || options.type == ZMQ_REQ); if (unlikely (is_single_connect)) { - const endpoints_t::iterator it = _endpoints.find (addr_); - if (it != _endpoints.end ()) { + if (0 != _endpoints.count (addr_)) { // There is no valid use for multiple connects for SUB-PUB nor // DEALER-ROUTER nor REQ-REP. Multiple connects produces // nonsensical results. @@ -1551,8 +1548,8 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_) xpipe_terminated (pipe_); // Remove pipe from inproc pipes - for (inprocs_t::iterator it = _inprocs.begin (); it != _inprocs.end (); - ++it) + for (inprocs_t::iterator it = _inprocs.begin (), end = _inprocs.end (); + it != end; ++it) if (it->second == pipe_) { _inprocs.erase (it); break; @@ -1790,12 +1787,13 @@ int zmq::routing_socket_base_t::xsetsockopt (int option_, void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_) { + const out_pipes_t::iterator end = _out_pipes.end (); out_pipes_t::iterator it; - for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) + for (it = _out_pipes.begin (); it != end; ++it) if (it->second.pipe == pipe_) break; - zmq_assert (it != _out_pipes.end ()); + zmq_assert (it != end); zmq_assert (!it->second.active); it->second.active = true; } diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index c0258a6d..0b9ebc78 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -31,6 +31,7 @@ #include "socket_poller.hpp" #include "err.hpp" #include "polling_util.hpp" +#include "macros.hpp" #include @@ -59,7 +60,8 @@ zmq::socket_poller_t::~socket_poller_t () // Mark the socket_poller as dead _tag = 0xdeadbeef; - for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { + for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; + ++it) { // TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead? if (it->socket && it->socket->check_tag () && is_thread_safe (*it->socket)) { @@ -68,8 +70,7 @@ zmq::socket_poller_t::~socket_poller_t () } if (_signaler != NULL) { - delete _signaler; - _signaler = NULL; + LIBZMQ_DELETE (_signaler); } #if defined ZMQ_POLL_BASED_ON_POLL @@ -89,7 +90,8 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, void *user_data_, short events_) { - for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { + for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; + ++it) { if (it->socket == socket_) { errno = EINVAL; return -1; @@ -138,7 +140,8 @@ int zmq::socket_poller_t::add (socket_base_t *socket_, int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_) { - for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { + for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; + ++it) { if (!it->socket && it->fd == fd_) { errno = EINVAL; return -1; @@ -169,14 +172,15 @@ int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_) int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_) { + const items_t::iterator end = _items.end (); items_t::iterator it; - for (it = _items.begin (); it != _items.end (); ++it) { + for (it = _items.begin (); it != end; ++it) { if (it->socket == socket_) break; } - if (it == _items.end ()) { + if (it == end) { errno = EINVAL; return -1; } @@ -190,14 +194,15 @@ int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_) int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_) { + const items_t::iterator end = _items.end (); items_t::iterator it; - for (it = _items.begin (); it != _items.end (); ++it) { + for (it = _items.begin (); it != end; ++it) { if (!it->socket && it->fd == fd_) break; } - if (it == _items.end ()) { + if (it == end) { errno = EINVAL; return -1; } @@ -211,14 +216,15 @@ int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_) int zmq::socket_poller_t::remove (socket_base_t *socket_) { + const items_t::iterator end = _items.end (); items_t::iterator it; - for (it = _items.begin (); it != _items.end (); ++it) { + for (it = _items.begin (); it != end; ++it) { if (it->socket == socket_) break; } - if (it == _items.end ()) { + if (it == end) { errno = EINVAL; return -1; } @@ -235,14 +241,15 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_) int zmq::socket_poller_t::remove_fd (fd_t fd_) { + const items_t::iterator end = _items.end (); items_t::iterator it; - for (it = _items.begin (); it != _items.end (); ++it) { + for (it = _items.begin (); it != end; ++it) { if (!it->socket && it->fd == fd_) break; } - if (it == _items.end ()) { + if (it == end) { errno = EINVAL; return -1; } @@ -266,7 +273,8 @@ void zmq::socket_poller_t::rebuild () _pollfds = NULL; } - for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { + for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; + ++it) { if (it->events) { if (it->socket && is_thread_safe (*it->socket)) { if (!_use_signaler) { @@ -292,7 +300,8 @@ void zmq::socket_poller_t::rebuild () _pollfds[0].events = POLLIN; } - for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { + for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; + ++it) { if (it->events) { if (it->socket) { if (!is_thread_safe (*it->socket)) { @@ -330,7 +339,8 @@ void zmq::socket_poller_t::rebuild () FD_ZERO (_pollset_out.get ()); FD_ZERO (_pollset_err.get ()); - for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { + for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; + ++it) { if (it->socket && is_thread_safe (*it->socket) && it->events) { _use_signaler = true; FD_SET (_signaler->get_fd (), _pollset_in.get ()); @@ -342,7 +352,8 @@ void zmq::socket_poller_t::rebuild () _max_fd = 0; // Build the fd_sets for passing to select (). - for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) { + for (items_t::iterator it = _items.begin (), end = _items.end (); it != end; + ++it) { if (it->events) { // If the poll item is a 0MQ socket we are interested in input on the // notification file descriptor retrieved by the ZMQ_FD socket option. @@ -404,8 +415,8 @@ int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_, #endif { int found = 0; - for (items_t::iterator it = _items.begin (); - it != _items.end () && found < n_events_; ++it) { + for (items_t::iterator it = _items.begin (), end = _items.end (); + it != end && found < n_events_; ++it) { // The poll item is a 0MQ socket. Retrieve pending events // using the ZMQ_EVENTS socket option. if (it->socket) { diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 9dc96696..f29bdd19 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -105,7 +105,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, // Put the socket into non-blocking mode. unblock_socket (_s); - int family = get_peer_ip_address (_s, _peer_address); + const int family = get_peer_ip_address (_s, _peer_address); if (family == 0) _peer_address.clear (); #if defined ZMQ_HAVE_SO_PEERCRED @@ -485,229 +485,23 @@ void zmq::stream_engine_t::restart_input () } } +// Position of the revision field in the greeting. +const size_t revision_pos = 10; + bool zmq::stream_engine_t::handshake () { zmq_assert (_handshaking); zmq_assert (_greeting_bytes_read < _greeting_size); // Receive the greeting. - while (_greeting_bytes_read < _greeting_size) { - const int n = tcp_read (_s, _greeting_recv + _greeting_bytes_read, - _greeting_size - _greeting_bytes_read); - if (n == 0) { - errno = EPIPE; - error (connection_error); - return false; - } - if (n == -1) { - if (errno != EAGAIN) - error (connection_error); - return false; - } + const int rc = receive_greeting (); + if (rc == -1) + return false; + const bool unversioned = rc != 0; - _greeting_bytes_read += n; - - // We have received at least one byte from the peer. - // If the first byte is not 0xff, we know that the - // peer is using unversioned protocol. - if (_greeting_recv[0] != 0xff) - break; - - if (_greeting_bytes_read < signature_size) - continue; - - // Inspect the right-most bit of the 10th byte (which coincides - // with the 'flags' field if a regular message was sent). - // Zero indicates this is a header of a routing id message - // (i.e. the peer is using the unversioned protocol). - if (!(_greeting_recv[9] & 0x01)) - break; - - // The peer is using versioned protocol. - // Send the major version number. - if (_outpos + _outsize == _greeting_send + signature_size) { - if (_outsize == 0) - set_pollout (_handle); - _outpos[_outsize++] = 3; // Major version number - } - - if (_greeting_bytes_read > signature_size) { - if (_outpos + _outsize == _greeting_send + signature_size + 1) { - if (_outsize == 0) - set_pollout (_handle); - - // Use ZMTP/2.0 to talk to older peers. - if (_greeting_recv[10] == ZMTP_1_0 - || _greeting_recv[10] == ZMTP_2_0) - _outpos[_outsize++] = _options.type; - else { - _outpos[_outsize++] = 0; // Minor version number - memset (_outpos + _outsize, 0, 20); - - zmq_assert (_options.mechanism == ZMQ_NULL - || _options.mechanism == ZMQ_PLAIN - || _options.mechanism == ZMQ_CURVE - || _options.mechanism == ZMQ_GSSAPI); - - if (_options.mechanism == ZMQ_NULL) - memcpy (_outpos + _outsize, "NULL", 4); - else if (_options.mechanism == ZMQ_PLAIN) - memcpy (_outpos + _outsize, "PLAIN", 5); - else if (_options.mechanism == ZMQ_GSSAPI) - memcpy (_outpos + _outsize, "GSSAPI", 6); - else if (_options.mechanism == ZMQ_CURVE) - memcpy (_outpos + _outsize, "CURVE", 5); - _outsize += 20; - memset (_outpos + _outsize, 0, 32); - _outsize += 32; - _greeting_size = v3_greeting_size; - } - } - } - } - - // Position of the revision field in the greeting. - const size_t revision_pos = 10; - - // Is the peer using ZMTP/1.0 with no revision number? - // If so, we send and receive rest of routing id message - if (_greeting_recv[0] != 0xff || !(_greeting_recv[9] & 0x01)) { - if (_session->zap_enabled ()) { - // reject ZMTP 1.0 connections if ZAP is enabled - error (protocol_error); - return false; - } - - _encoder = new (std::nothrow) v1_encoder_t (out_batch_size); - alloc_assert (_encoder); - - _decoder = - new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize); - alloc_assert (_decoder); - - // We have already sent the message header. - // Since there is no way to tell the encoder to - // skip the message header, we simply throw that - // header data away. - const size_t header_size = - _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2; - unsigned char tmp[10], *bufferp = tmp; - - // Prepare the routing id message and load it into encoder. - // Then consume bytes we have already sent to the peer. - const int rc = _tx_msg.init_size (_options.routing_id_size); - zmq_assert (rc == 0); - memcpy (_tx_msg.data (), _options.routing_id, _options.routing_id_size); - _encoder->load_msg (&_tx_msg); - size_t buffer_size = _encoder->encode (&bufferp, header_size); - zmq_assert (buffer_size == header_size); - - // Make sure the decoder sees the data we have already received. - _inpos = _greeting_recv; - _insize = _greeting_bytes_read; - - // To allow for interoperability with peers that do not forward - // their subscriptions, we inject a phantom subscription message - // message into the incoming message stream. - if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB) - _subscription_required = true; - - // We are sending our routing id now and the next message - // will come from the socket. - _next_msg = &stream_engine_t::pull_msg_from_session; - - // We are expecting routing id message. - _process_msg = &stream_engine_t::process_routing_id_msg; - } else if (_greeting_recv[revision_pos] == ZMTP_1_0) { - if (_session->zap_enabled ()) { - // reject ZMTP 1.0 connections if ZAP is enabled - error (protocol_error); - return false; - } - - _encoder = new (std::nothrow) v1_encoder_t (out_batch_size); - alloc_assert (_encoder); - - _decoder = - new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize); - alloc_assert (_decoder); - } else if (_greeting_recv[revision_pos] == ZMTP_2_0) { - if (_session->zap_enabled ()) { - // reject ZMTP 2.0 connections if ZAP is enabled - error (protocol_error); - return false; - } - - _encoder = new (std::nothrow) v2_encoder_t (out_batch_size); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) - v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy); - alloc_assert (_decoder); - } else { - _encoder = new (std::nothrow) v2_encoder_t (out_batch_size); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) - v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy); - alloc_assert (_decoder); - - if (_options.mechanism == ZMQ_NULL - && memcmp (_greeting_recv + 12, - "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) - == 0) { - _mechanism = new (std::nothrow) - null_mechanism_t (_session, _peer_address, _options); - alloc_assert (_mechanism); - } else if (_options.mechanism == ZMQ_PLAIN - && memcmp (_greeting_recv + 12, - "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) - == 0) { - if (_options.as_server) - _mechanism = new (std::nothrow) - plain_server_t (_session, _peer_address, _options); - else - _mechanism = - new (std::nothrow) plain_client_t (_session, _options); - alloc_assert (_mechanism); - } -#ifdef ZMQ_HAVE_CURVE - else if (_options.mechanism == ZMQ_CURVE - && memcmp (_greeting_recv + 12, - "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) - == 0) { - if (_options.as_server) - _mechanism = new (std::nothrow) - curve_server_t (_session, _peer_address, _options); - else - _mechanism = - new (std::nothrow) curve_client_t (_session, _options); - alloc_assert (_mechanism); - } -#endif -#ifdef HAVE_LIBGSSAPI_KRB5 - else if (_options.mechanism == ZMQ_GSSAPI - && memcmp (_greeting_recv + 12, - "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) - == 0) { - if (_options.as_server) - _mechanism = new (std::nothrow) - gssapi_server_t (_session, _peer_address, _options); - else - _mechanism = - new (std::nothrow) gssapi_client_t (_session, _options); - alloc_assert (_mechanism); - } -#endif - else { - _session->get_socket ()->event_handshake_failed_protocol ( - _session->get_endpoint (), - ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH); - error (protocol_error); - return false; - } - _next_msg = &stream_engine_t::next_handshake_command; - _process_msg = &stream_engine_t::process_handshake_command; - } + if (!(this + ->*select_handshake_fun (unversioned, + _greeting_recv[revision_pos])) ()) + return false; // Start polling for output if necessary. if (_outsize == 0) @@ -725,6 +519,269 @@ bool zmq::stream_engine_t::handshake () return true; } +int zmq::stream_engine_t::receive_greeting () +{ + bool unversioned = false; + while (_greeting_bytes_read < _greeting_size) { + const int n = tcp_read (_s, _greeting_recv + _greeting_bytes_read, + _greeting_size - _greeting_bytes_read); + if (n == 0) { + errno = EPIPE; + error (connection_error); + return -1; + } + if (n == -1) { + if (errno != EAGAIN) + error (connection_error); + return -1; + } + + _greeting_bytes_read += n; + + // We have received at least one byte from the peer. + // If the first byte is not 0xff, we know that the + // peer is using unversioned protocol. + if (_greeting_recv[0] != 0xff) { + unversioned = true; + break; + } + + if (_greeting_bytes_read < signature_size) + continue; + + // Inspect the right-most bit of the 10th byte (which coincides + // with the 'flags' field if a regular message was sent). + // Zero indicates this is a header of a routing id message + // (i.e. the peer is using the unversioned protocol). + if (!(_greeting_recv[9] & 0x01)) { + unversioned = true; + break; + } + + // The peer is using versioned protocol. + receive_greeting_versioned (); + } + return unversioned ? 1 : 0; +} + +void zmq::stream_engine_t::receive_greeting_versioned () +{ + // Send the major version number. + if (_outpos + _outsize == _greeting_send + signature_size) { + if (_outsize == 0) + set_pollout (_handle); + _outpos[_outsize++] = 3; // Major version number + } + + if (_greeting_bytes_read > signature_size) { + if (_outpos + _outsize == _greeting_send + signature_size + 1) { + if (_outsize == 0) + set_pollout (_handle); + + // Use ZMTP/2.0 to talk to older peers. + if (_greeting_recv[revision_pos] == ZMTP_1_0 + || _greeting_recv[revision_pos] == ZMTP_2_0) + _outpos[_outsize++] = _options.type; + else { + _outpos[_outsize++] = 0; // Minor version number + memset (_outpos + _outsize, 0, 20); + + zmq_assert (_options.mechanism == ZMQ_NULL + || _options.mechanism == ZMQ_PLAIN + || _options.mechanism == ZMQ_CURVE + || _options.mechanism == ZMQ_GSSAPI); + + if (_options.mechanism == ZMQ_NULL) + memcpy (_outpos + _outsize, "NULL", 4); + else if (_options.mechanism == ZMQ_PLAIN) + memcpy (_outpos + _outsize, "PLAIN", 5); + else if (_options.mechanism == ZMQ_GSSAPI) + memcpy (_outpos + _outsize, "GSSAPI", 6); + else if (_options.mechanism == ZMQ_CURVE) + memcpy (_outpos + _outsize, "CURVE", 5); + _outsize += 20; + memset (_outpos + _outsize, 0, 32); + _outsize += 32; + _greeting_size = v3_greeting_size; + } + } + } +} + +zmq::stream_engine_t::handshake_fun_t +zmq::stream_engine_t::select_handshake_fun (bool unversioned, + unsigned char revision) +{ + // Is the peer using ZMTP/1.0 with no revision number? + if (unversioned) { + return &stream_engine_t::handshake_v1_0_unversioned; + } + switch (revision) { + case ZMTP_1_0: + return &stream_engine_t::handshake_v1_0; + case ZMTP_2_0: + return &stream_engine_t::handshake_v2_0; + default: + return &stream_engine_t::handshake_v3_0; + } +} + +bool zmq::stream_engine_t::handshake_v1_0_unversioned () +{ + // We send and receive rest of routing id message + if (_session->zap_enabled ()) { + // reject ZMTP 1.0 connections if ZAP is enabled + error (protocol_error); + return false; + } + + _encoder = new (std::nothrow) v1_encoder_t (out_batch_size); + alloc_assert (_encoder); + + _decoder = + new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize); + alloc_assert (_decoder); + + // We have already sent the message header. + // Since there is no way to tell the encoder to + // skip the message header, we simply throw that + // header data away. + const size_t header_size = + _options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2; + unsigned char tmp[10], *bufferp = tmp; + + // Prepare the routing id message and load it into encoder. + // Then consume bytes we have already sent to the peer. + const int rc = _tx_msg.init_size (_options.routing_id_size); + zmq_assert (rc == 0); + memcpy (_tx_msg.data (), _options.routing_id, _options.routing_id_size); + _encoder->load_msg (&_tx_msg); + const size_t buffer_size = _encoder->encode (&bufferp, header_size); + zmq_assert (buffer_size == header_size); + + // Make sure the decoder sees the data we have already received. + _inpos = _greeting_recv; + _insize = _greeting_bytes_read; + + // To allow for interoperability with peers that do not forward + // their subscriptions, we inject a phantom subscription message + // message into the incoming message stream. + if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB) + _subscription_required = true; + + // We are sending our routing id now and the next message + // will come from the socket. + _next_msg = &stream_engine_t::pull_msg_from_session; + + // We are expecting routing id message. + _process_msg = &stream_engine_t::process_routing_id_msg; + + return true; +} + +bool zmq::stream_engine_t::handshake_v1_0 () +{ + if (_session->zap_enabled ()) { + // reject ZMTP 1.0 connections if ZAP is enabled + error (protocol_error); + return false; + } + + _encoder = new (std::nothrow) v1_encoder_t (out_batch_size); + alloc_assert (_encoder); + + _decoder = + new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize); + alloc_assert (_decoder); + + return true; +} + +bool zmq::stream_engine_t::handshake_v2_0 () +{ + if (_session->zap_enabled ()) { + // reject ZMTP 2.0 connections if ZAP is enabled + error (protocol_error); + return false; + } + + _encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) + v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy); + alloc_assert (_decoder); + + return true; +} + +bool zmq::stream_engine_t::handshake_v3_0 () +{ + _encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) + v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy); + alloc_assert (_decoder); + + if (_options.mechanism == ZMQ_NULL + && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", + 20) + == 0) { + _mechanism = new (std::nothrow) + null_mechanism_t (_session, _peer_address, _options); + alloc_assert (_mechanism); + } else if (_options.mechanism == ZMQ_PLAIN + && memcmp (_greeting_recv + 12, + "PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) + == 0) { + if (_options.as_server) + _mechanism = new (std::nothrow) + plain_server_t (_session, _peer_address, _options); + else + _mechanism = new (std::nothrow) plain_client_t (_session, _options); + alloc_assert (_mechanism); + } +#ifdef ZMQ_HAVE_CURVE + else if (_options.mechanism == ZMQ_CURVE + && memcmp (_greeting_recv + 12, + "CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) + == 0) { + if (_options.as_server) + _mechanism = new (std::nothrow) + curve_server_t (_session, _peer_address, _options); + else + _mechanism = new (std::nothrow) curve_client_t (_session, _options); + alloc_assert (_mechanism); + } +#endif +#ifdef HAVE_LIBGSSAPI_KRB5 + else if (_options.mechanism == ZMQ_GSSAPI + && memcmp (_greeting_recv + 12, + "GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) + == 0) { + if (_options.as_server) + _mechanism = new (std::nothrow) + gssapi_server_t (_session, _peer_address, _options); + else + _mechanism = + new (std::nothrow) gssapi_client_t (_session, _options); + alloc_assert (_mechanism); + } +#endif + else { + _session->get_socket ()->event_handshake_failed_protocol ( + _session->get_endpoint (), + ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH); + error (protocol_error); + return false; + } + _next_msg = &stream_engine_t::next_handshake_command; + _process_msg = &stream_engine_t::process_handshake_command; + + return true; +} + int zmq::stream_engine_t::routing_id_msg (msg_t *msg_) { int rc = msg_->init_size (_options.routing_id_size); @@ -1037,20 +1094,18 @@ void zmq::stream_engine_t::timer_event (int id_) int zmq::stream_engine_t::produce_ping_message (msg_t *msg_) { - int rc = 0; // 16-bit TTL + \4PING == 7 const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2; zmq_assert (_mechanism != NULL); - rc = msg_->init_size (ping_ttl_len); + int rc = msg_->init_size (ping_ttl_len); errno_assert (rc == 0); msg_->set_flags (msg_t::command); // Copy in the command message memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size); uint16_t ttl_val = htons (_options.heartbeat_ttl); - memcpy ((static_cast (msg_->data ())) - + msg_t::ping_cmd_name_size, + memcpy (static_cast (msg_->data ()) + msg_t::ping_cmd_name_size, &ttl_val, sizeof (ttl_val)); rc = _mechanism->encode (msg_); @@ -1064,10 +1119,9 @@ int zmq::stream_engine_t::produce_ping_message (msg_t *msg_) int zmq::stream_engine_t::produce_pong_message (msg_t *msg_) { - int rc = 0; zmq_assert (_mechanism != NULL); - rc = msg_->move (_pong_msg); + int rc = msg_->move (_pong_msg); errno_assert (rc == 0); rc = _mechanism->encode (msg_); @@ -1103,17 +1157,17 @@ int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_) // here and store it. Truncate it if it's too long. // Given the engine goes straight to out_event, sequential PINGs will // not be a problem. - size_t context_len = msg_->size () - ping_ttl_len > ping_max_ctx_len - ? ping_max_ctx_len - : msg_->size () - ping_ttl_len; - int rc = _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len); + const size_t context_len = + std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len); + const int rc = + _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len); errno_assert (rc == 0); _pong_msg.set_flags (msg_t::command); memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size); if (context_len > 0) - memcpy ((static_cast (_pong_msg.data ())) + memcpy (static_cast (_pong_msg.data ()) + msg_t::ping_cmd_name_size, - (static_cast (msg_->data ())) + ping_ttl_len, + static_cast (msg_->data ()) + ping_ttl_len, context_len); _next_msg = &stream_engine_t::produce_pong_message; diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index e714fbd9..2c25ce55 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -93,12 +93,22 @@ class stream_engine_t : public io_object_t, public i_engine // Function to handle network disconnections. void error (error_reason_t reason_); - // Receives the greeting message from the peer. - int receive_greeting (); - // Detects the protocol used by the peer. bool handshake (); + // Receive the greeting from the peer. + int receive_greeting (); + void receive_greeting_versioned (); + + typedef bool (stream_engine_t::*handshake_fun_t) (); + static handshake_fun_t select_handshake_fun (bool unversioned, + unsigned char revision); + + bool handshake_v1_0_unversioned (); + bool handshake_v1_0 (); + bool handshake_v2_0 (); + bool handshake_v3_0 (); + int routing_id_msg (msg_t *msg_); int process_routing_id_msg (msg_t *msg_); diff --git a/src/timers.cpp b/src/timers.cpp index eb792ec7..74c1b716 100644 --- a/src/timers.cpp +++ b/src/timers.cpp @@ -144,7 +144,7 @@ long zmq::timers_t::timeout () for (; it != end; ++it) { if (0 == _cancelled_timers.erase (it->second.timer_id)) { // Live timer, lets return the timeout - res = it->first > now ? static_cast (it->first - now) : 0; + res = std::max (static_cast (it->first - now), 0l); break; } }