mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-17 03:03:25 +02:00
Code cleanup
This commit is contained in:
@@ -110,10 +110,10 @@ void zmq::tcp_connecter_t::in_event ()
|
|||||||
|
|
||||||
void zmq::tcp_connecter_t::out_event ()
|
void zmq::tcp_connecter_t::out_event ()
|
||||||
{
|
{
|
||||||
fd_t fd = connect ();
|
|
||||||
rm_fd (handle);
|
rm_fd (handle);
|
||||||
handle_valid = false;
|
handle_valid = false;
|
||||||
|
|
||||||
|
const fd_t fd = connect ();
|
||||||
// Handle the error condition by attempt to reconnect.
|
// Handle the error condition by attempt to reconnect.
|
||||||
if (fd == retired_fd) {
|
if (fd == retired_fd) {
|
||||||
close ();
|
close ();
|
||||||
@@ -151,7 +151,7 @@ void zmq::tcp_connecter_t::timer_event (int id_)
|
|||||||
void zmq::tcp_connecter_t::start_connecting ()
|
void zmq::tcp_connecter_t::start_connecting ()
|
||||||
{
|
{
|
||||||
// Open the connecting socket.
|
// Open the connecting socket.
|
||||||
int rc = open ();
|
const int rc = open ();
|
||||||
|
|
||||||
// Connect may succeed in synchronous manner.
|
// Connect may succeed in synchronous manner.
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
@@ -179,30 +179,26 @@ void zmq::tcp_connecter_t::start_connecting ()
|
|||||||
|
|
||||||
void zmq::tcp_connecter_t::add_reconnect_timer ()
|
void zmq::tcp_connecter_t::add_reconnect_timer ()
|
||||||
{
|
{
|
||||||
int rc_ivl = get_new_reconnect_ivl();
|
const int interval = get_new_reconnect_ivl ();
|
||||||
add_timer (rc_ivl, reconnect_timer_id);
|
add_timer (interval, reconnect_timer_id);
|
||||||
socket->event_connect_retried (endpoint, rc_ivl);
|
socket->event_connect_retried (endpoint, interval);
|
||||||
timer_started = true;
|
timer_started = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
|
int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
|
||||||
{
|
{
|
||||||
// The new interval is the current interval + random value.
|
// The new interval is the current interval + random value.
|
||||||
int this_interval = current_reconnect_ivl +
|
const int interval = current_reconnect_ivl +
|
||||||
(generate_random () % options.reconnect_ivl);
|
generate_random () % options.reconnect_ivl;
|
||||||
|
|
||||||
// Only change the current reconnect interval if the maximum reconnect
|
// Only change the current reconnect interval if the maximum reconnect
|
||||||
// interval was set and if it's larger than the reconnect interval.
|
// interval was set and if it's larger than the reconnect interval.
|
||||||
if (options.reconnect_ivl_max > 0 &&
|
if (options.reconnect_ivl_max > 0 &&
|
||||||
options.reconnect_ivl_max > options.reconnect_ivl) {
|
options.reconnect_ivl_max > options.reconnect_ivl)
|
||||||
|
|
||||||
// Calculate the next interval
|
// Calculate the next interval
|
||||||
current_reconnect_ivl = current_reconnect_ivl * 2;
|
current_reconnect_ivl =
|
||||||
if(current_reconnect_ivl >= options.reconnect_ivl_max) {
|
std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
|
||||||
current_reconnect_ivl = options.reconnect_ivl_max;
|
return interval;
|
||||||
}
|
|
||||||
}
|
|
||||||
return this_interval;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::tcp_connecter_t::open ()
|
int zmq::tcp_connecter_t::open ()
|
||||||
@@ -214,7 +210,6 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
delete addr->resolved.tcp_addr;
|
delete addr->resolved.tcp_addr;
|
||||||
addr->resolved.tcp_addr = NULL;
|
addr->resolved.tcp_addr = NULL;
|
||||||
}
|
}
|
||||||
zmq_assert (addr->resolved.tcp_addr == NULL);
|
|
||||||
|
|
||||||
addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
|
addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
|
||||||
alloc_assert (addr->resolved.tcp_addr);
|
alloc_assert (addr->resolved.tcp_addr);
|
||||||
@@ -226,9 +221,10 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
zmq_assert (addr->resolved.tcp_addr != NULL);
|
zmq_assert (addr->resolved.tcp_addr != NULL);
|
||||||
|
tcp_address_t * const tcp_addr = addr->resolved.tcp_addr;
|
||||||
|
|
||||||
// Create the socket.
|
// Create the socket.
|
||||||
s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
|
s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
if (s == INVALID_SOCKET) {
|
if (s == INVALID_SOCKET) {
|
||||||
errno = wsa_error_to_errno (WSAGetLastError ());
|
errno = wsa_error_to_errno (WSAGetLastError ());
|
||||||
@@ -241,7 +237,7 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
|
|
||||||
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
|
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
|
||||||
// Switch it on in such cases.
|
// Switch it on in such cases.
|
||||||
if (addr->resolved.tcp_addr->family () == AF_INET6)
|
if (tcp_addr->family () == AF_INET6)
|
||||||
enable_ipv4_mapping (s);
|
enable_ipv4_mapping (s);
|
||||||
|
|
||||||
// Set the IP Type-Of-Service priority for this socket
|
// Set the IP Type-Of-Service priority for this socket
|
||||||
@@ -262,18 +258,14 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
set_ip_type_of_service (s, options.tos);
|
set_ip_type_of_service (s, options.tos);
|
||||||
|
|
||||||
// Set a source address for conversations
|
// Set a source address for conversations
|
||||||
if (addr->resolved.tcp_addr->has_src_addr ()) {
|
if (tcp_addr->has_src_addr ()) {
|
||||||
rc = ::bind (s, addr->resolved.tcp_addr->src_addr (), addr->resolved.tcp_addr->src_addrlen ());
|
rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
|
||||||
|
if (rc == -1)
|
||||||
if (rc == -1) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Connect to the remote peer.
|
// Connect to the remote peer.
|
||||||
rc = ::connect (
|
rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
|
||||||
s, addr->resolved.tcp_addr->addr (),
|
|
||||||
addr->resolved.tcp_addr->addrlen ());
|
|
||||||
|
|
||||||
// Connect was successfull immediately.
|
// Connect was successfull immediately.
|
||||||
if (rc == 0)
|
if (rc == 0)
|
||||||
@@ -298,33 +290,31 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
|
|||||||
{
|
{
|
||||||
// Async connect has finished. Check whether an error occurred
|
// Async connect has finished. Check whether an error occurred
|
||||||
int err = 0;
|
int err = 0;
|
||||||
#if defined ZMQ_HAVE_HPUX
|
#ifdef ZMQ_HAVE_HPUX
|
||||||
int len = sizeof (err);
|
int len = sizeof err;
|
||||||
#else
|
#else
|
||||||
socklen_t len = sizeof (err);
|
socklen_t len = sizeof err;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
|
const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
|
||||||
|
|
||||||
// Assert if the error was caused by 0MQ bug.
|
// Assert if the error was caused by 0MQ bug.
|
||||||
// Networking problems are OK. No need to assert.
|
// Networking problems are OK. No need to assert.
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
zmq_assert (rc == 0);
|
zmq_assert (rc == 0);
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
if (err == WSAECONNREFUSED ||
|
wsa_assert (err == WSAECONNREFUSED
|
||||||
err == WSAETIMEDOUT ||
|
|| err == WSAETIMEDOUT
|
||||||
err == WSAECONNABORTED ||
|
|| err == WSAECONNABORTED
|
||||||
err == WSAEHOSTUNREACH ||
|
|| err == WSAEHOSTUNREACH
|
||||||
err == WSAENETUNREACH ||
|
|| err == WSAENETUNREACH
|
||||||
err == WSAENETDOWN ||
|
|| err == WSAENETDOWN
|
||||||
err == WSAEACCES ||
|
|| err == WSAEACCES
|
||||||
err == WSAEINVAL ||
|
|| err == WSAEINVAL
|
||||||
err == WSAEADDRINUSE)
|
|| err == WSAEADDRINUSE);
|
||||||
return retired_fd;
|
return retired_fd;
|
||||||
wsa_assert_no (err);
|
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
|
|
||||||
// Following code should handle both Berkeley-derived socket
|
// Following code should handle both Berkeley-derived socket
|
||||||
// implementations and Solaris.
|
// implementations and Solaris.
|
||||||
if (rc == -1)
|
if (rc == -1)
|
||||||
@@ -344,7 +334,7 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Return the newly connected socket.
|
// Return the newly connected socket.
|
||||||
fd_t result = s;
|
const fd_t result = s;
|
||||||
s = retired_fd;
|
s = retired_fd;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@@ -353,10 +343,10 @@ void zmq::tcp_connecter_t::close ()
|
|||||||
{
|
{
|
||||||
zmq_assert (s != retired_fd);
|
zmq_assert (s != retired_fd);
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
int rc = closesocket (s);
|
const int rc = closesocket (s);
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
#else
|
#else
|
||||||
int rc = ::close (s);
|
const int rc = ::close (s);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
#endif
|
#endif
|
||||||
socket->event_closed (endpoint, s);
|
socket->event_closed (endpoint, s);
|
||||||
|
Reference in New Issue
Block a user