/* SPDX-License-Identifier: MPL-2.0 */ #include "testutil.hpp" #include "testutil_unity.hpp" #include const size_t services = 5; void *req; void *rep[services]; void setUp () { setup_test_context (); char my_endpoint[MAX_SOCKET_STRING]; req = test_context_socket (ZMQ_REQ); int enabled = 1; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int))); TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int))); bind_loopback_ipv4 (req, my_endpoint, sizeof (my_endpoint)); for (size_t peer = 0; peer < services; peer++) { rep[peer] = test_context_socket (ZMQ_REP); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (rep[peer], my_endpoint)); // These tests require strict ordering, so wait for the connections to // happen before opening the next, so that messages flow in the // expected direction msleep (SETTLE_TIME); } } void tearDown () { test_context_socket_close_zero_linger (req); for (size_t peer = 0; peer < services; peer++) test_context_socket_close_zero_linger (rep[peer]); teardown_test_context (); } static void bounce (void *socket_) { int more; size_t more_size = sizeof (more); do { zmq_msg_t recv_part, sent_part; TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&recv_part)); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&recv_part, socket_, 0)); TEST_ASSERT_SUCCESS_ERRNO ( zmq_getsockopt (socket_, ZMQ_RCVMORE, &more, &more_size)); zmq_msg_init (&sent_part); zmq_msg_copy (&sent_part, &recv_part); TEST_ASSERT_SUCCESS_ERRNO ( zmq_msg_send (&sent_part, socket_, more ? ZMQ_SNDMORE : 0)); zmq_msg_close (&recv_part); } while (more); } static int get_events (void *socket_) { int events; size_t events_size = sizeof (events); TEST_ASSERT_SUCCESS_ERRNO ( zmq_getsockopt (socket_, ZMQ_EVENTS, &events, &events_size)); return events; } void test_case_1 () { // Case 1: Second send() before a reply arrives in a pipe. int events = get_events (req); TEST_ASSERT_EQUAL_INT (ZMQ_POLLOUT, events); // Send a request, ensure it arrives, don't send a reply s_send_seq (req, "A", "B", SEQ_END); s_recv_seq (rep[0], "A", "B", SEQ_END); events = get_events (req); TEST_ASSERT_EQUAL_INT (ZMQ_POLLOUT, events); // Send another request on the REQ socket s_send_seq (req, "C", "D", SEQ_END); s_recv_seq (rep[1], "C", "D", SEQ_END); events = get_events (req); TEST_ASSERT_EQUAL_INT (ZMQ_POLLOUT, events); // Send a reply to the first request - that should be discarded by the REQ s_send_seq (rep[0], "WRONG", SEQ_END); // Send the expected reply s_send_seq (rep[1], "OK", SEQ_END); s_recv_seq (req, "OK", SEQ_END); // Another standard req-rep cycle, just to check s_send_seq (req, "E", SEQ_END); s_recv_seq (rep[2], "E", SEQ_END); s_send_seq (rep[2], "F", "G", SEQ_END); s_recv_seq (req, "F", "G", SEQ_END); } void test_case_2 () { // Case 2: Second send() after a reply is already in a pipe on the REQ. // TODO instead of rerunning the previous test cases, only do the relevant parts (or change the peer) test_case_1 (); // Send a request, ensure it arrives, send a reply s_send_seq (req, "H", SEQ_END); s_recv_seq (rep[3], "H", SEQ_END); s_send_seq (rep[3], "BAD", SEQ_END); // Wait for message to be there. msleep (SETTLE_TIME); // Without receiving that reply, send another request on the REQ socket s_send_seq (req, "I", SEQ_END); s_recv_seq (rep[4], "I", SEQ_END); // Send the expected reply s_send_seq (rep[4], "GOOD", SEQ_END); s_recv_seq (req, "GOOD", SEQ_END); } void test_case_3 () { // Case 3: Check issue #1690. Two send() in a row should not close the // communication pipes. For example pipe from req to rep[0] should not be // closed after executing Case 1. So rep[0] should be the next to receive, // not rep[1]. // TODO instead of rerunning the previous test cases, only do the relevant parts (or change the peer) test_case_2 (); s_send_seq (req, "J", SEQ_END); s_recv_seq (rep[0], "J", SEQ_END); } void test_case_4 () { // TODO this test case does not use the sockets from setUp // Case 4: Check issue #1695. As messages may pile up before a responder // is available, we check that responses to messages other than the last // sent one are correctly discarded by the REQ pipe // Setup REQ socket as client void *req = test_context_socket (ZMQ_REQ); int enabled = 1; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (req, ZMQ_REQ_RELAXED, &enabled, sizeof (int))); TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (req, ZMQ_REQ_CORRELATE, &enabled, sizeof (int))); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (req, ENDPOINT_0)); // Setup ROUTER socket as server but do not bind it just yet void *router = test_context_socket (ZMQ_ROUTER); // Send two requests s_send_seq (req, "TO_BE_DISCARDED", SEQ_END); s_send_seq (req, "TO_BE_ANSWERED", SEQ_END); // Bind server allowing it to receive messages TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (router, ENDPOINT_0)); // Read the two messages and send them back as is bounce (router); bounce (router); // Read the expected correlated reply. As the ZMQ_REQ_CORRELATE is active, // the expected answer is "TO_BE_ANSWERED", not "TO_BE_DISCARDED". s_recv_seq (req, "TO_BE_ANSWERED", SEQ_END); test_context_socket_close_zero_linger (req); test_context_socket_close_zero_linger (router); } int main () { setup_test_environment (); UNITY_BEGIN (); RUN_TEST (test_case_1); RUN_TEST (test_case_2); RUN_TEST (test_case_3); RUN_TEST (test_case_4); return UNITY_END (); }