mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-04 19:13:35 +01:00
commit
ac46e6da96
@ -100,20 +100,15 @@ static int close_wait_ms (int fd_, unsigned int max_ms_ = 2000)
|
|||||||
unsigned int step_ms = max_ms_ / 10;
|
unsigned int step_ms = max_ms_ / 10;
|
||||||
if (step_ms < 1)
|
if (step_ms < 1)
|
||||||
step_ms = 1;
|
step_ms = 1;
|
||||||
|
|
||||||
if (step_ms > 100)
|
if (step_ms > 100)
|
||||||
step_ms = 100;
|
step_ms = 100;
|
||||||
|
|
||||||
int rc = 0; // do not sleep on first attempt
|
int rc = 0; // do not sleep on first attempt
|
||||||
|
do {
|
||||||
do
|
if (rc == -1 && errno == EAGAIN) {
|
||||||
{
|
|
||||||
if (rc == -1 && errno == EAGAIN)
|
|
||||||
{
|
|
||||||
sleep_ms (step_ms);
|
sleep_ms (step_ms);
|
||||||
ms_so_far += step_ms;
|
ms_so_far += step_ms;
|
||||||
}
|
}
|
||||||
|
|
||||||
rc = close (fd_);
|
rc = close (fd_);
|
||||||
} while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
|
} while (ms_so_far < max_ms_ && rc == -1 && errno == EAGAIN);
|
||||||
|
|
||||||
@ -187,7 +182,7 @@ void zmq::signaler_t::send ()
|
|||||||
errno_assert (sz == sizeof (inc));
|
errno_assert (sz == sizeof (inc));
|
||||||
#elif defined ZMQ_HAVE_WINDOWS
|
#elif defined ZMQ_HAVE_WINDOWS
|
||||||
unsigned char dummy = 0;
|
unsigned char dummy = 0;
|
||||||
int nbytes = ::send (w, (char*) &dummy, sizeof (dummy), 0);
|
int nbytes = ::send (w, (char *) &dummy, sizeof (dummy), 0);
|
||||||
wsa_assert (nbytes != SOCKET_ERROR);
|
wsa_assert (nbytes != SOCKET_ERROR);
|
||||||
zmq_assert (nbytes == sizeof (dummy));
|
zmq_assert (nbytes == sizeof (dummy));
|
||||||
#else
|
#else
|
||||||
@ -304,7 +299,7 @@ void zmq::signaler_t::recv ()
|
|||||||
#else
|
#else
|
||||||
unsigned char dummy;
|
unsigned char dummy;
|
||||||
#if defined ZMQ_HAVE_WINDOWS
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
|
int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
|
||||||
wsa_assert (nbytes != SOCKET_ERROR);
|
wsa_assert (nbytes != SOCKET_ERROR);
|
||||||
#else
|
#else
|
||||||
ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
|
ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0);
|
||||||
@ -342,7 +337,7 @@ int zmq::signaler_t::recv_failable ()
|
|||||||
#else
|
#else
|
||||||
unsigned char dummy;
|
unsigned char dummy;
|
||||||
#if defined ZMQ_HAVE_WINDOWS
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0);
|
int nbytes = ::recv (r, (char *) &dummy, sizeof (dummy), 0);
|
||||||
if (nbytes == SOCKET_ERROR) {
|
if (nbytes == SOCKET_ERROR) {
|
||||||
const int last_error = WSAGetLastError();
|
const int last_error = WSAGetLastError();
|
||||||
if (last_error == WSAEWOULDBLOCK) {
|
if (last_error == WSAEWOULDBLOCK) {
|
||||||
@ -466,11 +461,11 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
|
|||||||
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
|
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
|
||||||
BOOL so_reuseaddr = 1;
|
BOOL so_reuseaddr = 1;
|
||||||
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
|
int rc = setsockopt (listener, SOL_SOCKET, SO_REUSEADDR,
|
||||||
(char *)&so_reuseaddr, sizeof so_reuseaddr);
|
(char *) &so_reuseaddr, sizeof so_reuseaddr);
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
BOOL tcp_nodelay = 1;
|
BOOL tcp_nodelay = 1;
|
||||||
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
|
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELAY,
|
||||||
(char *)&tcp_nodelay, sizeof tcp_nodelay);
|
(char *) &tcp_nodelay, sizeof tcp_nodelay);
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
|
|
||||||
// Init sockaddr to signaler port.
|
// Init sockaddr to signaler port.
|
||||||
@ -496,12 +491,12 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Bind listening socket to signaler port.
|
// Bind listening socket to signaler port.
|
||||||
rc = bind (listener, (const struct sockaddr*) &addr, sizeof addr);
|
rc = bind (listener, (const struct sockaddr *) &addr, sizeof addr);
|
||||||
|
|
||||||
if (rc != SOCKET_ERROR && signaler_port == 0) {
|
if (rc != SOCKET_ERROR && signaler_port == 0) {
|
||||||
// Retrieve ephemeral port number
|
// Retrieve ephemeral port number
|
||||||
int addrlen = sizeof addr;
|
int addrlen = sizeof addr;
|
||||||
rc = getsockname (listener, (struct sockaddr*) &addr, &addrlen);
|
rc = getsockname (listener, (struct sockaddr *) &addr, &addrlen);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Listen for incoming connections.
|
// Listen for incoming connections.
|
||||||
@ -510,7 +505,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
|
|||||||
|
|
||||||
// Connect writer to the listener.
|
// Connect writer to the listener.
|
||||||
if (rc != SOCKET_ERROR)
|
if (rc != SOCKET_ERROR)
|
||||||
rc = connect (*w_, (struct sockaddr*) &addr, sizeof addr);
|
rc = connect (*w_, (struct sockaddr *) &addr, sizeof addr);
|
||||||
|
|
||||||
// Accept connection from writer.
|
// Accept connection from writer.
|
||||||
if (rc != SOCKET_ERROR)
|
if (rc != SOCKET_ERROR)
|
||||||
@ -582,12 +577,12 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
|
|||||||
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
|
rc = setsockopt (listener, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
rc = bind (listener, (struct sockaddr*) &lcladdr, sizeof lcladdr);
|
rc = bind (listener, (struct sockaddr *) &lcladdr, sizeof lcladdr);
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
socklen_t lcladdr_len = sizeof lcladdr;
|
socklen_t lcladdr_len = sizeof lcladdr;
|
||||||
|
|
||||||
rc = getsockname (listener, (struct sockaddr*) &lcladdr, &lcladdr_len);
|
rc = getsockname (listener, (struct sockaddr *) &lcladdr, &lcladdr_len);
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
rc = listen (listener, 1);
|
rc = listen (listener, 1);
|
||||||
@ -602,7 +597,7 @@ int zmq::signaler_t::make_fdpair (fd_t *r_, fd_t *w_)
|
|||||||
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
|
rc = setsockopt (*w_, IPPROTO_TCP, TCP_NODELACK, &on, sizeof on);
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
rc = connect (*w_, (struct sockaddr*) &lcladdr, sizeof lcladdr);
|
rc = connect (*w_, (struct sockaddr *) &lcladdr, sizeof lcladdr);
|
||||||
errno_assert (rc != -1);
|
errno_assert (rc != -1);
|
||||||
|
|
||||||
*r_ = accept (listener, NULL, NULL);
|
*r_ = accept (listener, NULL, NULL);
|
||||||
|
32
src/tcp.cpp
32
src/tcp.cpp
@ -218,7 +218,6 @@ void zmq::tune_tcp_retransmit_timeout (fd_t sockfd_, int timeout_)
|
|||||||
&& errno != EBADF
|
&& errno != EBADF
|
||||||
&& errno != EDESTADDRREQ
|
&& errno != EDESTADDRREQ
|
||||||
&& errno != EFAULT
|
&& errno != EFAULT
|
||||||
&& errno != EINVAL
|
|
||||||
&& errno != EISCONN
|
&& errno != EISCONN
|
||||||
&& errno != EMSGSIZE
|
&& errno != EMSGSIZE
|
||||||
&& errno != ENOMEM
|
&& errno != ENOMEM
|
||||||
@ -240,21 +239,21 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
|
|||||||
|
|
||||||
// If not a single byte can be read from the socket in non-blocking mode
|
// If not a single byte can be read from the socket in non-blocking mode
|
||||||
// we'll get an error (this may happen during the speculative read).
|
// we'll get an error (this may happen during the speculative read).
|
||||||
if (rc == SOCKET_ERROR) {
|
if (rc == SOCKET_ERROR) {
|
||||||
const int last_error = WSAGetLastError();
|
const int last_error = WSAGetLastError();
|
||||||
if (last_error == WSAEWOULDBLOCK) {
|
if (last_error == WSAEWOULDBLOCK) {
|
||||||
errno = EAGAIN;
|
errno = EAGAIN;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
wsa_assert (last_error == WSAENETDOWN ||
|
wsa_assert (last_error == WSAENETDOWN ||
|
||||||
last_error == WSAENETRESET ||
|
last_error == WSAENETRESET ||
|
||||||
last_error == WSAECONNABORTED ||
|
last_error == WSAECONNABORTED ||
|
||||||
last_error == WSAETIMEDOUT ||
|
last_error == WSAETIMEDOUT ||
|
||||||
last_error == WSAECONNRESET ||
|
last_error == WSAECONNRESET ||
|
||||||
last_error == WSAECONNREFUSED ||
|
last_error == WSAECONNREFUSED ||
|
||||||
last_error == WSAENOTCONN);
|
last_error == WSAENOTCONN);
|
||||||
errno = wsa_error_to_errno (last_error);
|
errno = wsa_error_to_errno (last_error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return rc == SOCKET_ERROR ? -1 : rc;
|
return rc == SOCKET_ERROR ? -1 : rc;
|
||||||
@ -269,7 +268,6 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
|
|||||||
if (rc == -1) {
|
if (rc == -1) {
|
||||||
errno_assert (errno != EBADF
|
errno_assert (errno != EBADF
|
||||||
&& errno != EFAULT
|
&& errno != EFAULT
|
||||||
&& errno != EINVAL
|
|
||||||
&& errno != ENOMEM
|
&& errno != ENOMEM
|
||||||
&& errno != ENOTSOCK);
|
&& errno != ENOTSOCK);
|
||||||
if (errno == EWOULDBLOCK || errno == EINTR)
|
if (errno == EWOULDBLOCK || errno == EINTR)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user