EINTR returned from the blocking functions

This commit is contained in:
Martin Sustrik 2010-09-08 08:39:27 +02:00
parent f374431ebe
commit 91ea204644
13 changed files with 93 additions and 84 deletions

View File

@ -216,6 +216,8 @@ option value.
The 0MQ 'context' associated with the specified 'socket' was terminated. The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*:: *EFAULT*::
The provided 'socket' was not valid (NULL). The provided 'socket' was not valid (NULL).
*EINTR*::
The operation was interrupted by delivery of a signal.
EXAMPLE EXAMPLE

View File

@ -98,6 +98,8 @@ At least one of the members of the 'items' array refers to a 'socket' whose
associated 0MQ 'context' was terminated. associated 0MQ 'context' was terminated.
*EFAULT*:: *EFAULT*::
The provided 'items' was not valid (NULL). The provided 'items' was not valid (NULL).
*EINTR*::
The poll was interrupted by delivery of a signal before any event was available.
EXAMPLE EXAMPLE

View File

@ -65,6 +65,9 @@ _messaging patterns_ section of linkzmq:zmq_socket[3] for more information.
The 0MQ 'context' associated with the specified 'socket' was terminated. The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*:: *EFAULT*::
The provided 'socket' was not valid (NULL). The provided 'socket' was not valid (NULL).
*EINTR*::
The operation was interrupted by delivery of a signal before a message was
available.
EXAMPLE EXAMPLE

View File

@ -71,6 +71,9 @@ _messaging patterns_ section of linkzmq:zmq_socket[3] for more information.
The 0MQ 'context' associated with the specified 'socket' was terminated. The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*:: *EFAULT*::
The provided 'context' was not valid (NULL). The provided 'context' was not valid (NULL).
*EINTR*::
The operation was interrupted by delivery of a signal before the message was
sent.
EXAMPLE EXAMPLE

View File

@ -231,6 +231,8 @@ _option_value_ is invalid.
The 0MQ 'context' associated with the specified 'socket' was terminated. The 0MQ 'context' associated with the specified 'socket' was terminated.
*EFAULT*:: *EFAULT*::
The provided 'socket' was not valid (NULL). The provided 'socket' was not valid (NULL).
*EINTR*::
The operation was interrupted by delivery of a signal.
EXAMPLE EXAMPLE

View File

@ -219,11 +219,6 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
slots [slot_]->send (command_); slots [slot_]->send (command_);
} }
bool zmq::ctx_t::recv_command (uint32_t slot_, command_t *command_, bool block_)
{
return slots [slot_]->recv (command_, block_);
}
zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_) zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
{ {
// Find the I/O thread with minimum load. // Find the I/O thread with minimum load.

View File

@ -64,9 +64,6 @@ namespace zmq
// Send command to the destination slot. // Send command to the destination slot.
void send_command (uint32_t slot_, const command_t &command_); void send_command (uint32_t slot_, const command_t &command_);
// Receive command from the source slot.
bool recv_command (uint32_t slot_, command_t *command_, bool block_);
// Returns the I/O thread that is the least busy at the moment. // Returns the I/O thread that is the least busy at the moment.
// Taskset specifies which I/O threads are eligible (0 = all). // Taskset specifies which I/O threads are eligible (0 = all).
class io_thread_t *choose_io_thread (uint64_t taskset_); class io_thread_t *choose_io_thread (uint64_t taskset_);

View File

@ -71,8 +71,12 @@ void zmq::io_thread_t::in_event ()
// Get the next command. If there is none, exit. // Get the next command. If there is none, exit.
command_t cmd; command_t cmd;
if (!signaler.recv (&cmd, false)) int rc = signaler.recv (&cmd, false);
if (rc != 0 && errno == EINTR)
continue;
if (rc != 0 && errno == EAGAIN)
break; break;
errno_assert (rc == 0);
// Process the command. // Process the command.
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);

View File

@ -112,7 +112,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (rc == sizeof (command_t)); zmq_assert (rc == sizeof (command_t));
} }
bool zmq::signaler_t::recv (command_t *cmd_, bool block_) int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{ {
if (block_) { if (block_) {
@ -122,10 +122,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
} }
bool result; int err;
int result;
int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); int nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) { if (nbytes == -1 && WSAGetLastError () == WSAEWOULDBLOCK) {
result = false; err = EAGAIN;
result = -1;
} }
else { else {
wsa_assert (nbytes != -1); wsa_assert (nbytes != -1);
@ -133,7 +135,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
// Check whether we haven't got half of a signal. // Check whether we haven't got half of a signal.
zmq_assert (nbytes % sizeof (uint32_t) == 0); zmq_assert (nbytes % sizeof (uint32_t) == 0);
result = true; result = 0;
} }
if (block_) { if (block_) {
@ -144,6 +146,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
} }
if (result == -1)
errno = err;
return result; return result;
} }
@ -184,7 +188,7 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t)); zmq_assert (nbytes == sizeof (command_t));
} }
bool zmq::signaler_t::recv (command_t *cmd_, bool block_) int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{ {
if (block_) { if (block_) {
@ -196,13 +200,12 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
errno_assert (rc != -1); errno_assert (rc != -1);
} }
bool result; int err;
ssize_t nbytes; int result;
do { ssize_t nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0);
nbytes = ::recv (r, (char*) cmd_, sizeof (command_t), 0); if (nbytes == -1 && (errno == EAGAIN || errno == EINTR)) {
} while (nbytes == -1 && errno == EINTR); err = errno;
if (nbytes == -1 && errno == EAGAIN) { result = -1;
result = false;
} }
else { else {
zmq_assert (nbytes != -1); zmq_assert (nbytes != -1);
@ -210,7 +213,7 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
// Check whether we haven't got half of command. // Check whether we haven't got half of command.
zmq_assert (nbytes == sizeof (command_t)); zmq_assert (nbytes == sizeof (command_t));
result = true; result = 0;
} }
if (block_) { if (block_) {
@ -223,6 +226,8 @@ bool zmq::signaler_t::recv (command_t *cmd_, bool block_)
errno_assert (rc != -1); errno_assert (rc != -1);
} }
if (result == -1)
errno = err;
return result; return result;
} }
@ -266,24 +271,18 @@ void zmq::signaler_t::send (const command_t &cmd_)
zmq_assert (nbytes == sizeof (command_t)); zmq_assert (nbytes == sizeof (command_t));
} }
bool zmq::signaler_t::recv (command_t *cmd_, bool block_) int zmq::signaler_t::recv (command_t *cmd_, bool block_)
{ {
ssize_t nbytes; ssize_t nbytes;
do { nbytes = ::recv (r, cmd_, sizeof (command_t), block_ ? 0 : MSG_DONTWAIT);
nbytes = ::recv (r, cmd_, sizeof (command_t), if (nbytes == -1 && (errno == EAGAIN || errno == EINTR))
block_ ? 0 : MSG_DONTWAIT); return -1;
} while (nbytes == -1 && errno == EINTR);
// If there's no signal available return false.
if (nbytes == -1 && errno == EAGAIN)
return false;
errno_assert (nbytes != -1); errno_assert (nbytes != -1);
// Check whether we haven't got half of command. // Check whether we haven't got half of command.
zmq_assert (nbytes == sizeof (command_t)); zmq_assert (nbytes == sizeof (command_t));
return true; return 0;
} }
#endif #endif

