LIBZMQ-497 there could be unsent bytes in encoder

When we send a large message, the message can be splitted into two chunks.
One is in the encoder buffer and the other is the zero-copy pointer.
The session could get the term before the last chunk is sent.
This commit is contained in:
Min(Dongmin Yu) 2013-02-01 17:32:28 +09:00
parent a3ae0d4c16
commit 2c1a3c55f7
4 changed files with 19 additions and 0 deletions

View File

@ -126,6 +126,11 @@ namespace zmq
*size_ = pos;
}
inline bool has_data ()
{
return to_write > 0;
}
protected:
// Prototype of state machine action.

View File

@ -47,6 +47,7 @@ namespace zmq
virtual void get_data (unsigned char **data_, size_t *size_,
int *offset_ = NULL) = 0;
virtual bool has_data () = 0;
};
}

View File

@ -65,6 +65,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
options (options_),
endpoint (endpoint_),
plugged (false),
terminating (false),
socket (NULL)
{
// Put the socket into non-blocking mode.
@ -188,6 +189,11 @@ void zmq::stream_engine_t::unplug ()
void zmq::stream_engine_t::terminate ()
{
if (!terminating && encoder && encoder->has_data ()) {
// Give io_thread a chance to send in the buffer
terminating = true;
return;
}
unplug ();
delete this;
}
@ -298,6 +304,8 @@ void zmq::stream_engine_t::out_event ()
// this is necessary to prevent losing incomming messages.
if (nbytes == -1) {
reset_pollout (handle);
if (unlikely (terminating))
terminate ();
return;
}
@ -309,6 +317,10 @@ void zmq::stream_engine_t::out_event ()
if (unlikely (handshaking))
if (outsize == 0)
reset_pollout (handle);
if (unlikely (terminating))
if (outsize == 0)
terminate ();
}
void zmq::stream_engine_t::activate_out ()

View File

@ -135,6 +135,7 @@ namespace zmq
std::string endpoint;
bool plugged;
bool terminating;
// Socket
zmq::socket_base_t *socket;