Add ZMQ_REQ_REQUEST_IDS option.

* Documentation:
The default behavior of REQ sockets is to rely on the ordering of messages
to match requests and responses and that is usually sufficient. When this option
is set to 1, the REQ socket will prefix outgoing messages with an extra frame
containing a request id. That means the full message is (request id, 0,
user frames...). The REQ socket will discard all incoming messages that don't
begin with these two frames.

* Behavior change: When a REQ socket gets an invalid reply, it used to
  discard the message and return EAGAIN. REQ sockets still discard
  invalid messages, but keep looking at the next one automatically
  until a good one is found or there are no more messages.
* Add test_req_request_ids.
This commit is contained in:
Christian Kamm
2013-07-26 21:13:43 +02:00
parent 6473dfd8f4
commit b9646f2aac
7 changed files with 277 additions and 14 deletions

View File

@@ -28,7 +28,9 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
dealer_t (parent_, tid_, sid_),
receiving_reply (false),
message_begins (true),
reply_pipe (NULL)
reply_pipe (NULL),
request_id_frames_enabled (false),
request_id (generate_random())
{
options.type = ZMQ_REQ;
}
@@ -48,16 +50,31 @@ int zmq::req_t::xsend (msg_t *msg_)
// First part of the request is the request identity.
if (message_begins) {
reply_pipe = NULL;
if (request_id_frames_enabled) {
request_id++;
msg_t id;
int rc = id.init_data (&request_id, sizeof (request_id), NULL, NULL);
errno_assert (rc == 0);
id.set_flags (msg_t::more);
rc = dealer_t::sendpipe (&id, &reply_pipe);
if (rc != 0)
return -1;
}
msg_t bottom;
int rc = bottom.init ();
errno_assert (rc == 0);
bottom.set_flags (msg_t::more);
reply_pipe = NULL;
rc = dealer_t::sendpipe (&bottom, &reply_pipe);
if (rc != 0)
return -1;
assert (reply_pipe);
message_begins = false;
// Eat all currently avaliable messages before the request is fully
@@ -99,24 +116,39 @@ int zmq::req_t::xrecv (msg_t *msg_)
return -1;
}
// First part of the reply should be the original request ID.
if (message_begins) {
// Skip messages until one with the right first frames is found.
while (message_begins) {
// If enabled, the first frame must have the correct request_id.
if (request_id_frames_enabled) {
int rc = recv_reply_pipe (msg_);
if (rc != 0)
return rc;
if (unlikely (!(msg_->flags () & msg_t::more) ||
msg_->size () != sizeof (request_id) ||
*static_cast<uint32_t *> (msg_->data ()) != request_id)) {
// Skip the remaining frames and try the next message
while (msg_->flags () & msg_t::more) {
rc = recv_reply_pipe (msg_);
errno_assert (rc == 0);
}
continue;
}
}
// The next frame must be 0.
// TODO: Failing this check should also close the connection with the peer!
int rc = recv_reply_pipe (msg_);
if (rc != 0)
return rc;
// TODO: This should also close the connection with the peer!
if (unlikely (!(msg_->flags () & msg_t::more) || msg_->size () != 0)) {
while (true) {
int rc = dealer_t::xrecv (msg_);
// Skip the remaining frames and try the next message
while (msg_->flags () & msg_t::more) {
rc = recv_reply_pipe (msg_);
errno_assert (rc == 0);
if (!(msg_->flags () & msg_t::more))
break;
}
msg_->close ();
msg_->init ();
errno = EAGAIN;
return -1;
continue;
}
message_begins = false;
@@ -153,6 +185,25 @@ bool zmq::req_t::xhas_out ()
return dealer_t::xhas_out ();
}
int zmq::req_t::xsetsockopt(int option_, const void *optval_, size_t optvallen_)
{
bool is_int = (optvallen_ == sizeof (int));
int value = is_int? *((int *) optval_): 0;
switch (option_) {
case ZMQ_REQ_REQUEST_IDS:
if (is_int && value >= 0) {
request_id_frames_enabled = value;
return 0;
}
break;
default:
break;
}
return dealer_t::xsetsockopt(option_, optval_, optvallen_);
}
int zmq::req_t::recv_reply_pipe (msg_t *msg_)
{
while (true) {