diff --git a/src/Makefile.am b/src/Makefile.am index b02e3027..13541748 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -60,8 +60,8 @@ libzmq_la_SOURCES = \ stdint.hpp \ sub.hpp \ tcp_connecter.hpp \ + tcp_engine.hpp \ tcp_listener.hpp \ - tcp_socket.hpp \ thread.hpp \ trie.hpp \ windows.hpp \ @@ -72,9 +72,6 @@ libzmq_la_SOURCES = \ xsub.hpp \ ypipe.hpp \ yqueue.hpp \ - zmq_connecter.hpp \ - zmq_engine.hpp \ - zmq_listener.hpp \ clock.cpp \ ctx.cpp \ decoder.cpp \ @@ -116,8 +113,8 @@ libzmq_la_SOURCES = \ socket_base.cpp \ sub.cpp \ tcp_connecter.cpp \ + tcp_engine.cpp \ tcp_listener.cpp \ - tcp_socket.cpp \ thread.cpp \ trie.cpp \ xpub.cpp \ @@ -125,9 +122,6 @@ libzmq_la_SOURCES = \ xreq.cpp \ xsub.cpp \ zmq.cpp \ - zmq_connecter.cpp \ - zmq_engine.cpp \ - zmq_listener.cpp \ zmq_utils.cpp if ON_MINGW diff --git a/src/session.cpp b/src/session.cpp index ed9b44ed..d14b58c8 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -24,7 +24,7 @@ #include "err.hpp" #include "pipe.hpp" #include "likely.hpp" -#include "zmq_connecter.hpp" +#include "tcp_connecter.hpp" #include "pgm_sender.hpp" #include "pgm_receiver.hpp" @@ -307,7 +307,7 @@ void zmq::session_t::start_connecting (bool wait_) // Both TCP and IPC transports are using the same infrastructure. if (protocol == "tcp" || protocol == "ipc") { - zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t ( + tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t ( io_thread, this, options, protocol.c_str (), address.c_str (), wait_); alloc_assert (connecter); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 2ec3998d..b97e3a79 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -34,8 +34,8 @@ #endif #include "socket_base.hpp" -#include "zmq_listener.hpp" -#include "zmq_connecter.hpp" +#include "tcp_listener.hpp" +#include "tcp_connecter.hpp" #include "io_thread.hpp" #include "session.hpp" #include "config.hpp" @@ -348,7 +348,7 @@ int zmq::socket_base_t::bind (const char *addr_) } // Create and run the listener. - zmq_listener_t *listener = new (std::nothrow) zmq_listener_t ( + tcp_listener_t *listener = new (std::nothrow) tcp_listener_t ( io_thread, this, options); alloc_assert (listener); int rc = listener->set_address (protocol.c_str(), address.c_str ()); diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 01d80cc3..678d4884 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -18,30 +18,168 @@ along with this program. If not, see . */ -#include - +#include #include #include "tcp_connecter.hpp" +#include "tcp_engine.hpp" +#include "io_thread.hpp" #include "platform.hpp" #include "ip.hpp" #include "err.hpp" -#ifdef ZMQ_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef ZMQ_HAVE_OPENVMS +#include +#endif +#endif -zmq::tcp_connecter_t::tcp_connecter_t () : - s (retired_fd) +zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, + class session_t *session_, const options_t &options_, + const char *protocol_, const char *address_, bool wait_) : + own_t (io_thread_, options_), + io_object_t (io_thread_), + s (retired_fd), + handle_valid (false), + wait (wait_), + session (session_), + current_reconnect_ivl(options.reconnect_ivl) { memset (&addr, 0, sizeof (addr)); addr_len = 0; + + int rc = set_address (protocol_, address_); + zmq_assert (rc == 0); //TODO: take care ENOMEM, EINVAL } zmq::tcp_connecter_t::~tcp_connecter_t () { + if (wait) + cancel_timer (reconnect_timer_id); + if (handle_valid) + rm_fd (handle); + if (s != retired_fd) close (); } +void zmq::tcp_connecter_t::process_plug () +{ + if (wait) + add_reconnect_timer(); + else + start_connecting (); +} + +void zmq::tcp_connecter_t::in_event () +{ + // We are not polling for incomming data, so we are actually called + // because of error here. However, we can get error on out event as well + // on some platforms, so we'll simply handle both events in the same way. + out_event (); +} + +void zmq::tcp_connecter_t::out_event () +{ + fd_t fd = connect (); + rm_fd (handle); + handle_valid = false; + + // Handle the error condition by attempt to reconnect. + if (fd == retired_fd) { + close (); + wait = true; + add_reconnect_timer(); + return; + } + + // Create the engine object for this connection. + tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options); + alloc_assert (engine); + + // Attach the engine to the corresponding session object. + send_attach (session, engine); + + // Shut the connecter down. + terminate (); +} + +void zmq::tcp_connecter_t::timer_event (int id_) +{ + zmq_assert (id_ == reconnect_timer_id); + wait = false; + start_connecting (); +} + +void zmq::tcp_connecter_t::start_connecting () +{ + // Open the connecting socket. + int rc = open (); + + // Connect may succeed in synchronous manner. + if (rc == 0) { + handle = add_fd (s); + handle_valid = true; + out_event (); + return; + } + + // Connection establishment may be dealyed. Poll for its completion. + else if (rc == -1 && errno == EAGAIN) { + handle = add_fd (s); + handle_valid = true; + set_pollout (handle); + return; + } + + // Handle any other error condition by eventual reconnect. + wait = true; + add_reconnect_timer(); +} + +void zmq::tcp_connecter_t::add_reconnect_timer() +{ + add_timer (get_new_reconnect_ivl(), reconnect_timer_id); +} + +int zmq::tcp_connecter_t::get_new_reconnect_ivl () +{ +#if defined ZMQ_HAVE_WINDOWS + int pid = (int) GetCurrentProcessId (); +#else + int pid = (int) getpid (); +#endif + + // The new interval is the current interval + random value. + int this_interval = current_reconnect_ivl + + ((pid * 13) % options.reconnect_ivl); + + // Only change the current reconnect interval if the maximum reconnect + // interval was set and if it's larger than the reconnect interval. + if (options.reconnect_ivl_max > 0 && + options.reconnect_ivl_max > options.reconnect_ivl) { + + // Calculate the next interval + current_reconnect_ivl = current_reconnect_ivl * 2; + if(current_reconnect_ivl >= options.reconnect_ivl_max) { + current_reconnect_ivl = options.reconnect_ivl_max; + } + } + return this_interval; +} + +#ifdef ZMQ_HAVE_WINDOWS + int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_) { if (strcmp (protocol_, "tcp") == 0) @@ -100,11 +238,6 @@ int zmq::tcp_connecter_t::close () return 0; } -zmq::fd_t zmq::tcp_connecter_t::get_fd () -{ - return s; -} - zmq::fd_t zmq::tcp_connecter_t::connect () { // Nonblocking connect have finished. Check whether an error occured. @@ -132,30 +265,6 @@ zmq::fd_t zmq::tcp_connecter_t::connect () #else -#include -#include -#include -#include -#include -#include -#include - -#ifdef ZMQ_HAVE_OPENVMS -#include -#endif - -zmq::tcp_connecter_t::tcp_connecter_t () : - s (retired_fd) -{ - memset (&addr, 0, sizeof (addr)); -} - -zmq::tcp_connecter_t::~tcp_connecter_t () -{ - if (s != retired_fd) - close (); -} - int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_) { if (strcmp (protocol_, "tcp") == 0) @@ -271,11 +380,6 @@ int zmq::tcp_connecter_t::close () return 0; } -zmq::fd_t zmq::tcp_connecter_t::get_fd () -{ - return s; -} - zmq::fd_t zmq::tcp_connecter_t::connect () { // Following code should handle both Berkeley-derived socket diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 06641e55..2168ca7b 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -21,28 +21,50 @@ #ifndef __ZMQ_TCP_CONNECTER_HPP_INCLUDED__ #define __ZMQ_TCP_CONNECTER_HPP_INCLUDED__ -#include "platform.hpp" #include "fd.hpp" - -#ifdef ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include -#include -#endif +#include "ip.hpp" +#include "own.hpp" +#include "io_object.hpp" +#include "stdint.hpp" namespace zmq { - // The class encapsulating simple TCP listening socket. - - class tcp_connecter_t + class tcp_connecter_t : public own_t, public io_object_t { public: - tcp_connecter_t (); + // If 'wait' is true connecter first waits for a while, then starts + // connection process. + tcp_connecter_t (class io_thread_t *io_thread_, + class session_t *session_, const options_t &options_, + const char *protocol_, const char *address_, bool delay_); ~tcp_connecter_t (); + private: + + // ID of the timer used to delay the reconnection. + enum {reconnect_timer_id = 1}; + + // Handlers for incoming commands. + void process_plug (); + + // Handlers for I/O events. + void in_event (); + void out_event (); + void timer_event (int id_); + + // Internal function to start the actual connection establishment. + void start_connecting (); + + // Internal function to add a reconnect timer + void add_reconnect_timer(); + + // Internal function to return a reconnect backoff delay. + // Will modify the current_reconnect_ivl used for next call + // Returns the currently used interval + int get_new_reconnect_ivl (); + // Set address to connect to. int set_address (const char *protocol, const char *addr_); @@ -55,16 +77,10 @@ namespace zmq // Close the connecting socket. int close (); - // Get the file descriptor to poll on to get notified about - // connection success. - fd_t get_fd (); - // Get the file descriptor of newly created connection. Returns // retired_fd if the connection was unsuccessfull. fd_t connect (); - private: - // Address to connect to. sockaddr_storage addr; socklen_t addr_len; @@ -72,6 +88,22 @@ namespace zmq // Underlying socket. fd_t s; + // Handle corresponding to the listening socket. + handle_t handle; + + // If true file descriptor is registered with the poller and 'handle' + // contains valid value. + bool handle_valid; + + // If true, connecter is waiting a while before trying to connect. + bool wait; + + // Reference to the session we belong to. + class session_t *session; + + // Current reconnect ivl, updated for backoff strategy + int current_reconnect_ivl; + tcp_connecter_t (const tcp_connecter_t&); const tcp_connecter_t &operator = (const tcp_connecter_t&); }; diff --git a/src/tcp_engine.cpp b/src/tcp_engine.cpp new file mode 100644 index 00000000..1972809a --- /dev/null +++ b/src/tcp_engine.cpp @@ -0,0 +1,401 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "platform.hpp" +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#include +#include +#include +#endif + +#include +#include + +#include "tcp_engine.hpp" +#include "io_thread.hpp" +#include "session.hpp" +#include "config.hpp" +#include "err.hpp" + +zmq::tcp_engine_t::tcp_engine_t (fd_t fd_, const options_t &options_) : + s (retired_fd), + inpos (NULL), + insize (0), + decoder (in_batch_size, options_.maxmsgsize), + outpos (NULL), + outsize (0), + encoder (out_batch_size), + session (NULL), + leftover_session (NULL), + options (options_), + plugged (false) +{ + // Initialise the underlying socket. + int rc = open (fd_, options.sndbuf, options.rcvbuf); + zmq_assert (rc == 0); +} + +zmq::tcp_engine_t::~tcp_engine_t () +{ + zmq_assert (!plugged); + + if (s != retired_fd) + close (); +} + +void zmq::tcp_engine_t::plug (io_thread_t *io_thread_, session_t *session_) +{ + zmq_assert (!plugged); + plugged = true; + leftover_session = NULL; + + // Connect to session object. + zmq_assert (!session); + zmq_assert (session_); + encoder.set_session (session_); + decoder.set_session (session_); + session = session_; + + // Connect to I/O threads poller object. + io_object_t::plug (io_thread_); + handle = add_fd (s); + set_pollin (handle); + set_pollout (handle); + + // Flush all the data that may have been already received downstream. + in_event (); +} + +void zmq::tcp_engine_t::unplug () +{ + zmq_assert (plugged); + plugged = false; + + // Cancel all fd subscriptions. + rm_fd (handle); + + // Disconnect from I/O threads poller object. + io_object_t::unplug (); + + // Disconnect from session object. + encoder.set_session (NULL); + decoder.set_session (NULL); + leftover_session = session; + session = NULL; +} + +void zmq::tcp_engine_t::terminate () +{ + unplug (); + delete this; +} + +void zmq::tcp_engine_t::in_event () +{ + bool disconnection = false; + + // If there's no data to process in the buffer... + if (!insize) { + + // Retrieve the buffer and read as much data as possible. + // Note that buffer can be arbitrarily large. However, we assume + // the underlying TCP layer has fixed buffer size and thus the + // number of bytes read will be always limited. + decoder.get_buffer (&inpos, &insize); + insize = read (inpos, insize); + + // Check whether the peer has closed the connection. + if (insize == (size_t) -1) { + insize = 0; + disconnection = true; + } + } + + // Push the data to the decoder. + size_t processed = decoder.process_buffer (inpos, insize); + + if (unlikely (processed == (size_t) -1)) { + disconnection = true; + } + else { + + // Stop polling for input if we got stuck. + if (processed < insize) { + + // This may happen if queue limits are in effect. + if (plugged) + reset_pollin (handle); + } + + // Adjust the buffer. + inpos += processed; + insize -= processed; + } + + // Flush all messages the decoder may have produced. + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (leftover_session); + leftover_session->flush (); + } else { + session->flush (); + } + + if (session && disconnection) + error (); +} + +void zmq::tcp_engine_t::out_event () +{ + // If write buffer is empty, try to read new data from the encoder. + if (!outsize) { + + outpos = NULL; + encoder.get_data (&outpos, &outsize); + + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (leftover_session); + leftover_session->flush (); + return; + } + + // If there is no data to send, stop polling for output. + if (outsize == 0) { + reset_pollout (handle); + return; + } + } + + // If there are any data to write in write buffer, write as much as + // possible to the socket. Note that amount of data to write can be + // arbitratily large. However, we assume that underlying TCP layer has + // limited transmission buffer and thus the actual number of bytes + // written should be reasonably modest. + int nbytes = write (outpos, outsize); + + // Handle problems with the connection. + if (nbytes == -1) { + error (); + return; + } + + outpos += nbytes; + outsize -= nbytes; +} + +void zmq::tcp_engine_t::activate_out () +{ + set_pollout (handle); + + // Speculative write: The assumption is that at the moment new message + // was sent by the user the socket is probably available for writing. + // Thus we try to write the data to socket avoiding polling for POLLOUT. + // Consequently, the latency should be better in request/reply scenarios. + out_event (); +} + +void zmq::tcp_engine_t::activate_in () +{ + set_pollin (handle); + + // Speculative read. + in_event (); +} + +void zmq::tcp_engine_t::error () +{ + zmq_assert (session); + session->detach (); + unplug (); + delete this; +} + +#ifdef ZMQ_HAVE_WINDOWS + +int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) +{ + zmq_assert (s == retired_fd); + s = fd_; + + if (sndbuf_) { + int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, + (char*) &sndbuf_, sizeof (int)); + errno_assert (rc == 0); + } + + if (rcvbuf_) { + int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, + (char*) &rcvbuf_, sizeof (int)); + errno_assert (rc == 0); + } + + return 0; +} + +int zmq::tcp_engine_t::close () +{ + zmq_assert (s != retired_fd); + int rc = closesocket (s); + wsa_assert (rc != SOCKET_ERROR); + s = retired_fd; + return 0; +} + +int zmq::tcp_engine_t::write (const void *data_, size_t size_) +{ + int nbytes = send (s, (char*) data_, (int) size_, 0); + + // If not a single byte can be written to the socket in non-blocking mode + // we'll get an error (this may happen during the speculative write). + if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) + return 0; + + // Signalise peer failure. + if (nbytes == -1 && ( + WSAGetLastError () == WSAENETDOWN || + WSAGetLastError () == WSAENETRESET || + WSAGetLastError () == WSAEHOSTUNREACH || + WSAGetLastError () == WSAECONNABORTED || + WSAGetLastError () == WSAETIMEDOUT || + WSAGetLastError () == WSAECONNRESET)) + return -1; + + wsa_assert (nbytes != SOCKET_ERROR); + + return (size_t) nbytes; +} + +int zmq::tcp_engine_t::read (void *data_, size_t size_) +{ + int nbytes = recv (s, (char*) data_, (int) size_, 0); + + // If not a single byte can be read from the socket in non-blocking mode + // we'll get an error (this may happen during the speculative read). + if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) + return 0; + + // Connection failure. + if (nbytes == -1 && ( + WSAGetLastError () == WSAENETDOWN || + WSAGetLastError () == WSAENETRESET || + WSAGetLastError () == WSAECONNABORTED || + WSAGetLastError () == WSAETIMEDOUT || + WSAGetLastError () == WSAECONNRESET || + WSAGetLastError () == WSAECONNREFUSED || + WSAGetLastError () == WSAENOTCONN)) + return -1; + + wsa_assert (nbytes != SOCKET_ERROR); + + // Orderly shutdown by the other peer. + if (nbytes == 0) + return -1; + + return (size_t) nbytes; +} + +#else + +int zmq::tcp_engine_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) +{ + assert (s == retired_fd); + s = fd_; + + if (sndbuf_) { + int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sndbuf_, sizeof (int)); + errno_assert (rc == 0); + } + + if (rcvbuf_) { + int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof (int)); + errno_assert (rc == 0); + } + +#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD + int set = 1; + int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); + errno_assert (rc == 0); +#endif + + return 0; +} + +int zmq::tcp_engine_t::close () +{ + zmq_assert (s != retired_fd); + int rc = ::close (s); + if (rc != 0) + return -1; + s = retired_fd; + return 0; +} + +int zmq::tcp_engine_t::write (const void *data_, size_t size_) +{ + ssize_t nbytes = send (s, data_, size_, 0); + + // Several errors are OK. When speculative write is being done we may not + // be able to write a single byte to the socket. Also, SIGSTOP issued + // by a debugging tool can result in EINTR error. + if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || + errno == EINTR)) + return 0; + + // Signalise peer failure. + if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE)) + return -1; + + errno_assert (nbytes != -1); + return (size_t) nbytes; +} + +int zmq::tcp_engine_t::read (void *data_, size_t size_) +{ + ssize_t nbytes = recv (s, data_, size_, 0); + + // Several errors are OK. When speculative read is being done we may not + // be able to read a single byte to the socket. Also, SIGSTOP issued + // by a debugging tool can result in EINTR error. + if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || + errno == EINTR)) + return 0; + + // Signalise peer failure. + if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED || + errno == ETIMEDOUT || errno == EHOSTUNREACH)) + return -1; + + errno_assert (nbytes != -1); + + // Orderly shutdown by the peer. + if (nbytes == 0) + return -1; + + return (size_t) nbytes; +} + +#endif diff --git a/src/zmq_engine.hpp b/src/tcp_engine.hpp similarity index 61% rename from src/zmq_engine.hpp rename to src/tcp_engine.hpp index ad4bc8aa..db17122e 100644 --- a/src/zmq_engine.hpp +++ b/src/tcp_engine.hpp @@ -18,16 +18,14 @@ along with this program. If not, see . */ -#ifndef __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ -#define __ZMQ_ZMQ_ENGINE_HPP_INCLUDED__ +#ifndef __ZMQ_TCP_ENGINE_HPP_INCLUDED__ +#define __ZMQ_TCP_ENGINE_HPP_INCLUDED__ #include -#include - +#include "fd.hpp" #include "i_engine.hpp" #include "io_object.hpp" -#include "tcp_socket.hpp" #include "encoder.hpp" #include "decoder.hpp" #include "options.hpp" @@ -35,12 +33,12 @@ namespace zmq { - class zmq_engine_t : public io_object_t, public i_engine + class tcp_engine_t : public io_object_t, public i_engine { public: - zmq_engine_t (fd_t fd_, const options_t &options_); - ~zmq_engine_t (); + tcp_engine_t (fd_t fd_, const options_t &options_); + ~tcp_engine_t (); // i_engine interface implementation. void plug (class io_thread_t *io_thread_, class session_t *session_); @@ -58,7 +56,26 @@ namespace zmq // Function to handle network disconnections. void error (); - tcp_socket_t tcp_socket; + // Associates a socket with a native socket descriptor. + int open (fd_t fd_, int sndbuf_, int rcvbuf_); + + // Closes the underlying socket. + int close (); + + // Writes data to the socket. Returns the number of bytes actually + // written (even zero is to be considered to be a success). In case + // of error or orderly shutdown by the other peer -1 is returned. + int write (const void *data_, size_t size_); + + // Reads data from the socket (up to 'size' bytes). Returns the number + // of bytes actually read (even zero is to be considered to be + // a success). In case of error or orderly shutdown by the other + // peer -1 is returned. + int read (void *data_, size_t size_); + + // Underlying socket. + fd_t s; + handle_t handle; unsigned char *inpos; @@ -79,8 +96,8 @@ namespace zmq bool plugged; - zmq_engine_t (const zmq_engine_t&); - const zmq_engine_t &operator = (const zmq_engine_t&); + tcp_engine_t (const tcp_engine_t&); + const tcp_engine_t &operator = (const tcp_engine_t&); }; } diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index f40b0fe5..b4098913 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -18,19 +18,42 @@ along with this program. If not, see . */ +#include + #include #include "tcp_listener.hpp" #include "platform.hpp" -#include "ip.hpp" +#include "tcp_engine.hpp" +#include "io_thread.hpp" +#include "session.hpp" #include "config.hpp" #include "err.hpp" #ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#include +#include +#include +#ifndef ZMQ_HAVE_OPENVMS +#include +#else +#include +#endif +#endif -zmq::tcp_listener_t::tcp_listener_t () : +zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_, + socket_base_t *socket_, const options_t &options_) : + own_t (io_thread_, options_), + io_object_t (io_thread_), has_file (false), - s (retired_fd) + s (retired_fd), + socket (socket_) { memset (&addr, 0, sizeof (addr)); addr_len = 0; @@ -42,8 +65,48 @@ zmq::tcp_listener_t::~tcp_listener_t () close (); } -int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, - int backlog_) +void zmq::tcp_listener_t::process_plug () +{ + // Start polling for incoming connections. + handle = add_fd (s); + set_pollin (handle); +} + +void zmq::tcp_listener_t::process_term (int linger_) +{ + rm_fd (handle); + own_t::process_term (linger_); +} + +void zmq::tcp_listener_t::in_event () +{ + fd_t fd = accept (); + + // If connection was reset by the peer in the meantime, just ignore it. + // TODO: Handle specific errors like ENFILE/EMFILE etc. + if (fd == retired_fd) + return; + // Create the engine object for this connection. + tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options); + alloc_assert (engine); + + // Choose I/O thread to run connecter in. Given that we are already + // running in an I/O thread, there must be at least one available. + io_thread_t *io_thread = choose_io_thread (options.affinity); + zmq_assert (io_thread); + + // Create and launch a session object. + session_t *session = new (std::nothrow) + session_t (io_thread, false, socket, options, NULL, NULL); + alloc_assert (session); + session->inc_seqnum (); + launch_child (session); + send_attach (session, engine, false); +} + +#ifdef ZMQ_HAVE_WINDOWS + +int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) { // IPC protocol is not supported on Windows platform. if (strcmp (protocol_, "tcp") != 0 ) { @@ -57,7 +120,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, return rc; // Create a listening socket. - s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); + s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); if (s == INVALID_SOCKET) { wsa_error_to_errno (); return -1; @@ -82,7 +145,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, } // Listen for incomming connections. - rc = listen (s, backlog_); + rc = listen (s, options.backlog); if (rc == SOCKET_ERROR) { wsa_error_to_errno (); return -1; @@ -100,11 +163,6 @@ int zmq::tcp_listener_t::close () return 0; } -zmq::fd_t zmq::tcp_listener_t::get_fd () -{ - return s; -} - zmq::fd_t zmq::tcp_listener_t::accept () { zmq_assert (s != retired_fd); @@ -134,37 +192,7 @@ zmq::fd_t zmq::tcp_listener_t::accept () #else -#include -#include -#include -#include -#include -#include -#include - -#ifndef ZMQ_HAVE_OPENVMS -#include -#endif - -#ifdef ZMQ_HAVE_OPENVMS -#include -#endif - -zmq::tcp_listener_t::tcp_listener_t () : - has_file (false), - s (retired_fd) -{ - memset (&addr, 0, sizeof (addr)); -} - -zmq::tcp_listener_t::~tcp_listener_t () -{ - if (s != retired_fd) - close (); -} - -int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, - int backlog_) +int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_) { if (strcmp (protocol_, "tcp") == 0 ) { @@ -174,7 +202,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, return -1; // Create a listening socket. - s = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); + s = ::socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); if (s == -1) return -1; @@ -207,7 +235,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, } // Listen for incomming connections. - rc = listen (s, backlog_); + rc = listen (s, options.backlog); if (rc != 0) { int err = errno; if (close () != 0) @@ -231,7 +259,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, return -1; // Create a listening socket. - s = socket (AF_UNIX, SOCK_STREAM, 0); + s = ::socket (AF_UNIX, SOCK_STREAM, 0); if (s == -1) return -1; @@ -254,7 +282,7 @@ int zmq::tcp_listener_t::set_address (const char *protocol_, const char *addr_, has_file = true; // Listen for incomming connections. - rc = listen (s, backlog_); + rc = listen (s, options.backlog); if (rc != 0) { int err = errno; if (close () != 0) @@ -294,11 +322,6 @@ int zmq::tcp_listener_t::close () return 0; } -zmq::fd_t zmq::tcp_listener_t::get_fd () -{ - return s; -} - zmq::fd_t zmq::tcp_listener_t::accept () { zmq_assert (s != retired_fd); diff --git a/src/tcp_listener.hpp b/src/tcp_listener.hpp index 27e30921..2dd43ceb 100644 --- a/src/tcp_listener.hpp +++ b/src/tcp_listener.hpp @@ -23,37 +23,41 @@ #include "fd.hpp" #include "ip.hpp" +#include "own.hpp" +#include "io_object.hpp" +#include "stdint.hpp" namespace zmq { - // The class encapsulating simple TCP listening socket. - - class tcp_listener_t + class tcp_listener_t : public own_t, public io_object_t { public: - tcp_listener_t (); + tcp_listener_t (class io_thread_t *io_thread_, + class socket_base_t *socket_, const options_t &options_); ~tcp_listener_t (); - // Start listening on the interface. - int set_address (const char *protocol_, const char *addr_, - int backlog_); + // Set address to listen on. + int set_address (const char* protocol_, const char *addr_); + + private: + + // Handlers for incoming commands. + void process_plug (); + void process_term (int linger_); + + // Handlers for I/O events. + void in_event (); // Close the listening socket. int close (); - // Get the file descriptor to poll on to get notified about - // newly created connections. - fd_t get_fd (); - // Accept the new connection. Returns the file descriptor of the // newly created connection. The function may return retired_fd // if the connection was dropped while waiting in the listen backlog. fd_t accept (); - private: - // Address to listen on. sockaddr_storage addr; socklen_t addr_len; @@ -64,6 +68,12 @@ namespace zmq // Underlying socket. fd_t s; + // Handle corresponding to the listening socket. + handle_t handle; + + // Socket the listerner belongs to. + class socket_base_t *socket; + tcp_listener_t (const tcp_listener_t&); const tcp_listener_t &operator = (const tcp_listener_t&); }; diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp deleted file mode 100644 index bd3f117f..00000000 --- a/src/tcp_socket.cpp +++ /dev/null @@ -1,230 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "tcp_socket.hpp" -#include "platform.hpp" -#include "err.hpp" - -#ifdef ZMQ_HAVE_WINDOWS - -zmq::tcp_socket_t::tcp_socket_t () : - s (retired_fd) -{ -} - -zmq::tcp_socket_t::~tcp_socket_t () -{ - if (s != retired_fd) - close (); -} - -int zmq::tcp_socket_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) -{ - zmq_assert (s == retired_fd); - s = fd_; - - if (sndbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, - (char*) &sndbuf_, sizeof (int)); - errno_assert (rc == 0); - } - - if (rcvbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, - (char*) &rcvbuf_, sizeof (int)); - errno_assert (rc == 0); - } - - return 0; -} - -int zmq::tcp_socket_t::close () -{ - zmq_assert (s != retired_fd); - int rc = closesocket (s); - wsa_assert (rc != SOCKET_ERROR); - s = retired_fd; - return 0; -} - -zmq::fd_t zmq::tcp_socket_t::get_fd () -{ - return s; -} - -int zmq::tcp_socket_t::write (const void *data_, size_t size_) -{ - int nbytes = send (s, (char*) data_, (int) size_, 0); - - // If not a single byte can be written to the socket in non-blocking mode - // we'll get an error (this may happen during the speculative write). - if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) - return 0; - - // Signalise peer failure. - if (nbytes == -1 && ( - WSAGetLastError () == WSAENETDOWN || - WSAGetLastError () == WSAENETRESET || - WSAGetLastError () == WSAEHOSTUNREACH || - WSAGetLastError () == WSAECONNABORTED || - WSAGetLastError () == WSAETIMEDOUT || - WSAGetLastError () == WSAECONNRESET)) - return -1; - - wsa_assert (nbytes != SOCKET_ERROR); - - return (size_t) nbytes; -} - -int zmq::tcp_socket_t::read (void *data_, size_t size_) -{ - int nbytes = recv (s, (char*) data_, (int) size_, 0); - - // If not a single byte can be read from the socket in non-blocking mode - // we'll get an error (this may happen during the speculative read). - if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK) - return 0; - - // Connection failure. - if (nbytes == -1 && ( - WSAGetLastError () == WSAENETDOWN || - WSAGetLastError () == WSAENETRESET || - WSAGetLastError () == WSAECONNABORTED || - WSAGetLastError () == WSAETIMEDOUT || - WSAGetLastError () == WSAECONNRESET || - WSAGetLastError () == WSAECONNREFUSED || - WSAGetLastError () == WSAENOTCONN)) - return -1; - - wsa_assert (nbytes != SOCKET_ERROR); - - // Orderly shutdown by the other peer. - if (nbytes == 0) - return -1; - - return (size_t) nbytes; -} - -#else - -#include -#include -#include -#include -#include -#include -#include - -zmq::tcp_socket_t::tcp_socket_t () : - s (retired_fd) -{ -} - -zmq::tcp_socket_t::~tcp_socket_t () -{ - if (s != retired_fd) - close (); -} - -int zmq::tcp_socket_t::open (fd_t fd_, int sndbuf_, int rcvbuf_) -{ - assert (s == retired_fd); - s = fd_; - - if (sndbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, &sndbuf_, sizeof (int)); - errno_assert (rc == 0); - } - - if (rcvbuf_) { - int rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, &rcvbuf_, sizeof (int)); - errno_assert (rc == 0); - } - -#if defined ZMQ_HAVE_OSX || defined ZMQ_HAVE_FREEBSD - int set = 1; - int rc = setsockopt (s, SOL_SOCKET, SO_NOSIGPIPE, &set, sizeof (int)); - errno_assert (rc == 0); -#endif - - return 0; -} - -int zmq::tcp_socket_t::close () -{ - zmq_assert (s != retired_fd); - int rc = ::close (s); - if (rc != 0) - return -1; - s = retired_fd; - return 0; -} - -zmq::fd_t zmq::tcp_socket_t::get_fd () -{ - return s; -} - -int zmq::tcp_socket_t::write (const void *data_, size_t size_) -{ - ssize_t nbytes = send (s, data_, size_, 0); - - // Several errors are OK. When speculative write is being done we may not - // be able to write a single byte to the socket. Also, SIGSTOP issued - // by a debugging tool can result in EINTR error. - if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || - errno == EINTR)) - return 0; - - // Signalise peer failure. - if (nbytes == -1 && (errno == ECONNRESET || errno == EPIPE)) - return -1; - - errno_assert (nbytes != -1); - return (size_t) nbytes; -} - -int zmq::tcp_socket_t::read (void *data_, size_t size_) -{ - ssize_t nbytes = recv (s, data_, size_, 0); - - // Several errors are OK. When speculative read is being done we may not - // be able to read a single byte to the socket. Also, SIGSTOP issued - // by a debugging tool can result in EINTR error. - if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || - errno == EINTR)) - return 0; - - // Signalise peer failure. - if (nbytes == -1 && (errno == ECONNRESET || errno == ECONNREFUSED || - errno == ETIMEDOUT || errno == EHOSTUNREACH)) - return -1; - - errno_assert (nbytes != -1); - - // Orderly shutdown by the peer. - if (nbytes == 0) - return -1; - - return (size_t) nbytes; -} - -#endif - diff --git a/src/tcp_socket.hpp b/src/tcp_socket.hpp deleted file mode 100644 index 4540bf95..00000000 --- a/src/tcp_socket.hpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_TCP_SOCKET_HPP_INCLUDED__ -#define __ZMQ_TCP_SOCKET_HPP_INCLUDED__ - -#include - -#include "fd.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - // The class encapsulating simple TCP read/write socket. - - class tcp_socket_t - { - public: - - tcp_socket_t (); - ~tcp_socket_t (); - - // Associates a socket with a native socket descriptor. - int open (fd_t fd_, int sndbuf_, int rcvbuf_); - - // Closes the underlying socket. - int close (); - - // Returns the underlying socket. Returns retired_fd when the socket - // is in the closed state. - fd_t get_fd (); - - // Writes data to the socket. Returns the number of bytes actually - // written (even zero is to be considered to be a success). In case - // of error or orderly shutdown by the other peer -1 is returned. - int write (const void *data_, size_t size_); - - // Reads data from the socket (up to 'size' bytes). Returns the number - // of bytes actually read (even zero is to be considered to be - // a success). In case of error or orderly shutdown by the other - // peer -1 is returned. - int read (void *data_, size_t size_); - - private: - - // Underlying socket. - fd_t s; - - // Disable copy construction of tcp_socket. - tcp_socket_t (const tcp_socket_t&); - const tcp_socket_t &operator = (const tcp_socket_t&); - }; - -} - -#endif diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp deleted file mode 100644 index 0512c3c5..00000000 --- a/src/zmq_connecter.cpp +++ /dev/null @@ -1,161 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include "platform.hpp" -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#else -#include -#include -#endif - -#include "zmq_connecter.hpp" -#include "zmq_engine.hpp" -#include "io_thread.hpp" -#include "err.hpp" - -zmq::zmq_connecter_t::zmq_connecter_t (class io_thread_t *io_thread_, - class session_t *session_, const options_t &options_, - const char *protocol_, const char *address_, bool wait_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - handle_valid (false), - wait (wait_), - session (session_), - current_reconnect_ivl(options.reconnect_ivl) -{ - int rc = tcp_connecter.set_address (protocol_, address_); - zmq_assert (rc == 0); //TODO: take care ENOMEM, EINVAL -} - -zmq::zmq_connecter_t::~zmq_connecter_t () -{ - if (wait) - cancel_timer (reconnect_timer_id); - if (handle_valid) - rm_fd (handle); -} - -void zmq::zmq_connecter_t::process_plug () -{ - if (wait) - add_reconnect_timer(); - else - start_connecting (); -} - -void zmq::zmq_connecter_t::in_event () -{ - // We are not polling for incomming data, so we are actually called - // because of error here. However, we can get error on out event as well - // on some platforms, so we'll simply handle both events in the same way. - out_event (); -} - -void zmq::zmq_connecter_t::out_event () -{ - fd_t fd = tcp_connecter.connect (); - rm_fd (handle); - handle_valid = false; - - // Handle the error condition by attempt to reconnect. - if (fd == retired_fd) { - tcp_connecter.close (); - wait = true; - add_reconnect_timer(); - return; - } - - // Create the engine object for this connection. - zmq_engine_t *engine = new (std::nothrow) zmq_engine_t (fd, options); - alloc_assert (engine); - - // Attach the engine to the corresponding session object. - send_attach (session, engine); - - // Shut the connecter down. - terminate (); -} - -void zmq::zmq_connecter_t::timer_event (int id_) -{ - zmq_assert (id_ == reconnect_timer_id); - wait = false; - start_connecting (); -} - -void zmq::zmq_connecter_t::start_connecting () -{ - // Open the connecting socket. - int rc = tcp_connecter.open (); - - // Connect may succeed in synchronous manner. - if (rc == 0) { - handle = add_fd (tcp_connecter.get_fd ()); - handle_valid = true; - out_event (); - return; - } - - // Connection establishment may be dealyed. Poll for its completion. - else if (rc == -1 && errno == EAGAIN) { - handle = add_fd (tcp_connecter.get_fd ()); - handle_valid = true; - set_pollout (handle); - return; - } - - // Handle any other error condition by eventual reconnect. - wait = true; - add_reconnect_timer(); -} - -void zmq::zmq_connecter_t::add_reconnect_timer() -{ - add_timer (get_new_reconnect_ivl(), reconnect_timer_id); -} - -int zmq::zmq_connecter_t::get_new_reconnect_ivl () -{ -#if defined ZMQ_HAVE_WINDOWS - int pid = (int) GetCurrentProcessId (); -#else - int pid = (int) getpid (); -#endif - - // The new interval is the current interval + random value. - int this_interval = current_reconnect_ivl + - ((pid * 13) % options.reconnect_ivl); - - // Only change the current reconnect interval if the maximum reconnect - // interval was set and if it's larger than the reconnect interval. - if (options.reconnect_ivl_max > 0 && - options.reconnect_ivl_max > options.reconnect_ivl) { - - // Calculate the next interval - current_reconnect_ivl = current_reconnect_ivl * 2; - if(current_reconnect_ivl >= options.reconnect_ivl_max) { - current_reconnect_ivl = options.reconnect_ivl_max; - } - } - return this_interval; -} diff --git a/src/zmq_connecter.hpp b/src/zmq_connecter.hpp deleted file mode 100644 index 31a6d9b4..00000000 --- a/src/zmq_connecter.hpp +++ /dev/null @@ -1,92 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ -#define __ZMQ_ZMQ_CONNECTER_HPP_INCLUDED__ - -#include "own.hpp" -#include "io_object.hpp" -#include "tcp_connecter.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - class zmq_connecter_t : public own_t, public io_object_t - { - public: - - // If 'wait' is true connecter first waits for a while, then starts - // connection process. - zmq_connecter_t (class io_thread_t *io_thread_, - class session_t *session_, const options_t &options_, - const char *protocol_, const char *address_, bool delay_); - ~zmq_connecter_t (); - - private: - - // ID of the timer used to delay the reconnection. - enum {reconnect_timer_id = 1}; - - // Handlers for incoming commands. - void process_plug (); - - // Handlers for I/O events. - void in_event (); - void out_event (); - void timer_event (int id_); - - // Internal function to start the actual connection establishment. - void start_connecting (); - - // Internal function to add a reconnect timer - void add_reconnect_timer(); - - // Internal function to return a reconnect backoff delay. - // Will modify the current_reconnect_ivl used for next call - // Returns the currently used interval - int get_new_reconnect_ivl (); - - // Actual connecting socket. - tcp_connecter_t tcp_connecter; - - // Handle corresponding to the listening socket. - handle_t handle; - - // If true file descriptor is registered with the poller and 'handle' - // contains valid value. - bool handle_valid; - - // If true, connecter is waiting a while before trying to connect. - bool wait; - - // Reference to the session we belong to. - class session_t *session; - - // Current reconnect ivl, updated for backoff strategy - int current_reconnect_ivl; - - zmq_connecter_t (const zmq_connecter_t&); - const zmq_connecter_t &operator = (const zmq_connecter_t&); - }; - -} - -#endif diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp deleted file mode 100644 index fa1bd452..00000000 --- a/src/zmq_engine.cpp +++ /dev/null @@ -1,224 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "platform.hpp" -#if defined ZMQ_HAVE_WINDOWS -#include "windows.hpp" -#endif - -#include -#include - -#include "zmq_engine.hpp" -#include "zmq_connecter.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "config.hpp" -#include "err.hpp" - -zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : - inpos (NULL), - insize (0), - decoder (in_batch_size, options_.maxmsgsize), - outpos (NULL), - outsize (0), - encoder (out_batch_size), - session (NULL), - leftover_session (NULL), - options (options_), - plugged (false) -{ - // Initialise the underlying socket. - int rc = tcp_socket.open (fd_, options.sndbuf, options.rcvbuf); - zmq_assert (rc == 0); -} - -zmq::zmq_engine_t::~zmq_engine_t () -{ - zmq_assert (!plugged); -} - -void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, session_t *session_) -{ - zmq_assert (!plugged); - plugged = true; - leftover_session = NULL; - - // Connect to session object. - zmq_assert (!session); - zmq_assert (session_); - encoder.set_session (session_); - decoder.set_session (session_); - session = session_; - - // Connect to I/O threads poller object. - io_object_t::plug (io_thread_); - handle = add_fd (tcp_socket.get_fd ()); - set_pollin (handle); - set_pollout (handle); - - // Flush all the data that may have been already received downstream. - in_event (); -} - -void zmq::zmq_engine_t::unplug () -{ - zmq_assert (plugged); - plugged = false; - - // Cancel all fd subscriptions. - rm_fd (handle); - - // Disconnect from I/O threads poller object. - io_object_t::unplug (); - - // Disconnect from session object. - encoder.set_session (NULL); - decoder.set_session (NULL); - leftover_session = session; - session = NULL; -} - -void zmq::zmq_engine_t::terminate () -{ - unplug (); - delete this; -} - -void zmq::zmq_engine_t::in_event () -{ - bool disconnection = false; - - // If there's no data to process in the buffer... - if (!insize) { - - // Retrieve the buffer and read as much data as possible. - // Note that buffer can be arbitrarily large. However, we assume - // the underlying TCP layer has fixed buffer size and thus the - // number of bytes read will be always limited. - decoder.get_buffer (&inpos, &insize); - insize = tcp_socket.read (inpos, insize); - - // Check whether the peer has closed the connection. - if (insize == (size_t) -1) { - insize = 0; - disconnection = true; - } - } - - // Push the data to the decoder. - size_t processed = decoder.process_buffer (inpos, insize); - - if (unlikely (processed == (size_t) -1)) { - disconnection = true; - } - else { - - // Stop polling for input if we got stuck. - if (processed < insize) { - - // This may happen if queue limits are in effect. - if (plugged) - reset_pollin (handle); - } - - // Adjust the buffer. - inpos += processed; - insize -= processed; - } - - // Flush all messages the decoder may have produced. - // If IO handler has unplugged engine, flush transient IO handler. - if (unlikely (!plugged)) { - zmq_assert (leftover_session); - leftover_session->flush (); - } else { - session->flush (); - } - - if (session && disconnection) - error (); -} - -void zmq::zmq_engine_t::out_event () -{ - // If write buffer is empty, try to read new data from the encoder. - if (!outsize) { - - outpos = NULL; - encoder.get_data (&outpos, &outsize); - - // If IO handler has unplugged engine, flush transient IO handler. - if (unlikely (!plugged)) { - zmq_assert (leftover_session); - leftover_session->flush (); - return; - } - - // If there is no data to send, stop polling for output. - if (outsize == 0) { - reset_pollout (handle); - return; - } - } - - // If there are any data to write in write buffer, write as much as - // possible to the socket. Note that amount of data to write can be - // arbitratily large. However, we assume that underlying TCP layer has - // limited transmission buffer and thus the actual number of bytes - // written should be reasonably modest. - int nbytes = tcp_socket.write (outpos, outsize); - - // Handle problems with the connection. - if (nbytes == -1) { - error (); - return; - } - - outpos += nbytes; - outsize -= nbytes; -} - -void zmq::zmq_engine_t::activate_out () -{ - set_pollout (handle); - - // Speculative write: The assumption is that at the moment new message - // was sent by the user the socket is probably available for writing. - // Thus we try to write the data to socket avoiding polling for POLLOUT. - // Consequently, the latency should be better in request/reply scenarios. - out_event (); -} - -void zmq::zmq_engine_t::activate_in () -{ - set_pollin (handle); - - // Speculative read. - in_event (); -} - -void zmq::zmq_engine_t::error () -{ - zmq_assert (session); - session->detach (); - unplug (); - delete this; -} diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp deleted file mode 100644 index cb73ad81..00000000 --- a/src/zmq_listener.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include - -#include "zmq_listener.hpp" -#include "zmq_engine.hpp" -#include "io_thread.hpp" -#include "session.hpp" -#include "err.hpp" - -zmq::zmq_listener_t::zmq_listener_t (io_thread_t *io_thread_, - socket_base_t *socket_, const options_t &options_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - socket (socket_) -{ -} - -zmq::zmq_listener_t::~zmq_listener_t () -{ -} - -int zmq::zmq_listener_t::set_address (const char *protocol_, const char *addr_) -{ - return tcp_listener.set_address (protocol_, addr_, options.backlog); -} - -void zmq::zmq_listener_t::process_plug () -{ - // Start polling for incoming connections. - handle = add_fd (tcp_listener.get_fd ()); - set_pollin (handle); -} - -void zmq::zmq_listener_t::process_term (int linger_) -{ - rm_fd (handle); - own_t::process_term (linger_); -} - -void zmq::zmq_listener_t::in_event () -{ - fd_t fd = tcp_listener.accept (); - - // If connection was reset by the peer in the meantime, just ignore it. - // TODO: Handle specific errors like ENFILE/EMFILE etc. - if (fd == retired_fd) - return; - // Create the engine object for this connection. - zmq_engine_t *engine = new (std::nothrow) zmq_engine_t (fd, options); - alloc_assert (engine); - - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); - - // Create and launch a session object. - session_t *session = new (std::nothrow) - session_t (io_thread, false, socket, options, NULL, NULL); - alloc_assert (session); - session->inc_seqnum (); - launch_child (session); - send_attach (session, engine, false); -} - diff --git a/src/zmq_listener.hpp b/src/zmq_listener.hpp deleted file mode 100644 index 82f45908..00000000 --- a/src/zmq_listener.hpp +++ /dev/null @@ -1,67 +0,0 @@ -/* - Copyright (c) 2007-2011 iMatix Corporation - Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#ifndef __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ -#define __ZMQ_ZMQ_LISTENER_HPP_INCLUDED__ - -#include "own.hpp" -#include "io_object.hpp" -#include "tcp_listener.hpp" -#include "stdint.hpp" - -namespace zmq -{ - - class zmq_listener_t : public own_t, public io_object_t - { - public: - - zmq_listener_t (class io_thread_t *io_thread_, - class socket_base_t *socket_, const options_t &options_); - ~zmq_listener_t (); - - // Set address to listen on. - int set_address (const char* protocol_, const char *addr_); - - private: - - // Handlers for incoming commands. - void process_plug (); - void process_term (int linger_); - - // Handlers for I/O events. - void in_event (); - - // Actual listening socket. - tcp_listener_t tcp_listener; - - // Handle corresponding to the listening socket. - handle_t handle; - - // Socket the listerner belongs to. - class socket_base_t *socket; - - zmq_listener_t (const zmq_listener_t&); - const zmq_listener_t &operator = (const zmq_listener_t&); - }; - -} - -#endif