View File

@ -40,7 +40,7 @@ namespace zmq
fd_t get_fd (); fd_t get_fd ();
void send (const command_t &cmd_); void send (const command_t &cmd_);
bool recv (command_t *cmd_, bool block_); int recv (command_t *cmd_, bool block_);
private: private:

View File

@ -244,7 +244,10 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
process_commands (false, false); int rc = process_commands (false, false);
if (rc != 0 && errno == EINTR)
return -1;
errno_assert (rc == 0);
*((uint32_t*) optval_) = 0; *((uint32_t*) optval_) = 0;
if (has_out ()) if (has_out ())
*((uint32_t*) optval_) |= ZMQ_POLLOUT; *((uint32_t*) optval_) |= ZMQ_POLLOUT;
@ -420,18 +423,16 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
} }
// Process pending commands, if any. // Process pending commands, if any.
process_commands (false, true); int rc = process_commands (false, true);
if (unlikely (zombie)) { if (unlikely (rc != 0))
errno = ETERM;
return -1; return -1;
}
// At this point we impose the MORE flag on the message. // At this point we impose the MORE flag on the message.
if (flags_ & ZMQ_SNDMORE) if (flags_ & ZMQ_SNDMORE)
msg_->flags |= ZMQ_MSG_MORE; msg_->flags |= ZMQ_MSG_MORE;
// Try to send the message. // Try to send the message.
int rc = xsend (msg_, flags_); rc = xsend (msg_, flags_);
if (rc == 0) if (rc == 0)
return 0; return 0;
@ -445,11 +446,8 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
while (rc != 0) { while (rc != 0) {
if (errno != EAGAIN) if (errno != EAGAIN)
return -1; return -1;
process_commands (true, false); if (unlikely (process_commands (true, false) != 0))
if (unlikely (zombie)) {
errno = ETERM;
return -1; return -1;
}
rc = xsend (msg_, flags_); rc = xsend (msg_, flags_);
} }
return 0; return 0;
@ -475,11 +473,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// described above) from the one used by 'send'. This is because counting // described above) from the one used by 'send'. This is because counting
// ticks is more efficient than doing rdtsc all the time. // ticks is more efficient than doing rdtsc all the time.
if (++ticks == inbound_poll_rate) { if (++ticks == inbound_poll_rate) {
process_commands (false, false); if (unlikely (process_commands (false, false) != 0))
if (unlikely (zombie)) {
errno = ETERM;
return -1; return -1;
}
ticks = 0; ticks = 0;
} }
@ -501,11 +496,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
if (flags_ & ZMQ_NOBLOCK) { if (flags_ & ZMQ_NOBLOCK) {
if (errno != EAGAIN) if (errno != EAGAIN)
return -1; return -1;
process_commands (false, false); if (unlikely (process_commands (false, false) != 0))
if (unlikely (zombie)) {
errno = ETERM;
return -1; return -1;
}
ticks = 0; ticks = 0;
rc = xrecv (msg_, flags_); rc = xrecv (msg_, flags_);
@ -522,11 +514,8 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
while (rc != 0) { while (rc != 0) {
if (errno != EAGAIN) if (errno != EAGAIN)
return -1; return -1;
process_commands (true, false); if (unlikely (process_commands (true, false) != 0))
if (unlikely (zombie)) {
errno = ETERM;
return -1; return -1;
}
rc = xrecv (msg_, flags_); rc = xrecv (msg_, flags_);
ticks = 0; ticks = 0;
} }
@ -619,13 +608,15 @@ bool zmq::socket_base_t::dezombify ()
return false; return false;
} }
void zmq::socket_base_t::process_commands (bool block_, bool throttle_) int zmq::socket_base_t::process_commands (bool block_, bool throttle_)
{ {
bool received; int rc;
command_t cmd; command_t cmd;
if (block_) { if (block_) {
received = signaler.recv (&cmd, true); rc = signaler.recv (&cmd, true);
zmq_assert (received); if (rc == -1 && errno == EINTR)
return -1;
errno_assert (rc == 0);
} }
else { else {
@ -649,24 +640,40 @@ void zmq::socket_base_t::process_commands (bool block_, bool throttle_)
#else #else
#error #error
#endif #endif
// Check whether certain time have elapsed since last command // Check whether certain time have elapsed since last command
// processing. // processing.
if (current_time - last_processing_time <= max_command_delay) if (current_time - last_processing_time <= max_command_delay) {
return;
// No command was processed, so the socket should
// not get into the zombie state.
zmq_assert (!zombie);
return 0;
}
last_processing_time = current_time; last_processing_time = current_time;
} }
#endif #endif
// Check whether there are any commands pending for this thread. // Check whether there are any commands pending for this thread.
received = signaler.recv (&cmd, false); rc = signaler.recv (&cmd, false);
} }
// Process all the commands available at the moment. // Process all the commands available at the moment.
while (received) { while (true) {
if (rc == -1 && errno == EAGAIN)
break;
if (rc == -1 && errno == EINTR)
return -1;
errno_assert (rc == 0);
cmd.destination->process_command (cmd); cmd.destination->process_command (cmd);
received = signaler.recv (&cmd, false); rc = signaler.recv (&cmd, false);
} }
if (zombie) {
errno = ETERM;
return -1;
}
return 0;
} }
void zmq::socket_base_t::process_stop () void zmq::socket_base_t::process_stop ()

