diff --git a/Makefile.am b/Makefile.am index bae8424b..40c6cccd 100644 --- a/Makefile.am +++ b/Makefile.am @@ -593,7 +593,8 @@ tests_test_conflate_LDADD = src/libzmq.la ${UNITY_LIBS} tests_test_conflate_CPPFLAGS = ${UNITY_CPPFLAGS} tests_test_inproc_connect_SOURCES = tests/test_inproc_connect.cpp -tests_test_inproc_connect_LDADD = src/libzmq.la +tests_test_inproc_connect_LDADD = src/libzmq.la ${UNITY_LIBS} +tests_test_inproc_connect_CPPFLAGS = ${UNITY_CPPFLAGS} tests_test_issue_566_SOURCES = tests/test_issue_566.cpp tests_test_issue_566_LDADD = src/libzmq.la diff --git a/tests/test_inproc_connect.cpp b/tests/test_inproc_connect.cpp index 9bd69291..868911a5 100644 --- a/tests/test_inproc_connect.cpp +++ b/tests/test_inproc_connect.cpp @@ -28,294 +28,194 @@ */ #include "testutil.hpp" +#include "testutil_unity.hpp" -static void pusher (void *ctx) +void setUp () +{ + setup_test_context (); +} + +void tearDown () +{ + teardown_test_context (); +} + +static void pusher (void * /*unused*/) { // Connect first - void *connectSocket = zmq_socket (ctx, ZMQ_PAIR); - assert (connectSocket); - int rc = zmq_connect (connectSocket, "inproc://sink"); - assert (rc == 0); + void *connectSocket = test_context_socket (ZMQ_PAIR); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connectSocket, "inproc://sink")); // Queue up some data - rc = zmq_send_const (connectSocket, "foobar", 6, 0); - assert (rc == 6); + send_string_expect_success (connectSocket, "foobar", 0); // Cleanup - rc = zmq_close (connectSocket); - assert (rc == 0); + test_context_socket_close (connectSocket); } -static void simult_conn (void *payload) +static void simult_conn (void *endpt_) { - // Pull out arguments - context followed by endpoint string - void *ctx = (void *) ((void **) payload)[0]; - char *endpt = (char *) ((void **) payload)[1]; + // Pull out arguments - endpoint string + const char *endpt = static_cast (endpt_); // Connect - void *connectSocket = zmq_socket (ctx, ZMQ_SUB); - assert (connectSocket); - int rc = zmq_connect (connectSocket, endpt); - assert (rc == 0); + void *connectSocket = test_context_socket (ZMQ_SUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connectSocket, endpt)); // Cleanup - rc = zmq_close (connectSocket); - assert (rc == 0); + test_context_socket_close (connectSocket); } -static void simult_bind (void *payload) +static void simult_bind (void *endpt_) { // Pull out arguments - context followed by endpoint string - void *ctx = (void *) ((void **) payload)[0]; - char *endpt = (char *) ((void **) payload)[1]; + const char *endpt = static_cast (endpt_); // Bind - void *bindSocket = zmq_socket (ctx, ZMQ_PUB); - assert (bindSocket); - int rc = zmq_bind (bindSocket, endpt); - assert (rc == 0); + void *bindSocket = test_context_socket (ZMQ_PUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bindSocket, endpt)); // Cleanup - rc = zmq_close (bindSocket); - assert (rc == 0); + test_context_socket_close (bindSocket); } void test_bind_before_connect () { - void *ctx = zmq_ctx_new (); - assert (ctx); - // Bind first - void *bindSocket = zmq_socket (ctx, ZMQ_PAIR); - assert (bindSocket); - int rc = zmq_bind (bindSocket, "inproc://bbc"); - assert (rc == 0); + void *bindSocket = test_context_socket (ZMQ_PAIR); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bindSocket, "inproc://bbc")); // Now connect - void *connectSocket = zmq_socket (ctx, ZMQ_PAIR); - assert (connectSocket); - rc = zmq_connect (connectSocket, "inproc://bbc"); - assert (rc == 0); + void *connectSocket = test_context_socket (ZMQ_PAIR); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connectSocket, "inproc://bbc")); // Queue up some data - rc = zmq_send_const (connectSocket, "foobar", 6, 0); - assert (rc == 6); + send_string_expect_success (connectSocket, "foobar", 0); // Read pending message - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_msg_recv (&msg, bindSocket, 0); - assert (rc == 6); - void *data = zmq_msg_data (&msg); - assert (memcmp ("foobar", data, 6) == 0); + recv_string_expect_success (bindSocket, "foobar", 0); // Cleanup - rc = zmq_close (connectSocket); - assert (rc == 0); - - rc = zmq_close (bindSocket); - assert (rc == 0); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); + test_context_socket_close (connectSocket); + test_context_socket_close (bindSocket); } void test_connect_before_bind () { - void *ctx = zmq_ctx_new (); - assert (ctx); - // Connect first - void *connectSocket = zmq_socket (ctx, ZMQ_PAIR); - assert (connectSocket); - int rc = zmq_connect (connectSocket, "inproc://cbb"); - assert (rc == 0); + void *connectSocket = test_context_socket (ZMQ_PAIR); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connectSocket, "inproc://cbb")); // Queue up some data - rc = zmq_send_const (connectSocket, "foobar", 6, 0); - assert (rc == 6); + send_string_expect_success (connectSocket, "foobar", 0); // Now bind - void *bindSocket = zmq_socket (ctx, ZMQ_PAIR); - assert (bindSocket); - rc = zmq_bind (bindSocket, "inproc://cbb"); - assert (rc == 0); + void *bindSocket = test_context_socket (ZMQ_PAIR); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bindSocket, "inproc://cbb")); // Read pending message - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_msg_recv (&msg, bindSocket, 0); - assert (rc == 6); - void *data = zmq_msg_data (&msg); - assert (memcmp ("foobar", data, 6) == 0); + recv_string_expect_success (bindSocket, "foobar", 0); // Cleanup - rc = zmq_close (connectSocket); - assert (rc == 0); - - rc = zmq_close (bindSocket); - assert (rc == 0); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); + test_context_socket_close (connectSocket); + test_context_socket_close (bindSocket); } void test_connect_before_bind_pub_sub () { - void *ctx = zmq_ctx_new (); - assert (ctx); - // Connect first - void *connectSocket = zmq_socket (ctx, ZMQ_PUB); - assert (connectSocket); - int rc = zmq_connect (connectSocket, "inproc://cbbps"); - assert (rc == 0); + void *connectSocket = test_context_socket (ZMQ_PUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connectSocket, "inproc://cbbps")); // Queue up some data, this will be dropped - rc = zmq_send_const (connectSocket, "before", 6, 0); - assert (rc == 6); + send_string_expect_success (connectSocket, "before", 0); // Now bind - void *bindSocket = zmq_socket (ctx, ZMQ_SUB); - assert (bindSocket); - rc = zmq_setsockopt (bindSocket, ZMQ_SUBSCRIBE, "", 0); - assert (rc == 0); - rc = zmq_bind (bindSocket, "inproc://cbbps"); - assert (rc == 0); + void *bindSocket = test_context_socket (ZMQ_SUB); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (bindSocket, ZMQ_SUBSCRIBE, "", 0)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bindSocket, "inproc://cbbps")); // Wait for pub-sub connection to happen msleep (SETTLE_TIME); // Queue up some data, this not will be dropped - rc = zmq_send_const (connectSocket, "after", 6, 0); - assert (rc == 6); + send_string_expect_success (connectSocket, "after", 0); // Read pending message - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_msg_recv (&msg, bindSocket, 0); - assert (rc == 6); - void *data = zmq_msg_data (&msg); - assert (memcmp ("after", data, 5) == 0); + recv_string_expect_success (bindSocket, "after", 0); // Cleanup - rc = zmq_close (connectSocket); - assert (rc == 0); - - rc = zmq_close (bindSocket); - assert (rc == 0); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); + test_context_socket_close (connectSocket); + test_context_socket_close (bindSocket); } void test_connect_before_bind_ctx_term () { - void *ctx = zmq_ctx_new (); - assert (ctx); - for (int i = 0; i < 20; ++i) { // Connect first - void *connectSocket = zmq_socket (ctx, ZMQ_ROUTER); - assert (connectSocket); + void *connectSocket = test_context_socket (ZMQ_ROUTER); char ep[20]; sprintf (ep, "inproc://cbbrr%d", i); - int rc = zmq_connect (connectSocket, ep); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connectSocket, ep)); // Cleanup - rc = zmq_close (connectSocket); - assert (rc == 0); + test_context_socket_close (connectSocket); } - - int rc = zmq_ctx_term (ctx); - assert (rc == 0); } void test_multiple_connects () { const unsigned int no_of_connects = 10; - void *ctx = zmq_ctx_new (); - assert (ctx); - int rc; void *connectSocket[no_of_connects]; // Connect first for (unsigned int i = 0; i < no_of_connects; ++i) { - connectSocket[i] = zmq_socket (ctx, ZMQ_PUSH); - assert (connectSocket[i]); - rc = zmq_connect (connectSocket[i], "inproc://multiple"); - assert (rc == 0); + connectSocket[i] = test_context_socket (ZMQ_PUSH); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_connect (connectSocket[i], "inproc://multiple")); // Queue up some data - rc = zmq_send_const (connectSocket[i], "foobar", 6, 0); - assert (rc == 6); + send_string_expect_success (connectSocket[i], "foobar", 0); } // Now bind - void *bindSocket = zmq_socket (ctx, ZMQ_PULL); - assert (bindSocket); - rc = zmq_bind (bindSocket, "inproc://multiple"); - assert (rc == 0); + void *bindSocket = test_context_socket (ZMQ_PULL); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bindSocket, "inproc://multiple")); for (unsigned int i = 0; i < no_of_connects; ++i) { - // Read pending message - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_msg_recv (&msg, bindSocket, 0); - assert (rc == 6); - void *data = zmq_msg_data (&msg); - assert (memcmp ("foobar", data, 6) == 0); + recv_string_expect_success (bindSocket, "foobar", 0); } // Cleanup for (unsigned int i = 0; i < no_of_connects; ++i) { - rc = zmq_close (connectSocket[i]); - assert (rc == 0); + test_context_socket_close (connectSocket[i]); } - rc = zmq_close (bindSocket); - assert (rc == 0); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); + test_context_socket_close (bindSocket); } void test_multiple_threads () { const unsigned int no_of_threads = 30; - void *ctx = zmq_ctx_new (); - assert (ctx); - int rc; void *threads[no_of_threads]; // Connect first for (unsigned int i = 0; i < no_of_threads; ++i) { - threads[i] = zmq_threadstart (&pusher, ctx); + threads[i] = zmq_threadstart (&pusher, NULL); } // Now bind - void *bindSocket = zmq_socket (ctx, ZMQ_PULL); - assert (bindSocket); - rc = zmq_bind (bindSocket, "inproc://sink"); - assert (rc == 0); + void *bindSocket = test_context_socket (ZMQ_PULL); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bindSocket, "inproc://sink")); for (unsigned int i = 0; i < no_of_threads; ++i) { // Read pending message - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_msg_recv (&msg, bindSocket, 0); - assert (rc == 6); - void *data = zmq_msg_data (&msg); - assert (memcmp ("foobar", data, 6) == 0); + recv_string_expect_success (bindSocket, "foobar", 0); } // Cleanup @@ -323,27 +223,19 @@ void test_multiple_threads () zmq_threadclose (threads[i]); } - rc = zmq_close (bindSocket); - assert (rc == 0); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); + test_context_socket_close (bindSocket); } void test_simultaneous_connect_bind_threads () { const unsigned int no_of_times = 50; - void *ctx = zmq_ctx_new (); - assert (ctx); - void *threads[no_of_times * 2]; - void *thr_args[no_of_times][2]; + void *thr_args[no_of_times]; char endpts[no_of_times][20]; // Set up thread arguments: context followed by endpoint string for (unsigned int i = 0; i < no_of_times; ++i) { - thr_args[i][0] = (void *) ctx; - thr_args[i][1] = (void *) endpts[i]; + thr_args[i] = (void *) endpts[i]; sprintf (endpts[i], "inproc://foo_%d", i); } @@ -360,170 +252,109 @@ void test_simultaneous_connect_bind_threads () zmq_threadclose (threads[i * 2 + 0]); zmq_threadclose (threads[i * 2 + 1]); } - - int rc = zmq_ctx_term (ctx); - assert (rc == 0); } void test_routing_id () { // Create the infrastructure - void *ctx = zmq_ctx_new (); - assert (ctx); + void *sc = test_context_socket (ZMQ_DEALER); - void *sc = zmq_socket (ctx, ZMQ_DEALER); - assert (sc); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://routing_id")); - int rc = zmq_connect (sc, "inproc://routing_id"); - assert (rc == 0); + void *sb = test_context_socket (ZMQ_ROUTER); - void *sb = zmq_socket (ctx, ZMQ_ROUTER); - assert (sb); - - rc = zmq_bind (sb, "inproc://routing_id"); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://routing_id")); // Send 2-part message. - rc = zmq_send (sc, "A", 1, ZMQ_SNDMORE); - assert (rc == 1); - rc = zmq_send (sc, "B", 1, 0); - assert (rc == 1); + TEST_ASSERT_EQUAL_INT ( + 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "A", 1, ZMQ_SNDMORE))); + TEST_ASSERT_EQUAL_INT ( + 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_send (sc, "B", 1, 0))); // Routing id comes first. zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_msg_recv (&msg, sb, 0); - assert (rc >= 0); - int more = zmq_msg_more (&msg); - assert (more == 1); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0)); + TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg)); // Then the first part of the message body. - rc = zmq_msg_recv (&msg, sb, 0); - assert (rc == 1); - more = zmq_msg_more (&msg); - assert (more == 1); + TEST_ASSERT_EQUAL_INT ( + 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0))); + TEST_ASSERT_EQUAL_INT (1, zmq_msg_more (&msg)); // And finally, the second part of the message body. - rc = zmq_msg_recv (&msg, sb, 0); - assert (rc == 1); - more = zmq_msg_more (&msg); - assert (more == 0); + TEST_ASSERT_EQUAL_INT ( + 1, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, sb, 0))); + TEST_ASSERT_EQUAL_INT (0, zmq_msg_more (&msg)); // Deallocate the infrastructure. - rc = zmq_close (sc); - assert (rc == 0); - - rc = zmq_close (sb); - assert (rc == 0); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); + test_context_socket_close (sc); + test_context_socket_close (sb); } void test_connect_only () { - void *ctx = zmq_ctx_new (); - assert (ctx); + void *connectSocket = test_context_socket (ZMQ_PUSH); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connectSocket, "inproc://a")); - void *connectSocket = zmq_socket (ctx, ZMQ_PUSH); - assert (connectSocket); - int rc = zmq_connect (connectSocket, "inproc://a"); - assert (rc == 0); - - rc = zmq_close (connectSocket); - assert (rc == 0); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); + test_context_socket_close (connectSocket); } void test_unbind () { - void *ctx = zmq_ctx_new (); - assert (ctx); - // Bind and unbind socket 1 - void *bindSocket1 = zmq_socket (ctx, ZMQ_PAIR); - assert (bindSocket1); - int rc = zmq_bind (bindSocket1, "inproc://unbind"); - assert (rc == 0); - zmq_unbind (bindSocket1, "inproc://unbind"); - assert (rc == 0); + void *bindSocket1 = test_context_socket (ZMQ_PAIR); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bindSocket1, "inproc://unbind")); + TEST_ASSERT_SUCCESS_ERRNO (zmq_unbind (bindSocket1, "inproc://unbind")); // Bind socket 2 - void *bindSocket2 = zmq_socket (ctx, ZMQ_PAIR); - assert (bindSocket2); - rc = zmq_bind (bindSocket2, "inproc://unbind"); - assert (rc == 0); + void *bindSocket2 = test_context_socket (ZMQ_PAIR); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (bindSocket2, "inproc://unbind")); // Now connect - void *connectSocket = zmq_socket (ctx, ZMQ_PAIR); - assert (connectSocket); - rc = zmq_connect (connectSocket, "inproc://unbind"); - assert (rc == 0); + void *connectSocket = test_context_socket (ZMQ_PAIR); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connectSocket, "inproc://unbind")); // Queue up some data - rc = zmq_send_const (connectSocket, "foobar", 6, 0); - assert (rc == 6); + send_string_expect_success (connectSocket, "foobar", 0); // Read pending message - zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); - rc = zmq_msg_recv (&msg, bindSocket2, 0); - assert (rc == 6); - void *data = zmq_msg_data (&msg); - assert (memcmp ("foobar", data, 6) == 0); + recv_string_expect_success (bindSocket2, "foobar", 0); // Cleanup - rc = zmq_close (connectSocket); - assert (rc == 0); - rc = zmq_close (bindSocket1); - assert (rc == 0); - rc = zmq_close (bindSocket2); - assert (rc == 0); - rc = zmq_ctx_term (ctx); - assert (rc == 0); + test_context_socket_close (connectSocket); + test_context_socket_close (bindSocket1); + test_context_socket_close (bindSocket2); } void test_shutdown_during_pend () { - void *ctx = zmq_ctx_new (); - assert (ctx); - // Connect first - void *connectSocket = zmq_socket (ctx, ZMQ_PAIR); - assert (connectSocket); - int rc = zmq_connect (connectSocket, "inproc://cbb"); - assert (rc == 0); + void *connectSocket = test_context_socket (ZMQ_PAIR); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (connectSocket, "inproc://cbb")); - zmq_ctx_shutdown (ctx); + zmq_ctx_shutdown (get_test_context ()); // Cleanup - rc = zmq_close (connectSocket); - assert (rc == 0); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); + test_context_socket_close (connectSocket); } int main (void) { setup_test_environment (); - test_bind_before_connect (); - test_connect_before_bind (); - test_connect_before_bind_pub_sub (); - test_connect_before_bind_ctx_term (); - test_multiple_connects (); - test_multiple_threads (); - test_simultaneous_connect_bind_threads (); - test_routing_id (); - test_connect_only (); - test_unbind (); - test_shutdown_during_pend (); - - return 0; + UNITY_BEGIN (); + RUN_TEST (test_bind_before_connect); + RUN_TEST (test_connect_before_bind); + RUN_TEST (test_connect_before_bind_pub_sub); + RUN_TEST (test_connect_before_bind_ctx_term); + RUN_TEST (test_multiple_connects); + RUN_TEST (test_multiple_threads); + RUN_TEST (test_simultaneous_connect_bind_threads); + RUN_TEST (test_routing_id); + RUN_TEST (test_connect_only); + RUN_TEST (test_unbind); + RUN_TEST (test_shutdown_during_pend); + return UNITY_END (); }