Merge branch 'master' into events

This commit is contained in:
Lourens Naudé 2012-05-04 02:35:22 +01:00
commit c38aecdc50
9 changed files with 94 additions and 42 deletions

View File

@ -21,6 +21,7 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <limits>
#include "decoder.hpp" #include "decoder.hpp"
#include "session_base.hpp" #include "session_base.hpp"
@ -51,6 +52,11 @@ void zmq::decoder_t::set_session (session_base_t *session_)
session = session_; session = session_;
} }
bool zmq::decoder_t::stalled () const
{
return next == &decoder_t::message_ready;
}
bool zmq::decoder_t::one_byte_size_ready () bool zmq::decoder_t::one_byte_size_ready ()
{ {
// First byte of size is read. If it is 0xff read 8-byte size. // First byte of size is read. If it is 0xff read 8-byte size.
@ -91,33 +97,41 @@ bool zmq::decoder_t::one_byte_size_ready ()
bool zmq::decoder_t::eight_byte_size_ready () bool zmq::decoder_t::eight_byte_size_ready ()
{ {
// 8-byte size is read. Allocate the buffer for message body and // 8-byte payload length is read. Allocate the buffer
// read the message data into it. // for message body and read the message data into it.
size_t size = (size_t) get_uint64 (tmpbuf); const uint64_t payload_length = get_uint64 (tmpbuf);
// There has to be at least one byte (the flags) in the message). // There has to be at least one byte (the flags) in the message).
if (!size) { if (payload_length == 0) {
decoding_error (); decoding_error ();
return false; return false;
} }
// in_progress is initialised at this point so in theory we should // Message size must not exceed the maximum allowed size.
// close it before calling zmq_msg_init_size, however, it's a 0-byte if (maxmsgsize >= 0 && payload_length - 1 > (uint64_t) maxmsgsize) {
// message and thus we can treat it as uninitialised... decoding_error ();
int rc; return false;
if (maxmsgsize >= 0 && (int64_t) (size - 1) > maxmsgsize) {
rc = -1;
errno = ENOMEM;
} }
else
rc = in_progress.init_size (size - 1); // Message size must fit within range of size_t data type.
if (rc != 0 && errno == ENOMEM) { if (payload_length - 1 > std::numeric_limits <size_t>::max ()) {
decoding_error ();
return false;
}
const size_t msg_size = static_cast <size_t> (payload_length - 1);
// in_progress is initialised at this point so in theory we should
// close it before calling init_size, however, it's a 0-byte
// message and thus we can treat it as uninitialised...
int rc = in_progress.init_size (msg_size);
if (rc != 0) {
errno_assert (errno == ENOMEM);
rc = in_progress.init (); rc = in_progress.init ();
errno_assert (rc == 0); errno_assert (rc == 0);
decoding_error (); decoding_error ();
return false; return false;
} }
errno_assert (rc == 0);
next_step (tmpbuf, 1, &decoder_t::flags_ready); next_step (tmpbuf, 1, &decoder_t::flags_ready);
return true; return true;

View File

@ -52,9 +52,9 @@ namespace zmq
public: public:
inline decoder_base_t (size_t bufsize_) : inline decoder_base_t (size_t bufsize_) :
next (NULL),
read_pos (NULL), read_pos (NULL),
to_read (0), to_read (0),
next (NULL),
bufsize (bufsize_) bufsize (bufsize_)
{ {
buf = (unsigned char*) malloc (bufsize_); buf = (unsigned char*) malloc (bufsize_);
@ -165,6 +165,11 @@ namespace zmq
next = NULL; next = NULL;
} }
// Next step. If set to NULL, it means that associated data stream
// is dead. Note that there can be still data in the process in such
// case.
step_t next;
private: private:
// Where to store the read data. // Where to store the read data.
@ -173,11 +178,6 @@ namespace zmq
// How much data to read before taking next step. // How much data to read before taking next step.
size_t to_read; size_t to_read;
// Next step. If set to NULL, it means that associated data stream
// is dead. Note that there can be still data in the process in such
// case.
step_t next;
// The duffer for data to decode. // The duffer for data to decode.
size_t bufsize; size_t bufsize;
unsigned char *buf; unsigned char *buf;
@ -197,6 +197,10 @@ namespace zmq
void set_session (zmq::session_base_t *session_); void set_session (zmq::session_base_t *session_);
// Returns true if there is a decoded message
// waiting to be delivered to the session.
bool stalled () const;
private: private:
bool one_byte_size_ready (); bool one_byte_size_ready ();

View File

@ -194,6 +194,13 @@ int zmq::ipc_connecter_t::open ()
if (rc == 0) if (rc == 0)
return 0; return 0;
// Translate other error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
if (rc == -1 && errno == EINTR) {
errno = EINPROGRESS;
return -1;
}
// Forward the error. // Forward the error.
return -1; return -1;
} }

View File

@ -38,13 +38,19 @@ zmq::pair_t::~pair_t ()
void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_)
{ {
zmq_assert (!pipe); zmq_assert (pipe_ != NULL);
// ZMQ_PAIR socket can only be connected to a single peer.
// The socket rejects any further connection requests.
if (pipe == NULL)
pipe = pipe_; pipe = pipe_;
else
pipe_->terminate (false);
} }
void zmq::pair_t::xterminated (pipe_t *pipe_) void zmq::pair_t::xterminated (pipe_t *pipe_)
{ {
zmq_assert (pipe_ == pipe); if (pipe_ == pipe)
pipe = NULL; pipe = NULL;
} }

