mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-21 18:14:44 +01:00
Merge pull request #510 from miniway/master
LIBZMQ-497 there could be unsent bytes in encoder
This commit is contained in:
commit
f27eb67e1a
@ -126,6 +126,11 @@ namespace zmq
|
|||||||
*size_ = pos;
|
*size_ = pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline bool has_data ()
|
||||||
|
{
|
||||||
|
return to_write > 0;
|
||||||
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
|
|
||||||
// Prototype of state machine action.
|
// Prototype of state machine action.
|
||||||
|
@ -47,6 +47,7 @@ namespace zmq
|
|||||||
virtual void get_data (unsigned char **data_, size_t *size_,
|
virtual void get_data (unsigned char **data_, size_t *size_,
|
||||||
int *offset_ = NULL) = 0;
|
int *offset_ = NULL) = 0;
|
||||||
|
|
||||||
|
virtual bool has_data () = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -65,6 +65,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons
|
|||||||
options (options_),
|
options (options_),
|
||||||
endpoint (endpoint_),
|
endpoint (endpoint_),
|
||||||
plugged (false),
|
plugged (false),
|
||||||
|
terminating (false),
|
||||||
socket (NULL)
|
socket (NULL)
|
||||||
{
|
{
|
||||||
// Put the socket into non-blocking mode.
|
// Put the socket into non-blocking mode.
|
||||||
@ -188,6 +189,11 @@ void zmq::stream_engine_t::unplug ()
|
|||||||
|
|
||||||
void zmq::stream_engine_t::terminate ()
|
void zmq::stream_engine_t::terminate ()
|
||||||
{
|
{
|
||||||
|
if (!terminating && encoder && encoder->has_data ()) {
|
||||||
|
// Give io_thread a chance to send in the buffer
|
||||||
|
terminating = true;
|
||||||
|
return;
|
||||||
|
}
|
||||||
unplug ();
|
unplug ();
|
||||||
delete this;
|
delete this;
|
||||||
}
|
}
|
||||||
@ -298,6 +304,8 @@ void zmq::stream_engine_t::out_event ()
|
|||||||
// this is necessary to prevent losing incomming messages.
|
// this is necessary to prevent losing incomming messages.
|
||||||
if (nbytes == -1) {
|
if (nbytes == -1) {
|
||||||
reset_pollout (handle);
|
reset_pollout (handle);
|
||||||
|
if (unlikely (terminating))
|
||||||
|
terminate ();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -309,6 +317,10 @@ void zmq::stream_engine_t::out_event ()
|
|||||||
if (unlikely (handshaking))
|
if (unlikely (handshaking))
|
||||||
if (outsize == 0)
|
if (outsize == 0)
|
||||||
reset_pollout (handle);
|
reset_pollout (handle);
|
||||||
|
|
||||||
|
if (unlikely (terminating))
|
||||||
|
if (outsize == 0)
|
||||||
|
terminate ();
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::stream_engine_t::activate_out ()
|
void zmq::stream_engine_t::activate_out ()
|
||||||
|
@ -135,6 +135,7 @@ namespace zmq
|
|||||||
std::string endpoint;
|
std::string endpoint;
|
||||||
|
|
||||||
bool plugged;
|
bool plugged;
|
||||||
|
bool terminating;
|
||||||
|
|
||||||
// Socket
|
// Socket
|
||||||
zmq::socket_base_t *socket;
|
zmq::socket_base_t *socket;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user