From 68d520ef682a2f7335c9d5e5ea1f235eed928b1b Mon Sep 17 00:00:00 2001 From: Simon Giesecke Date: Mon, 4 Feb 2019 07:31:08 -0500 Subject: [PATCH] Problem: tcp_connecter_t and tcp_listener_t duplicate code around opening and configuring a TCP socket Solution: extract common parts into tcp_open_socket function --- src/tcp.cpp | 56 +++++++++++++++++++++++++++++++++++++++++++ src/tcp.hpp | 12 ++++++++++ src/tcp_connecter.cpp | 56 ++++--------------------------------------- src/tcp_listener.cpp | 49 +++++++------------------------------ 4 files changed, 82 insertions(+), 91 deletions(-) diff --git a/src/tcp.cpp b/src/tcp.cpp index 11467f2e..597eb7ec 100644 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -32,6 +32,7 @@ #include "ip.hpp" #include "tcp.hpp" #include "err.hpp" +#include "options.hpp" #if !defined ZMQ_HAVE_WINDOWS #include @@ -381,3 +382,58 @@ void zmq::tcp_tune_loopback_fast_path (const fd_t socket_) LIBZMQ_UNUSED (socket_); #endif } + +zmq::fd_t zmq::tcp_open_socket (const char *address_, + const zmq::options_t &options_, + zmq::tcp_address_t *out_tcp_addr_) +{ + // Convert the textual address into address structure. + int rc = out_tcp_addr_->resolve (address_, true, options_.ipv6); + if (rc != 0) + return retired_fd; + + // Create the socket. + fd_t s = open_socket (out_tcp_addr_->family (), SOCK_STREAM, IPPROTO_TCP); + + // IPv6 address family not supported, try automatic downgrade to IPv4. + if (s == retired_fd && out_tcp_addr_->family () == AF_INET6 + && errno == EAFNOSUPPORT && options_.ipv6) { + rc = out_tcp_addr_->resolve (address_, false, false); + if (rc != 0) { + return retired_fd; + } + s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + } + + if (s == retired_fd) { + return retired_fd; + } + + // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. + // Switch it on in such cases. + if (out_tcp_addr_->family () == AF_INET6) + enable_ipv4_mapping (s); + + // Set the IP Type-Of-Service priority for this socket + if (options_.tos != 0) + set_ip_type_of_service (s, options_.tos); + + // Set the socket to loopback fastpath if configured. + if (options_.loopback_fastpath) + tcp_tune_loopback_fast_path (s); + + // Bind the socket to a device if applicable + if (!options_.bound_device.empty ()) + bind_to_device (s, options_.bound_device); + + // Set the socket to non-blocking mode so that we get async connect(). + unblock_socket (s); + + // Set the socket buffer limits for the underlying socket. + if (options_.sndbuf >= 0) + set_tcp_send_buffer (s, options_.sndbuf); + if (options_.rcvbuf >= 0) + set_tcp_receive_buffer (s, options_.rcvbuf); + + return s; +} diff --git a/src/tcp.hpp b/src/tcp.hpp index 07600d07..b9ed65fa 100644 --- a/src/tcp.hpp +++ b/src/tcp.hpp @@ -34,6 +34,9 @@ namespace zmq { +class tcp_address_t; +struct options_t; + // Tunes the supplied TCP socket for the best latency. int tune_tcp_socket (fd_t s_); @@ -68,6 +71,15 @@ int tcp_read (fd_t s_, void *data_, size_t size_); void tcp_assert_tuning_error (fd_t s_, int rc_); void tcp_tune_loopback_fast_path (const fd_t socket_); + +// Resolves the given address_ string, opens a socket and sets socket options +// according to the passed options_. On success, returns the socket +// descriptor and assigns the resolved address to out_tcp_addr_. In case of +// an error, retired_fd is returned, and the value of out_tcp_addr_ is undefined. +// errno is set to an error code describing the cause of the error. +fd_t tcp_open_socket (const char *address_, + const options_t &options_, + tcp_address_t *out_tcp_addr_); } #endif diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 5cbbf436..2e627d5f 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -174,63 +174,17 @@ int zmq::tcp_connecter_t::open () _addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t (); alloc_assert (_addr->resolved.tcp_addr); - int rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false, - options.ipv6); - if (rc != 0) { + _s = tcp_open_socket (_addr->address.c_str (), options, + _addr->resolved.tcp_addr); + if (_s == retired_fd) { LIBZMQ_DELETE (_addr->resolved.tcp_addr); return -1; } zmq_assert (_addr->resolved.tcp_addr != NULL); + const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr; - // Create the socket. - _s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP); - - // IPv6 address family not supported, try automatic downgrade to IPv4. - if (_s == zmq::retired_fd && tcp_addr->family () == AF_INET6 - && errno == EAFNOSUPPORT && options.ipv6) { - rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false, - false); - if (rc != 0) { - LIBZMQ_DELETE (_addr->resolved.tcp_addr); - return -1; - } - _s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - } - - if (_s == retired_fd) { - return -1; - } - - // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. - // Switch it on in such cases. - if (tcp_addr->family () == AF_INET6) - enable_ipv4_mapping (_s); - - // Set the IP Type-Of-Service priority for this socket - if (options.tos != 0) - set_ip_type_of_service (_s, options.tos); - - // Bind the socket to a device if applicable - if (!options.bound_device.empty ()) - bind_to_device (_s, options.bound_device); - - // Set the socket to non-blocking mode so that we get async connect(). - unblock_socket (_s); - - // Set the socket to loopback fastpath if configured. - if (options.loopback_fastpath) - tcp_tune_loopback_fast_path (_s); - - // Set the socket buffer limits for the underlying socket. - if (options.sndbuf >= 0) - set_tcp_send_buffer (_s, options.sndbuf); - if (options.rcvbuf >= 0) - set_tcp_receive_buffer (_s, options.rcvbuf); - - // Set the IP Type-Of-Service for the underlying socket - if (options.tos != 0) - set_ip_type_of_service (_s, options.tos); + int rc; // Set a source address for conversations if (tcp_addr->has_src_addr ()) { diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 1b51466f..85c4392b 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -103,54 +103,23 @@ zmq::tcp_listener_t::get_socket_name (zmq::fd_t fd_, int zmq::tcp_listener_t::create_socket (const char *addr_) { - // Convert the textual address into address structure. - int rc = _address.resolve (addr_, true, options.ipv6); - if (rc != 0) - return -1; - - // Create a listening socket. - _s = open_socket (_address.family (), SOCK_STREAM, IPPROTO_TCP); - - // IPv6 address family not supported, try automatic downgrade to IPv4. - if (_s == zmq::retired_fd && _address.family () == AF_INET6 - && errno == EAFNOSUPPORT && options.ipv6) { - rc = _address.resolve (addr_, true, false); - if (rc != 0) - return rc; - _s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); - } - + _s = tcp_open_socket (addr_, options, &_address); if (_s == retired_fd) { return -1; } + + // TODO why is this only done for the listener? make_socket_noninheritable (_s); - // On some systems, IPv4 mapping in IPv6 sockets is disabled by default. - // Switch it on in such cases. - if (_address.family () == AF_INET6) - enable_ipv4_mapping (_s); - - // Set the IP Type-Of-Service for the underlying socket - if (options.tos != 0) - set_ip_type_of_service (_s, options.tos); - - // Set the socket to loopback fastpath if configured. - if (options.loopback_fastpath) - tcp_tune_loopback_fast_path (_s); - - // Bind the socket to a device if applicable - if (!options.bound_device.empty ()) - bind_to_device (_s, options.bound_device); - - // Set the socket buffer limits for the underlying socket. - if (options.sndbuf >= 0) - set_tcp_send_buffer (_s, options.sndbuf); - if (options.rcvbuf >= 0) - set_tcp_receive_buffer (_s, options.rcvbuf); - // Allow reusing of the address. int flag = 1; + int rc; #ifdef ZMQ_HAVE_WINDOWS + // TODO this was changed for Windows from SO_REUSEADDRE to + // SE_EXCLUSIVEADDRUSE by 0ab65324195ad70205514d465b03d851a6de051c, + // so the comment above is no longer correct; also, now the settings are + // different between listener and connecter with a src address. + // is this intentional? rc = setsockopt (_s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE, reinterpret_cast (&flag), sizeof (int)); wsa_assert (rc != SOCKET_ERROR);