mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-18 03:29:49 +02:00
Add asserts and rename pipe set
Rename the pipeset to terminating_pipes, as suggested by Martin H. Adds asserts to test the pipe is contained in the terminating set where appropriate.
This commit is contained in:
@@ -230,27 +230,29 @@ void zmq::session_base_t::clean_pipes ()
|
|||||||
void zmq::session_base_t::terminated (pipe_t *pipe_)
|
void zmq::session_base_t::terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Drop the reference to the deallocated pipe if required.
|
// Drop the reference to the deallocated pipe if required.
|
||||||
zmq_assert (pipe == pipe_ || incomplete_pipes.size () > 0);
|
zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1);
|
||||||
|
|
||||||
if (pipe == pipe_)
|
if (pipe == pipe_)
|
||||||
// If this is our current pipe, remove it
|
// If this is our current pipe, remove it
|
||||||
pipe = NULL;
|
pipe = NULL;
|
||||||
else
|
else
|
||||||
// Remove the pipe from the detached pipes set
|
// Remove the pipe from the detached pipes set
|
||||||
incomplete_pipes.erase (pipe_);
|
terminating_pipes.erase (pipe_);
|
||||||
|
|
||||||
// If we are waiting for pending messages to be sent, at this point
|
// If we are waiting for pending messages to be sent, at this point
|
||||||
// we are sure that there will be no more messages and we can proceed
|
// we are sure that there will be no more messages and we can proceed
|
||||||
// with termination safely.
|
// with termination safely.
|
||||||
if (pending && !pipe && incomplete_pipes.size () == 0)
|
if (pending && !pipe && terminating_pipes.size () == 0)
|
||||||
proceed_with_term ();
|
proceed_with_term ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Skip activating if we're detaching this pipe
|
// Skip activating if we're detaching this pipe
|
||||||
if (incomplete_pipes.size () > 0 && pipe_ != pipe)
|
if (pipe != pipe_) {
|
||||||
|
zmq_assert (terminating_pipes.count (pipe_) == 1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (likely (engine != NULL))
|
if (likely (engine != NULL))
|
||||||
engine->activate_out ();
|
engine->activate_out ();
|
||||||
@@ -261,8 +263,10 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_)
|
|||||||
void zmq::session_base_t::write_activated (pipe_t *pipe_)
|
void zmq::session_base_t::write_activated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Skip activating if we're detaching this pipe
|
// Skip activating if we're detaching this pipe
|
||||||
if (incomplete_pipes.size () > 0 && pipe_ != pipe)
|
if (pipe != pipe_) {
|
||||||
|
zmq_assert (terminating_pipes.count (pipe_) == 1);
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (engine)
|
if (engine)
|
||||||
engine->activate_in ();
|
engine->activate_in ();
|
||||||
@@ -411,7 +415,7 @@ void zmq::session_base_t::detached ()
|
|||||||
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
|
&& addr->protocol != "pgm" && addr->protocol != "epgm") {
|
||||||
pipe->hiccup ();
|
pipe->hiccup ();
|
||||||
pipe->terminate (false);
|
pipe->terminate (false);
|
||||||
incomplete_pipes.insert (pipe);
|
terminating_pipes.insert (pipe);
|
||||||
pipe = NULL;
|
pipe = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -105,7 +105,7 @@ namespace zmq
|
|||||||
zmq::pipe_t *pipe;
|
zmq::pipe_t *pipe;
|
||||||
|
|
||||||
// This set is added to with pipes we are disconnecting, but haven't yet completed
|
// This set is added to with pipes we are disconnecting, but haven't yet completed
|
||||||
std::set<pipe_t *> incomplete_pipes;
|
std::set<pipe_t *> terminating_pipes;
|
||||||
|
|
||||||
// This flag is true if the remainder of the message being processed
|
// This flag is true if the remainder of the message being processed
|
||||||
// is still in the in pipe.
|
// is still in the in pipe.
|
||||||
|
Reference in New Issue
Block a user