Merge pull request #2924 from sigiesec/tcp-cleanup

Problem: duplicated code, redundant member handle_valid, asymmetry between tcp_connecter and tcp_listener
This commit is contained in:
Luca Boccassi
2018-02-08 21:57:46 +00:00
committed by GitHub
3 changed files with 36 additions and 34 deletions

View File

@@ -67,7 +67,6 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
addr (addr_),
s (retired_fd),
handle ((handle_t) NULL),
handle_valid (false),
delayed_start (delayed_start_),
connect_timer_started (false),
reconnect_timer_started (false),
@@ -87,7 +86,7 @@ zmq::tcp_connecter_t::~tcp_connecter_t ()
{
zmq_assert (!connect_timer_started);
zmq_assert (!reconnect_timer_started);
zmq_assert (!handle_valid);
zmq_assert (!handle);
zmq_assert (s == retired_fd);
}
@@ -111,9 +110,8 @@ void zmq::tcp_connecter_t::process_term (int linger_)
reconnect_timer_started = false;
}
if (handle_valid) {
rm_fd (handle);
handle_valid = false;
if (handle) {
rm_handle ();
}
if (s != retired_fd)
@@ -137,25 +135,12 @@ void zmq::tcp_connecter_t::out_event ()
connect_timer_started = false;
}
rm_fd (handle);
handle_valid = false;
rm_handle ();
const fd_t fd = connect ();
// Handle the error condition by attempt to reconnect.
if (fd == retired_fd) {
close ();
add_reconnect_timer ();
return;
}
int rc = tune_tcp_socket (fd);
rc = rc
| tune_tcp_keepalives (
fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
rc = rc | tune_tcp_maxrt (fd, options.tcp_maxrt);
if (rc != 0) {
if (fd == retired_fd || !tune_socket (fd)) {
close ();
add_reconnect_timer ();
return;
@@ -175,15 +160,18 @@ void zmq::tcp_connecter_t::out_event ()
socket->event_connected (endpoint, (int) fd);
}
void zmq::tcp_connecter_t::rm_handle ()
{
rm_fd (handle);
handle = (handle_t) NULL;
}
void zmq::tcp_connecter_t::timer_event (int id_)
{
zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
if (id_ == connect_timer_id) {
connect_timer_started = false;
rm_fd (handle);
handle_valid = false;
rm_handle ();
close ();
add_reconnect_timer ();
} else if (id_ == reconnect_timer_id) {
@@ -200,14 +188,12 @@ void zmq::tcp_connecter_t::start_connecting ()
// Connect may succeed in synchronous manner.
if (rc == 0) {
handle = add_fd (s);
handle_valid = true;
out_event ();
}
// Connection establishment may be delayed. Poll for its completion.
else if (rc == -1 && errno == EINPROGRESS) {
handle = add_fd (s);
handle_valid = true;
set_pollout (handle);
socket->event_connect_delayed (endpoint, zmq_errno ());
@@ -349,11 +335,12 @@ int zmq::tcp_connecter_t::open ()
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
// Connect was successful immediately.
if (rc == 0)
if (rc == 0) {
return 0;
}
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
// Translate error codes indicating asynchronous connect has been
// launched to a uniform EINPROGRESS.
#ifdef ZMQ_HAVE_WINDOWS
const int last_error = WSAGetLastError ();
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
@@ -409,6 +396,16 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
return result;
}
bool zmq::tcp_connecter_t::tune_socket (const fd_t fd)
{
const int rc = tune_tcp_socket (fd)
| tune_tcp_keepalives (
fd, options.tcp_keepalive, options.tcp_keepalive_cnt,
options.tcp_keepalive_idle, options.tcp_keepalive_intvl)
| tune_tcp_maxrt (fd, options.tcp_maxrt);
return rc == 0;
}
void zmq::tcp_connecter_t::close ()
{
zmq_assert (s != retired_fd);