From 2c1a3c55f73a504b82c32b75790c59426cde76e9 Mon Sep 17 00:00:00 2001 From: "Min(Dongmin Yu)" Date: Fri, 1 Feb 2013 17:32:28 +0900 Subject: [PATCH] LIBZMQ-497 there could be unsent bytes in encoder When we send a large message, the message can be splitted into two chunks. One is in the encoder buffer and the other is the zero-copy pointer. The session could get the term before the last chunk is sent. --- src/encoder.hpp | 5 +++++ src/i_encoder.hpp | 1 + src/stream_engine.cpp | 12 ++++++++++++ src/stream_engine.hpp | 1 + 4 files changed, 19 insertions(+) diff --git a/src/encoder.hpp b/src/encoder.hpp index 4c299f1b..e30f7d9c 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -126,6 +126,11 @@ namespace zmq *size_ = pos; } + inline bool has_data () + { + return to_write > 0; + } + protected: // Prototype of state machine action. diff --git a/src/i_encoder.hpp b/src/i_encoder.hpp index a2551cb6..ae491e25 100644 --- a/src/i_encoder.hpp +++ b/src/i_encoder.hpp @@ -47,6 +47,7 @@ namespace zmq virtual void get_data (unsigned char **data_, size_t *size_, int *offset_ = NULL) = 0; + virtual bool has_data () = 0; }; } diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 80b2afd3..566181b5 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -65,6 +65,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons options (options_), endpoint (endpoint_), plugged (false), + terminating (false), socket (NULL) { // Put the socket into non-blocking mode. @@ -188,6 +189,11 @@ void zmq::stream_engine_t::unplug () void zmq::stream_engine_t::terminate () { + if (!terminating && encoder && encoder->has_data ()) { + // Give io_thread a chance to send in the buffer + terminating = true; + return; + } unplug (); delete this; } @@ -298,6 +304,8 @@ void zmq::stream_engine_t::out_event () // this is necessary to prevent losing incomming messages. if (nbytes == -1) { reset_pollout (handle); + if (unlikely (terminating)) + terminate (); return; } @@ -309,6 +317,10 @@ void zmq::stream_engine_t::out_event () if (unlikely (handshaking)) if (outsize == 0) reset_pollout (handle); + + if (unlikely (terminating)) + if (outsize == 0) + terminate (); } void zmq::stream_engine_t::activate_out () diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index d7505331..4d2da628 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -135,6 +135,7 @@ namespace zmq std::string endpoint; bool plugged; + bool terminating; // Socket zmq::socket_base_t *socket;