diff --git a/tests/test_spec_dealer.cpp b/tests/test_spec_dealer.cpp index 1235f26b..fc7ef277 100644 --- a/tests/test_spec_dealer.cpp +++ b/tests/test_spec_dealer.cpp @@ -21,12 +21,15 @@ #include #include "testutil.hpp" +const char *bind_address = 0; +const char *connect_address = 0; + void test_round_robin_out (void *ctx) { void *dealer = zmq_socket (ctx, ZMQ_DEALER); assert (dealer); - int rc = zmq_bind (dealer, "inproc://b"); + int rc = zmq_bind (dealer, bind_address); assert (rc == 0); const size_t N = 5; @@ -40,10 +43,14 @@ void test_round_robin_out (void *ctx) rc = zmq_setsockopt (rep[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); assert (rc == 0); - rc = zmq_connect (rep[i], "inproc://b"); + rc = zmq_connect (rep[i], connect_address); assert (rc == 0); } + // Wait for connections. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); + // Send N requests for (size_t i = 0; i < N; ++i) { @@ -62,14 +69,16 @@ void test_round_robin_out (void *ctx) rc = zmq_msg_close (&msg); assert (rc == 0); - rc = zmq_close (dealer); - assert (rc == 0); + close_zero_linger (dealer); for (size_t i = 0; i < N; ++i) { - rc = zmq_close (rep[i]); - assert (rc == 0); + close_zero_linger (rep[i]); } + + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); } void test_fair_queue_in (void *ctx) @@ -81,7 +90,7 @@ void test_fair_queue_in (void *ctx) int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int)); assert (rc == 0); - rc = zmq_bind (receiver, "inproc://a"); + rc = zmq_bind (receiver, bind_address); assert (rc == 0); const size_t N = 5; @@ -94,7 +103,7 @@ void test_fair_queue_in (void *ctx) rc = zmq_setsockopt (senders[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); assert (rc == 0); - rc = zmq_connect (senders[i], "inproc://a"); + rc = zmq_connect (senders[i], connect_address); assert (rc == 0); } @@ -111,32 +120,32 @@ void test_fair_queue_in (void *ctx) // send N requests for (size_t i = 0; i < N; ++i) { - char *str = strdup("A"); - str[0] += i; - s_send_seq (senders[i], str, SEQ_END); - free (str); + s_send_seq (senders[i], "B", SEQ_END); } + // Wait for data. + rc = zmq_poll (0, 0, 50); + assert (rc == 0); + // handle N requests for (size_t i = 0; i < N; ++i) { - char *str = strdup("A"); - str[0] += i; - s_recv_seq (receiver, str, SEQ_END); - free (str); + s_recv_seq (receiver, "B", SEQ_END); } rc = zmq_msg_close (&msg); assert (rc == 0); - rc = zmq_close (receiver); - assert (rc == 0); + close_zero_linger (receiver); for (size_t i = 0; i < N; ++i) { - rc = zmq_close (senders[i]); - assert (rc == 0); + close_zero_linger (senders[i]); } + + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); } void test_destroy_queue_on_disconnect (void *ctx) @@ -144,26 +153,28 @@ void test_destroy_queue_on_disconnect (void *ctx) void *A = zmq_socket (ctx, ZMQ_DEALER); assert (A); - int rc = zmq_bind (A, "inproc://d"); + int rc = zmq_bind (A, bind_address); assert (rc == 0); void *B = zmq_socket (ctx, ZMQ_DEALER); assert (B); - rc = zmq_connect (B, "inproc://d"); + rc = zmq_connect (B, connect_address); assert (rc == 0); // Send a message in both directions s_send_seq (A, "ABC", SEQ_END); s_send_seq (B, "DEF", SEQ_END); - rc = zmq_disconnect (B, "inproc://d"); + rc = zmq_disconnect (B, connect_address); assert (rc == 0); // Disconnect may take time and need command processing. zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } }; rc = zmq_poll (poller, 2, 100); assert (rc == 0); + rc = zmq_poll (poller, 2, 100); + assert (rc == 0); // No messages should be available, sending should fail. zmq_msg_t msg; @@ -178,7 +189,7 @@ void test_destroy_queue_on_disconnect (void *ctx) assert (errno == EAGAIN); // After a reconnect of B, the messages should still be gone - rc = zmq_connect (B, "inproc://d"); + rc = zmq_connect (B, connect_address); assert (rc == 0); rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT); @@ -192,10 +203,11 @@ void test_destroy_queue_on_disconnect (void *ctx) rc = zmq_msg_close (&msg); assert (rc == 0); - rc = zmq_close (A); - assert (rc == 0); + close_zero_linger (A); + close_zero_linger (B); - rc = zmq_close (B); + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); assert (rc == 0); } @@ -225,21 +237,29 @@ int main () void *ctx = zmq_ctx_new (); assert (ctx); - // SHALL route outgoing messages to available peers using a round-robin - // strategy. - test_round_robin_out (ctx); + const char *binds[] = { "inproc://a", "tcp://*:5555" }; + const char *connects[] = { "inproc://a", "tcp://localhost:5555" }; - // SHALL receive incoming messages from its peers using a fair-queuing - // strategy. - test_fair_queue_in (ctx); + for (int i = 0; i < 2; ++i) { + bind_address = binds[i]; + connect_address = connects[i]; - // SHALL block on sending, or return a suitable error, when it has no connected peers. - test_block_on_send_no_peers (ctx); + // SHALL route outgoing messages to available peers using a round-robin + // strategy. + test_round_robin_out (ctx); - // SHALL create a double queue when a peer connects to it. If this peer - // disconnects, the DEALER socket SHALL destroy its double queue and SHALL - // discard any messages it contains. - test_destroy_queue_on_disconnect (ctx); + // SHALL receive incoming messages from its peers using a fair-queuing + // strategy. + test_fair_queue_in (ctx); + + // SHALL block on sending, or return a suitable error, when it has no connected peers. + test_block_on_send_no_peers (ctx); + + // SHALL create a double queue when a peer connects to it. If this peer + // disconnects, the DEALER socket SHALL destroy its double queue and SHALL + // discard any messages it contains. + test_destroy_queue_on_disconnect (ctx); + } int rc = zmq_ctx_term (ctx); assert (rc == 0); diff --git a/tests/test_spec_pushpull.cpp b/tests/test_spec_pushpull.cpp index 764525cf..a91d3c74 100644 --- a/tests/test_spec_pushpull.cpp +++ b/tests/test_spec_pushpull.cpp @@ -21,12 +21,15 @@ #include #include "testutil.hpp" +const char *bind_address = 0; +const char *connect_address = 0; + void test_push_round_robin_out (void *ctx) { void *push = zmq_socket (ctx, ZMQ_PUSH); assert (push); - int rc = zmq_bind (push, "inproc://b"); + int rc = zmq_bind (push, bind_address); assert (rc == 0); const size_t N = 5; @@ -40,10 +43,14 @@ void test_push_round_robin_out (void *ctx) rc = zmq_setsockopt (pulls[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); assert (rc == 0); - rc = zmq_connect (pulls[i], "inproc://b"); + rc = zmq_connect (pulls[i], connect_address); assert (rc == 0); } + // Wait for connections. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); + // Send 2N messages for (size_t i = 0; i < N; ++i) { @@ -61,14 +68,16 @@ void test_push_round_robin_out (void *ctx) s_recv_seq (pulls[i], "DEF", SEQ_END); } - rc = zmq_close (push); - assert (rc == 0); + close_zero_linger (push); for (size_t i = 0; i < N; ++i) { - rc = zmq_close (pulls[i]); - assert (rc == 0); + close_zero_linger (pulls[i]); } + + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); } void test_pull_fair_queue_in (void *ctx) @@ -76,7 +85,7 @@ void test_pull_fair_queue_in (void *ctx) void *pull = zmq_socket (ctx, ZMQ_PULL); assert (pull); - int rc = zmq_bind (pull, "inproc://a"); + int rc = zmq_bind (pull, bind_address); assert (rc == 0); const size_t N = 5; @@ -86,38 +95,74 @@ void test_pull_fair_queue_in (void *ctx) pushs[i] = zmq_socket (ctx, ZMQ_PUSH); assert (pushs[i]); - rc = zmq_connect (pushs[i], "inproc://a"); + rc = zmq_connect (pushs[i], connect_address); assert (rc == 0); } + // Wait for connections. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); + + int first_half = 0; + int second_half = 0; + // Send 2N messages for (size_t i = 0; i < N; ++i) { - char * str = strdup("A"); + char *str = strdup("A"); + str[0] += i; s_send_seq (pushs[i], str, SEQ_END); + first_half += str[0]; + str[0] += N; s_send_seq (pushs[i], str, SEQ_END); + second_half += str[0]; + free (str); } - // Expect to pull them in order - for (size_t i = 0; i < 2*N; ++i) - { - char * str = strdup("A"); - str[0] += i; - s_recv_seq (pull, str, SEQ_END); - free (str); - } - - rc = zmq_close (pull); + // Wait for data. + rc = zmq_poll (0, 0, 100); assert (rc == 0); + zmq_msg_t msg; + rc = zmq_msg_init (&msg); + assert (rc == 0); + + // Expect to pull one from each first + for (size_t i = 0; i < N; ++i) + { + rc = zmq_msg_recv (&msg, pull, 0); + assert (rc == 2); + const char *str = (const char *)zmq_msg_data (&msg); + first_half -= str[0]; + } + assert (first_half == 0); + + // And then get the second batch + for (size_t i = 0; i < N; ++i) + { + rc = zmq_msg_recv (&msg, pull, 0); + assert (rc == 2); + const char *str = (const char *)zmq_msg_data (&msg); + second_half -= str[0]; + } + assert (second_half == 0); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + close_zero_linger (pull); + for (size_t i = 0; i < N; ++i) { - rc = zmq_close (pushs[i]); - assert (rc == 0); + close_zero_linger (pushs[i]); } + + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); } void test_push_block_on_send_no_peers (void *ctx) @@ -150,7 +195,7 @@ void test_destroy_queue_on_disconnect (void *ctx) int rc = zmq_setsockopt (A, ZMQ_SNDHWM, &hwm, sizeof(hwm)); assert (rc == 0); - rc = zmq_bind (A, "inproc://d"); + rc = zmq_bind (A, bind_address); assert (rc == 0); void *B = zmq_socket (ctx, ZMQ_PULL); @@ -159,7 +204,7 @@ void test_destroy_queue_on_disconnect (void *ctx) rc = zmq_setsockopt (B, ZMQ_RCVHWM, &hwm, sizeof(hwm)); assert (rc == 0); - rc = zmq_connect (B, "inproc://d"); + rc = zmq_connect (B, connect_address); assert (rc == 0); // Send two messages, one should be stuck in A's outgoing queue, the other @@ -172,13 +217,15 @@ void test_destroy_queue_on_disconnect (void *ctx) assert (rc == -1); assert (errno == EAGAIN); - rc = zmq_disconnect (B, "inproc://d"); + rc = zmq_disconnect (B, connect_address); assert (rc == 0); // Disconnect may take time and need command processing. zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } }; rc = zmq_poll (poller, 2, 100); assert (rc == 0); + rc = zmq_poll (poller, 2, 100); + assert (rc == 0); zmq_msg_t msg; rc = zmq_msg_init (&msg); @@ -195,7 +242,7 @@ void test_destroy_queue_on_disconnect (void *ctx) assert (errno == EAGAIN); // Reconnect B - rc = zmq_connect (B, "inproc://d"); + rc = zmq_connect (B, connect_address); assert (rc == 0); // Still can't receive old data on B. @@ -214,10 +261,11 @@ void test_destroy_queue_on_disconnect (void *ctx) rc = zmq_msg_close (&msg); assert (rc == 0); - rc = zmq_close (A); - assert (rc == 0); + close_zero_linger (A); + close_zero_linger (B); - rc = zmq_close (B); + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); assert (rc == 0); } @@ -226,22 +274,30 @@ int main () void *ctx = zmq_ctx_new (); assert (ctx); - // PUSH: SHALL route outgoing messages to connected peers using a - // round-robin strategy. - test_push_round_robin_out (ctx); + const char *binds[] = { "inproc://a", "tcp://*:5555" }; + const char *connects[] = { "inproc://a", "tcp://localhost:5555" }; - // PULL: SHALL receive incoming messages from its peers using a fair-queuing - // strategy. - test_pull_fair_queue_in (ctx); + for (int i = 0; i < 2; ++i) { + bind_address = binds[i]; + connect_address = connects[i]; - // PUSH: SHALL block on sending, or return a suitable error, when it has no - // available peers. - test_push_block_on_send_no_peers (ctx); + // PUSH: SHALL route outgoing messages to connected peers using a + // round-robin strategy. + test_push_round_robin_out (ctx); - // PUSH and PULL: SHALL create this queue when a peer connects to it. If - // this peer disconnects, the socket SHALL destroy its queue and SHALL - // discard any messages it contains. - test_destroy_queue_on_disconnect (ctx); + // PULL: SHALL receive incoming messages from its peers using a fair-queuing + // strategy. + test_pull_fair_queue_in (ctx); + + // PUSH: SHALL block on sending, or return a suitable error, when it has no + // available peers. + test_push_block_on_send_no_peers (ctx); + + // PUSH and PULL: SHALL create this queue when a peer connects to it. If + // this peer disconnects, the socket SHALL destroy its queue and SHALL + // discard any messages it contains. + test_destroy_queue_on_disconnect (ctx); + } int rc = zmq_ctx_term (ctx); assert (rc == 0); diff --git a/tests/test_spec_rep.cpp b/tests/test_spec_rep.cpp index 2f1424aa..53402e45 100644 --- a/tests/test_spec_rep.cpp +++ b/tests/test_spec_rep.cpp @@ -21,6 +21,9 @@ #include #include "testutil.hpp" +const char *bind_address = 0; +const char *connect_address = 0; + void test_fair_queue_in (void *ctx) { void *rep = zmq_socket (ctx, ZMQ_REP); @@ -30,7 +33,7 @@ void test_fair_queue_in (void *ctx) int rc = zmq_setsockopt (rep, ZMQ_RCVTIMEO, &timeout, sizeof(int)); assert (rc == 0); - rc = zmq_bind (rep, "inproc://a"); + rc = zmq_bind (rep, bind_address); assert (rc == 0); const size_t N = 5; @@ -43,7 +46,7 @@ void test_fair_queue_in (void *ctx) rc = zmq_setsockopt (reqs[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); assert (rc == 0); - rc = zmq_connect (reqs[i], "inproc://a"); + rc = zmq_connect (reqs[i], connect_address); assert (rc == 0); } @@ -77,14 +80,16 @@ void test_fair_queue_in (void *ctx) free (str); } - rc = zmq_close (rep); - assert (rc == 0); + close_zero_linger (rep); for (size_t i = 0; i < N; ++i) { - rc = zmq_close (reqs[i]); - assert (rc == 0); + close_zero_linger (reqs[i]); } + + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); } void test_envelope (void *ctx) @@ -92,13 +97,13 @@ void test_envelope (void *ctx) void *rep = zmq_socket (ctx, ZMQ_REP); assert (rep); - int rc = zmq_bind (rep, "inproc://b"); + int rc = zmq_bind (rep, bind_address); assert (rc == 0); void *dealer = zmq_socket (ctx, ZMQ_DEALER); assert (dealer); - rc = zmq_connect (dealer, "inproc://b"); + rc = zmq_connect (dealer, connect_address); assert (rc == 0); // minimal envelope @@ -113,10 +118,11 @@ void test_envelope (void *ctx) s_send_seq (rep, "A", SEQ_END); s_recv_seq (dealer, "X", "Y", 0, "A", SEQ_END); - rc = zmq_close (rep); - assert (rc == 0); + close_zero_linger (rep); + close_zero_linger (dealer); - rc = zmq_close (dealer); + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); assert (rc == 0); } @@ -125,17 +131,25 @@ int main () void *ctx = zmq_ctx_new (); assert (ctx); - // SHALL receive incoming messages from its peers using a fair-queuing - // strategy. - test_fair_queue_in (ctx); + const char *binds[] = { "inproc://a", "tcp://*:5555" }; + const char *connects[] = { "inproc://a", "tcp://localhost:5555" }; - // For an incoming message: - // SHALL remove and store the address envelope, including the delimiter. - // SHALL pass the remaining data frames to its calling application. - // SHALL wait for a single reply message from its calling application. - // SHALL prepend the address envelope and delimiter. - // SHALL deliver this message back to the originating peer. - test_envelope (ctx); + for (int i = 0; i < 2; ++i) { + bind_address = binds[i]; + connect_address = connects[i]; + + // SHALL receive incoming messages from its peers using a fair-queuing + // strategy. + test_fair_queue_in (ctx); + + // For an incoming message: + // SHALL remove and store the address envelope, including the delimiter. + // SHALL pass the remaining data frames to its calling application. + // SHALL wait for a single reply message from its calling application. + // SHALL prepend the address envelope and delimiter. + // SHALL deliver this message back to the originating peer. + test_envelope (ctx); + } int rc = zmq_ctx_term (ctx); assert (rc == 0); diff --git a/tests/test_spec_req.cpp b/tests/test_spec_req.cpp index e6f9d43f..5d5eade0 100644 --- a/tests/test_spec_req.cpp +++ b/tests/test_spec_req.cpp @@ -20,12 +20,15 @@ #include #include "testutil.hpp" +const char *bind_address = 0; +const char *connect_address = 0; + void test_round_robin_out (void *ctx) { void *req = zmq_socket (ctx, ZMQ_REQ); assert (req); - int rc = zmq_bind (req, "inproc://b"); + int rc = zmq_bind (req, bind_address); assert (rc == 0); const size_t N = 5; @@ -39,7 +42,7 @@ void test_round_robin_out (void *ctx) rc = zmq_setsockopt (rep[i], ZMQ_RCVTIMEO, &timeout, sizeof(int)); assert (rc == 0); - rc = zmq_connect (rep[i], "inproc://b"); + rc = zmq_connect (rep[i], connect_address); assert (rc == 0); } @@ -52,14 +55,16 @@ void test_round_robin_out (void *ctx) s_recv_seq (req, "DEF", SEQ_END); } - rc = zmq_close (req); - assert (rc == 0); + close_zero_linger (req); for (size_t i = 0; i < N; ++i) { - rc = zmq_close (rep[i]); - assert (rc == 0); + close_zero_linger (rep[i]); } + + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); } void test_req_only_listens_to_current_peer (void *ctx) @@ -70,7 +75,7 @@ void test_req_only_listens_to_current_peer (void *ctx) int rc = zmq_setsockopt(req, ZMQ_IDENTITY, "A", 2); assert (rc == 0); - rc = zmq_bind (req, "inproc://c"); + rc = zmq_bind (req, bind_address); assert (rc == 0); const size_t N = 3; @@ -88,7 +93,7 @@ void test_req_only_listens_to_current_peer (void *ctx) rc = zmq_setsockopt (router[i], ZMQ_ROUTER_MANDATORY, &enabled, sizeof(enabled)); assert (rc == 0); - rc = zmq_connect (router[i], "inproc://c"); + rc = zmq_connect (router[i], connect_address); assert (rc == 0); } @@ -111,14 +116,16 @@ void test_req_only_listens_to_current_peer (void *ctx) s_recv_seq (req, "GOOD", SEQ_END); } - rc = zmq_close (req); - assert (rc == 0); + close_zero_linger (req); for (size_t i = 0; i < N; ++i) { - rc = zmq_close (router[i]); - assert (rc == 0); + close_zero_linger (router[i]); } + + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); } void test_req_message_format (void *ctx) @@ -129,10 +136,10 @@ void test_req_message_format (void *ctx) void *router = zmq_socket (ctx, ZMQ_ROUTER); assert (router); - int rc = zmq_bind (req, "inproc://a"); + int rc = zmq_bind (req, bind_address); assert (rc == 0); - rc = zmq_connect (router, "inproc://a"); + rc = zmq_connect (router, connect_address); assert (rc == 0); // Send a multi-part request. @@ -172,10 +179,11 @@ void test_req_message_format (void *ctx) rc = zmq_msg_close (&peer_id_msg); assert (rc == 0); - rc = zmq_close (req); - assert (rc == 0); + close_zero_linger (req); + close_zero_linger (router); - rc = zmq_close (router); + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); assert (rc == 0); } @@ -205,23 +213,31 @@ int main () void *ctx = zmq_ctx_new (); assert (ctx); - // SHALL route outgoing messages to connected peers using a round-robin - // strategy. - test_round_robin_out (ctx); + const char *binds[] = { "inproc://a", "tcp://*:5555" }; + const char *connects[] = { "inproc://a", "tcp://localhost:5555" }; - // The request and reply messages SHALL have this format on the wire: - // * A delimiter, consisting of an empty frame, added by the REQ socket. - // * One or more data frames, comprising the message visible to the - // application. - test_req_message_format (ctx); + for (int i = 0; i < 2; ++i) { + bind_address = binds[i]; + connect_address = connects[i]; - // SHALL block on sending, or return a suitable error, when it has no connected peers. - test_block_on_send_no_peers (ctx); + // SHALL route outgoing messages to connected peers using a round-robin + // strategy. + test_round_robin_out (ctx); - // SHALL accept an incoming message only from the last peer that it sent a - // request to. - // SHALL discard silently any messages received from other peers. - test_req_only_listens_to_current_peer (ctx); + // The request and reply messages SHALL have this format on the wire: + // * A delimiter, consisting of an empty frame, added by the REQ socket. + // * One or more data frames, comprising the message visible to the + // application. + test_req_message_format (ctx); + + // SHALL block on sending, or return a suitable error, when it has no connected peers. + test_block_on_send_no_peers (ctx); + + // SHALL accept an incoming message only from the last peer that it sent a + // request to. + // SHALL discard silently any messages received from other peers. + test_req_only_listens_to_current_peer (ctx); + } int rc = zmq_ctx_term (ctx); assert (rc == 0); diff --git a/tests/test_spec_router.cpp b/tests/test_spec_router.cpp index e5972e34..8de10d4c 100644 --- a/tests/test_spec_router.cpp +++ b/tests/test_spec_router.cpp @@ -21,6 +21,9 @@ #include #include "testutil.hpp" +const char *bind_address = 0; +const char *connect_address = 0; + void test_fair_queue_in (void *ctx) { void *receiver = zmq_socket (ctx, ZMQ_ROUTER); @@ -30,7 +33,7 @@ void test_fair_queue_in (void *ctx) int rc = zmq_setsockopt (receiver, ZMQ_RCVTIMEO, &timeout, sizeof(int)); assert (rc == 0); - rc = zmq_bind (receiver, "inproc://a"); + rc = zmq_bind (receiver, bind_address); assert (rc == 0); const size_t N = 5; @@ -49,7 +52,7 @@ void test_fair_queue_in (void *ctx) assert (rc == 0); free (str); - rc = zmq_connect (senders[i], "inproc://a"); + rc = zmq_connect (senders[i], connect_address); assert (rc == 0); } @@ -63,32 +66,43 @@ void test_fair_queue_in (void *ctx) s_send_seq (senders[0], "M", SEQ_END); s_recv_seq (receiver, "A", "M", SEQ_END); + int sum = 0; + // send N requests for (size_t i = 0; i < N; ++i) { s_send_seq (senders[i], "M", SEQ_END); + sum += 'A' + i; } + assert (sum == N*'A' + N*(N-1)/2); + // handle N requests for (size_t i = 0; i < N; ++i) { - char *str = strdup("A"); - str[0] += i; - s_recv_seq (receiver, str, "M", SEQ_END); - free (str); + rc = zmq_msg_recv (&msg, receiver, 0); + assert (rc == 2); + const char *id = (const char *)zmq_msg_data (&msg); + sum -= id[0]; + + s_recv_seq (receiver, "M", SEQ_END); } + assert (sum == 0); + rc = zmq_msg_close (&msg); assert (rc == 0); - rc = zmq_close (receiver); - assert (rc == 0); + close_zero_linger (receiver); for (size_t i = 0; i < N; ++i) { - rc = zmq_close (senders[i]); - assert (rc == 0); + close_zero_linger (senders[i]); } + + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); + assert (rc == 0); } void test_destroy_queue_on_disconnect (void *ctx) @@ -100,7 +114,7 @@ void test_destroy_queue_on_disconnect (void *ctx) int rc = zmq_setsockopt (A, ZMQ_ROUTER_MANDATORY, &enabled, sizeof(enabled)); assert (rc == 0); - rc = zmq_bind (A, "inproc://d"); + rc = zmq_bind (A, bind_address); assert (rc == 0); void *B = zmq_socket (ctx, ZMQ_DEALER); @@ -109,20 +123,26 @@ void test_destroy_queue_on_disconnect (void *ctx) rc = zmq_setsockopt (B, ZMQ_IDENTITY, "B", 2); assert (rc == 0); - rc = zmq_connect (B, "inproc://d"); + rc = zmq_connect (B, connect_address); + assert (rc == 0); + + // Wait for connection. + rc = zmq_poll (0, 0, 100); assert (rc == 0); // Send a message in both directions s_send_seq (A, "B", "ABC", SEQ_END); s_send_seq (B, "DEF", SEQ_END); - rc = zmq_disconnect (B, "inproc://d"); + rc = zmq_disconnect (B, connect_address); assert (rc == 0); // Disconnect may take time and need command processing. zmq_pollitem_t poller[2] = { { A, 0, 0, 0 }, { B, 0, 0, 0 } }; rc = zmq_poll (poller, 2, 100); assert (rc == 0); + rc = zmq_poll (poller, 2, 100); + assert (rc == 0); // No messages should be available, sending should fail. zmq_msg_t msg; @@ -137,7 +157,7 @@ void test_destroy_queue_on_disconnect (void *ctx) assert (errno == EAGAIN); // After a reconnect of B, the messages should still be gone - rc = zmq_connect (B, "inproc://d"); + rc = zmq_connect (B, connect_address); assert (rc == 0); rc = zmq_msg_recv (&msg, A, ZMQ_DONTWAIT); @@ -151,10 +171,11 @@ void test_destroy_queue_on_disconnect (void *ctx) rc = zmq_msg_close (&msg); assert (rc == 0); - rc = zmq_close (A); - assert (rc == 0); + close_zero_linger (A); + close_zero_linger (B); - rc = zmq_close (B); + // Wait for disconnects. + rc = zmq_poll (0, 0, 100); assert (rc == 0); } @@ -164,14 +185,22 @@ int main () void *ctx = zmq_ctx_new (); assert (ctx); - // SHALL receive incoming messages from its peers using a fair-queuing - // strategy. - test_fair_queue_in (ctx); + const char *binds[] = { "inproc://a", "tcp://*:5555" }; + const char *connects[] = { "inproc://a", "tcp://localhost:5555" }; - // SHALL create a double queue when a peer connects to it. If this peer - // disconnects, the ROUTER socket SHALL destroy its double queue and SHALL - // discard any messages it contains. - test_destroy_queue_on_disconnect (ctx); + for (int i = 0; i < 2; ++i) { + bind_address = binds[i]; + connect_address = connects[i]; + + // SHALL receive incoming messages from its peers using a fair-queuing + // strategy. + test_fair_queue_in (ctx); + + // SHALL create a double queue when a peer connects to it. If this peer + // disconnects, the ROUTER socket SHALL destroy its double queue and SHALL + // discard any messages it contains. + test_destroy_queue_on_disconnect (ctx); + } int rc = zmq_ctx_term (ctx); assert (rc == 0); diff --git a/tests/testutil.hpp b/tests/testutil.hpp index eb4b4e48..376d85d9 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -186,4 +186,15 @@ void s_recv_seq (void *socket, ...) zmq_msg_close (&msg); } + +// Sets a zero linger period on a socket and closes it. +void close_zero_linger (void *socket) +{ + int linger = 0; + int rc = zmq_setsockopt (socket, ZMQ_LINGER, &linger, sizeof(linger)); + assert (rc == 0); + rc = zmq_close (socket); + assert (rc == 0); +} + #endif