/* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "platform.hpp" #if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include #include #include #include #include #include #include #endif #include #include #include "stream_engine.hpp" #include "io_thread.hpp" #include "session_base.hpp" #include "config.hpp" #include "err.hpp" #include "ip.hpp" zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_) : s (fd_), inpos (NULL), insize (0), decoder (in_batch_size, options_.maxmsgsize), input_error (false), outpos (NULL), outsize (0), encoder (out_batch_size), session (NULL), options (options_), plugged (false) { // Put the socket into non-blocking mode. unblock_socket (s); // Set the socket buffer limits for the underlying socket. if (options.sndbuf) { int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, (char*) &options.sndbuf, sizeof (int)); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else errno_assert (rc == 0); #endif } if (options.rcvbuf) { int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, (char*) &options.rcvbuf, sizeof (int)); #ifdef ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else errno_assert (rc == 0); #endif } #if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD // Make sure that SIGPIPE signal is not generated when writing to a // connection that was already closed by the peer. int set = 1; int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); errno_assert (rc == 0); #endif } zmq::stream_engine_t::~stream_engine_t () { zmq_assert (!plugged); if (s != retired_fd) { #ifdef ZMQ_HAVE_WINDOWS int rc = closesocket (s); wsa_assert (rc != SOCKET_ERROR); #else int rc = close (s); errno_assert (rc == 0); #endif s = retired_fd; } } void zmq::stream_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) { zmq_assert (!plugged); plugged = true; // Connect to session object. zmq_assert (!session); zmq_assert (session_); encoder.set_session (session_); decoder.set_session (session_); session = session_; session->get_address (endpoint); // Connect to I/O threads poller object. io_object_t::plug (io_thread_); handle = add_fd (s); set_pollin (handle); set_pollout (handle); // Flush all the data that may have been already received downstream. in_event (); } void zmq::stream_engine_t::unplug () { zmq_assert (plugged); plugged = false; // Cancel all fd subscriptions. rm_fd (handle); // Disconnect from I/O threads poller object. io_object_t::unplug (); // Disconnect from session object. encoder.set_session (NULL); decoder.set_session (NULL); session = NULL; endpoint.clear(); } void zmq::stream_engine_t::terminate () { unplug (); delete this; } void zmq::stream_engine_t::in_event () { bool disconnection = false; // If there's no data to process in the buffer... if (!insize) { // Retrieve the buffer and read as much data as possible. // Note that buffer can be arbitrarily large. However, we assume // the underlying TCP layer has fixed buffer size and thus the // number of bytes read will be always limited. decoder.get_buffer (&inpos, &insize); insize = read (inpos, insize); // Check whether the peer has closed the connection. if (insize == (size_t) -1) { insize = 0; disconnection = true; } } // Push the data to the decoder. size_t processed = decoder.process_buffer (inpos, insize); if (unlikely (processed == (size_t) -1)) { disconnection = true; } else { // Stop polling for input if we got stuck. if (processed < insize) reset_pollin (handle); // Adjust the buffer. inpos += processed; insize -= processed; } // Flush all messages the decoder may have produced. session->flush (); // Input error has occurred. If the last decoded // message has already been accepted, we terminate // the engine immediately. Otherwise, we stop // waiting for input events and postpone the termination // until after the session has accepted the message. if (disconnection) { input_error = true; if (decoder.stalled ()) reset_pollin (handle); else error (); } } void zmq::stream_engine_t::out_event () { // If write buffer is empty, try to read new data from the encoder. if (!outsize) { outpos = NULL; encoder.get_data (&outpos, &outsize); // If there is no data to send, stop polling for output. if (outsize == 0) { reset_pollout (handle); return; } } // If there are any data to write in write buffer, write as much as // possible to the socket. Note that amount of data to write can be // arbitratily large. However, we assume that underlying TCP layer has // limited transmission buffer and thus the actual number of bytes // written should be reasonably modest. int nbytes = write (outpos, outsize); // IO error has occurred. We stop waiting for output events. // The engine is not terminated until we detect input error; // this is necessary to prevent losing incomming messages. if (nbytes == -1) { reset_pollout (handle); return; } outpos += nbytes; outsize -= nbytes; } void zmq::stream_engine_t::activate_out () { set_pollout (handle); // Speculative write: The assumption is that at the moment new message // was sent by the user the socket is probably available for writing. // Thus we try to write the data to socket avoiding polling for POLLOUT. // Consequently, the latency should be better in request/reply scenarios. out_event (); } void zmq::stream_engine_t::activate_in () { if (input_error) { // There was an input error but the engine could not // be terminated (due to the stalled decoder). // Flush the pending message and terminate the engine now. decoder.process_buffer (inpos, 0); zmq_assert (!decoder.stalled ()); session->flush (); error (); return; } set_pollin (handle); // Speculative read. in_event (); } void zmq::stream_engine_t::error () { zmq_assert (session); session->monitor_event (ZMQ_EVENT_DISCONNECTED, endpoint.c_str(), s); session->detach (); unplug (); delete this; } int zmq::stream_engine_t::write (const void *data_, size_t size_) { #ifdef ZMQ_HAVE_WINDOWS int nbytes = send (s, (char*) data_, (int) size_, 0); // If not a single byte can be written to the socket in non-blocking mode // we'll get an error (this may happen during the speculative write). if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) return 0; // Signalise peer failure. if (nbytes == -1 && ( WSAGetLastError () == WSAENETDOWN || WSAGetLastError () == WSAENETRESET || WSAGetLastError () == WSAEHOSTUNREACH || WSAGetLastError () == WSAECONNABORTED || WSAGetLastError () == WSAETIMEDOUT || WSAGetLastError () == WSAECONNRESET)) return -1; wsa_assert (nbytes != SOCKET_ERROR); return nbytes; #else ssize_t nbytes = send (s, data_, size_, 0); // Several errors are OK. When speculative write is being done we may not // be able to write a single byte from the socket. Also, SIGSTOP issued // by a debugging tool can result in EINTR error. if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) return 0; // Signalise peer failure. if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE || errno == ETIMEDOUT)) return -1; errno_assert (nbytes != -1); return (size_t) nbytes; #endif } int zmq::stream_engine_t::read (void *data_, size_t size_) { #ifdef ZMQ_HAVE_WINDOWS int nbytes = recv (s, (char*) data_, (int) size_, 0); // If not a single byte can be read from the socket in non-blocking mode // we'll get an error (this may happen during the speculative read). if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) return 0; // Connection failure. if (nbytes == -1 && ( WSAGetLastError () == WSAENETDOWN || WSAGetLastError () == WSAENETRESET || WSAGetLastError () == WSAECONNABORTED || WSAGetLastError () == WSAETIMEDOUT || WSAGetLastError () == WSAECONNRESET || WSAGetLastError () == WSAECONNREFUSED || WSAGetLastError () == WSAENOTCONN)) return -1; wsa_assert (nbytes != SOCKET_ERROR); // Orderly shutdown by the other peer. if (nbytes == 0) return -1; return nbytes; #else ssize_t nbytes = recv (s, data_, size_, 0); // Several errors are OK. When speculative read is being done we may not // be able to read a single byte from the socket. Also, SIGSTOP issued // by a debugging tool can result in EINTR error. if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)) return 0; // Signalise peer failure. if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED || errno == ETIMEDOUT || errno == EHOSTUNREACH || errno == ENOTCONN)) return -1; errno_assert (nbytes != -1); // Orderly shutdown by the peer. if (nbytes == 0) return -1; return (size_t) nbytes; #endif }