diff --git a/Makefile.am b/Makefile.am index bd8ddef5..9a586200 100644 --- a/Makefile.am +++ b/Makefile.am @@ -598,7 +598,8 @@ tests_test_spec_router_LDADD = src/libzmq.la ${UNITY_LIBS} tests_test_spec_router_CPPFLAGS = ${UNITY_CPPFLAGS} tests_test_spec_pushpull_SOURCES = tests/test_spec_pushpull.cpp -tests_test_spec_pushpull_LDADD = src/libzmq.la +tests_test_spec_pushpull_LDADD = src/libzmq.la ${UNITY_LIBS} +tests_test_spec_pushpull_CPPFLAGS = ${UNITY_CPPFLAGS} tests_test_req_correlate_SOURCES = tests/test_req_correlate.cpp tests_test_req_correlate_LDADD = src/libzmq.la ${UNITY_LIBS} diff --git a/tests/test_spec_pushpull.cpp b/tests/test_spec_pushpull.cpp index d06b349b..9844eb22 100644 --- a/tests/test_spec_pushpull.cpp +++ b/tests/test_spec_pushpull.cpp @@ -28,33 +28,40 @@ */ #include "testutil.hpp" +#include "testutil_unity.hpp" + +void setUp () +{ + setup_test_context (); +} + +void tearDown () +{ + teardown_test_context (); +} -const char *bind_address = 0; char connect_address[MAX_SOCKET_STRING]; -void test_push_round_robin_out (void *ctx_) +// PUSH: SHALL route outgoing messages to connected peers using a +// round-robin strategy. +void test_push_round_robin_out (const char *bind_address_) { - void *push = zmq_socket (ctx_, ZMQ_PUSH); - assert (push); + void *push = test_context_socket (ZMQ_PUSH); - int rc = zmq_bind (push, bind_address); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (push, bind_address_)); size_t len = MAX_SOCKET_STRING; - rc = zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &len); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, connect_address, &len)); const size_t services = 5; void *pulls[services]; for (size_t peer = 0; peer < services; ++peer) { - pulls[peer] = zmq_socket (ctx_, ZMQ_PULL); - assert (pulls[peer]); + pulls[peer] = test_context_socket (ZMQ_PULL); int timeout = 250; - rc = zmq_setsockopt (pulls[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int)); - assert (rc == 0); - - rc = zmq_connect (pulls[peer], connect_address); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pulls[peer], ZMQ_RCVTIMEO, &timeout, sizeof (int))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pulls[peer], connect_address)); } // Wait for connections. @@ -72,34 +79,29 @@ void test_push_round_robin_out (void *ctx_) s_recv_seq (pulls[peer], "DEF", SEQ_END); } - close_zero_linger (push); + test_context_socket_close_zero_linger (push); for (size_t peer = 0; peer < services; ++peer) - close_zero_linger (pulls[peer]); - - // Wait for disconnects. - msleep (SETTLE_TIME); + test_context_socket_close_zero_linger (pulls[peer]); } -void test_pull_fair_queue_in (void *ctx_) +// PULL: SHALL receive incoming messages from its peers using a fair-queuing +// strategy. +void test_pull_fair_queue_in (const char *bind_address_) { - void *pull = zmq_socket (ctx_, ZMQ_PULL); - assert (pull); + void *pull = test_context_socket (ZMQ_PULL); - int rc = zmq_bind (pull, bind_address); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pull, bind_address_)); size_t len = MAX_SOCKET_STRING; - rc = zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, connect_address, &len); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, connect_address, &len)); const unsigned char services = 5; void *pushs[services]; for (unsigned char peer = 0; peer < services; ++peer) { - pushs[peer] = zmq_socket (ctx_, ZMQ_PUSH); - assert (pushs[peer]); + pushs[peer] = test_context_socket (ZMQ_PUSH); - rc = zmq_connect (pushs[peer], connect_address); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (pushs[peer], connect_address)); } // Wait for connections. @@ -127,83 +129,72 @@ void test_pull_fair_queue_in (void *ctx_) msleep (SETTLE_TIME); zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); // Expect to pull one from each first for (size_t peer = 0; peer < services; ++peer) { - rc = zmq_msg_recv (&msg, pull, 0); - assert (rc == 2); + TEST_ASSERT_EQUAL_INT ( + 2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0))); const char *str = (const char *) zmq_msg_data (&msg); first_half -= str[0]; } - assert (first_half == 0); + TEST_ASSERT_EQUAL_INT (0, first_half); // And then get the second batch for (size_t peer = 0; peer < services; ++peer) { - rc = zmq_msg_recv (&msg, pull, 0); - assert (rc == 2); + TEST_ASSERT_EQUAL_INT ( + 2, TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, pull, 0))); const char *str = (const char *) zmq_msg_data (&msg); second_half -= str[0]; } - assert (second_half == 0); + TEST_ASSERT_EQUAL_INT (0, second_half); - rc = zmq_msg_close (&msg); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); - close_zero_linger (pull); + test_context_socket_close_zero_linger (pull); for (size_t peer = 0; peer < services; ++peer) - close_zero_linger (pushs[peer]); - - // Wait for disconnects. - msleep (SETTLE_TIME); + test_context_socket_close_zero_linger (pushs[peer]); } -void test_push_block_on_send_no_peers (void *ctx_) +// PUSH: SHALL block on sending, or return a suitable error, when it has no +// available peers. +void test_push_block_on_send_no_peers (const char *bind_address_) { - void *sc = zmq_socket (ctx_, ZMQ_PUSH); - assert (sc); + void *sc = test_context_socket (ZMQ_PUSH); int timeout = 250; - int rc = zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout)); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sc, ZMQ_SNDTIMEO, &timeout, sizeof (timeout))); - rc = zmq_send (sc, 0, 0, ZMQ_DONTWAIT); - assert (rc == -1); - assert (errno == EAGAIN); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, ZMQ_DONTWAIT)); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (sc, 0, 0, 0)); - rc = zmq_send (sc, 0, 0, 0); - assert (rc == -1); - assert (errno == EAGAIN); - - rc = zmq_close (sc); - assert (rc == 0); + test_context_socket_close (sc); } -void test_destroy_queue_on_disconnect (void *ctx_) +// PUSH and PULL: SHALL create this queue when a peer connects to it. If +// this peer disconnects, the socket SHALL destroy its queue and SHALL +// discard any messages it contains. +void test_destroy_queue_on_disconnect (const char *bind_address_) { - void *a = zmq_socket (ctx_, ZMQ_PUSH); - assert (a); + void *a = test_context_socket (ZMQ_PUSH); int hwm = 1; - int rc = zmq_setsockopt (a, ZMQ_SNDHWM, &hwm, sizeof (hwm)); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (a, ZMQ_SNDHWM, &hwm, sizeof (hwm))); - rc = zmq_bind (a, bind_address); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (a, bind_address_)); size_t len = MAX_SOCKET_STRING; - rc = zmq_getsockopt (a, ZMQ_LAST_ENDPOINT, connect_address, &len); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (a, ZMQ_LAST_ENDPOINT, connect_address, &len)); - void *b = zmq_socket (ctx_, ZMQ_PULL); - assert (b); + void *b = test_context_socket (ZMQ_PULL); - rc = zmq_setsockopt (b, ZMQ_RCVHWM, &hwm, sizeof (hwm)); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (b, ZMQ_RCVHWM, &hwm, sizeof (hwm))); - rc = zmq_connect (b, connect_address); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address)); // Send two messages, one should be stuck in A's outgoing queue, the other // arrives at B. @@ -211,93 +202,79 @@ void test_destroy_queue_on_disconnect (void *ctx_) s_send_seq (a, "DEF", SEQ_END); // Both queues should now be full, indicated by A blocking on send. - rc = zmq_send (a, 0, 0, ZMQ_DONTWAIT); - assert (rc == -1); - assert (errno == EAGAIN); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT)); - rc = zmq_disconnect (b, connect_address); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (b, connect_address)); // Disconnect may take time and need command processing. zmq_pollitem_t poller[2] = {{a, 0, 0, 0}, {b, 0, 0, 0}}; - rc = zmq_poll (poller, 2, 100); - assert (rc == 0); - rc = zmq_poll (poller, 2, 100); - assert (rc == 0); + TEST_ASSERT_EQUAL_INT ( + 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100))); + TEST_ASSERT_EQUAL_INT ( + 0, TEST_ASSERT_SUCCESS_ERRNO (zmq_poll (poller, 2, 100))); zmq_msg_t msg; - rc = zmq_msg_init (&msg); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); // Can't receive old data on B. - rc = zmq_msg_recv (&msg, b, ZMQ_DONTWAIT); - assert (rc == -1); - assert (errno == EAGAIN); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT)); // Sending fails. - rc = zmq_send (a, 0, 0, ZMQ_DONTWAIT); - assert (rc == -1); - assert (errno == EAGAIN); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT)); // Reconnect B - rc = zmq_connect (b, connect_address); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (b, connect_address)); // Still can't receive old data on B. - rc = zmq_msg_recv (&msg, b, ZMQ_DONTWAIT); - assert (rc == -1); - assert (errno == EAGAIN); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_msg_recv (&msg, b, ZMQ_DONTWAIT)); // two messages should be sendable before the queues are filled up. s_send_seq (a, "ABC", SEQ_END); s_send_seq (a, "DEF", SEQ_END); - rc = zmq_send (a, 0, 0, ZMQ_DONTWAIT); - assert (rc == -1); - assert (errno == EAGAIN); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_send (a, 0, 0, ZMQ_DONTWAIT)); - rc = zmq_msg_close (&msg); - assert (rc == 0); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); - close_zero_linger (a); - close_zero_linger (b); - - // Wait for disconnects. - msleep (SETTLE_TIME); + test_context_socket_close_zero_linger (a); + test_context_socket_close_zero_linger (b); } -int main (void) -{ - setup_test_environment (); - void *ctx = zmq_ctx_new (); - assert (ctx); - - const char *binds[] = {"inproc://a", "tcp://127.0.0.1:*"}; - - for (int transport = 0; transport < 2; ++transport) { - bind_address = binds[transport]; - - // PUSH: SHALL route outgoing messages to connected peers using a - // round-robin strategy. - test_push_round_robin_out (ctx); - - // PULL: SHALL receive incoming messages from its peers using a fair-queuing - // strategy. - test_pull_fair_queue_in (ctx); - - // PUSH: SHALL block on sending, or return a suitable error, when it has no - // available peers. - test_push_block_on_send_no_peers (ctx); - - // PUSH and PULL: SHALL create this queue when a peer connects to it. If - // this peer disconnects, the socket SHALL destroy its queue and SHALL - // discard any messages it contains. - // *** Test disabled until libzmq does this properly *** - // test_destroy_queue_on_disconnect (ctx); +#define def_test_spec_pushpull(name, bind_address_) \ + void test_spec_pushpull_##name##_push_round_robin_out () \ + { \ + test_push_round_robin_out (bind_address_); \ + } \ + void test_spec_pushpull_##name##_pull_fair_queue_in () \ + { \ + test_pull_fair_queue_in (bind_address_); \ + } \ + void test_spec_pushpull_##name##_push_block_on_send_no_peers () \ + { \ + test_push_block_on_send_no_peers (bind_address_); \ + } \ + void test_spec_pushpull_##name##_destroy_queue_on_disconnect () \ + { \ + test_destroy_queue_on_disconnect (bind_address_); \ } - int rc = zmq_ctx_term (ctx); - assert (rc == 0); +def_test_spec_pushpull (inproc, "inproc://a") - return 0; + def_test_spec_pushpull (tcp, "tcp://127.0.0.1:*") + + int main () +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_spec_pushpull_inproc_push_round_robin_out); + RUN_TEST (test_spec_pushpull_tcp_push_round_robin_out); + RUN_TEST (test_spec_pushpull_inproc_pull_fair_queue_in); + RUN_TEST (test_spec_pushpull_tcp_pull_fair_queue_in); + RUN_TEST (test_spec_pushpull_inproc_push_block_on_send_no_peers); + RUN_TEST (test_spec_pushpull_tcp_push_block_on_send_no_peers); + // TODO Tests disabled until libzmq does this properly + //RUN_TEST (test_spec_pushpull_inproc_destroy_queue_on_disconnect); + //RUN_TEST (test_spec_pushpull_tcp_destroy_queue_on_disconnect); + return UNITY_END (); }