From 394a24857905c2489b555803170a0d640f38b4a8 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sat, 28 Apr 2012 16:28:12 +0200 Subject: [PATCH 1/5] socket_base: process_unplug () is not used, remove it --- src/socket_base.cpp | 4 ---- src/socket_base.hpp | 1 - 2 files changed, 5 deletions(-) diff --git a/src/socket_base.cpp b/src/socket_base.cpp index b8796571..e82c193d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -837,10 +837,6 @@ void zmq::socket_base_t::process_bind (pipe_t *pipe_) attach_pipe (pipe_); } -void zmq::socket_base_t::process_unplug () -{ -} - void zmq::socket_base_t::process_term (int linger_) { // Unregister all inproc endpoints associated with this socket. diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1788cd49..1abee52f 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -180,7 +180,6 @@ namespace zmq // Handlers for incoming commands. void process_stop (); void process_bind (zmq::pipe_t *pipe_); - void process_unplug (); void process_term (int linger_); // Socket's mailbox object. From 776563fcffe975774c713ade357ea2b83d22da7c Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 29 Apr 2012 17:13:18 +0200 Subject: [PATCH 2/5] Fix issue #264 Before this patch, the stream engine terminated itself whenever it had detected an IO error. If this happened when sending a message, the engine lost all in-flight messages, messages waiting to be decoded, and the last decoded message that had not been accepted, if there was one. The new behaviour is to terminate the engine only after the input error has been detected and the last decoded --- src/decoder.cpp | 5 +++++ src/decoder.hpp | 16 ++++++++++------ src/stream_engine.cpp | 34 +++++++++++++++++++++++++++++----- src/stream_engine.hpp | 1 + 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/decoder.cpp b/src/decoder.cpp index 48f457f8..c8279ba8 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -51,6 +51,11 @@ void zmq::decoder_t::set_session (session_base_t *session_) session = session_; } +bool zmq::decoder_t::stalled () const +{ + return next == &decoder_t::message_ready; +} + bool zmq::decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. diff --git a/src/decoder.hpp b/src/decoder.hpp index 4afd0185..d648cda8 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -52,9 +52,9 @@ namespace zmq public: inline decoder_base_t (size_t bufsize_) : + next (NULL), read_pos (NULL), to_read (0), - next (NULL), bufsize (bufsize_) { buf = (unsigned char*) malloc (bufsize_); @@ -165,6 +165,11 @@ namespace zmq next = NULL; } + // Next step. If set to NULL, it means that associated data stream + // is dead. Note that there can be still data in the process in such + // case. + step_t next; + private: // Where to store the read data. @@ -173,11 +178,6 @@ namespace zmq // How much data to read before taking next step. size_t to_read; - // Next step. If set to NULL, it means that associated data stream - // is dead. Note that there can be still data in the process in such - // case. - step_t next; - // The duffer for data to decode. size_t bufsize; unsigned char *buf; @@ -197,6 +197,10 @@ namespace zmq void set_session (zmq::session_base_t *session_); + // Returns true if there is a decoded message + // waiting to be delivered to the session. + bool stalled () const; + private: bool one_byte_size_ready (); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 1771990b..b25ff4bb 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -47,6 +47,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : inpos (NULL), insize (0), decoder (in_batch_size, options_.maxmsgsize), + input_error (false), outpos (NULL), outsize (0), encoder (out_batch_size), @@ -55,7 +56,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : options (options_), plugged (false) { - // Get the socket into non-blocking mode. + // Put the socket into non-blocking mode. unblock_socket (s); // Set the socket buffer limits for the underlying socket. @@ -202,8 +203,18 @@ void zmq::stream_engine_t::in_event () session->flush (); } - if (session && disconnection) - error (); + // Input error has occurred. If the last decoded + // message has already been accepted, we terminate + // the engine immediately. Otherwise, we stop + // waiting for input events and postpone the termination + // until after the session has accepted the message. + if (session != NULL && disconnection) { + input_error = true; + if (decoder.stalled ()) + reset_pollin (handle); + else + error (); + } } void zmq::stream_engine_t::out_event () @@ -235,9 +246,11 @@ void zmq::stream_engine_t::out_event () // written should be reasonably modest. int nbytes = write (outpos, outsize); - // Handle problems with the connection. + // IO error has occurred. We stop waiting for output events. + // The engine is not terminated until we detect input error; + // this is necessary to prevent losing incomming messages. if (nbytes == -1) { - error (); + reset_pollout (handle); return; } @@ -258,6 +271,17 @@ void zmq::stream_engine_t::activate_out () void zmq::stream_engine_t::activate_in () { + if (input_error) { + // There was an input error but the engine could not + // be terminated (due to the stalled decoder). + // Flush the pending message and terminate the engine now. + decoder.process_buffer (inpos, 0); + zmq_assert (!decoder.stalled ()); + session->flush (); + error (); + return; + } + set_pollin (handle); // Speculative read. diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 30b190b0..68a5c2ef 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -83,6 +83,7 @@ namespace zmq unsigned char *inpos; size_t insize; decoder_t decoder; + bool input_error; unsigned char *outpos; size_t outsize; From 5227f676f4259050f067386c58e6bcf7f148c333 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 29 Apr 2012 15:27:41 +0200 Subject: [PATCH 3/5] Fix decoder to properly handle large messages The decoder did not properly decode large messages on systems where sizeof (size_t) < sizeof (uint64_t). --- src/decoder.cpp | 41 +++++++++++++++++++++++++---------------- 1 file changed, 25 insertions(+), 16 deletions(-) diff --git a/src/decoder.cpp b/src/decoder.cpp index 48f457f8..fc46e29f 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -21,6 +21,7 @@ #include #include +#include #include "decoder.hpp" #include "session_base.hpp" @@ -91,33 +92,41 @@ bool zmq::decoder_t::one_byte_size_ready () bool zmq::decoder_t::eight_byte_size_ready () { - // 8-byte size is read. Allocate the buffer for message body and - // read the message data into it. - size_t size = (size_t) get_uint64 (tmpbuf); + // 8-byte payload length is read. Allocate the buffer + // for message body and read the message data into it. + const uint64_t payload_length = get_uint64 (tmpbuf); // There has to be at least one byte (the flags) in the message). - if (!size) { + if (payload_length == 0) { decoding_error (); return false; } - // in_progress is initialised at this point so in theory we should - // close it before calling zmq_msg_init_size, however, it's a 0-byte - // message and thus we can treat it as uninitialised... - int rc; - if (maxmsgsize >= 0 && (int64_t) (size - 1) > maxmsgsize) { - rc = -1; - errno = ENOMEM; + // Message size must not exceed the maximum allowed size. + if (maxmsgsize >= 0 && payload_length - 1 > (uint64_t) maxmsgsize) { + decoding_error (); + return false; } - else - rc = in_progress.init_size (size - 1); - if (rc != 0 && errno == ENOMEM) { + + // Message size must fit within range of size_t data type. + if (payload_length - 1 > std::numeric_limits ::max ()) { + decoding_error (); + return false; + } + + const size_t msg_size = static_cast (payload_length - 1); + + // in_progress is initialised at this point so in theory we should + // close it before calling init_size, however, it's a 0-byte + // message and thus we can treat it as uninitialised... + int rc = in_progress.init_size (msg_size); + if (rc != 0) { + errno_assert (errno == ENOMEM); rc = in_progress.init (); errno_assert (rc == 0); decoding_error (); return false; } - errno_assert (rc == 0); next_step (tmpbuf, 1, &decoder_t::flags_ready); return true; @@ -130,7 +139,7 @@ bool zmq::decoder_t::flags_ready () next_step (in_progress.data (), in_progress.size (), &decoder_t::message_ready); - + return true; } From d84709497edc9eba051374874e9318b2e1bacb95 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Mon, 30 Apr 2012 00:48:07 +0200 Subject: [PATCH 4/5] Do not crash when multiple peers connect to PAIR socket When more then one peer connected to a ZMQ_PAIR socket, an application aborted due to assertion failure. This patch changes the ZMQ_PAIR socket behaviour so that it rejects any further connection requests. --- src/pair.cpp | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/pair.cpp b/src/pair.cpp index 6b17b906..3be247ab 100644 --- a/src/pair.cpp +++ b/src/pair.cpp @@ -38,14 +38,20 @@ zmq::pair_t::~pair_t () void zmq::pair_t::xattach_pipe (pipe_t *pipe_, bool icanhasall_) { - zmq_assert (!pipe); - pipe = pipe_; + zmq_assert (pipe_ != NULL); + + // ZMQ_PAIR socket can only be connected to a single peer. + // The socket rejects any further connection requests. + if (pipe == NULL) + pipe = pipe_; + else + pipe_->terminate (false); } void zmq::pair_t::xterminated (pipe_t *pipe_) { - zmq_assert (pipe_ == pipe); - pipe = NULL; + if (pipe_ == pipe) + pipe = NULL; } void zmq::pair_t::xread_activated (pipe_t *pipe_) From 1075005b500b86aa5ca828198bfedcd0067946fd Mon Sep 17 00:00:00 2001 From: Ian Barber Date: Thu, 3 May 2012 13:24:12 +0100 Subject: [PATCH 5/5] Patch from Mato that fixes a subtle connect bug: EAGAIN was being used as a translation value for EINPROGRESS, thus shadowing a real EAGAIN return value from the OS. This caused later assertions of "Invalid argument" in stream_engine.cpp when it attempted to use a socket which was not connected. I also add EINTR to mean EINPROGRESS, as per the POSIX and FreeBSD documentation which specifies that a connect() call interrupted due to a signal will complete asynchronously. Signed-off-by: Martin Lucina --- src/ipc_connecter.cpp | 7 +++++++ src/tcp_connecter.cpp | 11 ++++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 2312866d..7c43424e 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -186,6 +186,13 @@ int zmq::ipc_connecter_t::open () // Connect was successfull immediately. if (rc == 0) return 0; + + // Translate other error codes indicating asynchronous connect has been + // launched to a uniform EINPROGRESS. + if (rc == -1 && errno == EINTR) { + errno = EINPROGRESS; + return -1; + } // Forward the error. return -1; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index a91c4145..0b57903e 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -140,7 +140,7 @@ void zmq::tcp_connecter_t::start_connecting () } // Connection establishment may be delayed. Poll for its completion. - else if (rc == -1 && errno == EAGAIN) { + else if (rc == -1 && errno == EINPROGRESS) { handle = add_fd (s); handle_valid = true; set_pollout (handle); @@ -211,17 +211,18 @@ int zmq::tcp_connecter_t::open () if (rc == 0) return 0; - // Asynchronous connect was launched. + // Translate other error codes indicating asynchronous connect has been + // launched to a uniform EINPROGRESS. #ifdef ZMQ_HAVE_WINDOWS if (rc == SOCKET_ERROR && (WSAGetLastError () == WSAEINPROGRESS || WSAGetLastError () == WSAEWOULDBLOCK)) { - errno = EAGAIN; + errno = EINPROGRESS; return -1; } wsa_error_to_errno (); #else - if (rc == -1 && errno == EINPROGRESS) { - errno = EAGAIN; + if (rc == -1 && errno == EINTR) { + errno = EINPROGRESS; return -1; } #endif