diff --git a/src/decoder.cpp b/src/decoder.cpp index 48f457f8..c8279ba8 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -51,6 +51,11 @@ void zmq::decoder_t::set_session (session_base_t *session_) session = session_; } +bool zmq::decoder_t::stalled () const +{ + return next == &decoder_t::message_ready; +} + bool zmq::decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. diff --git a/src/decoder.hpp b/src/decoder.hpp index 4afd0185..d648cda8 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -52,9 +52,9 @@ namespace zmq public: inline decoder_base_t (size_t bufsize_) : + next (NULL), read_pos (NULL), to_read (0), - next (NULL), bufsize (bufsize_) { buf = (unsigned char*) malloc (bufsize_); @@ -165,6 +165,11 @@ namespace zmq next = NULL; } + // Next step. If set to NULL, it means that associated data stream + // is dead. Note that there can be still data in the process in such + // case. + step_t next; + private: // Where to store the read data. @@ -173,11 +178,6 @@ namespace zmq // How much data to read before taking next step. size_t to_read; - // Next step. If set to NULL, it means that associated data stream - // is dead. Note that there can be still data in the process in such - // case. - step_t next; - // The duffer for data to decode. size_t bufsize; unsigned char *buf; @@ -197,6 +197,10 @@ namespace zmq void set_session (zmq::session_base_t *session_); + // Returns true if there is a decoded message + // waiting to be delivered to the session. + bool stalled () const; + private: bool one_byte_size_ready (); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 1771990b..b25ff4bb 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -47,6 +47,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : inpos (NULL), insize (0), decoder (in_batch_size, options_.maxmsgsize), + input_error (false), outpos (NULL), outsize (0), encoder (out_batch_size), @@ -55,7 +56,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : options (options_), plugged (false) { - // Get the socket into non-blocking mode. + // Put the socket into non-blocking mode. unblock_socket (s); // Set the socket buffer limits for the underlying socket. @@ -202,8 +203,18 @@ void zmq::stream_engine_t::in_event () session->flush (); } - if (session && disconnection) - error (); + // Input error has occurred. If the last decoded + // message has already been accepted, we terminate + // the engine immediately. Otherwise, we stop + // waiting for input events and postpone the termination + // until after the session has accepted the message. + if (session != NULL && disconnection) { + input_error = true; + if (decoder.stalled ()) + reset_pollin (handle); + else + error (); + } } void zmq::stream_engine_t::out_event () @@ -235,9 +246,11 @@ void zmq::stream_engine_t::out_event () // written should be reasonably modest. int nbytes = write (outpos, outsize); - // Handle problems with the connection. + // IO error has occurred. We stop waiting for output events. + // The engine is not terminated until we detect input error; + // this is necessary to prevent losing incomming messages. if (nbytes == -1) { - error (); + reset_pollout (handle); return; } @@ -258,6 +271,17 @@ void zmq::stream_engine_t::activate_out () void zmq::stream_engine_t::activate_in () { + if (input_error) { + // There was an input error but the engine could not + // be terminated (due to the stalled decoder). + // Flush the pending message and terminate the engine now. + decoder.process_buffer (inpos, 0); + zmq_assert (!decoder.stalled ()); + session->flush (); + error (); + return; + } + set_pollin (handle); // Speculative read. diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 30b190b0..68a5c2ef 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -83,6 +83,7 @@ namespace zmq unsigned char *inpos; size_t insize; decoder_t decoder; + bool input_error; unsigned char *outpos; size_t outsize;