diff --git a/src/config.hpp b/src/config.hpp index fb1b0d46..1db3bb68 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -35,10 +35,6 @@ namespace zmq // memory allocation by approximately 99.6% message_pipe_granularity = 256, - // Socketpair send buffer size used by signaler. The default value of - // zero means leave it at the system default. - signaler_sndbuf_size = 0, - // Determines how often does socket poll for new commands when it // still has unprocessed messages to handle. Thus, if it is set to 100, // socket will process 100 inbound messages before doing the poll. diff --git a/src/signaler.cpp b/src/signaler.cpp index 08edb1ef..a6920782 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -23,15 +23,16 @@ #include "fd.hpp" #include "ip.hpp" -#if defined ZMQ_HAVE_OPENVMS -#include -#include -#elif defined ZMQ_HAVE_WINDOWS +#if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" #else #include #include #include +#include +#include +#include +#include #endif zmq::fd_t zmq::signaler_t::get_fd () @@ -43,62 +44,18 @@ zmq::fd_t zmq::signaler_t::get_fd () zmq::signaler_t::signaler_t () { - // Windows have no 'socketpair' function. CreatePipe is no good as pipe - // handles cannot be polled on. Here we create the socketpair by hand. + // Create the socketpair for signalling. + int rc = make_socketpair (&r, &w); + errno_assert (rc == 0); - struct sockaddr_in addr; - SOCKET listener; - int addrlen = sizeof (addr); - - w = INVALID_SOCKET; - r = INVALID_SOCKET; - - fd_t rcs = (listener = socket (AF_INET, SOCK_STREAM, 0)); - wsa_assert (rcs != INVALID_SOCKET); - - memset (&addr, 0, sizeof (addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); - addr.sin_port = 0; - - int rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); - wsa_assert (rc != SOCKET_ERROR); - - rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); - wsa_assert (rc != SOCKET_ERROR); - - // Listen for incomming connections. - rc = listen (listener, 1); - wsa_assert (rc != SOCKET_ERROR); - - // Create the socket. - w = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); - wsa_assert (w != INVALID_SOCKET); - - // Increase signaler SNDBUF if requested in config.hpp. - if (signaler_sndbuf_size) { - int sndbuf = signaler_sndbuf_size; - socklen_t sndbuf_size = sizeof sndbuf; - rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, (const char *)&sndbuf, - sndbuf_size); - errno_assert (rc == 0); - } - - // Connect to the remote peer. - rc = connect (w, (sockaddr *) &addr, sizeof (addr)); - wsa_assert (rc != SOCKET_ERROR); - - // Accept connection from w. - r = accept (listener, NULL, NULL); - wsa_assert (r != INVALID_SOCKET); - - // Set the read site of the pair to non-blocking mode. + // Set the writer to non-blocking mode. unsigned long argp = 1; - rc = ioctlsocket (r, FIONBIO, &argp); + rc = ioctlsocket (w, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); - // We don't need the listening socket anymore. Close it. - rc = closesocket (listener); + // Set the reader to non-blocking mode. + argp = 1; + rc = ioctlsocket (r, FIONBIO, &argp); wsa_assert (rc != SOCKET_ERROR); } @@ -113,166 +70,77 @@ zmq::signaler_t::~signaler_t () void zmq::signaler_t::send (const command_t &cmd_) { - // TODO: Note that send is a blocking operation. - // How should we behave if the signal cannot be written to the signaler? - // Even worse: What if half of a command is written? - int rc = ::send (w, (char*) &cmd_, sizeof (command_t), 0); - win_assert (rc != SOCKET_ERROR); - zmq_assert (rc == sizeof (command_t)); -} - -int zmq::signaler_t::recv (command_t *cmd_, bool block_) -{ - if (block_) { - - // Switch to blocking mode. - unsigned long argp = 0; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - } - - int err; - int result; - int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); - if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { - err = EAGAIN; - result = -1; - } - else { - wsa_assert (nbytes != -1); - - // Check whether we haven't got half of a signal. - zmq_assert (nbytes % sizeof (uint32_t) == 0); - - result = 0; - } - - if (block_) { - - // Switch back to non-blocking mode. - unsigned long argp = 1; - int rc = ioctlsocket (r, FIONBIO, &argp); - wsa_assert (rc != SOCKET_ERROR); - } - - if (result == -1) - errno = err; - return result; -} - -#elif defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_AIX - -#include -#include - -zmq::signaler_t::signaler_t () -{ - int sv [2]; - int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); - errno_assert (rc == 0); - w = sv [0]; - r = sv [1]; - - // Set the reader to non-blocking mode. - int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); - - // Increase signaler SNDBUF if requested in config.hpp. - if (signaler_sndbuf_size) { - int sndbuf = signaler_sndbuf_size; - socklen_t sndbuf_size = sizeof sndbuf; - rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); - errno_assert (rc == 0); - } -} - -zmq::signaler_t::~signaler_t () -{ - close (w); - close (r); -} - -void zmq::signaler_t::send (const command_t &cmd_) -{ - ssize_t nbytes; - do { - nbytes = ::send (w, &cmd_, sizeof (command_t), 0); - } while (nbytes == -1 && errno == EINTR); - errno_assert (nbytes != -1); + // TODO: Implement SNDBUF auto-resizing as for POSIX platforms. + // In the mean time, the following code with assert if the send() + // call would block. + int nbytes = ::send (w, (char *)&cmd_, sizeof (command_t), 0); + wsa_assert (nbytes != SOCKET_ERROR); zmq_assert (nbytes == sizeof (command_t)); } int zmq::signaler_t::recv (command_t *cmd_, bool block_) { if (block_) { - // Set the reader to blocking mode. - int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); - errno_assert (rc != -1); + unsigned long argp = 0; + int rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); } - - int err; - int result; - ssize_t nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); - if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { - err = errno; - result = -1; + // Attempt to read an entire command. Returns EAGAIN if non-blocking + // and a command is not available. + int err = 0; + int nbytes = ::recv (r, (char *)cmd_, sizeof (command_t), 0); + if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { + // Save value of errno if we wish to pass it to caller. + err = EAGAIN; } - else { - zmq_assert (nbytes != -1); - - // Check whether we haven't got half of command. - zmq_assert (nbytes == sizeof (command_t)); - - result = 0; + if (block_) { + // Re-set the reader to non-blocking mode. + unsigned long argp = 1; + int rc = ioctlsocket (r, FIONBIO, &argp); + wsa_assert (rc != SOCKET_ERROR); } - - if (block_) { - - // Set the reader to non-blocking mode. - int flags = fcntl (r, F_GETFL, 0); - if (flags == -1) - flags = 0; - int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); - errno_assert (rc != -1); - } - - if (result == -1) + // If the recv failed, return with the saved errno if set. + if (err != 0) { errno = err; - return result; + return -1; + } + // Sanity check for success. + wsa_assert (nbytes != SOCKET_ERROR); + + // Check whether we haven't got half of command. + zmq_assert (nbytes == sizeof (command_t)); + return 0; } -#else - -#include -#include +#else // !ZMQ_HAVE_WINDOWS zmq::signaler_t::signaler_t () { +#ifdef PIPE_BUF // Make sure that command can be written to the socket in atomic fashion. // If this wasn't guaranteed, commands from different threads would be // interleaved. zmq_assert (sizeof (command_t) <= PIPE_BUF); +#endif - int sv [2]; - int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); + // Create the socketpair for signalling. + int rc = make_socketpair (&r, &w); errno_assert (rc == 0); - w = sv [0]; - r = sv [1]; - // Increase signaler SNDBUF if requested in config.hpp. - if (signaler_sndbuf_size) { - int sndbuf = signaler_sndbuf_size; - socklen_t sndbuf_size = sizeof sndbuf; - rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &sndbuf, sndbuf_size); - errno_assert (rc == 0); - } + // Set the writer to non-blocking mode. + int flags = fcntl (w, F_GETFL, 0); + errno_assert (flags >= 0); + rc = fcntl (w, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc == 0); + +#ifndef MSG_DONTWAIT + // Set the reader to non-blocking mode. + flags = fcntl (r, F_GETFL, 0); + errno_assert (flags >= 0); + rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc == 0); +#endif } zmq::signaler_t::~signaler_t () @@ -283,12 +151,32 @@ zmq::signaler_t::~signaler_t () void zmq::signaler_t::send (const command_t &cmd_) { - // TODO: Note that send is a blocking operation. - // How should we behave if the command cannot be written to the signaler? + // Attempt to write an entire command without blocking. ssize_t nbytes; do { nbytes = ::send (w, &cmd_, sizeof (command_t), 0); } while (nbytes == -1 && errno == EINTR); + // Attempt to increase signaler SNDBUF if the send failed. + if (nbytes == -1 && errno == EAGAIN) { + int old_sndbuf, new_sndbuf; + socklen_t sndbuf_size = sizeof old_sndbuf; + // Retrieve current send buffer size. + int rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &old_sndbuf, + &sndbuf_size); + errno_assert (rc == 0); + new_sndbuf = old_sndbuf * 2; + // Double the new send buffer size. + rc = setsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, sndbuf_size); + errno_assert (rc == 0); + // Verify that the OS actually honored the request. + rc = getsockopt (w, SOL_SOCKET, SO_SNDBUF, &new_sndbuf, &sndbuf_size); + errno_assert (rc == 0); + zmq_assert (new_sndbuf > old_sndbuf); + // Retry the sending operation; at this point it must succeed. + do { + nbytes = ::send (w, &cmd_, sizeof (command_t), 0); + } while (nbytes == -1 && errno == EINTR); + } errno_assert (nbytes != -1); // This should never happen as we've already checked that command size is @@ -298,10 +186,43 @@ void zmq::signaler_t::send (const command_t &cmd_) int zmq::signaler_t::recv (command_t *cmd_, bool block_) { - ssize_t nbytes; - nbytes = ::recv (r, cmd_, sizeof (command_t), block_ ? 0 : MSG_DONTWAIT); +#ifdef MSG_DONTWAIT + // Attempt to read an entire command. Returns EAGAIN if non-blocking + // mode is requested and a command is not available. + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), + block_ ? 0 : MSG_DONTWAIT); if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) return -1; +#else + if (block_) { + // Set the reader to blocking mode. + int flags = fcntl (r, F_GETFL, 0); + errno_assert (flags >= 0); + int rc = fcntl (r, F_SETFL, flags & ~O_NONBLOCK); + errno_assert (rc == 0); + } + // Attempt to read an entire command. Returns EAGAIN if non-blocking + // and a command is not available. + int err = 0; + ssize_t nbytes = ::recv (r, cmd_, sizeof (command_t), 0); + if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) { + // Save value of errno if we wish to pass it to caller. + err = errno; + } + if (block_) { + // Re-set the reader to non-blocking mode. + int flags = fcntl (r, F_GETFL, 0); + errno_assert (flags >= 0); + int rc = fcntl (r, F_SETFL, flags | O_NONBLOCK); + errno_assert (rc == 0); + } + // If the recv failed, return with the saved errno if set. + if (err != 0) { + errno = err; + return -1; + } +#endif + // Sanity check for success. errno_assert (nbytes != -1); // Check whether we haven't got half of command. @@ -312,29 +233,90 @@ int zmq::signaler_t::recv (command_t *cmd_, bool block_) #endif -#if defined ZMQ_HAVE_OPENVMS - -int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_, - int sv_ [2]) +int zmq::signaler_t::make_socketpair (fd_t *r_, fd_t *w_) { - int listener; +#if defined ZMQ_HAVE_WINDOWS + + // Windows has no 'socketpair' function. CreatePipe is no good as pipe + // handles cannot be polled on. Here we create the socketpair by hand. + *w_ = INVALID_SOCKET; + *r_ = INVALID_SOCKET; + + // Create listening socket. + SOCKET listener; + listener = socket (AF_INET, SOCK_STREAM, 0); + wsa_assert (listener != INVALID_SOCKET); + + // Set SO_REUSEADDR and TCP_NODELAY on listening socket. + BOOL so_reuseaddr = 1; + int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR, + (char *)&so_reuseaddr, sizeof (so_reuseaddr)); + wsa_assert (rc != SOCKET_ERROR); + BOOL tcp_nodelay = 1; + rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, + (char *)&tcp_nodelay, sizeof (tcp_nodelay)); + wsa_assert (rc != SOCKET_ERROR); + + // Bind listening socket to any free local port. + struct sockaddr_in addr; + memset (&addr, 0, sizeof (addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); + addr.sin_port = 0; + rc = bind (listener, (const struct sockaddr*) &addr, sizeof (addr)); + wsa_assert (rc != SOCKET_ERROR); + + // Retrieve local port listener is bound to (into addr). + int addrlen = sizeof (addr); + rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen); + wsa_assert (rc != SOCKET_ERROR); + + // Listen for incomming connections. + rc = listen (listener, 1); + wsa_assert (rc != SOCKET_ERROR); + + // Create the writer socket. + *w_ = WSASocket (AF_INET, SOCK_STREAM, 0, NULL, 0, 0); + wsa_assert (*w_ != INVALID_SOCKET); + + // Set TCP_NODELAY on writer socket. + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, + (char *)&tcp_nodelay, sizeof (tcp_nodelay)); + wsa_assert (rc != SOCKET_ERROR); + + // Connect writer to the listener. + rc = connect (*w_, (sockaddr *) &addr, sizeof (addr)); + wsa_assert (rc != SOCKET_ERROR); + + // Accept connection from writer. + *r_ = accept (listener, NULL, NULL); + wsa_assert (*r_ != INVALID_SOCKET); + + // We don't need the listening socket anymore. Close it. + rc = closesocket (listener); + wsa_assert (rc != SOCKET_ERROR); + + return 0; + +#elif defined ZMQ_HAVE_OPENVMS + + // Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further, + // it does not set the socket options TCP_NODELAY and TCP_NODELACK which + // can lead to performance problems. + // + // The bug will be fixed in V5.6 ECO4 and beyond. In the meantime, we'll + // create the socket pair manually. sockaddr_in lcladdr; - socklen_t lcladdr_len; - int rc; - int on = 1; - - zmq_assert (type_ == SOCK_STREAM); - - // Fill in the localhost address (127.0.0.1). memset (&lcladdr, 0, sizeof (lcladdr)); lcladdr.sin_family = AF_INET; lcladdr.sin_addr.s_addr = htonl (INADDR_LOOPBACK); lcladdr.sin_port = 0; - listener = socket (AF_INET, SOCK_STREAM, 0); + int listener = socket (AF_INET, SOCK_STREAM, 0); errno_assert (listener != -1); - rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + int on = 1; + int rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); errno_assert (rc != -1); rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); @@ -342,8 +324,8 @@ int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_, rc = bind(listener, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); errno_assert (rc != -1); - - lcladdr_len = sizeof (lcladdr); + + socklen_t lcladdr_len = sizeof (lcladdr); rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len); errno_assert (rc != -1); @@ -351,25 +333,34 @@ int zmq::signaler_t::socketpair (int domain_, int type_, int protocol_, rc = listen (listener, 1); errno_assert (rc != -1); - sv_ [0] = socket (AF_INET, SOCK_STREAM, 0); + *w_ = socket (AF_INET, SOCK_STREAM, 0); + errno_assert (*w_ != -1); + + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); errno_assert (rc != -1); - rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELAY, &on, sizeof (on)); + rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); errno_assert (rc != -1); - rc = setsockopt (sv_ [0], IPPROTO_TCP, TCP_NODELACK, &on, sizeof (on)); + rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof (lcladdr)); errno_assert (rc != -1); - rc = connect (sv_ [0], (struct sockaddr*) &lcladdr, sizeof (lcladdr)); - errno_assert (rc != -1); - - sv_ [1] = accept (listener, NULL, NULL); - errno_assert (sv_ [1] != -1); + *r_ = accept (listener, NULL, NULL); + errno_assert (*r_ != -1); close (listener); return 0; -} + +#else // All other implementations support socketpair() + + int sv [2]; + int rc = socketpair (AF_UNIX, SOCK_STREAM, 0, sv); + errno_assert (rc == 0); + *w_ = sv [0]; + *r_ = sv [1]; + return 0; #endif +} diff --git a/src/signaler.hpp b/src/signaler.hpp index 3e5ff135..faf3f1f3 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -44,24 +44,14 @@ namespace zmq private: -#if defined ZMQ_HAVE_OPENVMS - - // Whilst OpenVMS supports socketpair - it maps to AF_INET only. - // Further, it does not set the socket options TCP_NODELAY and - // TCP_NODELACK which can lead to performance problems. We'll - // overload the socketpair function for this class. - // - // The bug will be fixed in V5.6 ECO4 and beyond. In the - // meantime, we'll create the socket pair manually. - static int socketpair (int domain_, int type_, int protocol_, - int sv_ [2]); -#endif - // Write & read end of the socketpair. fd_t w; fd_t r; - // Disable copying of fd_signeler object. + // Platform-dependent function to create a socketpair. + static int make_socketpair (fd_t *r_, fd_t *w_); + + // Disable copying of signaler_t object. signaler_t (const signaler_t&); void operator = (const signaler_t&); };