/* SPDX-License-Identifier: MPL-2.0 */ #include "testutil.hpp" #include "testutil_unity.hpp" SETUP_TEARDOWN_TESTCONTEXT static void pusher (void * /*unused*/) { // Connect first // do not use test_context_socket here, as it is not thread-safe void *connect_socket = zmq_socket (get_test_context (), ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://sink")); // Queue up some data send_string_expect_success (connect_socket, "foobar", 0); // Cleanup TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket)); } static void simult_conn (void *endpt_) { // Pull out arguments - endpoint string const char *endpt = static_cast (endpt_); // Connect // do not use test_context_socket here, as it is not thread-safe void *connect_socket = zmq_socket (get_test_context (), ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, endpt)); recv_string_expect_success (connect_socket, "foobar", 0); // Cleanup TEST_ASSERT_SUCCESS_ERRNO (zmq_close (connect_socket)); } static void simult_bind (void *endpt_) { // Pull out arguments - context followed by endpoint string const char *endpt = static_cast (endpt_); // Bind // do not use test_context_socket here, as it is not thread-safe void *bind_socket = zmq_socket (get_test_context (), ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, endpt)); send_string_expect_success (bind_socket, "foobar", 0); // Cleanup TEST_ASSERT_SUCCESS_ERRNO (zmq_close (bind_socket)); } void test_bind_before_connect () { // Bind first void *bind_socket = test_context_socket (ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://bbc")); // Now connect void *connect_socket = test_context_socket (ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://bbc")); // Queue up some data send_string_expect_success (connect_socket, "foobar", 0); // Read pending message recv_string_expect_success (bind_socket, "foobar", 0); // Cleanup test_context_socket_close (connect_socket); test_context_socket_close (bind_socket); } void test_connect_before_bind () { // Connect first void *connect_socket = test_context_socket (ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb")); // Queue up some data send_string_expect_success (connect_socket, "foobar", 0); // Now bind void *bind_socket = test_context_socket (ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbb")); // Read pending message recv_string_expect_success (bind_socket, "foobar", 0); // Cleanup test_context_socket_close (connect_socket); test_context_socket_close (bind_socket); } void test_connect_before_bind_pub_sub () { // Connect first void *connect_socket = test_context_socket (ZMQ_PUB); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbbps")); // Queue up some data, this will be dropped send_string_expect_success (connect_socket, "before", 0); // Now bind void *bind_socket = test_context_socket (ZMQ_SUB); TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (bind_socket, ZMQ_SUBSCRIBE, "", 0)); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://cbbps")); // Wait for pub-sub connection to happen msleep (SETTLE_TIME); // Queue up some data, this not will be dropped send_string_expect_success (connect_socket, "after", 0); // Read pending message recv_string_expect_success (bind_socket, "after", 0); // Cleanup test_context_socket_close (connect_socket); test_context_socket_close (bind_socket); } void test_connect_before_bind_ctx_term () { for (int i = 0; i < 20; ++i) { // Connect first void *connect_socket = test_context_socket (ZMQ_ROUTER); char ep[32]; snprintf (ep, 32 * sizeof (char), "inproc://cbbrr%d", i); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, ep)); // Cleanup test_context_socket_close (connect_socket); } } void test_multiple_connects () { const unsigned int no_of_connects = 10; void *connect_socket[no_of_connects]; // Connect first for (unsigned int i = 0; i < no_of_connects; ++i) { connect_socket[i] = test_context_socket (ZMQ_PUSH); TEST_ASSERT_SUCCESS_ERRNO ( zmq_connect (connect_socket[i], "inproc://multiple")); // Queue up some data send_string_expect_success (connect_socket[i], "foobar", 0); } // Now bind void *bind_socket = test_context_socket (ZMQ_PULL); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://multiple")); for (unsigned int i = 0; i < no_of_connects; ++i) { recv_string_expect_success (bind_socket, "foobar", 0); } // Cleanup for (unsigned int i = 0; i < no_of_connects; ++i) { test_context_socket_close (connect_socket[i]); } test_context_socket_close (bind_socket); } void test_multiple_threads () { const unsigned int no_of_threads = 30; void *threads[no_of_threads]; // Connect first for (unsigned int i = 0; i < no_of_threads; ++i) { threads[i] = zmq_threadstart (&pusher, NULL); } // Now bind void *bind_socket = test_context_socket (ZMQ_PULL); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket, "inproc://sink")); for (unsigned int i = 0; i < no_of_threads; ++i) { // Read pending message recv_string_expect_success (bind_socket, "foobar", 0); } // Cleanup for (unsigned int i = 0; i < no_of_threads; ++i) { zmq_threadclose (threads[i]); } test_context_socket_close (bind_socket); } void test_simultaneous_connect_bind_threads () { const unsigned int no_of_times = 50; void *threads[no_of_times * 2]; void *thr_args[no_of_times]; char endpts[no_of_times][20]; // Set up thread arguments: context followed by endpoint string for (unsigned int i = 0; i < no_of_times; ++i) { thr_args[i] = (void *) endpts[i]; snprintf (endpts[i], 20 * sizeof (char), "inproc://foo_%d", i); } // Spawn all threads as simultaneously as possible for (unsigned int i = 0; i < no_of_times; ++i) { threads[i * 2 + 0] = zmq_threadstart (&simult_conn, thr_args[i]); threads[i * 2 + 1] = zmq_threadstart (&simult_bind, thr_args[i]); } // Close all threads for (unsigned int i = 0; i < no_of_times; ++i) { zmq_threadclose (threads[i * 2 + 0]); zmq_threadclose (threads[i * 2 + 1]); } } void test_routing_id () { // Create the infrastructure void *sc = test_context_socket (ZMQ_DEALER); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://routing_id")); void *sb = test_context_socket (ZMQ_ROUTER); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://routing_id")); // Send 2-part message. TEST_ASSERT_EQUAL_INT ( 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "A", 1, ZMQ_SNDMORE))); TEST_ASSERT_EQUAL_INT ( 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "B", 1, 0))); // Routing id comes first. zmq_msg_t msg; TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)); TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg)); // Then the first part of the message body. TEST_ASSERT_EQUAL_INT ( 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0))); TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg)); // And finally, the second part of the message body. TEST_ASSERT_EQUAL_INT ( 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0))); TEST_ASSERT_EQUAL_INT (0, zmq_msg_more (&msg)); // Deallocate the infrastructure. test_context_socket_close (sc); test_context_socket_close (sb); } void test_connect_only () { void *connect_socket = test_context_socket (ZMQ_PUSH); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://a")); test_context_socket_close (connect_socket); } void test_unbind () { // Bind and unbind socket 1 void *bind_socket1 = test_context_socket (ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket1, "inproc://unbind")); TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (bind_socket1, "inproc://unbind")); // Bind socket 2 void *bind_socket2 = test_context_socket (ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bind_socket2, "inproc://unbind")); // Now connect void *connect_socket = test_context_socket (ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://unbind")); // Queue up some data send_string_expect_success (connect_socket, "foobar", 0); // Read pending message recv_string_expect_success (bind_socket2, "foobar", 0); // Cleanup test_context_socket_close (connect_socket); test_context_socket_close (bind_socket1); test_context_socket_close (bind_socket2); } void test_shutdown_during_pend () { // Connect first void *connect_socket = test_context_socket (ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connect_socket, "inproc://cbb")); zmq_ctx_shutdown (get_test_context ()); // Cleanup test_context_socket_close (connect_socket); } int main (void) { setup_test_environment (); UNITY_BEGIN (); RUN_TEST (test_bind_before_connect); RUN_TEST (test_connect_before_bind); RUN_TEST (test_connect_before_bind_pub_sub); RUN_TEST (test_connect_before_bind_ctx_term); RUN_TEST (test_multiple_connects); RUN_TEST (test_multiple_threads); RUN_TEST (test_simultaneous_connect_bind_threads); RUN_TEST (test_routing_id); RUN_TEST (test_connect_only); RUN_TEST (test_unbind); RUN_TEST (test_shutdown_during_pend); return UNITY_END (); }