diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 330a7c2f..fdc4686c 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -667,11 +667,9 @@ ZMQ_REQ_RELAXED: relax strict alternation between request and reply ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ By default, a REQ socket does not allow initiating a new request with _zmq_send(3)_ until the reply to the previous one has been received. -When set to 1, sending another message is allowed and has the effect of -disconnecting the underlying connection to the peer from which the reply was -expected, triggering a reconnection attempt on transports that support it. -The request-reply state machine is reset and a new request is sent to the -next available peer. +When set to 1, sending another message is allowed and previous replies will +be discarded if any. The request-reply state machine is reset and a new +request is sent to the next available peer. If set to 1, also enable ZMQ_REQ_CORRELATE to ensure correct matching of requests and replies. Otherwise a late reply to an aborted request can be diff --git a/src/req.cpp b/src/req.cpp index dfc6611d..23263f07 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -35,13 +35,21 @@ #include "random.hpp" #include "likely.hpp" +extern "C" +{ + static void free_id (void *data, void *hint) + { + free (data); + } +} + zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) : dealer_t (parent_, tid_, sid_), receiving_reply (false), message_begins (true), reply_pipe (NULL), request_id_frames_enabled (false), - request_id (generate_random()), + request_id (generate_random ()), strict (true) { options.type = ZMQ_REQ; @@ -72,8 +80,13 @@ int zmq::req_t::xsend (msg_t *msg_) if (request_id_frames_enabled) { request_id++; + // Copy request id before sending (see issue #1695 for details). + uint32_t *request_id_copy = (uint32_t *) malloc (sizeof (uint32_t)); + *request_id_copy = request_id; + msg_t id; - int rc = id.init_data (&request_id, sizeof (request_id), NULL, NULL); + int rc = id.init_data (request_id_copy, sizeof (uint32_t), + free_id, NULL); errno_assert (rc == 0); id.set_flags (msg_t::more); @@ -206,7 +219,8 @@ int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_ { bool is_int = (optvallen_ == sizeof (int)); int value = 0; - if (is_int) memcpy(&value, optval_, sizeof (int)); + if (is_int) + memcpy (&value, optval_, sizeof (int)); switch (option_) { case ZMQ_REQ_CORRELATE: diff --git a/tests/test_req_relaxed.cpp b/tests/test_req_relaxed.cpp index 86d99b4e..3b5c1cb4 100644 --- a/tests/test_req_relaxed.cpp +++ b/tests/test_req_relaxed.cpp @@ -29,6 +29,31 @@ #include "testutil.hpp" +static void bounce (void *socket) +{ + int more; + size_t more_size = sizeof(more); + do { + zmq_msg_t recv_part, sent_part; + int rc = zmq_msg_init (&recv_part); + assert (rc == 0); + + rc = zmq_msg_recv (&recv_part, socket, 0); + assert (rc != -1); + + rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size); + assert (rc == 0); + + zmq_msg_init (&sent_part); + zmq_msg_copy (&sent_part, &recv_part); + + rc = zmq_msg_send (&sent_part, socket, more ? ZMQ_SNDMORE : 0); + assert (rc != -1); + + zmq_msg_close(&recv_part); + } while (more); +} + int main (void) { setup_test_environment(); @@ -58,7 +83,7 @@ int main (void) rep [peer] = zmq_socket (ctx, ZMQ_REP); assert (rep [peer]); - int timeout = 250; + int timeout = 500; rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)); assert (rc == 0); @@ -113,19 +138,65 @@ int main (void) s_send_seq (rep [4], "GOOD", SEQ_END); s_recv_seq (req, "GOOD", SEQ_END); - // Case 3: Check issue #1690. Two send() in a row should not close the - // communication pipes. For example pipe from req to rep[0] should not be - // closed after executing Case 1. So rep[0] should be the next to receive, - // not rep[1]. - s_send_seq(req, "J", SEQ_END); - s_recv_seq(rep [0], "J", SEQ_END); - + // Case 3: Check issue #1690. Two send() in a row should not close the + // communication pipes. For example pipe from req to rep[0] should not be + // closed after executing Case 1. So rep[0] should be the next to receive, + // not rep[1]. + s_send_seq(req, "J", SEQ_END); + s_recv_seq(rep [0], "J", SEQ_END); close_zero_linger (req); for (size_t peer = 0; peer < services; peer++) close_zero_linger (rep [peer]); - // Wait for disconnects. + // Wait for disconnects. + msleep (SETTLE_TIME); + + // Case 4: Check issue #1695. As messages may pile up before a responder + // is available, we check that responses to messages other than the last + // sent one are correctly discarded by the REQ pipe + + // Setup REQ socket as client + req = zmq_socket (ctx, ZMQ_REQ); + assert (req); + + rc = zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int)); + assert (rc == 0); + + rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int)); + assert (rc == 0); + + rc = zmq_connect (req, "tcp://localhost:5555"); + assert (rc == 0); + + // Setup ROUTER socket as server but do not bind it just yet + void *router = zmq_socket (ctx, ZMQ_ROUTER); + assert(router); + + int timeout = 1000; + rc = zmq_setsockopt (router, ZMQ_RCVTIMEO, &timeout, sizeof(int)); + assert (rc == 0); + + // Send two requests + s_send_seq (req, "TO_BE_DISCARDED", SEQ_END); + s_send_seq (req, "TO_BE_ANSWERED", SEQ_END); + + // Bind server allowing it to receive messages + rc = zmq_bind(router, "tcp://127.0.0.1:5555"); + assert (rc == 0); + + // Read the two messages and send them back as is + bounce (router); + bounce (router); + + // Read the expected correlated reply. As the ZMQ_REQ_CORRELATE is active, + // the expected answer is "TO_BE_ANSWERED", not "TO_BE_DISCARDED". + s_recv_seq (req, "TO_BE_ANSWERED", SEQ_END); + + close_zero_linger (req); + close_zero_linger (router); + + // Wait for disconnects. msleep (SETTLE_TIME); rc = zmq_ctx_term (ctx);