From 4f49e7420b11c4230c6951b610c7cb6cc520c2c0 Mon Sep 17 00:00:00 2001 From: Francesco Montorsi Date: Fri, 21 Sep 2018 11:19:50 +0200 Subject: [PATCH] Remove race condition from XPUB/SUB test in test_blocking() --- tests/test_hwm_pubsub.cpp | 36 +++++++++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/tests/test_hwm_pubsub.cpp b/tests/test_hwm_pubsub.cpp index 60fa0cb3..90dd75ee 100644 --- a/tests/test_hwm_pubsub.cpp +++ b/tests/test_hwm_pubsub.cpp @@ -95,12 +95,21 @@ int test_defaults (int send_hwm_, int msg_cnt_, const char *endpoint) return recv_count; } -int receive (void *socket_) +int receive (void *socket_, int *is_termination) { int recv_count = 0; + *is_termination = 0; + // Now receive all sent messages - while (0 == zmq_recv (socket_, NULL, 0, 0)) { + char buffer[255]; + int len; + while ((len = zmq_recv (socket_, buffer, sizeof (buffer), 0)) >= 0) { ++recv_count; + + if (len == 3 && strncmp (buffer, "end", len) == 0) { + *is_termination = 1; + return recv_count; + } } return recv_count; @@ -141,19 +150,36 @@ int test_blocking (int send_hwm_, int msg_cnt_, const char *endpoint) // Send until we block int send_count = 0; int recv_count = 0; + int blocked_count = 0; + int is_termination = 0; while (send_count < msg_cnt_) { const int rc = zmq_send (pub_socket, NULL, 0, ZMQ_DONTWAIT); if (rc == 0) { ++send_count; } else if (-1 == rc) { // if the PUB socket blocks due to HWM, errno should be EAGAIN: + blocked_count++; TEST_ASSERT_EQUAL_INT (EAGAIN, errno); - recv_count += receive (sub_socket); + recv_count += receive (sub_socket, &is_termination); } } - msleep (2 * SETTLE_TIME); // required for TCP transport - recv_count += receive (sub_socket); + // if send_hwm_ < msg_cnt_, we should block at least once: + TEST_ASSERT (blocked_count > 0); + + // dequeue SUB socket again, to make sure XPUB has space to send the termination message + recv_count += receive (sub_socket, &is_termination); + + // send termination message + send_string_expect_success (pub_socket, "end", 0); + + // now block on the SUB side till we get the termination message + while (is_termination == 0) + recv_count += receive (sub_socket, &is_termination); + + // remove termination message from the count: + recv_count--; + TEST_ASSERT_EQUAL_INT (send_count, recv_count); // Clean up