mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-21 02:00:36 +01:00
Merge pull request #3205 from sigiesec/code-improvements
Code style improvements
This commit is contained in:
commit
76602516a8
@ -137,8 +137,8 @@ int zmq::ctx_t::terminate ()
|
||||
|
||||
// Connect up any pending inproc connections, otherwise we will hang
|
||||
pending_connections_t copy = _pending_connections;
|
||||
for (pending_connections_t::iterator p = copy.begin (); p != copy.end ();
|
||||
++p) {
|
||||
for (pending_connections_t::iterator p = copy.begin (), end = copy.end ();
|
||||
p != end; ++p) {
|
||||
zmq::socket_base_t *s = create_socket (ZMQ_PAIR);
|
||||
// create_socket might fail eg: out of memory/sockets limit reached
|
||||
zmq_assert (s);
|
||||
@ -528,8 +528,9 @@ void zmq::ctx_t::unregister_endpoints (socket_base_t *socket_)
|
||||
{
|
||||
scoped_lock_t locker (_endpoints_sync);
|
||||
|
||||
for (endpoints_t::iterator it = _endpoints.begin ();
|
||||
it != _endpoints.end ();) {
|
||||
for (endpoints_t::iterator it = _endpoints.begin (),
|
||||
end = _endpoints.end ();
|
||||
it != end;) {
|
||||
if (it->second.socket == socket_)
|
||||
#if __cplusplus >= 201103L
|
||||
it = _endpoints.erase (it);
|
||||
|
11
src/dish.cpp
11
src/dish.cpp
@ -100,16 +100,12 @@ int zmq::dish_t::xjoin (const char *group_)
|
||||
return -1;
|
||||
}
|
||||
|
||||
subscriptions_t::iterator it = _subscriptions.find (group);
|
||||
|
||||
// User cannot join same group twice
|
||||
if (it != _subscriptions.end ()) {
|
||||
if (!_subscriptions.insert (group).second) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
_subscriptions.insert (group);
|
||||
|
||||
msg_t msg;
|
||||
int rc = msg.init_join ();
|
||||
errno_assert (rc == 0);
|
||||
@ -230,8 +226,9 @@ const zmq::blob_t &zmq::dish_t::get_credential () const
|
||||
|
||||
void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
|
||||
{
|
||||
for (subscriptions_t::iterator it = _subscriptions.begin ();
|
||||
it != _subscriptions.end (); ++it) {
|
||||
for (subscriptions_t::iterator it = _subscriptions.begin (),
|
||||
end = _subscriptions.end ();
|
||||
it != end; ++it) {
|
||||
msg_t msg;
|
||||
int rc = msg.init_join ();
|
||||
errno_assert (rc == 0);
|
||||
|
@ -75,8 +75,8 @@ zmq::epoll_t::~epoll_t ()
|
||||
#else
|
||||
close (_epoll_fd);
|
||||
#endif
|
||||
for (retired_t::iterator it = _retired.begin (); it != _retired.end ();
|
||||
++it) {
|
||||
for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
|
||||
it != end; ++it) {
|
||||
LIBZMQ_DELETE (*it);
|
||||
}
|
||||
}
|
||||
@ -207,8 +207,8 @@ void zmq::epoll_t::loop ()
|
||||
}
|
||||
|
||||
// Destroy retired event sources.
|
||||
for (retired_t::iterator it = _retired.begin (); it != _retired.end ();
|
||||
++it) {
|
||||
for (retired_t::iterator it = _retired.begin (), end = _retired.end ();
|
||||
it != end; ++it) {
|
||||
LIBZMQ_DELETE (*it);
|
||||
}
|
||||
_retired.clear ();
|
||||
|
@ -92,6 +92,17 @@ template <typename T> class generic_mtrie_t
|
||||
void (*func_) (prefix_t data_, size_t size_, Arg arg_),
|
||||
Arg arg_,
|
||||
bool call_on_uniq_);
|
||||
template <typename Arg>
|
||||
void rm_helper_multiple_subnodes (unsigned char **buff_,
|
||||
size_t buffsize_,
|
||||
size_t maxbuffsize_,
|
||||
void (*func_) (prefix_t data_,
|
||||
size_t size_,
|
||||
Arg arg_),
|
||||
Arg arg_,
|
||||
bool call_on_uniq_,
|
||||
value_t *pipe_);
|
||||
|
||||
rm_result rm_helper (prefix_t prefix_, size_t size_, value_t *value_);
|
||||
bool is_redundant () const;
|
||||
|
||||
|
@ -37,7 +37,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#include <algorithm>
|
||||
|
||||
#include "err.hpp"
|
||||
#include "pipe.hpp"
|
||||
#include "macros.hpp"
|
||||
#include "generic_mtrie.hpp"
|
||||
|
||||
@ -80,7 +79,7 @@ bool zmq::generic_mtrie_t<T>::add_helper (prefix_t prefix_,
|
||||
{
|
||||
// We are at the node corresponding to the prefix. We are done.
|
||||
if (!size_) {
|
||||
bool result = !_pipes;
|
||||
const bool result = !_pipes;
|
||||
if (!_pipes) {
|
||||
_pipes = new (std::nothrow) pipes_t;
|
||||
alloc_assert (_pipes);
|
||||
@ -89,7 +88,7 @@ bool zmq::generic_mtrie_t<T>::add_helper (prefix_t prefix_,
|
||||
return result;
|
||||
}
|
||||
|
||||
unsigned char c = *prefix_;
|
||||
const unsigned char c = *prefix_;
|
||||
if (c < _min || c >= _min + _count) {
|
||||
// The character is out of range of currently handled
|
||||
// characters. We have to extend the table.
|
||||
@ -98,11 +97,11 @@ bool zmq::generic_mtrie_t<T>::add_helper (prefix_t prefix_,
|
||||
_count = 1;
|
||||
_next.node = NULL;
|
||||
} else if (_count == 1) {
|
||||
unsigned char oldc = _min;
|
||||
const unsigned char oldc = _min;
|
||||
generic_mtrie_t *oldp = _next.node;
|
||||
_count = (_min < c ? c - _min : _min - c) + 1;
|
||||
_next.table =
|
||||
(generic_mtrie_t **) malloc (sizeof (generic_mtrie_t *) * _count);
|
||||
_next.table = static_cast<generic_mtrie_t **> (
|
||||
malloc (sizeof (generic_mtrie_t *) * _count));
|
||||
alloc_assert (_next.table);
|
||||
for (unsigned short i = 0; i != _count; ++i)
|
||||
_next.table[i] = 0;
|
||||
@ -110,19 +109,19 @@ bool zmq::generic_mtrie_t<T>::add_helper (prefix_t prefix_,
|
||||
_next.table[oldc - _min] = oldp;
|
||||
} else if (_min < c) {
|
||||
// The new character is above the current character range.
|
||||
unsigned short old_count = _count;
|
||||
const unsigned short old_count = _count;
|
||||
_count = c - _min + 1;
|
||||
_next.table = (generic_mtrie_t **) realloc (
|
||||
_next.table, sizeof (generic_mtrie_t *) * _count);
|
||||
_next.table = static_cast<generic_mtrie_t **> (
|
||||
realloc (_next.table, sizeof (generic_mtrie_t *) * _count));
|
||||
alloc_assert (_next.table);
|
||||
for (unsigned short i = old_count; i != _count; i++)
|
||||
_next.table[i] = NULL;
|
||||
} else {
|
||||
// The new character is below the current character range.
|
||||
unsigned short old_count = _count;
|
||||
const unsigned short old_count = _count;
|
||||
_count = (_min + old_count) - c;
|
||||
_next.table = (generic_mtrie_t **) realloc (
|
||||
_next.table, sizeof (generic_mtrie_t *) * _count);
|
||||
_next.table = static_cast<generic_mtrie_t **> (
|
||||
realloc (_next.table, sizeof (generic_mtrie_t *) * _count));
|
||||
alloc_assert (_next.table);
|
||||
memmove (_next.table + _min - c, _next.table,
|
||||
old_count * sizeof (generic_mtrie_t *));
|
||||
@ -194,29 +193,45 @@ void zmq::generic_mtrie_t<T>::rm_helper (value_t *pipe_,
|
||||
alloc_assert (*buff_);
|
||||
}
|
||||
|
||||
// If there are no subnodes in the trie, return.
|
||||
if (_count == 0)
|
||||
return;
|
||||
switch (_count) {
|
||||
case 0:
|
||||
// If there are no subnodes in the trie, return.
|
||||
break;
|
||||
case 1:
|
||||
// If there's one subnode (optimisation).
|
||||
|
||||
// If there's one subnode (optimisation).
|
||||
if (_count == 1) {
|
||||
(*buff_)[buffsize_] = _min;
|
||||
buffsize_++;
|
||||
_next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, func_,
|
||||
arg_, call_on_uniq_);
|
||||
(*buff_)[buffsize_] = _min;
|
||||
buffsize_++;
|
||||
_next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, func_,
|
||||
arg_, call_on_uniq_);
|
||||
|
||||
// Prune the node if it was made redundant by the removal
|
||||
if (_next.node->is_redundant ()) {
|
||||
LIBZMQ_DELETE (_next.node);
|
||||
_count = 0;
|
||||
--_live_nodes;
|
||||
zmq_assert (_live_nodes == 0);
|
||||
}
|
||||
return;
|
||||
// Prune the node if it was made redundant by the removal
|
||||
if (_next.node->is_redundant ()) {
|
||||
LIBZMQ_DELETE (_next.node);
|
||||
_count = 0;
|
||||
--_live_nodes;
|
||||
zmq_assert (_live_nodes == 0);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
// If there are multiple subnodes.
|
||||
rm_helper_multiple_subnodes (buff_, buffsize_, maxbuffsize_, func_,
|
||||
arg_, call_on_uniq_, pipe_);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// If there are multiple subnodes.
|
||||
//
|
||||
template <typename T>
|
||||
template <typename Arg>
|
||||
void zmq::generic_mtrie_t<T>::rm_helper_multiple_subnodes (
|
||||
unsigned char **buff_,
|
||||
size_t buffsize_,
|
||||
size_t maxbuffsize_,
|
||||
void (*func_) (prefix_t data_, size_t size_, Arg arg_),
|
||||
Arg arg_,
|
||||
bool call_on_uniq_,
|
||||
value_t *pipe_)
|
||||
{
|
||||
// New min non-null character in the node table after the removal
|
||||
unsigned char new_min = _min + _count - 1;
|
||||
// New max non-null character in the node table after the removal
|
||||
@ -253,46 +268,52 @@ void zmq::generic_mtrie_t<T>::rm_helper (value_t *pipe_,
|
||||
zmq_assert (_count > 1);
|
||||
|
||||
// Free the node table if it's no longer used.
|
||||
if (_live_nodes == 0) {
|
||||
free (_next.table);
|
||||
_next.table = NULL;
|
||||
_count = 0;
|
||||
}
|
||||
// Compact the node table if possible
|
||||
else if (_live_nodes == 1) {
|
||||
// If there's only one live node in the table we can
|
||||
// switch to using the more compact single-node
|
||||
// representation
|
||||
zmq_assert (new_min == new_max);
|
||||
zmq_assert (new_min >= _min && new_min < _min + _count);
|
||||
generic_mtrie_t *node = _next.table[new_min - _min];
|
||||
zmq_assert (node);
|
||||
free (_next.table);
|
||||
_next.node = node;
|
||||
_count = 1;
|
||||
_min = new_min;
|
||||
} else if (new_min > _min || new_max < _min + _count - 1) {
|
||||
zmq_assert (new_max - new_min + 1 > 1);
|
||||
switch (_live_nodes) {
|
||||
case 0:
|
||||
free (_next.table);
|
||||
_next.table = NULL;
|
||||
_count = 0;
|
||||
break;
|
||||
case 1:
|
||||
// Compact the node table if possible
|
||||
|
||||
generic_mtrie_t **old_table = _next.table;
|
||||
zmq_assert (new_min > _min || new_max < _min + _count - 1);
|
||||
zmq_assert (new_min >= _min);
|
||||
zmq_assert (new_max <= _min + _count - 1);
|
||||
zmq_assert (new_max - new_min + 1 < _count);
|
||||
// If there's only one live node in the table we can
|
||||
// switch to using the more compact single-node
|
||||
// representation
|
||||
zmq_assert (new_min == new_max);
|
||||
zmq_assert (new_min >= _min && new_min < _min + _count);
|
||||
{
|
||||
generic_mtrie_t *node = _next.table[new_min - _min];
|
||||
zmq_assert (node);
|
||||
free (_next.table);
|
||||
_next.node = node;
|
||||
}
|
||||
_count = 1;
|
||||
_min = new_min;
|
||||
break;
|
||||
default:
|
||||
if (new_min > _min || new_max < _min + _count - 1) {
|
||||
zmq_assert (new_max - new_min + 1 > 1);
|
||||
|
||||
_count = new_max - new_min + 1;
|
||||
_next.table =
|
||||
(generic_mtrie_t **) malloc (sizeof (generic_mtrie_t *) * _count);
|
||||
alloc_assert (_next.table);
|
||||
generic_mtrie_t **old_table = _next.table;
|
||||
zmq_assert (new_min > _min || new_max < _min + _count - 1);
|
||||
zmq_assert (new_min >= _min);
|
||||
zmq_assert (new_max <= _min + _count - 1);
|
||||
zmq_assert (new_max - new_min + 1 < _count);
|
||||
|
||||
memmove (_next.table, old_table + (new_min - _min),
|
||||
sizeof (generic_mtrie_t *) * _count);
|
||||
free (old_table);
|
||||
_count = new_max - new_min + 1;
|
||||
_next.table = static_cast<generic_mtrie_t **> (
|
||||
malloc (sizeof (generic_mtrie_t *) * _count));
|
||||
alloc_assert (_next.table);
|
||||
|
||||
_min = new_min;
|
||||
memmove (_next.table, old_table + (new_min - _min),
|
||||
sizeof (generic_mtrie_t *) * _count);
|
||||
free (old_table);
|
||||
|
||||
_min = new_min;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
typename zmq::generic_mtrie_t<T>::rm_result
|
||||
zmq::generic_mtrie_t<T>::rm (prefix_t prefix_, size_t size_, value_t *pipe_)
|
||||
@ -317,7 +338,7 @@ typename zmq::generic_mtrie_t<T>::rm_result zmq::generic_mtrie_t<T>::rm_helper (
|
||||
return (erased == 1) ? values_remain : not_found;
|
||||
}
|
||||
|
||||
unsigned char c = *prefix_;
|
||||
const unsigned char c = *prefix_;
|
||||
if (!_count || c < _min || c >= _min + _count)
|
||||
return not_found;
|
||||
|
||||
@ -327,7 +348,7 @@ typename zmq::generic_mtrie_t<T>::rm_result zmq::generic_mtrie_t<T>::rm_helper (
|
||||
if (!next_node)
|
||||
return not_found;
|
||||
|
||||
rm_result ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);
|
||||
const rm_result ret = next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);
|
||||
|
||||
if (next_node->is_redundant ()) {
|
||||
LIBZMQ_DELETE (next_node);
|
||||
@ -370,8 +391,8 @@ typename zmq::generic_mtrie_t<T>::rm_result zmq::generic_mtrie_t<T>::rm_helper (
|
||||
_min += i;
|
||||
_count -= i;
|
||||
generic_mtrie_t **old_table = _next.table;
|
||||
_next.table = (generic_mtrie_t **) malloc (
|
||||
sizeof (generic_mtrie_t *) * _count);
|
||||
_next.table = static_cast<generic_mtrie_t **> (
|
||||
malloc (sizeof (generic_mtrie_t *) * _count));
|
||||
alloc_assert (_next.table);
|
||||
memmove (_next.table, old_table + i,
|
||||
sizeof (generic_mtrie_t *) * _count);
|
||||
@ -386,8 +407,8 @@ typename zmq::generic_mtrie_t<T>::rm_result zmq::generic_mtrie_t<T>::rm_helper (
|
||||
zmq_assert (i < _count);
|
||||
_count -= i;
|
||||
generic_mtrie_t **old_table = _next.table;
|
||||
_next.table = (generic_mtrie_t **) malloc (
|
||||
sizeof (generic_mtrie_t *) * _count);
|
||||
_next.table = static_cast<generic_mtrie_t **> (
|
||||
malloc (sizeof (generic_mtrie_t *) * _count));
|
||||
alloc_assert (_next.table);
|
||||
memmove (_next.table, old_table,
|
||||
sizeof (generic_mtrie_t *) * _count);
|
||||
|
@ -61,10 +61,11 @@ void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_)
|
||||
void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
|
||||
{
|
||||
// TODO: make a copy of array and signal outside the lock
|
||||
const std::vector<zmq::signaler_t *>::iterator end = _signalers.end ();
|
||||
std::vector<signaler_t *>::iterator it =
|
||||
std::find (_signalers.begin (), _signalers.end (), signaler_);
|
||||
std::find (_signalers.begin (), end, signaler_);
|
||||
|
||||
if (it != _signalers.end ())
|
||||
if (it != end)
|
||||
_signalers.erase (it);
|
||||
}
|
||||
|
||||
@ -81,8 +82,10 @@ void zmq::mailbox_safe_t::send (const command_t &cmd_)
|
||||
|
||||
if (!ok) {
|
||||
_cond_var.broadcast ();
|
||||
for (std::vector<signaler_t *>::iterator it = _signalers.begin ();
|
||||
it != _signalers.end (); ++it) {
|
||||
|
||||
for (std::vector<signaler_t *>::iterator it = _signalers.begin (),
|
||||
end = _signalers.end ();
|
||||
it != end; ++it) {
|
||||
(*it)->send ();
|
||||
}
|
||||
}
|
||||
|
@ -178,12 +178,14 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *ptr_,
|
||||
}
|
||||
|
||||
|
||||
for (std::map<std::string, std::string>::const_iterator it =
|
||||
options.app_metadata.begin ();
|
||||
it != options.app_metadata.end (); ++it)
|
||||
for (std::map<std::string, std::string>::const_iterator
|
||||
it = options.app_metadata.begin (),
|
||||
end = options.app_metadata.end ();
|
||||
it != end; ++it) {
|
||||
ptr +=
|
||||
add_property (ptr, ptr_capacity_ - (ptr - ptr_), it->first.c_str (),
|
||||
it->second.c_str (), strlen (it->second.c_str ()));
|
||||
}
|
||||
|
||||
return ptr - ptr_;
|
||||
}
|
||||
@ -193,9 +195,10 @@ size_t zmq::mechanism_t::basic_properties_len () const
|
||||
const char *socket_type = socket_type_string (options.type);
|
||||
size_t meta_len = 0;
|
||||
|
||||
for (std::map<std::string, std::string>::const_iterator it =
|
||||
options.app_metadata.begin ();
|
||||
it != options.app_metadata.end (); ++it) {
|
||||
for (std::map<std::string, std::string>::const_iterator
|
||||
it = options.app_metadata.begin (),
|
||||
end = options.app_metadata.end ();
|
||||
it != end; ++it) {
|
||||
meta_len +=
|
||||
property_len (it->first.c_str (), strlen (it->second.c_str ()));
|
||||
}
|
||||
|
@ -158,7 +158,8 @@ void zmq::own_t::process_term (int linger_)
|
||||
zmq_assert (!_terminating);
|
||||
|
||||
// Send termination request to all owned objects.
|
||||
for (owned_t::iterator it = _owned.begin (); it != _owned.end (); ++it)
|
||||
for (owned_t::iterator it = _owned.begin (), end = _owned.end (); it != end;
|
||||
++it)
|
||||
send_term (*it, linger_);
|
||||
register_term_acks (static_cast<int> (_owned.size ()));
|
||||
_owned.clear ();
|
||||
|
@ -86,7 +86,8 @@ void zmq::pgm_receiver_t::plug (io_thread_t *io_thread_,
|
||||
void zmq::pgm_receiver_t::unplug ()
|
||||
{
|
||||
// Delete decoders.
|
||||
for (peers_t::iterator it = peers.begin (); it != peers.end (); ++it) {
|
||||
for (peers_t::iterator it = peers.begin (), end = peers.end (); it != end;
|
||||
++it) {
|
||||
if (it->second.decoder != NULL) {
|
||||
LIBZMQ_DELETE (it->second.decoder);
|
||||
}
|
||||
|
@ -520,8 +520,8 @@ void zmq::pipe_t::hiccup ()
|
||||
|
||||
void zmq::pipe_t::set_hwms (int inhwm_, int outhwm_)
|
||||
{
|
||||
int in = inhwm_ + (_in_hwm_boost > 0 ? _in_hwm_boost : 0);
|
||||
int out = outhwm_ + (_out_hwm_boost > 0 ? _out_hwm_boost : 0);
|
||||
int in = inhwm_ + std::max (_in_hwm_boost, 0);
|
||||
int out = outhwm_ + std::max (_out_hwm_boost, 0);
|
||||
|
||||
// if either send or recv side has hwm <= 0 it means infinite so we should set hwms infinite
|
||||
if (inhwm_ <= 0 || _in_hwm_boost == 0)
|
||||
|
@ -65,7 +65,8 @@ void zmq::poller_base_t::add_timer (int timeout_, i_poll_events *sink_, int id_)
|
||||
void zmq::poller_base_t::cancel_timer (i_poll_events *sink_, int id_)
|
||||
{
|
||||
// Complexity of this operation is O(n). We assume it is rarely used.
|
||||
for (timers_t::iterator it = _timers.begin (); it != _timers.end (); ++it)
|
||||
for (timers_t::iterator it = _timers.begin (), end = _timers.end ();
|
||||
it != end; ++it)
|
||||
if (it->second.sink == sink_ && it->second.id == id_) {
|
||||
_timers.erase (it);
|
||||
return;
|
||||
|
@ -122,8 +122,9 @@ int zmq::radio_t::xsetsockopt (int option_,
|
||||
|
||||
void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
|
||||
{
|
||||
for (subscriptions_t::iterator it = _subscriptions.begin ();
|
||||
it != _subscriptions.end ();) {
|
||||
for (subscriptions_t::iterator it = _subscriptions.begin (),
|
||||
end = _subscriptions.end ();
|
||||
it != end;) {
|
||||
if (it->second == pipe_) {
|
||||
#if __cplusplus >= 201103L
|
||||
it = _subscriptions.erase (it);
|
||||
@ -135,10 +136,13 @@ void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
|
||||
}
|
||||
}
|
||||
|
||||
udp_pipes_t::iterator it =
|
||||
std::find (_udp_pipes.begin (), _udp_pipes.end (), pipe_);
|
||||
if (it != _udp_pipes.end ())
|
||||
_udp_pipes.erase (it);
|
||||
{
|
||||
const udp_pipes_t::iterator end = _udp_pipes.end ();
|
||||
const udp_pipes_t::iterator it =
|
||||
std::find (_udp_pipes.begin (), end, pipe_);
|
||||
if (it != end)
|
||||
_udp_pipes.erase (it);
|
||||
}
|
||||
|
||||
_dist.pipe_terminated (pipe_);
|
||||
}
|
||||
@ -159,8 +163,9 @@ int zmq::radio_t::xsend (msg_t *msg_)
|
||||
for (subscriptions_t::iterator it = range.first; it != range.second; ++it)
|
||||
_dist.match (it->second);
|
||||
|
||||
for (udp_pipes_t::iterator it = _udp_pipes.begin ();
|
||||
it != _udp_pipes.end (); ++it)
|
||||
for (udp_pipes_t::iterator it = _udp_pipes.begin (),
|
||||
end = _udp_pipes.end ();
|
||||
it != end; ++it)
|
||||
_dist.match (*it);
|
||||
|
||||
int rc = -1;
|
||||
|
@ -165,7 +165,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_)
|
||||
if (it == _anonymous_pipes.end ())
|
||||
_fq.activated (pipe_);
|
||||
else {
|
||||
bool routing_id_ok = identify_peer (pipe_, false);
|
||||
const bool routing_id_ok = identify_peer (pipe_, false);
|
||||
if (routing_id_ok) {
|
||||
_anonymous_pipes.erase (it);
|
||||
_fq.attach (pipe_);
|
||||
|
@ -86,8 +86,9 @@ void zmq::server_t::xread_activated (pipe_t *pipe_)
|
||||
|
||||
void zmq::server_t::xwrite_activated (pipe_t *pipe_)
|
||||
{
|
||||
const out_pipes_t::iterator end = _out_pipes.end ();
|
||||
out_pipes_t::iterator it;
|
||||
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
|
||||
for (it = _out_pipes.begin (); it != end; ++it)
|
||||
if (it->second.pipe == pipe_)
|
||||
break;
|
||||
|
||||
|
@ -543,8 +543,6 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
session_base_t::create (io_thread, true, this, options, paddr);
|
||||
errno_assert (session);
|
||||
|
||||
pipe_t *newpipe = NULL;
|
||||
|
||||
// Create a bi-directional pipe.
|
||||
object_t *parents[2] = {this, session};
|
||||
pipe_t *new_pipes[2] = {NULL, NULL};
|
||||
@ -556,7 +554,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
|
||||
// Attach local end of the pipe to the socket object.
|
||||
attach_pipe (new_pipes[0], true, true);
|
||||
newpipe = new_pipes[0];
|
||||
pipe_t *const newpipe = new_pipes[0];
|
||||
|
||||
// Attach remote end of the pipe to the session object later on.
|
||||
session->attach_pipe (new_pipes[1]);
|
||||
@ -564,7 +562,7 @@ int zmq::socket_base_t::bind (const char *addr_)
|
||||
// Save last endpoint URI
|
||||
paddr->to_string (_last_endpoint);
|
||||
|
||||
add_endpoint (addr_, (own_t *) session, newpipe);
|
||||
add_endpoint (addr_, static_cast<own_t *> (session), newpipe);
|
||||
|
||||
return 0;
|
||||
}
|
||||
@ -786,12 +784,11 @@ int zmq::socket_base_t::connect (const char *addr_)
|
||||
options.connected = true;
|
||||
return 0;
|
||||
}
|
||||
bool is_single_connect =
|
||||
const bool is_single_connect =
|
||||
(options.type == ZMQ_DEALER || options.type == ZMQ_SUB
|
||||
|| options.type == ZMQ_PUB || options.type == ZMQ_REQ);
|
||||
if (unlikely (is_single_connect)) {
|
||||
const endpoints_t::iterator it = _endpoints.find (addr_);
|
||||
if (it != _endpoints.end ()) {
|
||||
if (0 != _endpoints.count (addr_)) {
|
||||
// There is no valid use for multiple connects for SUB-PUB nor
|
||||
// DEALER-ROUTER nor REQ-REP. Multiple connects produces
|
||||
// nonsensical results.
|
||||
@ -1551,8 +1548,8 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_)
|
||||
xpipe_terminated (pipe_);
|
||||
|
||||
// Remove pipe from inproc pipes
|
||||
for (inprocs_t::iterator it = _inprocs.begin (); it != _inprocs.end ();
|
||||
++it)
|
||||
for (inprocs_t::iterator it = _inprocs.begin (), end = _inprocs.end ();
|
||||
it != end; ++it)
|
||||
if (it->second == pipe_) {
|
||||
_inprocs.erase (it);
|
||||
break;
|
||||
@ -1790,12 +1787,13 @@ int zmq::routing_socket_base_t::xsetsockopt (int option_,
|
||||
|
||||
void zmq::routing_socket_base_t::xwrite_activated (pipe_t *pipe_)
|
||||
{
|
||||
const out_pipes_t::iterator end = _out_pipes.end ();
|
||||
out_pipes_t::iterator it;
|
||||
for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it)
|
||||
for (it = _out_pipes.begin (); it != end; ++it)
|
||||
if (it->second.pipe == pipe_)
|
||||
break;
|
||||
|
||||
zmq_assert (it != _out_pipes.end ());
|
||||
zmq_assert (it != end);
|
||||
zmq_assert (!it->second.active);
|
||||
it->second.active = true;
|
||||
}
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include "socket_poller.hpp"
|
||||
#include "err.hpp"
|
||||
#include "polling_util.hpp"
|
||||
#include "macros.hpp"
|
||||
|
||||
#include <limits.h>
|
||||
|
||||
@ -59,7 +60,8 @@ zmq::socket_poller_t::~socket_poller_t ()
|
||||
// Mark the socket_poller as dead
|
||||
_tag = 0xdeadbeef;
|
||||
|
||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||
++it) {
|
||||
// TODO shouldn't this zmq_assert (it->socket->check_tag ()) instead?
|
||||
if (it->socket && it->socket->check_tag ()
|
||||
&& is_thread_safe (*it->socket)) {
|
||||
@ -68,8 +70,7 @@ zmq::socket_poller_t::~socket_poller_t ()
|
||||
}
|
||||
|
||||
if (_signaler != NULL) {
|
||||
delete _signaler;
|
||||
_signaler = NULL;
|
||||
LIBZMQ_DELETE (_signaler);
|
||||
}
|
||||
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
@ -89,7 +90,8 @@ int zmq::socket_poller_t::add (socket_base_t *socket_,
|
||||
void *user_data_,
|
||||
short events_)
|
||||
{
|
||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||
++it) {
|
||||
if (it->socket == socket_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
@ -138,7 +140,8 @@ int zmq::socket_poller_t::add (socket_base_t *socket_,
|
||||
|
||||
int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
|
||||
{
|
||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||
++it) {
|
||||
if (!it->socket && it->fd == fd_) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
@ -169,14 +172,15 @@ int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
|
||||
|
||||
int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
|
||||
{
|
||||
const items_t::iterator end = _items.end ();
|
||||
items_t::iterator it;
|
||||
|
||||
for (it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (it = _items.begin (); it != end; ++it) {
|
||||
if (it->socket == socket_)
|
||||
break;
|
||||
}
|
||||
|
||||
if (it == _items.end ()) {
|
||||
if (it == end) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
@ -190,14 +194,15 @@ int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
|
||||
|
||||
int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
|
||||
{
|
||||
const items_t::iterator end = _items.end ();
|
||||
items_t::iterator it;
|
||||
|
||||
for (it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (it = _items.begin (); it != end; ++it) {
|
||||
if (!it->socket && it->fd == fd_)
|
||||
break;
|
||||
}
|
||||
|
||||
if (it == _items.end ()) {
|
||||
if (it == end) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
@ -211,14 +216,15 @@ int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
|
||||
|
||||
int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
||||
{
|
||||
const items_t::iterator end = _items.end ();
|
||||
items_t::iterator it;
|
||||
|
||||
for (it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (it = _items.begin (); it != end; ++it) {
|
||||
if (it->socket == socket_)
|
||||
break;
|
||||
}
|
||||
|
||||
if (it == _items.end ()) {
|
||||
if (it == end) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
@ -235,14 +241,15 @@ int zmq::socket_poller_t::remove (socket_base_t *socket_)
|
||||
|
||||
int zmq::socket_poller_t::remove_fd (fd_t fd_)
|
||||
{
|
||||
const items_t::iterator end = _items.end ();
|
||||
items_t::iterator it;
|
||||
|
||||
for (it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (it = _items.begin (); it != end; ++it) {
|
||||
if (!it->socket && it->fd == fd_)
|
||||
break;
|
||||
}
|
||||
|
||||
if (it == _items.end ()) {
|
||||
if (it == end) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
@ -266,7 +273,8 @@ void zmq::socket_poller_t::rebuild ()
|
||||
_pollfds = NULL;
|
||||
}
|
||||
|
||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||
++it) {
|
||||
if (it->events) {
|
||||
if (it->socket && is_thread_safe (*it->socket)) {
|
||||
if (!_use_signaler) {
|
||||
@ -292,7 +300,8 @@ void zmq::socket_poller_t::rebuild ()
|
||||
_pollfds[0].events = POLLIN;
|
||||
}
|
||||
|
||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||
++it) {
|
||||
if (it->events) {
|
||||
if (it->socket) {
|
||||
if (!is_thread_safe (*it->socket)) {
|
||||
@ -330,7 +339,8 @@ void zmq::socket_poller_t::rebuild ()
|
||||
FD_ZERO (_pollset_out.get ());
|
||||
FD_ZERO (_pollset_err.get ());
|
||||
|
||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||
++it) {
|
||||
if (it->socket && is_thread_safe (*it->socket) && it->events) {
|
||||
_use_signaler = true;
|
||||
FD_SET (_signaler->get_fd (), _pollset_in.get ());
|
||||
@ -342,7 +352,8 @@ void zmq::socket_poller_t::rebuild ()
|
||||
_max_fd = 0;
|
||||
|
||||
// Build the fd_sets for passing to select ().
|
||||
for (items_t::iterator it = _items.begin (); it != _items.end (); ++it) {
|
||||
for (items_t::iterator it = _items.begin (), end = _items.end (); it != end;
|
||||
++it) {
|
||||
if (it->events) {
|
||||
// If the poll item is a 0MQ socket we are interested in input on the
|
||||
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
||||
@ -404,8 +415,8 @@ int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
|
||||
#endif
|
||||
{
|
||||
int found = 0;
|
||||
for (items_t::iterator it = _items.begin ();
|
||||
it != _items.end () && found < n_events_; ++it) {
|
||||
for (items_t::iterator it = _items.begin (), end = _items.end ();
|
||||
it != end && found < n_events_; ++it) {
|
||||
// The poll item is a 0MQ socket. Retrieve pending events
|
||||
// using the ZMQ_EVENTS socket option.
|
||||
if (it->socket) {
|
||||
|
@ -105,7 +105,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_,
|
||||
// Put the socket into non-blocking mode.
|
||||
unblock_socket (_s);
|
||||
|
||||
int family = get_peer_ip_address (_s, _peer_address);
|
||||
const int family = get_peer_ip_address (_s, _peer_address);
|
||||
if (family == 0)
|
||||
_peer_address.clear ();
|
||||
#if defined ZMQ_HAVE_SO_PEERCRED
|
||||
@ -485,229 +485,23 @@ void zmq::stream_engine_t::restart_input ()
|
||||
}
|
||||
}
|
||||
|
||||
// Position of the revision field in the greeting.
|
||||
const size_t revision_pos = 10;
|
||||
|
||||
bool zmq::stream_engine_t::handshake ()
|
||||
{
|
||||
zmq_assert (_handshaking);
|
||||
zmq_assert (_greeting_bytes_read < _greeting_size);
|
||||
// Receive the greeting.
|
||||
while (_greeting_bytes_read < _greeting_size) {
|
||||
const int n = tcp_read (_s, _greeting_recv + _greeting_bytes_read,
|
||||
_greeting_size - _greeting_bytes_read);
|
||||
if (n == 0) {
|
||||
errno = EPIPE;
|
||||
error (connection_error);
|
||||
return false;
|
||||
}
|
||||
if (n == -1) {
|
||||
if (errno != EAGAIN)
|
||||
error (connection_error);
|
||||
return false;
|
||||
}
|
||||
const int rc = receive_greeting ();
|
||||
if (rc == -1)
|
||||
return false;
|
||||
const bool unversioned = rc != 0;
|
||||
|
||||
_greeting_bytes_read += n;
|
||||
|
||||
// We have received at least one byte from the peer.
|
||||
// If the first byte is not 0xff, we know that the
|
||||
// peer is using unversioned protocol.
|
||||
if (_greeting_recv[0] != 0xff)
|
||||
break;
|
||||
|
||||
if (_greeting_bytes_read < signature_size)
|
||||
continue;
|
||||
|
||||
// Inspect the right-most bit of the 10th byte (which coincides
|
||||
// with the 'flags' field if a regular message was sent).
|
||||
// Zero indicates this is a header of a routing id message
|
||||
// (i.e. the peer is using the unversioned protocol).
|
||||
if (!(_greeting_recv[9] & 0x01))
|
||||
break;
|
||||
|
||||
// The peer is using versioned protocol.
|
||||
// Send the major version number.
|
||||
if (_outpos + _outsize == _greeting_send + signature_size) {
|
||||
if (_outsize == 0)
|
||||
set_pollout (_handle);
|
||||
_outpos[_outsize++] = 3; // Major version number
|
||||
}
|
||||
|
||||
if (_greeting_bytes_read > signature_size) {
|
||||
if (_outpos + _outsize == _greeting_send + signature_size + 1) {
|
||||
if (_outsize == 0)
|
||||
set_pollout (_handle);
|
||||
|
||||
// Use ZMTP/2.0 to talk to older peers.
|
||||
if (_greeting_recv[10] == ZMTP_1_0
|
||||
|| _greeting_recv[10] == ZMTP_2_0)
|
||||
_outpos[_outsize++] = _options.type;
|
||||
else {
|
||||
_outpos[_outsize++] = 0; // Minor version number
|
||||
memset (_outpos + _outsize, 0, 20);
|
||||
|
||||
zmq_assert (_options.mechanism == ZMQ_NULL
|
||||
|| _options.mechanism == ZMQ_PLAIN
|
||||
|| _options.mechanism == ZMQ_CURVE
|
||||
|| _options.mechanism == ZMQ_GSSAPI);
|
||||
|
||||
if (_options.mechanism == ZMQ_NULL)
|
||||
memcpy (_outpos + _outsize, "NULL", 4);
|
||||
else if (_options.mechanism == ZMQ_PLAIN)
|
||||
memcpy (_outpos + _outsize, "PLAIN", 5);
|
||||
else if (_options.mechanism == ZMQ_GSSAPI)
|
||||
memcpy (_outpos + _outsize, "GSSAPI", 6);
|
||||
else if (_options.mechanism == ZMQ_CURVE)
|
||||
memcpy (_outpos + _outsize, "CURVE", 5);
|
||||
_outsize += 20;
|
||||
memset (_outpos + _outsize, 0, 32);
|
||||
_outsize += 32;
|
||||
_greeting_size = v3_greeting_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Position of the revision field in the greeting.
|
||||
const size_t revision_pos = 10;
|
||||
|
||||
// Is the peer using ZMTP/1.0 with no revision number?
|
||||
// If so, we send and receive rest of routing id message
|
||||
if (_greeting_recv[0] != 0xff || !(_greeting_recv[9] & 0x01)) {
|
||||
if (_session->zap_enabled ()) {
|
||||
// reject ZMTP 1.0 connections if ZAP is enabled
|
||||
error (protocol_error);
|
||||
return false;
|
||||
}
|
||||
|
||||
_encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
|
||||
alloc_assert (_encoder);
|
||||
|
||||
_decoder =
|
||||
new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
|
||||
alloc_assert (_decoder);
|
||||
|
||||
// We have already sent the message header.
|
||||
// Since there is no way to tell the encoder to
|
||||
// skip the message header, we simply throw that
|
||||
// header data away.
|
||||
const size_t header_size =
|
||||
_options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2;
|
||||
unsigned char tmp[10], *bufferp = tmp;
|
||||
|
||||
// Prepare the routing id message and load it into encoder.
|
||||
// Then consume bytes we have already sent to the peer.
|
||||
const int rc = _tx_msg.init_size (_options.routing_id_size);
|
||||
zmq_assert (rc == 0);
|
||||
memcpy (_tx_msg.data (), _options.routing_id, _options.routing_id_size);
|
||||
_encoder->load_msg (&_tx_msg);
|
||||
size_t buffer_size = _encoder->encode (&bufferp, header_size);
|
||||
zmq_assert (buffer_size == header_size);
|
||||
|
||||
// Make sure the decoder sees the data we have already received.
|
||||
_inpos = _greeting_recv;
|
||||
_insize = _greeting_bytes_read;
|
||||
|
||||
// To allow for interoperability with peers that do not forward
|
||||
// their subscriptions, we inject a phantom subscription message
|
||||
// message into the incoming message stream.
|
||||
if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB)
|
||||
_subscription_required = true;
|
||||
|
||||
// We are sending our routing id now and the next message
|
||||
// will come from the socket.
|
||||
_next_msg = &stream_engine_t::pull_msg_from_session;
|
||||
|
||||
// We are expecting routing id message.
|
||||
_process_msg = &stream_engine_t::process_routing_id_msg;
|
||||
} else if (_greeting_recv[revision_pos] == ZMTP_1_0) {
|
||||
if (_session->zap_enabled ()) {
|
||||
// reject ZMTP 1.0 connections if ZAP is enabled
|
||||
error (protocol_error);
|
||||
return false;
|
||||
}
|
||||
|
||||
_encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
|
||||
alloc_assert (_encoder);
|
||||
|
||||
_decoder =
|
||||
new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
|
||||
alloc_assert (_decoder);
|
||||
} else if (_greeting_recv[revision_pos] == ZMTP_2_0) {
|
||||
if (_session->zap_enabled ()) {
|
||||
// reject ZMTP 2.0 connections if ZAP is enabled
|
||||
error (protocol_error);
|
||||
return false;
|
||||
}
|
||||
|
||||
_encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
|
||||
alloc_assert (_encoder);
|
||||
|
||||
_decoder = new (std::nothrow)
|
||||
v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
|
||||
alloc_assert (_decoder);
|
||||
} else {
|
||||
_encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
|
||||
alloc_assert (_encoder);
|
||||
|
||||
_decoder = new (std::nothrow)
|
||||
v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
|
||||
alloc_assert (_decoder);
|
||||
|
||||
if (_options.mechanism == ZMQ_NULL
|
||||
&& memcmp (_greeting_recv + 12,
|
||||
"NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
|
||||
== 0) {
|
||||
_mechanism = new (std::nothrow)
|
||||
null_mechanism_t (_session, _peer_address, _options);
|
||||
alloc_assert (_mechanism);
|
||||
} else if (_options.mechanism == ZMQ_PLAIN
|
||||
&& memcmp (_greeting_recv + 12,
|
||||
"PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
|
||||
== 0) {
|
||||
if (_options.as_server)
|
||||
_mechanism = new (std::nothrow)
|
||||
plain_server_t (_session, _peer_address, _options);
|
||||
else
|
||||
_mechanism =
|
||||
new (std::nothrow) plain_client_t (_session, _options);
|
||||
alloc_assert (_mechanism);
|
||||
}
|
||||
#ifdef ZMQ_HAVE_CURVE
|
||||
else if (_options.mechanism == ZMQ_CURVE
|
||||
&& memcmp (_greeting_recv + 12,
|
||||
"CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
|
||||
== 0) {
|
||||
if (_options.as_server)
|
||||
_mechanism = new (std::nothrow)
|
||||
curve_server_t (_session, _peer_address, _options);
|
||||
else
|
||||
_mechanism =
|
||||
new (std::nothrow) curve_client_t (_session, _options);
|
||||
alloc_assert (_mechanism);
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_LIBGSSAPI_KRB5
|
||||
else if (_options.mechanism == ZMQ_GSSAPI
|
||||
&& memcmp (_greeting_recv + 12,
|
||||
"GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
|
||||
== 0) {
|
||||
if (_options.as_server)
|
||||
_mechanism = new (std::nothrow)
|
||||
gssapi_server_t (_session, _peer_address, _options);
|
||||
else
|
||||
_mechanism =
|
||||
new (std::nothrow) gssapi_client_t (_session, _options);
|
||||
alloc_assert (_mechanism);
|
||||
}
|
||||
#endif
|
||||
else {
|
||||
_session->get_socket ()->event_handshake_failed_protocol (
|
||||
_session->get_endpoint (),
|
||||
ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
|
||||
error (protocol_error);
|
||||
return false;
|
||||
}
|
||||
_next_msg = &stream_engine_t::next_handshake_command;
|
||||
_process_msg = &stream_engine_t::process_handshake_command;
|
||||
}
|
||||
if (!(this
|
||||
->*select_handshake_fun (unversioned,
|
||||
_greeting_recv[revision_pos])) ())
|
||||
return false;
|
||||
|
||||
// Start polling for output if necessary.
|
||||
if (_outsize == 0)
|
||||
@ -725,6 +519,269 @@ bool zmq::stream_engine_t::handshake ()
|
||||
return true;
|
||||
}
|
||||
|
||||
int zmq::stream_engine_t::receive_greeting ()
|
||||
{
|
||||
bool unversioned = false;
|
||||
while (_greeting_bytes_read < _greeting_size) {
|
||||
const int n = tcp_read (_s, _greeting_recv + _greeting_bytes_read,
|
||||
_greeting_size - _greeting_bytes_read);
|
||||
if (n == 0) {
|
||||
errno = EPIPE;
|
||||
error (connection_error);
|
||||
return -1;
|
||||
}
|
||||
if (n == -1) {
|
||||
if (errno != EAGAIN)
|
||||
error (connection_error);
|
||||
return -1;
|
||||
}
|
||||
|
||||
_greeting_bytes_read += n;
|
||||
|
||||
// We have received at least one byte from the peer.
|
||||
// If the first byte is not 0xff, we know that the
|
||||
// peer is using unversioned protocol.
|
||||
if (_greeting_recv[0] != 0xff) {
|
||||
unversioned = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (_greeting_bytes_read < signature_size)
|
||||
continue;
|
||||
|
||||
// Inspect the right-most bit of the 10th byte (which coincides
|
||||
// with the 'flags' field if a regular message was sent).
|
||||
// Zero indicates this is a header of a routing id message
|
||||
// (i.e. the peer is using the unversioned protocol).
|
||||
if (!(_greeting_recv[9] & 0x01)) {
|
||||
unversioned = true;
|
||||
break;
|
||||
}
|
||||
|
||||
// The peer is using versioned protocol.
|
||||
receive_greeting_versioned ();
|
||||
}
|
||||
return unversioned ? 1 : 0;
|
||||
}
|
||||
|
||||
void zmq::stream_engine_t::receive_greeting_versioned ()
|
||||
{
|
||||
// Send the major version number.
|
||||
if (_outpos + _outsize == _greeting_send + signature_size) {
|
||||
if (_outsize == 0)
|
||||
set_pollout (_handle);
|
||||
_outpos[_outsize++] = 3; // Major version number
|
||||
}
|
||||
|
||||
if (_greeting_bytes_read > signature_size) {
|
||||
if (_outpos + _outsize == _greeting_send + signature_size + 1) {
|
||||
if (_outsize == 0)
|
||||
set_pollout (_handle);
|
||||
|
||||
// Use ZMTP/2.0 to talk to older peers.
|
||||
if (_greeting_recv[revision_pos] == ZMTP_1_0
|
||||
|| _greeting_recv[revision_pos] == ZMTP_2_0)
|
||||
_outpos[_outsize++] = _options.type;
|
||||
else {
|
||||
_outpos[_outsize++] = 0; // Minor version number
|
||||
memset (_outpos + _outsize, 0, 20);
|
||||
|
||||
zmq_assert (_options.mechanism == ZMQ_NULL
|
||||
|| _options.mechanism == ZMQ_PLAIN
|
||||
|| _options.mechanism == ZMQ_CURVE
|
||||
|| _options.mechanism == ZMQ_GSSAPI);
|
||||
|
||||
if (_options.mechanism == ZMQ_NULL)
|
||||
memcpy (_outpos + _outsize, "NULL", 4);
|
||||
else if (_options.mechanism == ZMQ_PLAIN)
|
||||
memcpy (_outpos + _outsize, "PLAIN", 5);
|
||||
else if (_options.mechanism == ZMQ_GSSAPI)
|
||||
memcpy (_outpos + _outsize, "GSSAPI", 6);
|
||||
else if (_options.mechanism == ZMQ_CURVE)
|
||||
memcpy (_outpos + _outsize, "CURVE", 5);
|
||||
_outsize += 20;
|
||||
memset (_outpos + _outsize, 0, 32);
|
||||
_outsize += 32;
|
||||
_greeting_size = v3_greeting_size;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
zmq::stream_engine_t::handshake_fun_t
|
||||
zmq::stream_engine_t::select_handshake_fun (bool unversioned,
|
||||
unsigned char revision)
|
||||
{
|
||||
// Is the peer using ZMTP/1.0 with no revision number?
|
||||
if (unversioned) {
|
||||
return &stream_engine_t::handshake_v1_0_unversioned;
|
||||
}
|
||||
switch (revision) {
|
||||
case ZMTP_1_0:
|
||||
return &stream_engine_t::handshake_v1_0;
|
||||
case ZMTP_2_0:
|
||||
return &stream_engine_t::handshake_v2_0;
|
||||
default:
|
||||
return &stream_engine_t::handshake_v3_0;
|
||||
}
|
||||
}
|
||||
|
||||
bool zmq::stream_engine_t::handshake_v1_0_unversioned ()
|
||||
{
|
||||
// We send and receive rest of routing id message
|
||||
if (_session->zap_enabled ()) {
|
||||
// reject ZMTP 1.0 connections if ZAP is enabled
|
||||
error (protocol_error);
|
||||
return false;
|
||||
}
|
||||
|
||||
_encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
|
||||
alloc_assert (_encoder);
|
||||
|
||||
_decoder =
|
||||
new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
|
||||
alloc_assert (_decoder);
|
||||
|
||||
// We have already sent the message header.
|
||||
// Since there is no way to tell the encoder to
|
||||
// skip the message header, we simply throw that
|
||||
// header data away.
|
||||
const size_t header_size =
|
||||
_options.routing_id_size + 1 >= UCHAR_MAX ? 10 : 2;
|
||||
unsigned char tmp[10], *bufferp = tmp;
|
||||
|
||||
// Prepare the routing id message and load it into encoder.
|
||||
// Then consume bytes we have already sent to the peer.
|
||||
const int rc = _tx_msg.init_size (_options.routing_id_size);
|
||||
zmq_assert (rc == 0);
|
||||
memcpy (_tx_msg.data (), _options.routing_id, _options.routing_id_size);
|
||||
_encoder->load_msg (&_tx_msg);
|
||||
const size_t buffer_size = _encoder->encode (&bufferp, header_size);
|
||||
zmq_assert (buffer_size == header_size);
|
||||
|
||||
// Make sure the decoder sees the data we have already received.
|
||||
_inpos = _greeting_recv;
|
||||
_insize = _greeting_bytes_read;
|
||||
|
||||
// To allow for interoperability with peers that do not forward
|
||||
// their subscriptions, we inject a phantom subscription message
|
||||
// message into the incoming message stream.
|
||||
if (_options.type == ZMQ_PUB || _options.type == ZMQ_XPUB)
|
||||
_subscription_required = true;
|
||||
|
||||
// We are sending our routing id now and the next message
|
||||
// will come from the socket.
|
||||
_next_msg = &stream_engine_t::pull_msg_from_session;
|
||||
|
||||
// We are expecting routing id message.
|
||||
_process_msg = &stream_engine_t::process_routing_id_msg;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool zmq::stream_engine_t::handshake_v1_0 ()
|
||||
{
|
||||
if (_session->zap_enabled ()) {
|
||||
// reject ZMTP 1.0 connections if ZAP is enabled
|
||||
error (protocol_error);
|
||||
return false;
|
||||
}
|
||||
|
||||
_encoder = new (std::nothrow) v1_encoder_t (out_batch_size);
|
||||
alloc_assert (_encoder);
|
||||
|
||||
_decoder =
|
||||
new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize);
|
||||
alloc_assert (_decoder);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool zmq::stream_engine_t::handshake_v2_0 ()
|
||||
{
|
||||
if (_session->zap_enabled ()) {
|
||||
// reject ZMTP 2.0 connections if ZAP is enabled
|
||||
error (protocol_error);
|
||||
return false;
|
||||
}
|
||||
|
||||
_encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
|
||||
alloc_assert (_encoder);
|
||||
|
||||
_decoder = new (std::nothrow)
|
||||
v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
|
||||
alloc_assert (_decoder);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool zmq::stream_engine_t::handshake_v3_0 ()
|
||||
{
|
||||
_encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
|
||||
alloc_assert (_encoder);
|
||||
|
||||
_decoder = new (std::nothrow)
|
||||
v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy);
|
||||
alloc_assert (_decoder);
|
||||
|
||||
if (_options.mechanism == ZMQ_NULL
|
||||
&& memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0",
|
||||
20)
|
||||
== 0) {
|
||||
_mechanism = new (std::nothrow)
|
||||
null_mechanism_t (_session, _peer_address, _options);
|
||||
alloc_assert (_mechanism);
|
||||
} else if (_options.mechanism == ZMQ_PLAIN
|
||||
&& memcmp (_greeting_recv + 12,
|
||||
"PLAIN\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
|
||||
== 0) {
|
||||
if (_options.as_server)
|
||||
_mechanism = new (std::nothrow)
|
||||
plain_server_t (_session, _peer_address, _options);
|
||||
else
|
||||
_mechanism = new (std::nothrow) plain_client_t (_session, _options);
|
||||
alloc_assert (_mechanism);
|
||||
}
|
||||
#ifdef ZMQ_HAVE_CURVE
|
||||
else if (_options.mechanism == ZMQ_CURVE
|
||||
&& memcmp (_greeting_recv + 12,
|
||||
"CURVE\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
|
||||
== 0) {
|
||||
if (_options.as_server)
|
||||
_mechanism = new (std::nothrow)
|
||||
curve_server_t (_session, _peer_address, _options);
|
||||
else
|
||||
_mechanism = new (std::nothrow) curve_client_t (_session, _options);
|
||||
alloc_assert (_mechanism);
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_LIBGSSAPI_KRB5
|
||||
else if (_options.mechanism == ZMQ_GSSAPI
|
||||
&& memcmp (_greeting_recv + 12,
|
||||
"GSSAPI\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20)
|
||||
== 0) {
|
||||
if (_options.as_server)
|
||||
_mechanism = new (std::nothrow)
|
||||
gssapi_server_t (_session, _peer_address, _options);
|
||||
else
|
||||
_mechanism =
|
||||
new (std::nothrow) gssapi_client_t (_session, _options);
|
||||
alloc_assert (_mechanism);
|
||||
}
|
||||
#endif
|
||||
else {
|
||||
_session->get_socket ()->event_handshake_failed_protocol (
|
||||
_session->get_endpoint (),
|
||||
ZMQ_PROTOCOL_ERROR_ZMTP_MECHANISM_MISMATCH);
|
||||
error (protocol_error);
|
||||
return false;
|
||||
}
|
||||
_next_msg = &stream_engine_t::next_handshake_command;
|
||||
_process_msg = &stream_engine_t::process_handshake_command;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int zmq::stream_engine_t::routing_id_msg (msg_t *msg_)
|
||||
{
|
||||
int rc = msg_->init_size (_options.routing_id_size);
|
||||
@ -1037,20 +1094,18 @@ void zmq::stream_engine_t::timer_event (int id_)
|
||||
|
||||
int zmq::stream_engine_t::produce_ping_message (msg_t *msg_)
|
||||
{
|
||||
int rc = 0;
|
||||
// 16-bit TTL + \4PING == 7
|
||||
const size_t ping_ttl_len = msg_t::ping_cmd_name_size + 2;
|
||||
zmq_assert (_mechanism != NULL);
|
||||
|
||||
rc = msg_->init_size (ping_ttl_len);
|
||||
int rc = msg_->init_size (ping_ttl_len);
|
||||
errno_assert (rc == 0);
|
||||
msg_->set_flags (msg_t::command);
|
||||
// Copy in the command message
|
||||
memcpy (msg_->data (), "\4PING", msg_t::ping_cmd_name_size);
|
||||
|
||||
uint16_t ttl_val = htons (_options.heartbeat_ttl);
|
||||
memcpy ((static_cast<uint8_t *> (msg_->data ()))
|
||||
+ msg_t::ping_cmd_name_size,
|
||||
memcpy (static_cast<uint8_t *> (msg_->data ()) + msg_t::ping_cmd_name_size,
|
||||
&ttl_val, sizeof (ttl_val));
|
||||
|
||||
rc = _mechanism->encode (msg_);
|
||||
@ -1064,10 +1119,9 @@ int zmq::stream_engine_t::produce_ping_message (msg_t *msg_)
|
||||
|
||||
int zmq::stream_engine_t::produce_pong_message (msg_t *msg_)
|
||||
{
|
||||
int rc = 0;
|
||||
zmq_assert (_mechanism != NULL);
|
||||
|
||||
rc = msg_->move (_pong_msg);
|
||||
int rc = msg_->move (_pong_msg);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
rc = _mechanism->encode (msg_);
|
||||
@ -1103,17 +1157,17 @@ int zmq::stream_engine_t::process_heartbeat_message (msg_t *msg_)
|
||||
// here and store it. Truncate it if it's too long.
|
||||
// Given the engine goes straight to out_event, sequential PINGs will
|
||||
// not be a problem.
|
||||
size_t context_len = msg_->size () - ping_ttl_len > ping_max_ctx_len
|
||||
? ping_max_ctx_len
|
||||
: msg_->size () - ping_ttl_len;
|
||||
int rc = _pong_msg.init_size (msg_t::ping_cmd_name_size + context_len);
|
||||
const size_t context_len =
|
||||
std::min (msg_->size () - ping_ttl_len, ping_max_ctx_len);
|
||||
const int rc =
|
||||
_pong_msg.init_size (msg_t::ping_cmd_name_size + context_len);
|
||||
errno_assert (rc == 0);
|
||||
_pong_msg.set_flags (msg_t::command);
|
||||
memcpy (_pong_msg.data (), "\4PONG", msg_t::ping_cmd_name_size);
|
||||
if (context_len > 0)
|
||||
memcpy ((static_cast<uint8_t *> (_pong_msg.data ()))
|
||||
memcpy (static_cast<uint8_t *> (_pong_msg.data ())
|
||||
+ msg_t::ping_cmd_name_size,
|
||||
(static_cast<uint8_t *> (msg_->data ())) + ping_ttl_len,
|
||||
static_cast<uint8_t *> (msg_->data ()) + ping_ttl_len,
|
||||
context_len);
|
||||
|
||||
_next_msg = &stream_engine_t::produce_pong_message;
|
||||
|
@ -93,12 +93,22 @@ class stream_engine_t : public io_object_t, public i_engine
|
||||
// Function to handle network disconnections.
|
||||
void error (error_reason_t reason_);
|
||||
|
||||
// Receives the greeting message from the peer.
|
||||
int receive_greeting ();
|
||||
|
||||
// Detects the protocol used by the peer.
|
||||
bool handshake ();
|
||||
|
||||
// Receive the greeting from the peer.
|
||||
int receive_greeting ();
|
||||
void receive_greeting_versioned ();
|
||||
|
||||
typedef bool (stream_engine_t::*handshake_fun_t) ();
|
||||
static handshake_fun_t select_handshake_fun (bool unversioned,
|
||||
unsigned char revision);
|
||||
|
||||
bool handshake_v1_0_unversioned ();
|
||||
bool handshake_v1_0 ();
|
||||
bool handshake_v2_0 ();
|
||||
bool handshake_v3_0 ();
|
||||
|
||||
int routing_id_msg (msg_t *msg_);
|
||||
int process_routing_id_msg (msg_t *msg_);
|
||||
|
||||
|
@ -144,7 +144,7 @@ long zmq::timers_t::timeout ()
|
||||
for (; it != end; ++it) {
|
||||
if (0 == _cancelled_timers.erase (it->second.timer_id)) {
|
||||
// Live timer, lets return the timeout
|
||||
res = it->first > now ? static_cast<long> (it->first - now) : 0;
|
||||
res = std::max (static_cast<long> (it->first - now), 0l);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user