diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 73e71f06..bcaeff59 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -67,7 +67,6 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, addr (addr_), s (retired_fd), handle ((handle_t) NULL), - handle_valid (false), delayed_start (delayed_start_), connect_timer_started (false), reconnect_timer_started (false), @@ -87,7 +86,7 @@ zmq::tcp_connecter_t::~tcp_connecter_t () { zmq_assert (!connect_timer_started); zmq_assert (!reconnect_timer_started); - zmq_assert (!handle_valid); + zmq_assert (!handle); zmq_assert (s == retired_fd); } @@ -111,9 +110,8 @@ void zmq::tcp_connecter_t::process_term (int linger_) reconnect_timer_started = false; } - if (handle_valid) { - rm_fd (handle); - handle_valid = false; + if (handle) { + rm_handle (); } if (s != retired_fd) @@ -137,25 +135,12 @@ void zmq::tcp_connecter_t::out_event () connect_timer_started = false; } - rm_fd (handle); - handle_valid = false; + rm_handle (); const fd_t fd = connect (); // Handle the error condition by attempt to reconnect. - if (fd == retired_fd) { - close (); - add_reconnect_timer (); - return; - } - - int rc = tune_tcp_socket (fd); - rc = rc - | tune_tcp_keepalives ( - fd, options.tcp_keepalive, options.tcp_keepalive_cnt, - options.tcp_keepalive_idle, options.tcp_keepalive_intvl); - rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt); - if (rc != 0) { + if (fd == retired_fd || !tune_socket (fd)) { close (); add_reconnect_timer (); return; @@ -175,15 +160,18 @@ void zmq::tcp_connecter_t::out_event () socket->event_connected (endpoint, (int) fd); } +void zmq::tcp_connecter_t::rm_handle () +{ + rm_fd (handle); + handle = (handle_t) NULL; +} + void zmq::tcp_connecter_t::timer_event (int id_) { zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id); if (id_ == connect_timer_id) { connect_timer_started = false; - - rm_fd (handle); - handle_valid = false; - + rm_handle (); close (); add_reconnect_timer (); } else if (id_ == reconnect_timer_id) { @@ -200,14 +188,12 @@ void zmq::tcp_connecter_t::start_connecting () // Connect may succeed in synchronous manner. if (rc == 0) { handle = add_fd (s); - handle_valid = true; out_event (); } // Connection establishment may be delayed. Poll for its completion. else if (rc == -1 && errno == EINPROGRESS) { handle = add_fd (s); - handle_valid = true; set_pollout (handle); socket->event_connect_delayed (endpoint, zmq_errno ()); @@ -349,11 +335,12 @@ int zmq::tcp_connecter_t::open () rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ()); // Connect was successful immediately. - if (rc == 0) + if (rc == 0) { return 0; + } - // Translate error codes indicating asynchronous connect has been - // launched to a uniform EINPROGRESS. + // Translate error codes indicating asynchronous connect has been + // launched to a uniform EINPROGRESS. #ifdef ZMQ_HAVE_WINDOWS const int last_error = WSAGetLastError (); if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK) @@ -409,6 +396,16 @@ zmq::fd_t zmq::tcp_connecter_t::connect () return result; } +bool zmq::tcp_connecter_t::tune_socket (const fd_t fd) +{ + const int rc = tune_tcp_socket (fd) + | tune_tcp_keepalives ( + fd, options.tcp_keepalive, options.tcp_keepalive_cnt, + options.tcp_keepalive_idle, options.tcp_keepalive_intvl) + | tune_tcp_maxrt (fd, options.tcp_maxrt); + return rc == 0; +} + void zmq::tcp_connecter_t::close () { zmq_assert (s != retired_fd); diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 2910b6e7..b88035bd 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -70,6 +70,9 @@ class tcp_connecter_t : public own_t, public io_object_t void out_event (); void timer_event (int id_); + // Removes the handle from the poller. + void rm_handle (); + // Internal function to start the actual connection establishment. void start_connecting (); @@ -96,19 +99,19 @@ class tcp_connecter_t : public own_t, public io_object_t // retired_fd if the connection was unsuccessful. fd_t connect (); + // Tunes a connected socket. + bool tune_socket (fd_t fd); + // Address to connect to. Owned by session_base_t. address_t *const addr; // Underlying socket. fd_t s; - // Handle corresponding to the listening socket. + // Handle corresponding to the listening socket, if file descriptor is + // registered with the poller, or NULL. handle_t handle; - // If true file descriptor is registered with the poller and 'handle' - // contains valid value. - bool handle_valid; - // If true, connecter is waiting a while before trying to connect. const bool delayed_start; diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index cbddc724..e446fa98 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -71,6 +71,7 @@ zmq::tcp_listener_t::tcp_listener_t (io_thread_t *io_thread_, zmq::tcp_listener_t::~tcp_listener_t () { zmq_assert (s == retired_fd); + zmq_assert (!handle); } void zmq::tcp_listener_t::process_plug () @@ -83,6 +84,7 @@ void zmq::tcp_listener_t::process_plug () void zmq::tcp_listener_t::process_term (int linger_) { rm_fd (handle); + handle = (handle_t) NULL; close (); own_t::process_term (linger_); }