Implementations of TCP and IPC transports separated

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik
2011-07-28 13:19:55 +02:00
parent 6e987428d4
commit 5ac63140b0
11 changed files with 1008 additions and 45 deletions

View File

@@ -47,7 +47,7 @@
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
class session_t *session_, const options_t &options_,
const char *protocol_, const char *address_, bool wait_) :
const char *address_, bool wait_) :
own_t (io_thread_, options_),
io_object_t (io_thread_),
s (retired_fd),
@@ -59,8 +59,10 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
memset (&addr, 0, sizeof (addr));
addr_len = 0;
int rc = set_address (protocol_, address_);
zmq_assert (rc == 0); //TODO: take care ENOMEM, EINVAL
// TODO: set_addess should be called separately, so that the error
// can be propagated.
int rc = set_address (address_);
zmq_assert (rc == 0);
}
zmq::tcp_connecter_t::~tcp_connecter_t ()
@@ -104,6 +106,26 @@ void zmq::tcp_connecter_t::out_event ()
return;
}
// Disable Nagle's algorithm. We are doing data batching on 0MQ level,
// so using Nagle wouldn't improve throughput in anyway, but it would
// hurt latency.
int nodelay = 1;
int rc = setsockopt (fd, IPPROTO_TCP, TCP_NODELAY, (char*) &nodelay,
sizeof (int));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
#ifdef ZMQ_HAVE_OPENVMS
// Disable delayed acknowledgements as they hurt latency is serious manner.
int nodelack = 1;
rc = setsockopt (fd, IPPROTO_TCP, TCP_NODELACK, (char*) &nodelack,
sizeof (int));
errno_assert (rc != SOCKET_ERROR);
#endif
// Create the engine object for this connection.
tcp_engine_t *engine = new (std::nothrow) tcp_engine_t (fd, options);
alloc_assert (engine);
@@ -175,13 +197,9 @@ int zmq::tcp_connecter_t::get_new_reconnect_ivl ()
#ifdef ZMQ_HAVE_WINDOWS
int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)
int zmq::tcp_connecter_t::set_address (const char *addr_)
{
if (strcmp (protocol_, "tcp") == 0)
return resolve_ip_hostname (&addr, &addr_len, addr_);
errno = EPROTONOSUPPORT;
return -1;
return resolve_ip_hostname (&addr, &addr_len, addr_);
}
int zmq::tcp_connecter_t::open ()
@@ -254,15 +272,9 @@ zmq::fd_t zmq::tcp_connecter_t::connect ()
#else
int zmq::tcp_connecter_t::set_address (const char *protocol_, const char *addr_)
int zmq::tcp_connecter_t::set_address (const char *addr_)
{
if (strcmp (protocol_, "tcp") == 0)
return resolve_ip_hostname (&addr, &addr_len, addr_);
else if (strcmp (protocol_, "ipc") == 0)
return resolve_local_path (&addr, &addr_len, addr_);
errno = EPROTONOSUPPORT;
return -1;
return resolve_ip_hostname (&addr, &addr_len, addr_);
}
int zmq::tcp_connecter_t::open ()