mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-19 00:46:05 +01:00
Fixed issue #1695 (ZMQ_REQ_CORRELATE)
Problem: when using ZMQ_REQ_RELAXED + ZMQ_REQ_CORRELATE and two 'send' are executed in a row and no server is available at the time of the sends, then the internal request_id used to identify messages gets corrupted and the two messages end up with the same request_id. The correlation no longer works in that case and you may end up with the wrong message. Solution: make a copy of the request_id instance member before sending it down the pipe.
This commit is contained in:
parent
98ab7f4164
commit
e45dfe3bc7
@ -667,11 +667,9 @@ ZMQ_REQ_RELAXED: relax strict alternation between request and reply
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
By default, a REQ socket does not allow initiating a new request with
|
||||
_zmq_send(3)_ until the reply to the previous one has been received.
|
||||
When set to 1, sending another message is allowed and has the effect of
|
||||
disconnecting the underlying connection to the peer from which the reply was
|
||||
expected, triggering a reconnection attempt on transports that support it.
|
||||
The request-reply state machine is reset and a new request is sent to the
|
||||
next available peer.
|
||||
When set to 1, sending another message is allowed and previous replies will
|
||||
be discarded if any. The request-reply state machine is reset and a new
|
||||
request is sent to the next available peer.
|
||||
|
||||
If set to 1, also enable ZMQ_REQ_CORRELATE to ensure correct matching of
|
||||
requests and replies. Otherwise a late reply to an aborted request can be
|
||||
|
20
src/req.cpp
20
src/req.cpp
@ -35,13 +35,21 @@
|
||||
#include "random.hpp"
|
||||
#include "likely.hpp"
|
||||
|
||||
extern "C"
|
||||
{
|
||||
static void free_id (void *data, void *hint)
|
||||
{
|
||||
free (data);
|
||||
}
|
||||
}
|
||||
|
||||
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
dealer_t (parent_, tid_, sid_),
|
||||
receiving_reply (false),
|
||||
message_begins (true),
|
||||
reply_pipe (NULL),
|
||||
request_id_frames_enabled (false),
|
||||
request_id (generate_random()),
|
||||
request_id (generate_random ()),
|
||||
strict (true)
|
||||
{
|
||||
options.type = ZMQ_REQ;
|
||||
@ -72,8 +80,13 @@ int zmq::req_t::xsend (msg_t *msg_)
|
||||
if (request_id_frames_enabled) {
|
||||
request_id++;
|
||||
|
||||
// Copy request id before sending (see issue #1695 for details).
|
||||
uint32_t *request_id_copy = (uint32_t *) malloc (sizeof (uint32_t));
|
||||
*request_id_copy = request_id;
|
||||
|
||||
msg_t id;
|
||||
int rc = id.init_data (&request_id, sizeof (request_id), NULL, NULL);
|
||||
int rc = id.init_data (request_id_copy, sizeof (uint32_t),
|
||||
free_id, NULL);
|
||||
errno_assert (rc == 0);
|
||||
id.set_flags (msg_t::more);
|
||||
|
||||
@ -206,7 +219,8 @@ int zmq::req_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_
|
||||
{
|
||||
bool is_int = (optvallen_ == sizeof (int));
|
||||
int value = 0;
|
||||
if (is_int) memcpy(&value, optval_, sizeof (int));
|
||||
if (is_int)
|
||||
memcpy (&value, optval_, sizeof (int));
|
||||
|
||||
switch (option_) {
|
||||
case ZMQ_REQ_CORRELATE:
|
||||
|
@ -29,6 +29,31 @@
|
||||
|
||||
#include "testutil.hpp"
|
||||
|
||||
static void bounce (void *socket)
|
||||
{
|
||||
int more;
|
||||
size_t more_size = sizeof(more);
|
||||
do {
|
||||
zmq_msg_t recv_part, sent_part;
|
||||
int rc = zmq_msg_init (&recv_part);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_msg_recv (&recv_part, socket, 0);
|
||||
assert (rc != -1);
|
||||
|
||||
rc = zmq_getsockopt (socket, ZMQ_RCVMORE, &more, &more_size);
|
||||
assert (rc == 0);
|
||||
|
||||
zmq_msg_init (&sent_part);
|
||||
zmq_msg_copy (&sent_part, &recv_part);
|
||||
|
||||
rc = zmq_msg_send (&sent_part, socket, more ? ZMQ_SNDMORE : 0);
|
||||
assert (rc != -1);
|
||||
|
||||
zmq_msg_close(&recv_part);
|
||||
} while (more);
|
||||
}
|
||||
|
||||
int main (void)
|
||||
{
|
||||
setup_test_environment();
|
||||
@ -58,7 +83,7 @@ int main (void)
|
||||
rep [peer] = zmq_socket (ctx, ZMQ_REP);
|
||||
assert (rep [peer]);
|
||||
|
||||
int timeout = 250;
|
||||
int timeout = 500;
|
||||
rc = zmq_setsockopt (rep [peer], ZMQ_RCVTIMEO, &timeout, sizeof (int));
|
||||
assert (rc == 0);
|
||||
|
||||
@ -113,19 +138,65 @@ int main (void)
|
||||
s_send_seq (rep [4], "GOOD", SEQ_END);
|
||||
s_recv_seq (req, "GOOD", SEQ_END);
|
||||
|
||||
// Case 3: Check issue #1690. Two send() in a row should not close the
|
||||
// communication pipes. For example pipe from req to rep[0] should not be
|
||||
// closed after executing Case 1. So rep[0] should be the next to receive,
|
||||
// not rep[1].
|
||||
s_send_seq(req, "J", SEQ_END);
|
||||
s_recv_seq(rep [0], "J", SEQ_END);
|
||||
|
||||
// Case 3: Check issue #1690. Two send() in a row should not close the
|
||||
// communication pipes. For example pipe from req to rep[0] should not be
|
||||
// closed after executing Case 1. So rep[0] should be the next to receive,
|
||||
// not rep[1].
|
||||
s_send_seq(req, "J", SEQ_END);
|
||||
s_recv_seq(rep [0], "J", SEQ_END);
|
||||
|
||||
close_zero_linger (req);
|
||||
for (size_t peer = 0; peer < services; peer++)
|
||||
close_zero_linger (rep [peer]);
|
||||
|
||||
// Wait for disconnects.
|
||||
// Wait for disconnects.
|
||||
msleep (SETTLE_TIME);
|
||||
|
||||
// Case 4: Check issue #1695. As messages may pile up before a responder
|
||||
// is available, we check that responses to messages other than the last
|
||||
// sent one are correctly discarded by the REQ pipe
|
||||
|
||||
// Setup REQ socket as client
|
||||
req = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (req);
|
||||
|
||||
rc = zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int));
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_connect (req, "tcp://localhost:5555");
|
||||
assert (rc == 0);
|
||||
|
||||
// Setup ROUTER socket as server but do not bind it just yet
|
||||
void *router = zmq_socket (ctx, ZMQ_ROUTER);
|
||||
assert(router);
|
||||
|
||||
int timeout = 1000;
|
||||
rc = zmq_setsockopt (router, ZMQ_RCVTIMEO, &timeout, sizeof(int));
|
||||
assert (rc == 0);
|
||||
|
||||
// Send two requests
|
||||
s_send_seq (req, "TO_BE_DISCARDED", SEQ_END);
|
||||
s_send_seq (req, "TO_BE_ANSWERED", SEQ_END);
|
||||
|
||||
// Bind server allowing it to receive messages
|
||||
rc = zmq_bind(router, "tcp://127.0.0.1:5555");
|
||||
assert (rc == 0);
|
||||
|
||||
// Read the two messages and send them back as is
|
||||
bounce (router);
|
||||
bounce (router);
|
||||
|
||||
// Read the expected correlated reply. As the ZMQ_REQ_CORRELATE is active,
|
||||
// the expected answer is "TO_BE_ANSWERED", not "TO_BE_DISCARDED".
|
||||
s_recv_seq (req, "TO_BE_ANSWERED", SEQ_END);
|
||||
|
||||
close_zero_linger (req);
|
||||
close_zero_linger (router);
|
||||
|
||||
// Wait for disconnects.
|
||||
msleep (SETTLE_TIME);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
|
Loading…
x
Reference in New Issue
Block a user