Before this patch, the stream engine terminated itself
whenever it had detected an IO error. If this happened
when sending a message, the engine lost all
in-flight messages, messages waiting to be decoded,
and the last decoded message that had not been accepted,
if there was one.

The new behaviour is to terminate the engine only after
the input error has been detected and the last decoded
This commit is contained in:
Martin Hurton
2012-04-29 17:13:18 +02:00
parent 16ec2868c5
commit 776563fcff
4 changed files with 45 additions and 11 deletions

View File

@@ -47,6 +47,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
inpos (NULL),
insize (0),
decoder (in_batch_size, options_.maxmsgsize),
input_error (false),
outpos (NULL),
outsize (0),
encoder (out_batch_size),
@@ -55,7 +56,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
options (options_),
plugged (false)
{
// Get the socket into non-blocking mode.
// Put the socket into non-blocking mode.
unblock_socket (s);
// Set the socket buffer limits for the underlying socket.
@@ -202,8 +203,18 @@ void zmq::stream_engine_t::in_event ()
session->flush ();
}
if (session && disconnection)
error ();
// Input error has occurred. If the last decoded
// message has already been accepted, we terminate
// the engine immediately. Otherwise, we stop
// waiting for input events and postpone the termination
// until after the session has accepted the message.
if (session != NULL && disconnection) {
input_error = true;
if (decoder.stalled ())
reset_pollin (handle);
else
error ();
}
}
void zmq::stream_engine_t::out_event ()
@@ -235,9 +246,11 @@ void zmq::stream_engine_t::out_event ()
// written should be reasonably modest.
int nbytes = write (outpos, outsize);
// Handle problems with the connection.
// IO error has occurred. We stop waiting for output events.
// The engine is not terminated until we detect input error;
// this is necessary to prevent losing incomming messages.
if (nbytes == -1) {
error ();
reset_pollout (handle);
return;
}
@@ -258,6 +271,17 @@ void zmq::stream_engine_t::activate_out ()
void zmq::stream_engine_t::activate_in ()
{
if (input_error) {
// There was an input error but the engine could not
// be terminated (due to the stalled decoder).
// Flush the pending message and terminate the engine now.
decoder.process_buffer (inpos, 0);
zmq_assert (!decoder.stalled ());
session->flush ();
error ();
return;
}
set_pollin (handle);
// Speculative read.