mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-30 13:47:13 +01:00
Clean-up of session termination process
Specifically, shutdown of child objects is initiated *before* termination handshake with socket object. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
@@ -73,6 +73,10 @@ void zmq::fq_t::terminated (reader_t *pipe_)
|
||||
sink->unregister_term_ack ();
|
||||
}
|
||||
|
||||
void zmq::fq_t::delimited (reader_t *pipe_)
|
||||
{
|
||||
}
|
||||
|
||||
void zmq::fq_t::terminate ()
|
||||
{
|
||||
zmq_assert (!terminating);
|
||||
|
||||
@@ -45,6 +45,7 @@ namespace zmq
|
||||
// i_reader_events implementation.
|
||||
void activated (reader_t *pipe_);
|
||||
void terminated (reader_t *pipe_);
|
||||
void delimited (reader_t *pipe_);
|
||||
|
||||
private:
|
||||
|
||||
|
||||
12
src/pair.cpp
12
src/pair.cpp
@@ -42,8 +42,8 @@ zmq::pair_t::~pair_t ()
|
||||
zmq_assert (!outpipe);
|
||||
}
|
||||
|
||||
void zmq::pair_t::xattach_pipes (class reader_t *inpipe_,
|
||||
class writer_t *outpipe_, const blob_t &peer_identity_)
|
||||
void zmq::pair_t::xattach_pipes (reader_t *inpipe_, writer_t *outpipe_,
|
||||
const blob_t &peer_identity_)
|
||||
{
|
||||
zmq_assert (!inpipe && !outpipe);
|
||||
|
||||
@@ -62,7 +62,7 @@ void zmq::pair_t::xattach_pipes (class reader_t *inpipe_,
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::pair_t::terminated (class reader_t *pipe_)
|
||||
void zmq::pair_t::terminated (reader_t *pipe_)
|
||||
{
|
||||
zmq_assert (pipe_ == inpipe);
|
||||
inpipe = NULL;
|
||||
@@ -72,7 +72,7 @@ void zmq::pair_t::terminated (class reader_t *pipe_)
|
||||
unregister_term_ack ();
|
||||
}
|
||||
|
||||
void zmq::pair_t::terminated (class writer_t *pipe_)
|
||||
void zmq::pair_t::terminated (writer_t *pipe_)
|
||||
{
|
||||
zmq_assert (pipe_ == outpipe);
|
||||
outpipe = NULL;
|
||||
@@ -82,6 +82,10 @@ void zmq::pair_t::terminated (class writer_t *pipe_)
|
||||
unregister_term_ack ();
|
||||
}
|
||||
|
||||
void zmq::pair_t::delimited (reader_t *pipe_)
|
||||
{
|
||||
}
|
||||
|
||||
void zmq::pair_t::process_term ()
|
||||
{
|
||||
terminating = true;
|
||||
|
||||
@@ -47,6 +47,7 @@ namespace zmq
|
||||
// i_reader_events interface implementation.
|
||||
void activated (class reader_t *pipe_);
|
||||
void terminated (class reader_t *pipe_);
|
||||
void delimited (class reader_t *pipe_);
|
||||
|
||||
// i_writer_events interface implementation.
|
||||
void activated (class writer_t *pipe_);
|
||||
|
||||
@@ -89,6 +89,11 @@ bool zmq::reader_t::check_read ()
|
||||
// If the next item in the pipe is message delimiter,
|
||||
// initiate its termination.
|
||||
if (pipe->probe (is_delimiter)) {
|
||||
zmq_msg_t msg;
|
||||
bool ok = pipe->read (&msg);
|
||||
zmq_assert (ok);
|
||||
if (sink)
|
||||
sink->delimited (this);
|
||||
terminate ();
|
||||
return false;
|
||||
}
|
||||
@@ -109,6 +114,8 @@ bool zmq::reader_t::read (zmq_msg_t *msg_)
|
||||
// If delimiter was read, start termination process of the pipe.
|
||||
unsigned char *offset = 0;
|
||||
if (msg_->content == (void*) (offset + ZMQ_DELIMITER)) {
|
||||
if (sink)
|
||||
sink->delimited (this);
|
||||
terminate ();
|
||||
return false;
|
||||
}
|
||||
|
||||
@@ -52,6 +52,7 @@ namespace zmq
|
||||
|
||||
virtual void terminated (class reader_t *pipe_) = 0;
|
||||
virtual void activated (class reader_t *pipe_) = 0;
|
||||
virtual void delimited (class reader_t *pipe_) = 0;
|
||||
};
|
||||
|
||||
class reader_t : public object_t, public array_item_t
|
||||
|
||||
111
src/session.cpp
111
src/session.cpp
@@ -35,9 +35,9 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
|
||||
engine (NULL),
|
||||
socket (socket_),
|
||||
io_thread (io_thread_),
|
||||
attach_processed (false),
|
||||
term_processed (false),
|
||||
finalised (false)
|
||||
delimiter_processed (false),
|
||||
force_terminate (false),
|
||||
state (active)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -50,14 +50,26 @@ zmq::session_t::~session_t ()
|
||||
engine->terminate ();
|
||||
}
|
||||
|
||||
void zmq::session_t::terminate ()
|
||||
void zmq::session_t::proceed_with_term ()
|
||||
{
|
||||
if (in_pipe)
|
||||
if (state == terminating)
|
||||
return;
|
||||
|
||||
zmq_assert (state == pending);
|
||||
state = terminating;
|
||||
|
||||
if (in_pipe) {
|
||||
register_term_acks (1);
|
||||
in_pipe->terminate ();
|
||||
if (out_pipe)
|
||||
}
|
||||
if (out_pipe) {
|
||||
register_term_acks (1);
|
||||
out_pipe->terminate ();
|
||||
}
|
||||
|
||||
own_t::process_term ();
|
||||
}
|
||||
|
||||
bool zmq::session_t::read (::zmq_msg_t *msg_)
|
||||
{
|
||||
if (!in_pipe)
|
||||
@@ -125,46 +137,44 @@ void zmq::session_t::attach_pipes (class reader_t *inpipe_,
|
||||
}
|
||||
|
||||
// If we are already terminating, terminate the pipes straight away.
|
||||
if (finalised) {
|
||||
if (state == terminating) {
|
||||
if (in_pipe) {
|
||||
register_term_acks (1);
|
||||
in_pipe->terminate ();
|
||||
register_term_acks (1);
|
||||
}
|
||||
if (out_pipe) {
|
||||
register_term_acks (1);
|
||||
out_pipe->terminate ();
|
||||
register_term_acks (1);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
attach_processed = true;
|
||||
finalise ();
|
||||
void zmq::session_t::delimited (reader_t *pipe_)
|
||||
{
|
||||
zmq_assert (in_pipe == pipe_);
|
||||
zmq_assert (!delimiter_processed);
|
||||
delimiter_processed = true;
|
||||
|
||||
// If we are in process of being closed, but still waiting for all
|
||||
// pending messeges being sent, we can terminate here.
|
||||
if (state == pending)
|
||||
proceed_with_term ();
|
||||
}
|
||||
|
||||
void zmq::session_t::terminated (reader_t *pipe_)
|
||||
{
|
||||
zmq_assert (in_pipe == pipe_);
|
||||
in_pipe = NULL;
|
||||
|
||||
if (finalised) {
|
||||
if (state == terminating)
|
||||
unregister_term_ack ();
|
||||
return;
|
||||
}
|
||||
|
||||
finalise ();
|
||||
}
|
||||
|
||||
void zmq::session_t::terminated (writer_t *pipe_)
|
||||
{
|
||||
zmq_assert (out_pipe == pipe_);
|
||||
out_pipe = NULL;
|
||||
|
||||
if (finalised) {
|
||||
if (state == terminating)
|
||||
unregister_term_ack ();
|
||||
return;
|
||||
}
|
||||
|
||||
finalise ();
|
||||
}
|
||||
|
||||
void zmq::session_t::activated (reader_t *pipe_)
|
||||
@@ -186,27 +196,6 @@ void zmq::session_t::process_plug ()
|
||||
{
|
||||
}
|
||||
|
||||
void zmq::session_t::finalise ()
|
||||
{
|
||||
// There may be delimiter waiting in the inbound pipe, never to be read
|
||||
// because the connection cannot be established. In order to terminate
|
||||
// decently in such case, do check_read which will in turn start the pipe
|
||||
// termination process if there's delimiter in it.
|
||||
if (in_pipe)
|
||||
in_pipe->check_read ();
|
||||
|
||||
// If all conditions are met, proceed with termination:
|
||||
// 1. Owner object already asked us to terminate.
|
||||
// 2. The pipes were already attached to the session.
|
||||
// 3. Both pipes have already terminated. Note that inbound pipe
|
||||
// is terminated after delimiter is read, i.e. all messages
|
||||
// were already sent to the wire.
|
||||
if (term_processed && attach_processed && !in_pipe && !out_pipe) {
|
||||
finalised = true;
|
||||
own_t::process_term ();
|
||||
}
|
||||
}
|
||||
|
||||
void zmq::session_t::process_attach (i_engine *engine_,
|
||||
const blob_t &peer_identity_)
|
||||
{
|
||||
@@ -219,7 +208,7 @@ void zmq::session_t::process_attach (i_engine *engine_,
|
||||
}
|
||||
|
||||
// If we are already terminating, we destroy the engine straight away.
|
||||
if (finalised) {
|
||||
if (state == terminating) {
|
||||
delete engine;
|
||||
return;
|
||||
}
|
||||
@@ -229,18 +218,19 @@ void zmq::session_t::process_attach (i_engine *engine_,
|
||||
reader_t *socket_reader = NULL;
|
||||
writer_t *socket_writer = NULL;
|
||||
|
||||
// Create the pipes, if required.
|
||||
if (options.requires_in && !out_pipe) {
|
||||
create_pipe (socket, this, options.hwm, options.swap, &socket_reader,
|
||||
&out_pipe);
|
||||
out_pipe->set_event_sink (this);
|
||||
}
|
||||
|
||||
if (options.requires_out && !in_pipe) {
|
||||
create_pipe (this, socket, options.hwm, options.swap, &in_pipe,
|
||||
&socket_writer);
|
||||
in_pipe->set_event_sink (this);
|
||||
}
|
||||
|
||||
// Bind the pipes to the socket object.
|
||||
if (socket_reader || socket_writer)
|
||||
send_bind (socket, socket_reader, socket_writer, peer_identity_);
|
||||
|
||||
@@ -250,8 +240,6 @@ void zmq::session_t::process_attach (i_engine *engine_,
|
||||
engine = engine_;
|
||||
engine->plug (io_thread, this);
|
||||
|
||||
attach_processed = true;
|
||||
|
||||
// Trigger the notfication about the attachment.
|
||||
attached (peer_identity_);
|
||||
}
|
||||
@@ -266,11 +254,21 @@ void zmq::session_t::detach ()
|
||||
|
||||
void zmq::session_t::process_term ()
|
||||
{
|
||||
// Here we are pugging into the own_t's termination mechanism.
|
||||
// The goal is to postpone the termination till all the pending messages
|
||||
// are sent to the peer.
|
||||
term_processed = true;
|
||||
finalise ();
|
||||
zmq_assert (state == active);
|
||||
state = pending;
|
||||
|
||||
// If there's no engine and there's only delimiter in the pipe it wouldn't
|
||||
// be ever read. Thus we check for it explicitly.
|
||||
if (in_pipe)
|
||||
in_pipe->check_read ();
|
||||
|
||||
// If there's no in pipe there are no pending messages to send.
|
||||
// We can proceed with the shutdown straight away. Also, if there is
|
||||
// inbound pipe, but the delimiter was already processed, we can
|
||||
// terminate immediately. Alternatively, if the derived session type have
|
||||
// called 'terminate' we'll finish straight away.
|
||||
if (!options.requires_out || delimiter_processed || force_terminate)
|
||||
proceed_with_term ();
|
||||
}
|
||||
|
||||
bool zmq::session_t::register_session (const blob_t &name_, session_t *session_)
|
||||
@@ -291,3 +289,8 @@ void zmq::session_t::detached ()
|
||||
{
|
||||
}
|
||||
|
||||
void zmq::session_t::terminate ()
|
||||
{
|
||||
force_terminate = true;
|
||||
own_t::terminate ();
|
||||
}
|
||||
|
||||
@@ -54,6 +54,7 @@ namespace zmq
|
||||
// i_reader_events interface implementation.
|
||||
void activated (class reader_t *pipe_);
|
||||
void terminated (class reader_t *pipe_);
|
||||
void delimited (class reader_t *pipe_);
|
||||
|
||||
// i_writer_events interface implementation.
|
||||
void activated (class writer_t *pipe_);
|
||||
@@ -61,8 +62,8 @@ namespace zmq
|
||||
|
||||
protected:
|
||||
|
||||
// Forcefully close this session (without sending
|
||||
// outbound messages to the wire).
|
||||
// This function allows to shut down the session even though
|
||||
// there are pending messages in the inbound pipe.
|
||||
void terminate ();
|
||||
|
||||
// Two events for the derived session type. Attached is triggered
|
||||
@@ -93,9 +94,8 @@ namespace zmq
|
||||
const blob_t &peer_identity_);
|
||||
void process_term ();
|
||||
|
||||
// Check whether object is ready for termination. If so proceed
|
||||
// with closing child objects.
|
||||
void finalise ();
|
||||
// Call this function to move on with the delayed process_term.
|
||||
void proceed_with_term ();
|
||||
|
||||
// Inbound pipe, i.e. one the session is getting messages from.
|
||||
class reader_t *in_pipe;
|
||||
@@ -117,13 +117,18 @@ namespace zmq
|
||||
// the engines into the same thread.
|
||||
class io_thread_t *io_thread;
|
||||
|
||||
// True if pipes were already attached.
|
||||
bool attach_processed;
|
||||
// If true, delimiter was already read from the inbound pipe.
|
||||
bool delimiter_processed;
|
||||
|
||||
// True if term command was already processed.
|
||||
bool term_processed;
|
||||
// If true, we should terminate the session even though there are
|
||||
// pending messages in the inbound pipe.
|
||||
bool force_terminate;
|
||||
|
||||
bool finalised;
|
||||
enum {
|
||||
active,
|
||||
pending,
|
||||
terminating
|
||||
} state;
|
||||
|
||||
session_t (const session_t&);
|
||||
void operator = (const session_t&);
|
||||
|
||||
@@ -135,6 +135,10 @@ void zmq::xrep_t::terminated (writer_t *pipe_)
|
||||
zmq_assert (false);
|
||||
}
|
||||
|
||||
void zmq::xrep_t::delimited (reader_t *pipe_)
|
||||
{
|
||||
}
|
||||
|
||||
void zmq::xrep_t::activated (reader_t *pipe_)
|
||||
{
|
||||
for (inpipes_t::iterator it = inpipes.begin (); it != inpipes.end ();
|
||||
|
||||
@@ -57,6 +57,7 @@ namespace zmq
|
||||
// i_reader_events interface implementation.
|
||||
void activated (reader_t *pipe_);
|
||||
void terminated (reader_t *pipe_);
|
||||
void delimited (reader_t *pipe_);
|
||||
|
||||
// i_writer_events interface implementation.
|
||||
void activated (writer_t *pipe_);
|
||||
|
||||
Reference in New Issue
Block a user