diff --git a/src/pipe.cpp b/src/pipe.cpp index b39c5e92..c8ce4cda 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -553,3 +553,13 @@ void zmq::pipe_t::send_hwms_to_peer (int inhwm_, int outhwm_) { send_pipe_hwm (_peer, inhwm_, outhwm_); } + +void zmq::pipe_t::set_endpoint_uri (const char *name_) +{ + _endpoint_uri = name_; +} + +std::string &zmq::pipe_t::get_endpoint_uri () +{ + return _endpoint_uri; +} diff --git a/src/pipe.hpp b/src/pipe.hpp index dbe021a7..49e800d3 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -141,6 +141,9 @@ class pipe_t : public object_t, // Returns true if HWM is not reached bool check_hwm () const; + void set_endpoint_uri (const char *name_); + std::string &get_endpoint_uri (); + private: // Type of the underlying lock-free pipe. typedef ypipe_base_t upipe_t; @@ -244,6 +247,10 @@ class pipe_t : public object_t, const bool _conflate; + // If the pipe belongs to socket's endpoint the endpoint's name is stored here. + // Otherwise this is empty. + std::string _endpoint_uri; + // Disable copying. pipe_t (const pipe_t &); const pipe_t &operator= (const pipe_t &); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b8525b36..2b5290c4 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1009,6 +1009,9 @@ void zmq::socket_base_t::add_endpoint (const char *endpoint_uri_, launch_child (endpoint_); _endpoints.ZMQ_MAP_INSERT_OR_EMPLACE (std::string (endpoint_uri_), endpoint_pipe_t (endpoint_, pipe_)); + + if (pipe_ != NULL) + pipe_->set_endpoint_uri (endpoint_uri_); } int zmq::socket_base_t::term_endpoint (const char *endpoint_uri_) @@ -1543,6 +1546,20 @@ void zmq::socket_base_t::pipe_terminated (pipe_t *pipe_) // Remove the pipe from the list of attached pipes and confirm its // termination if we are already shutting down. _pipes.erase (pipe_); + + // Remove the pipe from _endpoints (set it to NULL). + if (!pipe_->get_endpoint_uri ().empty ()) { + std::pair range; + range = _endpoints.equal_range (pipe_->get_endpoint_uri ()); + + for (endpoints_t::iterator it = range.first; it != range.second; ++it) { + if (it->second.second == pipe_) { + it->second.second = NULL; + break; + } + } + } + if (is_terminating ()) unregister_term_ack (); }