diff --git a/src/router.cpp b/src/router.cpp index 3830ba3e..241810d4 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -152,7 +152,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_) if (it != _anonymous_pipes.end ()) _anonymous_pipes.erase (it); else { - outpipes_t::iterator iter = _out_pipes.find (pipe_->get_routing_id ()); + out_pipes_t::iterator iter = _out_pipes.find (pipe_->get_routing_id ()); zmq_assert (iter != _out_pipes.end ()); _out_pipes.erase (iter); _fq.pipe_terminated (pipe_); @@ -178,7 +178,7 @@ void zmq::router_t::xread_activated (pipe_t *pipe_) void zmq::router_t::xwrite_activated (pipe_t *pipe_) { - outpipes_t::iterator it; + out_pipes_t::iterator it; for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) if (it->second.pipe == pipe_) break; @@ -206,7 +206,7 @@ int zmq::router_t::xsend (msg_t *msg_) // router_mandatory is set. blob_t routing_id (static_cast (msg_->data ()), msg_->size (), zmq::reference_tag_t ()); - outpipes_t::iterator it = _out_pipes.find (routing_id); + out_pipes_t::iterator it = _out_pipes.find (routing_id); if (it != _out_pipes.end ()) { _current_out = it->second.pipe; @@ -422,7 +422,7 @@ bool zmq::router_t::xhas_out () return true; bool has_out = false; - outpipes_t::iterator it; + out_pipes_t::iterator it; for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) has_out |= it->second.pipe->check_hwm (); @@ -440,7 +440,7 @@ int zmq::router_t::get_peer_state (const void *routing_id_, int res = 0; blob_t routing_id_blob ((unsigned char *) routing_id_, routing_id_size_); - outpipes_t::const_iterator it = _out_pipes.find (routing_id_blob); + out_pipes_t::const_iterator it = _out_pipes.find (routing_id_blob); if (it == _out_pipes.end ()) { errno = EHOSTUNREACH; return -1; @@ -492,7 +492,7 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_) } else { routing_id.set (static_cast (msg.data ()), msg.size ()); - outpipes_t::iterator it = _out_pipes.find (routing_id); + out_pipes_t::iterator it = _out_pipes.find (routing_id); msg.close (); if (it != _out_pipes.end ()) { diff --git a/src/router.hpp b/src/router.hpp index 3ca1dd27..7ed44d6d 100644 --- a/src/router.hpp +++ b/src/router.hpp @@ -97,19 +97,9 @@ class router_t : public routing_socket_base_t // If true, more incoming message parts are expected. bool _more_in; - struct out_pipe_t - { - zmq::pipe_t *pipe; - bool active; - }; - // We keep a set of pipes that have not been identified yet. std::set _anonymous_pipes; - // Outbound pipes indexed by the peer IDs. - typedef std::map outpipes_t; - outpipes_t _out_pipes; - // The pipe we are currently writing to. zmq::pipe_t *_current_out; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 0da3dd00..7c6dab03 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -308,6 +308,16 @@ class routing_socket_base_t : public socket_base_t std::string extract_connect_routing_id (); + struct out_pipe_t + { + zmq::pipe_t *pipe; + bool active; + }; + + // Outbound pipes indexed by the peer IDs. + typedef std::map out_pipes_t; + out_pipes_t _out_pipes; + private: // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types std::string _connect_routing_id; diff --git a/src/stream.cpp b/src/stream.cpp index 841d40ea..c684e85c 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -53,7 +53,7 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : zmq::stream_t::~stream_t () { - zmq_assert (_outpipes.empty ()); + zmq_assert (_out_pipes.empty ()); _prefetched_routing_id.close (); _prefetched_msg.close (); } @@ -70,9 +70,9 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) void zmq::stream_t::xpipe_terminated (pipe_t *pipe_) { - outpipes_t::iterator it = _outpipes.find (pipe_->get_routing_id ()); - zmq_assert (it != _outpipes.end ()); - _outpipes.erase (it); + out_pipes_t::iterator it = _out_pipes.find (pipe_->get_routing_id ()); + zmq_assert (it != _out_pipes.end ()); + _out_pipes.erase (it); _fq.pipe_terminated (pipe_); if (pipe_ == _current_out) _current_out = NULL; @@ -85,12 +85,12 @@ void zmq::stream_t::xread_activated (pipe_t *pipe_) void zmq::stream_t::xwrite_activated (pipe_t *pipe_) { - outpipes_t::iterator it; - for (it = _outpipes.begin (); it != _outpipes.end (); ++it) + out_pipes_t::iterator it; + for (it = _out_pipes.begin (); it != _out_pipes.end (); ++it) if (it->second.pipe == pipe_) break; - zmq_assert (it != _outpipes.end ()); + zmq_assert (it != _out_pipes.end ()); zmq_assert (!it->second.active); it->second.active = true; } @@ -110,9 +110,9 @@ int zmq::stream_t::xsend (msg_t *msg_) // If there's no such pipe return an error blob_t routing_id (static_cast (msg_->data ()), msg_->size ()); - outpipes_t::iterator it = _outpipes.find (routing_id); + out_pipes_t::iterator it = _out_pipes.find (routing_id); - if (it != _outpipes.end ()) { + if (it != _out_pipes.end ()) { _current_out = it->second.pipe; if (!_current_out->check_write ()) { it->second.active = false; @@ -288,7 +288,7 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) reinterpret_cast (connect_routing_id.c_str ()), connect_routing_id.length ()); // Not allowed to duplicate an existing rid - zmq_assert (0 == _outpipes.count (routing_id)); + zmq_assert (0 == _out_pipes.count (routing_id)); } else { put_uint32 (buffer + 1, _next_integral_routing_id++); routing_id.set (buffer, sizeof buffer); @@ -298,9 +298,9 @@ void zmq::stream_t::identify_peer (pipe_t *pipe_) } pipe_->set_router_socket_routing_id (routing_id); // Add the record into output pipes lookup table - outpipe_t outpipe = {pipe_, true}; + out_pipe_t outpipe = {pipe_, true}; const bool ok = - _outpipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) + _out_pipes.ZMQ_MAP_INSERT_OR_EMPLACE (ZMQ_MOVE (routing_id), outpipe) .second; zmq_assert (ok); } diff --git a/src/stream.hpp b/src/stream.hpp index 33df16cf..4ff3eeda 100644 --- a/src/stream.hpp +++ b/src/stream.hpp @@ -76,16 +76,6 @@ class stream_t : public routing_socket_base_t // Holds the prefetched message. msg_t _prefetched_msg; - struct outpipe_t - { - zmq::pipe_t *pipe; - bool active; - }; - - // Outbound pipes indexed by the peer IDs. - typedef std::map outpipes_t; - outpipes_t _outpipes; - // The pipe we are currently writing to. zmq::pipe_t *_current_out;