View File

@ -839,10 +839,6 @@ void zmq::socket_base_t::process_bind (pipe_t *pipe_)
attach_pipe (pipe_); attach_pipe (pipe_);
} }
void zmq::socket_base_t::process_unplug ()
{
}
void zmq::socket_base_t::process_term (int linger_) void zmq::socket_base_t::process_term (int linger_)
{ {
// Unregister all inproc endpoints associated with this socket. // Unregister all inproc endpoints associated with this socket.

View File

@ -183,7 +183,6 @@ namespace zmq
// Handlers for incoming commands. // Handlers for incoming commands.
void process_stop (); void process_stop ();
void process_bind (zmq::pipe_t *pipe_); void process_bind (zmq::pipe_t *pipe_);
void process_unplug ();
void process_term (int linger_); void process_term (int linger_);
// Socket's mailbox object. // Socket's mailbox object.

View File

@ -47,6 +47,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
inpos (NULL), inpos (NULL),
insize (0), insize (0),
decoder (in_batch_size, options_.maxmsgsize), decoder (in_batch_size, options_.maxmsgsize),
input_error (false),
outpos (NULL), outpos (NULL),
outsize (0), outsize (0),
encoder (out_batch_size), encoder (out_batch_size),
@ -55,7 +56,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) :
options (options_), options (options_),
plugged (false) plugged (false)
{ {
// Get the socket into non-blocking mode. // Put the socket into non-blocking mode.
unblock_socket (s); unblock_socket (s);
// Set the socket buffer limits for the underlying socket. // Set the socket buffer limits for the underlying socket.
@ -205,8 +206,18 @@ void zmq::stream_engine_t::in_event ()
session->flush (); session->flush ();
} }
if (session && disconnection) // Input error has occurred. If the last decoded
// message has already been accepted, we terminate
// the engine immediately. Otherwise, we stop
// waiting for input events and postpone the termination
// until after the session has accepted the message.
if (session != NULL && disconnection) {
input_error = true;
if (decoder.stalled ())
reset_pollin (handle);
else
error (); error ();
}
} }
void zmq::stream_engine_t::out_event () void zmq::stream_engine_t::out_event ()
@ -238,9 +249,11 @@ void zmq::stream_engine_t::out_event ()
// written should be reasonably modest. // written should be reasonably modest.
int nbytes = write (outpos, outsize); int nbytes = write (outpos, outsize);
// Handle problems with the connection. // IO error has occurred. We stop waiting for output events.
// The engine is not terminated until we detect input error;
// this is necessary to prevent losing incomming messages.
if (nbytes == -1) { if (nbytes == -1) {
error (); reset_pollout (handle);
return; return;
} }
@ -261,6 +274,17 @@ void zmq::stream_engine_t::activate_out ()
void zmq::stream_engine_t::activate_in () void zmq::stream_engine_t::activate_in ()
{ {
if (input_error) {
// There was an input error but the engine could not
// be terminated (due to the stalled decoder).
// Flush the pending message and terminate the engine now.
decoder.process_buffer (inpos, 0);
zmq_assert (!decoder.stalled ());
session->flush ();
error ();
return;
}
set_pollin (handle); set_pollin (handle);
// Speculative read. // Speculative read.

View File

@ -84,6 +84,7 @@ namespace zmq
unsigned char *inpos; unsigned char *inpos;
size_t insize; size_t insize;
decoder_t decoder; decoder_t decoder;
bool input_error;
unsigned char *outpos; unsigned char *outpos;
size_t outsize; size_t outsize;

View File

@ -144,7 +144,7 @@ void zmq::tcp_connecter_t::start_connecting ()
} }
// Connection establishment may be delayed. Poll for its completion. // Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EAGAIN) { else if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
@ -218,17 +218,18 @@ int zmq::tcp_connecter_t::open ()
if (rc == 0) if (rc == 0)
return 0; return 0;
// Asynchronous connect was launched. // Translate other error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS #ifdef ZMQ_HAVE_WINDOWS
if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS || if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS ||
WSAGetLastError () == WSAEWOULDBLOCK)) { WSAGetLastError () == WSAEWOULDBLOCK)) {
errno = EAGAIN; errno = EINPROGRESS;
return -1; return -1;
} }
wsa_error_to_errno (); wsa_error_to_errno ();
#else #else
if (rc == -1 && errno == EINPROGRESS) { if (rc == -1 && errno == EINTR) {
errno = EAGAIN; errno = EINPROGRESS;
return -1; return -1;
} }
#endif #endif