View File

@ -124,7 +124,6 @@ namespace zmq
private: private:
// TODO: Check whether we still need this flag...
// If true, socket was already closed but not yet deallocated // If true, socket was already closed but not yet deallocated
// because either shutdown is in process or there are still pipes // because either shutdown is in process or there are still pipes
// attached to the socket. // attached to the socket.
@ -147,7 +146,7 @@ namespace zmq
// set to true, returns only after at least one command was processed. // set to true, returns only after at least one command was processed.
// If throttle argument is true, commands are processed at most once // If throttle argument is true, commands are processed at most once
// in a predefined time period. // in a predefined time period.
void process_commands (bool block_, bool throttle_); int process_commands (bool block_, bool throttle_);
// Handlers for incoming commands. // Handlers for incoming commands.
void process_stop (); void process_stop ();

View File

@ -409,17 +409,12 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
while (true) { while (true) {
// Wait for events. Ignore interrupts if there's infinite timeout. // Wait for events.
while (true) { while (true) {
int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout); int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout);
if (rc == -1 && errno == EINTR) { if (rc == -1 && errno == EINTR) {
if (timeout_ < 0)
continue;
else {
// TODO: Calculate remaining timeout and restart poll ().
free (pollfds); free (pollfds);
return 0; return -1;
}
} }
errno_assert (rc >= 0); errno_assert (rc >= 0);
break; break;
@ -474,6 +469,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
if (timeout == -1 && nevents == 0) if (timeout == -1 && nevents == 0)
continue; continue;
// TODO: if nevents is zero recompute timeout and loop
// if it is not yet reached.
break; break;
} }
@ -544,13 +542,8 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#if defined ZMQ_HAVE_WINDOWS #if defined ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR); wsa_assert (rc != SOCKET_ERROR);
#else #else
if (rc == -1 && errno == EINTR) { if (rc == -1 && errno == EINTR)
if (timeout_ < 0) return -1;
continue;
else
// TODO: Calculate remaining timeout and restart select ().
return 0;
}
errno_assert (rc >= 0); errno_assert (rc >= 0);
#endif #endif
break; break;
@ -610,6 +603,9 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
if (timeout_ < 0 && nevents == 0) if (timeout_ < 0 && nevents == 0)
continue; continue;
// TODO: if nevents is zero recompute timeout and loop
// if it is not yet reached.
break; break;
} }