mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-27 11:06:52 +01:00
Automatically resize signalling socket buffer if full
If the socketpair used by signaler_t fills up, this can lead to deadlock. This patch provides partial resolution by attempting to resize SO_SNDBUF on the writer side, and if that fails we shall at least assert rather than hang. I've also refactored the signaler_t code to make the platform-dependent parts clearer and have tested both the MSG_DONTWAIT and standard POSIX path in recv. The Win32 implementation currently does not implement resizing as I'm not convinced that it's safe, but it will also assert like other platforms if signaler_t::send() cannot succeed. The OpenVMS implementation has been carried forward but is untested. Signed-off-by: Martin Lucina <mato@kotelna.sk>
This commit is contained in:
committed by
Martin Sustrik
parent
756f7df8c8
commit
1b39bcd883
@@ -35,10 +35,6 @@ namespace zmq
|
|||||||
// memory allocation by approximately 99.6%
|
// memory allocation by approximately 99.6%
|
||||||
message_pipe_granularity = 256,
|
message_pipe_granularity = 256,
|
||||||
|
|
||||||
// Socketpair send buffer size used by signaler. The default value of
|
|
||||||
// zero means leave it at the system default.
|
|
||||||
signaler_sndbuf_size = 0,
|
|
||||||
|
|
||||||
// Determines how often does socket poll for new commands when it
|
// Determines how often does socket poll for new commands when it
|
||||||
// still has unprocessed messages to handle. Thus, if it is set to 100,
|
// still has unprocessed messages to handle. Thus, if it is set to 100,
|
||||||
// socket will process 100 inbound messages before doing the poll.
|
// socket will process 100 inbound messages before doing the poll.
|
||||||
|
|||||||
433
src/signaler.cpp
433
src/signaler.cpp
@@ -23,15 +23,16 @@
|
|||||||
#include "fd.hpp"
|
#include "fd.hpp"
|
||||||
#include "ip.hpp"
|
#include "ip.hpp"
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_OPENVMS
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
#include <netinet/tcp.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
#elif defined ZMQ_HAVE_WINDOWS
|
|
||||||
#include "windows.hpp"
|
#include "windows.hpp"
|
||||||
#else
|
#else
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <limits.h>
|
#include <limits.h>
|
||||||
|
#include <netinet/tcp.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
zmq::fd_t zmq::signaler_t::get_fd ()
|
zmq::fd_t zmq::signaler_t::get_fd ()
|
||||||
@@ -43,62 +44,18 @@ zmq::fd_t zmq::signaler_t::get_fd ()
|
|||||||
|
|
||||||
zmq::signaler_t::signaler_t ()
|
zmq::signaler_t::signaler_t ()
|
||||||
{
|
{
|
||||||
// Windows have no 'socketpair' function. CreatePipe is no good as pipe
|
// Create the socketpair for signalling.
|
||||||
// handles cannot be polled on. Here we create the socketpair by hand.
|
int rc = make_socketpair (&r, &w);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
|
||||||
struct sockaddr_in addr;
|
// Set the writer to non-blocking mode.
|
||||||
SOCKET listener;
|
|
||||||
int addrlen = sizeof (addr);
|
|
||||||
|
|
||||||
w = INVALID_SOCKET;
|
|
||||||
r = INVALID_SOCKET;
|
|
||||||
|
|
||||||
fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0));
|
|
||||||
wsa_assert (rcs != INVALID_SOCKET);
|
|
||||||
|
|
||||||
memset (&addr, 0, sizeof (addr));
|
|
||||||
addr.sin_family = AF_INET;
|
|
||||||
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
|
|
||||||
addr.sin_port = 0;
|
|
||||||
|
|
||||||
int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
|
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
|
||||||
|
|
||||||
rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
|
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
|
||||||
|
|
||||||
// Listen for incomming connections.
|
|
||||||
rc = listen (listener, 1);
|
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
|
||||||
|
|
||||||
// Create the socket.
|
|
||||||
w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
|
|
||||||
wsa_assert (w != INVALID_SOCKET);
|
|
||||||
|
|
||||||
// Increase signaler SNDBUF if requested in config.hpp.
|
|
||||||
if (signaler_sndbuf_size) {
|
|
||||||
int sndbuf = signaler_sndbuf_size;
|
|
||||||
socklen_t sndbuf_size = sizeof sndbuf;
|
|
||||||
rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf,
|
|
||||||
sndbuf_size);
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connect to the remote peer.
|
|
||||||
rc = connect (w, (sockaddr *) &addr, sizeof (addr));
|
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
|
||||||
|
|
||||||
// Accept connection from w.
|
|
||||||
r = accept (listener, NULL, NULL);
|
|
||||||
wsa_assert (r != INVALID_SOCKET);
|
|
||||||
|
|
||||||
// Set the read site of the pair to non-blocking mode.
|
|
||||||
unsigned long argp = 1;
|
unsigned long argp = 1;
|
||||||
rc = ioctlsocket (r, FIONBIO, &argp);
|
rc = ioctlsocket (w, FIONBIO, &argp);
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
// We don't need the listening socket anymore. Close it.
|
// Set the reader to non-blocking mode.
|
||||||
rc = closesocket (listener);
|
argp = 1;
|
||||||
|
rc = ioctlsocket (r, FIONBIO, &argp);
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,166 +70,77 @@ zmq::signaler_t::~signaler_t ()
|
|||||||
|
|
||||||
void zmq::signaler_t::send (const command_t &cmd_)
|
void zmq::signaler_t::send (const command_t &cmd_)
|
||||||
{
|
{
|
||||||
// TODO: Note that send is a blocking operation.
|
// TODO: Implement SNDBUF auto-resizing as for POSIX platforms.
|
||||||
// How should we behave if the signal cannot be written to the signaler?
|
// In the mean time, the following code with assert if the send()
|
||||||
// Even worse: What if half of a command is written?
|
// call would block.
|
||||||
int rc = ::send (w, (char*) &cmd_, sizeof (command_t), 0);
|
int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0);
|
||||||
win_assert (rc != SOCKET_ERROR);
|
wsa_assert (nbytes != SOCKET_ERROR);
|
||||||
zmq_assert (rc == sizeof (command_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
int zmq::signaler_t::recv (command_t *cmd_, bool block_)
|
|
||||||
{
|
|
||||||
if (block_) {
|
|
||||||
|
|
||||||
// Switch to blocking mode.
|
|
||||||
unsigned long argp = 0;
|
|
||||||
int rc = ioctlsocket (r, FIONBIO, &argp);
|
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
int err;
|
|
||||||
int result;
|
|
||||||
int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
|
|
||||||
if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
|
|
||||||
err = EAGAIN;
|
|
||||||
result = -1;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
wsa_assert (nbytes != -1);
|
|
||||||
|
|
||||||
// Check whether we haven't got half of a signal.
|
|
||||||
zmq_assert (nbytes % sizeof (uint32_t) == 0);
|
|
||||||
|
|
||||||
result = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (block_) {
|
|
||||||
|
|
||||||
// Switch back to non-blocking mode.
|
|
||||||
unsigned long argp = 1;
|
|
||||||
int rc = ioctlsocket (r, FIONBIO, &argp);
|
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (result == -1)
|
|
||||||
errno = err;
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX
|
|
||||||
|
|
||||||
#include <sys/types.h>
|
|
||||||
#include <sys/socket.h>
|
|
||||||
|
|
||||||
zmq::signaler_t::signaler_t ()
|
|
||||||
{
|
|
||||||
int sv [2];
|
|
||||||
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
w = sv [0];
|
|
||||||
r = sv [1];
|
|
||||||
|
|
||||||
// Set the reader to non-blocking mode.
|
|
||||||
int flags = fcntl (r, F_GETFL, 0);
|
|
||||||
if (flags == -1)
|
|
||||||
flags = 0;
|
|
||||||
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
|
|
||||||
errno_assert (rc != -1);
|
|
||||||
|
|
||||||
// Increase signaler SNDBUF if requested in config.hpp.
|
|
||||||
if (signaler_sndbuf_size) {
|
|
||||||
int sndbuf = signaler_sndbuf_size;
|
|
||||||
socklen_t sndbuf_size = sizeof sndbuf;
|
|
||||||
rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size);
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
zmq::signaler_t::~signaler_t ()
|
|
||||||
{
|
|
||||||
close (w);
|
|
||||||
close (r);
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::signaler_t::send (const command_t &cmd_)
|
|
||||||
{
|
|
||||||
ssize_t nbytes;
|
|
||||||
do {
|
|
||||||
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
|
|
||||||
} while (nbytes == -1 && errno == EINTR);
|
|
||||||
errno_assert (nbytes != -1);
|
|
||||||
zmq_assert (nbytes == sizeof (command_t));
|
zmq_assert (nbytes == sizeof (command_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::signaler_t::recv (command_t *cmd_, bool block_)
|
int zmq::signaler_t::recv (command_t *cmd_, bool block_)
|
||||||
{
|
{
|
||||||
if (block_) {
|
if (block_) {
|
||||||
|
|
||||||
// Set the reader to blocking mode.
|
// Set the reader to blocking mode.
|
||||||
int flags = fcntl (r, F_GETFL, 0);
|
unsigned long argp = 0;
|
||||||
if (flags == -1)
|
int rc = ioctlsocket (r, FIONBIO, &argp);
|
||||||
flags = 0;
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
|
|
||||||
errno_assert (rc != -1);
|
|
||||||
}
|
}
|
||||||
|
// Attempt to read an entire command. Returns EAGAIN if non-blocking
|
||||||
int err;
|
// and a command is not available.
|
||||||
int result;
|
int err = 0;
|
||||||
ssize_t nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
|
int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0);
|
||||||
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
|
if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
|
||||||
err = errno;
|
// Save value of errno if we wish to pass it to caller.
|
||||||
result = -1;
|
err = EAGAIN;
|
||||||
}
|
}
|
||||||
else {
|
if (block_) {
|
||||||
zmq_assert (nbytes != -1);
|
// Re-set the reader to non-blocking mode.
|
||||||
|
unsigned long argp = 1;
|
||||||
// Check whether we haven't got half of command.
|
int rc = ioctlsocket (r, FIONBIO, &argp);
|
||||||
zmq_assert (nbytes == sizeof (command_t));
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
result = 0;
|
|
||||||
}
|
}
|
||||||
|
// If the recv failed, return with the saved errno if set.
|
||||||
if (block_) {
|
if (err != 0) {
|
||||||
|
|
||||||
// Set the reader to non-blocking mode.
|
|
||||||
int flags = fcntl (r, F_GETFL, 0);
|
|
||||||
if (flags == -1)
|
|
||||||
flags = 0;
|
|
||||||
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
|
|
||||||
errno_assert (rc != -1);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (result == -1)
|
|
||||||
errno = err;
|
errno = err;
|
||||||
return result;
|
return -1;
|
||||||
|
}
|
||||||
|
// Sanity check for success.
|
||||||
|
wsa_assert (nbytes != SOCKET_ERROR);
|
||||||
|
|
||||||
|
// Check whether we haven't got half of command.
|
||||||
|
zmq_assert (nbytes == sizeof (command_t));
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#else
|
#else // !ZMQ_HAVE_WINDOWS
|
||||||
|
|
||||||
#include <sys/types.h>
|
|
||||||
#include <sys/socket.h>
|
|
||||||
|
|
||||||
zmq::signaler_t::signaler_t ()
|
zmq::signaler_t::signaler_t ()
|
||||||
{
|
{
|
||||||
|
#ifdef PIPE_BUF
|
||||||
// Make sure that command can be written to the socket in atomic fashion.
|
// Make sure that command can be written to the socket in atomic fashion.
|
||||||
// If this wasn't guaranteed, commands from different threads would be
|
// If this wasn't guaranteed, commands from different threads would be
|
||||||
// interleaved.
|
// interleaved.
|
||||||
zmq_assert (sizeof (command_t) <= PIPE_BUF);
|
zmq_assert (sizeof (command_t) <= PIPE_BUF);
|
||||||
|
#endif
|
||||||
|
|
||||||
int sv [2];
|
// Create the socketpair for signalling.
|
||||||
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
|
int rc = make_socketpair (&r, &w);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
w = sv [0];
|
|
||||||
r = sv [1];
|
|
||||||
|
|
||||||
// Increase signaler SNDBUF if requested in config.hpp.
|
// Set the writer to non-blocking mode.
|
||||||
if (signaler_sndbuf_size) {
|
int flags = fcntl (w, F_GETFL, 0);
|
||||||
int sndbuf = signaler_sndbuf_size;
|
errno_assert (flags >= 0);
|
||||||
socklen_t sndbuf_size = sizeof sndbuf;
|
rc = fcntl (w, F_SETFL, flags | O_NONBLOCK);
|
||||||
rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size);
|
errno_assert (rc == 0);
|
||||||
errno_assert (rc == 0);
|
|
||||||
}
|
#ifndef MSG_DONTWAIT
|
||||||
|
// Set the reader to non-blocking mode.
|
||||||
|
flags = fcntl (r, F_GETFL, 0);
|
||||||
|
errno_assert (flags >= 0);
|
||||||
|
rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::signaler_t::~signaler_t ()
|
zmq::signaler_t::~signaler_t ()
|
||||||
@@ -283,12 +151,32 @@ zmq::signaler_t::~signaler_t ()
|
|||||||
|
|
||||||
void zmq::signaler_t::send (const command_t &cmd_)
|
void zmq::signaler_t::send (const command_t &cmd_)
|
||||||
{
|
{
|
||||||
// TODO: Note that send is a blocking operation.
|
// Attempt to write an entire command without blocking.
|
||||||
// How should we behave if the command cannot be written to the signaler?
|
|
||||||
ssize_t nbytes;
|
ssize_t nbytes;
|
||||||
do {
|
do {
|
||||||
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
|
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
|
||||||
} while (nbytes == -1 && errno == EINTR);
|
} while (nbytes == -1 && errno == EINTR);
|
||||||
|
// Attempt to increase signaler SNDBUF if the send failed.
|
||||||
|
if (nbytes == -1 && errno == EAGAIN) {
|
||||||
|
int old_sndbuf, new_sndbuf;
|
||||||
|
socklen_t sndbuf_size = sizeof old_sndbuf;
|
||||||
|
// Retrieve current send buffer size.
|
||||||
|
int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf,
|
||||||
|
&sndbuf_size);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
new_sndbuf = old_sndbuf * 2;
|
||||||
|
// Double the new send buffer size.
|
||||||
|
rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
// Verify that the OS actually honored the request.
|
||||||
|
rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
zmq_assert (new_sndbuf > old_sndbuf);
|
||||||
|
// Retry the sending operation; at this point it must succeed.
|
||||||
|
do {
|
||||||
|
nbytes = ::send (w, &cmd_, sizeof (command_t), 0);
|
||||||
|
} while (nbytes == -1 && errno == EINTR);
|
||||||
|
}
|
||||||
errno_assert (nbytes != -1);
|
errno_assert (nbytes != -1);
|
||||||
|
|
||||||
// This should never happen as we've already checked that command size is
|
// This should never happen as we've already checked that command size is
|
||||||
@@ -298,10 +186,43 @@ void zmq::signaler_t::send (const command_t &cmd_)
|
|||||||
|
|
||||||
int zmq::signaler_t::recv (command_t *cmd_, bool block_)
|
int zmq::signaler_t::recv (command_t *cmd_, bool block_)
|
||||||
{
|
{
|
||||||
ssize_t nbytes;
|
#ifdef MSG_DONTWAIT
|
||||||
nbytes = ::recv (r, cmd_, sizeof (command_t), block_ ? 0 : MSG_DONTWAIT);
|
// Attempt to read an entire command. Returns EAGAIN if non-blocking
|
||||||
|
// mode is requested and a command is not available.
|
||||||
|
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t),
|
||||||
|
block_ ? 0 : MSG_DONTWAIT);
|
||||||
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
|
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
|
||||||
return -1;
|
return -1;
|
||||||
|
#else
|
||||||
|
if (block_) {
|
||||||
|
// Set the reader to blocking mode.
|
||||||
|
int flags = fcntl (r, F_GETFL, 0);
|
||||||
|
errno_assert (flags >= 0);
|
||||||
|
int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
}
|
||||||
|
// Attempt to read an entire command. Returns EAGAIN if non-blocking
|
||||||
|
// and a command is not available.
|
||||||
|
int err = 0;
|
||||||
|
ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0);
|
||||||
|
if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
|
||||||
|
// Save value of errno if we wish to pass it to caller.
|
||||||
|
err = errno;
|
||||||
|
}
|
||||||
|
if (block_) {
|
||||||
|
// Re-set the reader to non-blocking mode.
|
||||||
|
int flags = fcntl (r, F_GETFL, 0);
|
||||||
|
errno_assert (flags >= 0);
|
||||||
|
int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
}
|
||||||
|
// If the recv failed, return with the saved errno if set.
|
||||||
|
if (err != 0) {
|
||||||
|
errno = err;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
// Sanity check for success.
|
||||||
errno_assert (nbytes != -1);
|
errno_assert (nbytes != -1);
|
||||||
|
|
||||||
// Check whether we haven't got half of command.
|
// Check whether we haven't got half of command.
|
||||||
@@ -312,29 +233,90 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_)
|
|||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_OPENVMS
|
int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_)
|
||||||
|
|
||||||
int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_,
|
|
||||||
int sv_ [2])
|
|
||||||
{
|
{
|
||||||
int listener;
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
|
|
||||||
|
// Windows has no 'socketpair' function. CreatePipe is no good as pipe
|
||||||
|
// handles cannot be polled on. Here we create the socketpair by hand.
|
||||||
|
*w_ = INVALID_SOCKET;
|
||||||
|
*r_ = INVALID_SOCKET;
|
||||||
|
|
||||||
|
// Create listening socket.
|
||||||
|
SOCKET listener;
|
||||||
|
listener = socket (AF_INET, SOCK_STREAM, 0);
|
||||||
|
wsa_assert (listener != INVALID_SOCKET);
|
||||||
|
|
||||||
|
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
|
||||||
|
BOOL so_reuseaddr = 1;
|
||||||
|
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
|
||||||
|
(char *)&so_reuseaddr, sizeof (so_reuseaddr));
|
||||||
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
BOOL tcp_nodelay = 1;
|
||||||
|
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
|
||||||
|
(char *)&tcp_nodelay, sizeof (tcp_nodelay));
|
||||||
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
|
// Bind listening socket to any free local port.
|
||||||
|
struct sockaddr_in addr;
|
||||||
|
memset (&addr, 0, sizeof (addr));
|
||||||
|
addr.sin_family = AF_INET;
|
||||||
|
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
|
||||||
|
addr.sin_port = 0;
|
||||||
|
rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr));
|
||||||
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
|
// Retrieve local port listener is bound to (into addr).
|
||||||
|
int addrlen = sizeof (addr);
|
||||||
|
rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
|
||||||
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
|
// Listen for incomming connections.
|
||||||
|
rc = listen (listener, 1);
|
||||||
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
|
// Create the writer socket.
|
||||||
|
*w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0);
|
||||||
|
wsa_assert (*w_ != INVALID_SOCKET);
|
||||||
|
|
||||||
|
// Set TCP_NODELAY on writer socket.
|
||||||
|
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY,
|
||||||
|
(char *)&tcp_nodelay, sizeof (tcp_nodelay));
|
||||||
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
|
// Connect writer to the listener.
|
||||||
|
rc = connect (*w_, (sockaddr *) &addr, sizeof (addr));
|
||||||
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
|
// Accept connection from writer.
|
||||||
|
*r_ = accept (listener, NULL, NULL);
|
||||||
|
wsa_assert (*r_ != INVALID_SOCKET);
|
||||||
|
|
||||||
|
// We don't need the listening socket anymore. Close it.
|
||||||
|
rc = closesocket (listener);
|
||||||
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
#elif defined ZMQ_HAVE_OPENVMS
|
||||||
|
|
||||||
|
// Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
|
||||||
|
// it does not set the socket options TCP_NODELAY and TCP_NODELACK which
|
||||||
|
// can lead to performance problems.
|
||||||
|
//
|
||||||
|
// The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll
|
||||||
|
// create the socket pair manually.
|
||||||
sockaddr_in lcladdr;
|
sockaddr_in lcladdr;
|
||||||
socklen_t lcladdr_len;
|
|
||||||
int rc;
|
|
||||||
int on = 1;
|
|
||||||
|
|
||||||
zmq_assert (type_ == SOCK_STREAM);
|
|
||||||
|
|
||||||
// Fill in the localhost address (127.0.0.1).
|
|
||||||
memset (&lcladdr, 0, sizeof (lcladdr));
|
memset (&lcladdr, 0, sizeof (lcladdr));
|
||||||
lcladdr.sin_family = AF_INET;
|
lcladdr.sin_family = AF_INET;
|
||||||
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
|
lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
|
||||||
lcladdr.sin_port = 0;
|
lcladdr.sin_port = 0;
|
||||||
|
|
||||||
listener = socket (AF_INET, SOCK_STREAM, 0);
|
int listener = socket (AF_INET, SOCK_STREAM, 0);
|
||||||
errno_assert (listener != -1);
|
errno_assert (listener != -1);
|
||||||
|
|
||||||
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
|
int on = 1;
|
||||||
|
int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
|
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
|
||||||
@@ -343,7 +325,7 @@ int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_,
|
|||||||
rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
|
rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
lcladdr_len = sizeof (lcladdr);
|
socklen_t lcladdr_len = sizeof (lcladdr);
|
||||||
|
|
||||||
rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
|
rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
@@ -351,25 +333,34 @@ int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_,
|
|||||||
rc = listen (listener, 1);
|
rc = listen (listener, 1);
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
sv_ [0] = socket (AF_INET, SOCK_STREAM, 0);
|
*w_ = socket (AF_INET, SOCK_STREAM, 0);
|
||||||
|
errno_assert (*w_ != -1);
|
||||||
|
|
||||||
|
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on));
|
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on));
|
rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr));
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
rc = connect (sv_ [0], (struct sockaddr*) &lcladdr, sizeof (lcladdr));
|
*r_ = accept (listener, NULL, NULL);
|
||||||
errno_assert (rc != -1);
|
errno_assert (*r_ != -1);
|
||||||
|
|
||||||
sv_ [1] = accept (listener, NULL, NULL);
|
|
||||||
errno_assert (sv_ [1] != -1);
|
|
||||||
|
|
||||||
close (listener);
|
close (listener);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
|
||||||
|
#else // All other implementations support socketpair()
|
||||||
|
|
||||||
|
int sv [2];
|
||||||
|
int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv);
|
||||||
|
errno_assert (rc == 0);
|
||||||
|
*w_ = sv [0];
|
||||||
|
*r_ = sv [1];
|
||||||
|
return 0;
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -44,24 +44,14 @@ namespace zmq
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_OPENVMS
|
|
||||||
|
|
||||||
// Whilst OpenVMS supports socketpair - it maps to AF_INET only.
|
|
||||||
// Further, it does not set the socket options TCP_NODELAY and
|
|
||||||
// TCP_NODELACK which can lead to performance problems. We'll
|
|
||||||
// overload the socketpair function for this class.
|
|
||||||
//
|
|
||||||
// The bug will be fixed in V5.6 ECO4 and beyond. In the
|
|
||||||
// meantime, we'll create the socket pair manually.
|
|
||||||
static int socketpair (int domain_, int type_, int protocol_,
|
|
||||||
int sv_ [2]);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// Write & read end of the socketpair.
|
// Write & read end of the socketpair.
|
||||||
fd_t w;
|
fd_t w;
|
||||||
fd_t r;
|
fd_t r;
|
||||||
|
|
||||||
// Disable copying of fd_signeler object.
|
// Platform-dependent function to create a socketpair.
|
||||||
|
static int make_socketpair (fd_t *r_, fd_t *w_);
|
||||||
|
|
||||||
|
// Disable copying of signaler_t object.
|
||||||
signaler_t (const signaler_t&);
|
signaler_t (const signaler_t&);
|
||||||
void operator = (const signaler_t&);
|
void operator = (const signaler_t&);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user