mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-31 14:39:55 +01:00
Problem: tcp_connecter_t and tcp_listener_t duplicate code around opening and configuring a TCP socket
Solution: extract common parts into tcp_open_socket function
This commit is contained in:
parent
3f4e64edc0
commit
68d520ef68
56
src/tcp.cpp
56
src/tcp.cpp
@ -32,6 +32,7 @@
|
|||||||
#include "ip.hpp"
|
#include "ip.hpp"
|
||||||
#include "tcp.hpp"
|
#include "tcp.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
|
#include "options.hpp"
|
||||||
|
|
||||||
#if !defined ZMQ_HAVE_WINDOWS
|
#if !defined ZMQ_HAVE_WINDOWS
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
@ -381,3 +382,58 @@ void zmq::tcp_tune_loopback_fast_path (const fd_t socket_)
|
|||||||
LIBZMQ_UNUSED (socket_);
|
LIBZMQ_UNUSED (socket_);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
zmq::fd_t zmq::tcp_open_socket (const char *address_,
|
||||||
|
const zmq::options_t &options_,
|
||||||
|
zmq::tcp_address_t *out_tcp_addr_)
|
||||||
|
{
|
||||||
|
// Convert the textual address into address structure.
|
||||||
|
int rc = out_tcp_addr_->resolve (address_, true, options_.ipv6);
|
||||||
|
if (rc != 0)
|
||||||
|
return retired_fd;
|
||||||
|
|
||||||
|
// Create the socket.
|
||||||
|
fd_t s = open_socket (out_tcp_addr_->family (), SOCK_STREAM, IPPROTO_TCP);
|
||||||
|
|
||||||
|
// IPv6 address family not supported, try automatic downgrade to IPv4.
|
||||||
|
if (s == retired_fd && out_tcp_addr_->family () == AF_INET6
|
||||||
|
&& errno == EAFNOSUPPORT && options_.ipv6) {
|
||||||
|
rc = out_tcp_addr_->resolve (address_, false, false);
|
||||||
|
if (rc != 0) {
|
||||||
|
return retired_fd;
|
||||||
|
}
|
||||||
|
s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (s == retired_fd) {
|
||||||
|
return retired_fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
|
||||||
|
// Switch it on in such cases.
|
||||||
|
if (out_tcp_addr_->family () == AF_INET6)
|
||||||
|
enable_ipv4_mapping (s);
|
||||||
|
|
||||||
|
// Set the IP Type-Of-Service priority for this socket
|
||||||
|
if (options_.tos != 0)
|
||||||
|
set_ip_type_of_service (s, options_.tos);
|
||||||
|
|
||||||
|
// Set the socket to loopback fastpath if configured.
|
||||||
|
if (options_.loopback_fastpath)
|
||||||
|
tcp_tune_loopback_fast_path (s);
|
||||||
|
|
||||||
|
// Bind the socket to a device if applicable
|
||||||
|
if (!options_.bound_device.empty ())
|
||||||
|
bind_to_device (s, options_.bound_device);
|
||||||
|
|
||||||
|
// Set the socket to non-blocking mode so that we get async connect().
|
||||||
|
unblock_socket (s);
|
||||||
|
|
||||||
|
// Set the socket buffer limits for the underlying socket.
|
||||||
|
if (options_.sndbuf >= 0)
|
||||||
|
set_tcp_send_buffer (s, options_.sndbuf);
|
||||||
|
if (options_.rcvbuf >= 0)
|
||||||
|
set_tcp_receive_buffer (s, options_.rcvbuf);
|
||||||
|
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
12
src/tcp.hpp
12
src/tcp.hpp
@ -34,6 +34,9 @@
|
|||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
|
class tcp_address_t;
|
||||||
|
struct options_t;
|
||||||
|
|
||||||
// Tunes the supplied TCP socket for the best latency.
|
// Tunes the supplied TCP socket for the best latency.
|
||||||
int tune_tcp_socket (fd_t s_);
|
int tune_tcp_socket (fd_t s_);
|
||||||
|
|
||||||
@ -68,6 +71,15 @@ int tcp_read (fd_t s_, void *data_, size_t size_);
|
|||||||
void tcp_assert_tuning_error (fd_t s_, int rc_);
|
void tcp_assert_tuning_error (fd_t s_, int rc_);
|
||||||
|
|
||||||
void tcp_tune_loopback_fast_path (const fd_t socket_);
|
void tcp_tune_loopback_fast_path (const fd_t socket_);
|
||||||
|
|
||||||
|
// Resolves the given address_ string, opens a socket and sets socket options
|
||||||
|
// according to the passed options_. On success, returns the socket
|
||||||
|
// descriptor and assigns the resolved address to out_tcp_addr_. In case of
|
||||||
|
// an error, retired_fd is returned, and the value of out_tcp_addr_ is undefined.
|
||||||
|
// errno is set to an error code describing the cause of the error.
|
||||||
|
fd_t tcp_open_socket (const char *address_,
|
||||||
|
const options_t &options_,
|
||||||
|
tcp_address_t *out_tcp_addr_);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -174,63 +174,17 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
|
|
||||||
_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
|
_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
|
||||||
alloc_assert (_addr->resolved.tcp_addr);
|
alloc_assert (_addr->resolved.tcp_addr);
|
||||||
int rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false,
|
_s = tcp_open_socket (_addr->address.c_str (), options,
|
||||||
options.ipv6);
|
_addr->resolved.tcp_addr);
|
||||||
if (rc != 0) {
|
if (_s == retired_fd) {
|
||||||
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
|
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
zmq_assert (_addr->resolved.tcp_addr != NULL);
|
zmq_assert (_addr->resolved.tcp_addr != NULL);
|
||||||
|
|
||||||
const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
|
const tcp_address_t *const tcp_addr = _addr->resolved.tcp_addr;
|
||||||
|
|
||||||
// Create the socket.
|
int rc;
|
||||||
_s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
|
|
||||||
|
|
||||||
// IPv6 address family not supported, try automatic downgrade to IPv4.
|
|
||||||
if (_s == zmq::retired_fd && tcp_addr->family () == AF_INET6
|
|
||||||
&& errno == EAFNOSUPPORT && options.ipv6) {
|
|
||||||
rc = _addr->resolved.tcp_addr->resolve (_addr->address.c_str (), false,
|
|
||||||
false);
|
|
||||||
if (rc != 0) {
|
|
||||||
LIBZMQ_DELETE (_addr->resolved.tcp_addr);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
_s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_s == retired_fd) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
|
|
||||||
// Switch it on in such cases.
|
|
||||||
if (tcp_addr->family () == AF_INET6)
|
|
||||||
enable_ipv4_mapping (_s);
|
|
||||||
|
|
||||||
// Set the IP Type-Of-Service priority for this socket
|
|
||||||
if (options.tos != 0)
|
|
||||||
set_ip_type_of_service (_s, options.tos);
|
|
||||||
|
|
||||||
// Bind the socket to a device if applicable
|
|
||||||
if (!options.bound_device.empty ())
|
|
||||||
bind_to_device (_s, options.bound_device);
|
|
||||||
|
|
||||||
// Set the socket to non-blocking mode so that we get async connect().
|
|
||||||
unblock_socket (_s);
|
|
||||||
|
|
||||||
// Set the socket to loopback fastpath if configured.
|
|
||||||
if (options.loopback_fastpath)
|
|
||||||
tcp_tune_loopback_fast_path (_s);
|
|
||||||
|
|
||||||
// Set the socket buffer limits for the underlying socket.
|
|
||||||
if (options.sndbuf >= 0)
|
|
||||||
set_tcp_send_buffer (_s, options.sndbuf);
|
|
||||||
if (options.rcvbuf >= 0)
|
|
||||||
set_tcp_receive_buffer (_s, options.rcvbuf);
|
|
||||||
|
|
||||||
// Set the IP Type-Of-Service for the underlying socket
|
|
||||||
if (options.tos != 0)
|
|
||||||
set_ip_type_of_service (_s, options.tos);
|
|
||||||
|
|
||||||
// Set a source address for conversations
|
// Set a source address for conversations
|
||||||
if (tcp_addr->has_src_addr ()) {
|
if (tcp_addr->has_src_addr ()) {
|
||||||
|
@ -103,54 +103,23 @@ zmq::tcp_listener_t::get_socket_name (zmq::fd_t fd_,
|
|||||||
|
|
||||||
int zmq::tcp_listener_t::create_socket (const char *addr_)
|
int zmq::tcp_listener_t::create_socket (const char *addr_)
|
||||||
{
|
{
|
||||||
// Convert the textual address into address structure.
|
_s = tcp_open_socket (addr_, options, &_address);
|
||||||
int rc = _address.resolve (addr_, true, options.ipv6);
|
|
||||||
if (rc != 0)
|
|
||||||
return -1;
|
|
||||||
|
|
||||||
// Create a listening socket.
|
|
||||||
_s = open_socket (_address.family (), SOCK_STREAM, IPPROTO_TCP);
|
|
||||||
|
|
||||||
// IPv6 address family not supported, try automatic downgrade to IPv4.
|
|
||||||
if (_s == zmq::retired_fd && _address.family () == AF_INET6
|
|
||||||
&& errno == EAFNOSUPPORT && options.ipv6) {
|
|
||||||
rc = _address.resolve (addr_, true, false);
|
|
||||||
if (rc != 0)
|
|
||||||
return rc;
|
|
||||||
_s = open_socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_s == retired_fd) {
|
if (_s == retired_fd) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO why is this only done for the listener?
|
||||||
make_socket_noninheritable (_s);
|
make_socket_noninheritable (_s);
|
||||||
|
|
||||||
// On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
|
|
||||||
// Switch it on in such cases.
|
|
||||||
if (_address.family () == AF_INET6)
|
|
||||||
enable_ipv4_mapping (_s);
|
|
||||||
|
|
||||||
// Set the IP Type-Of-Service for the underlying socket
|
|
||||||
if (options.tos != 0)
|
|
||||||
set_ip_type_of_service (_s, options.tos);
|
|
||||||
|
|
||||||
// Set the socket to loopback fastpath if configured.
|
|
||||||
if (options.loopback_fastpath)
|
|
||||||
tcp_tune_loopback_fast_path (_s);
|
|
||||||
|
|
||||||
// Bind the socket to a device if applicable
|
|
||||||
if (!options.bound_device.empty ())
|
|
||||||
bind_to_device (_s, options.bound_device);
|
|
||||||
|
|
||||||
// Set the socket buffer limits for the underlying socket.
|
|
||||||
if (options.sndbuf >= 0)
|
|
||||||
set_tcp_send_buffer (_s, options.sndbuf);
|
|
||||||
if (options.rcvbuf >= 0)
|
|
||||||
set_tcp_receive_buffer (_s, options.rcvbuf);
|
|
||||||
|
|
||||||
// Allow reusing of the address.
|
// Allow reusing of the address.
|
||||||
int flag = 1;
|
int flag = 1;
|
||||||
|
int rc;
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
|
// TODO this was changed for Windows from SO_REUSEADDRE to
|
||||||
|
// SE_EXCLUSIVEADDRUSE by 0ab65324195ad70205514d465b03d851a6de051c,
|
||||||
|
// so the comment above is no longer correct; also, now the settings are
|
||||||
|
// different between listener and connecter with a src address.
|
||||||
|
// is this intentional?
|
||||||
rc = setsockopt (_s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
|
rc = setsockopt (_s, SOL_SOCKET, SO_EXCLUSIVEADDRUSE,
|
||||||
reinterpret_cast<const char *> (&flag), sizeof (int));
|
reinterpret_cast<const char *> (&flag), sizeof (int));
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user