Problem: duplicated code in stream_t & router_t

Solution: pulled up to routing_socket_base_t
This commit is contained in:
Simon Giesecke 2018-05-29 12:20:49 +02:00
parent 09fab930b3
commit ab3895a470
4 changed files with 20 additions and 10 deletions

View File

@ -152,9 +152,7 @@ void zmq::router_t::xpipe_terminated (pipe_t *pipe_)
if (it != _anonymous_pipes.end ()) if (it != _anonymous_pipes.end ())
_anonymous_pipes.erase (it); _anonymous_pipes.erase (it);
else { else {
out_pipes_t::iterator iter = _out_pipes.find (pipe_->get_routing_id ()); erase_out_pipe (pipe_);
zmq_assert (iter != _out_pipes.end ());
_out_pipes.erase (iter);
_fq.pipe_terminated (pipe_); _fq.pipe_terminated (pipe_);
pipe_->rollback (); pipe_->rollback ();
if (pipe_ == _current_out) if (pipe_ == _current_out)

View File

@ -1770,6 +1770,11 @@ zmq::routing_socket_base_t::routing_socket_base_t (class ctx_t *parent_,
{ {
} }
zmq::routing_socket_base_t::~routing_socket_base_t ()
{
zmq_assert (_out_pipes.empty ());
}
int zmq::routing_socket_base_t::xsetsockopt (int option_, int zmq::routing_socket_base_t::xsetsockopt (int option_,
const void *optval_, const void *optval_,
size_t optvallen_) size_t optvallen_)
@ -1807,3 +1812,10 @@ std::string zmq::routing_socket_base_t::extract_connect_routing_id ()
_connect_routing_id.clear (); _connect_routing_id.clear ();
return res; return res;
} }
void zmq::routing_socket_base_t::erase_out_pipe (pipe_t *pipe_)
{
out_pipes_t::iterator it = _out_pipes.find (pipe_->get_routing_id ());
zmq_assert (it != _out_pipes.end ());
_out_pipes.erase (it);
}

View File

@ -302,15 +302,18 @@ class routing_socket_base_t : public socket_base_t
{ {
protected: protected:
routing_socket_base_t (class ctx_t *parent_, uint32_t tid_, int sid_); routing_socket_base_t (class ctx_t *parent_, uint32_t tid_, int sid_);
~routing_socket_base_t ();
// methods from socket_base_t
virtual int virtual int
xsetsockopt (int option_, const void *optval_, size_t optvallen_); xsetsockopt (int option_, const void *optval_, size_t optvallen_);
virtual void xwrite_activated (pipe_t *pipe_);
void xwrite_activated (pipe_t *pipe_); // own methods
std::string extract_connect_routing_id (); std::string extract_connect_routing_id ();
void erase_out_pipe (pipe_t *pipe_);
struct out_pipe_t struct out_pipe_t
{ {
pipe_t *pipe; pipe_t *pipe;

View File

@ -53,7 +53,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
zmq::stream_t::~stream_t () zmq::stream_t::~stream_t ()
{ {
zmq_assert (_out_pipes.empty ());
_prefetched_routing_id.close (); _prefetched_routing_id.close ();
_prefetched_msg.close (); _prefetched_msg.close ();
} }
@ -70,9 +69,7 @@ void zmq::stream_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
void zmq::stream_t::xpipe_terminated (pipe_t *pipe_) void zmq::stream_t::xpipe_terminated (pipe_t *pipe_)
{ {
out_pipes_t::iterator it = _out_pipes.find (pipe_->get_routing_id ()); erase_out_pipe (pipe_);
zmq_assert (it != _out_pipes.end ());
_out_pipes.erase (it);
_fq.pipe_terminated (pipe_); _fq.pipe_terminated (pipe_);
// TODO router_t calls pipe_->rollback() here; should this be done here as // TODO router_t calls pipe_->rollback() here; should this be done here as
// well? then xpipe_terminated could be pulled up to routing_socket_base_t // well? then xpipe_terminated could be pulled up to routing_socket_base_t