diff --git a/tests/test_router_mandatory.cpp b/tests/test_router_mandatory.cpp index bc6677b3..93c7fe48 100644 --- a/tests/test_router_mandatory.cpp +++ b/tests/test_router_mandatory.cpp @@ -32,34 +32,44 @@ void test_get_peer_state () { #ifdef ZMQ_BUILD_DRAFT_API - size_t len = MAX_SOCKET_STRING; - char my_endpoint[MAX_SOCKET_STRING]; void *ctx = zmq_ctx_new (); assert (ctx); void *router = zmq_socket (ctx, ZMQ_ROUTER); assert (router); - int rc = zmq_bind (router, "tcp://127.0.0.1:*"); - assert (rc == 0); - - rc = zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len); - assert (rc == 0); - + int rc; int mandatory = 1; rc = zmq_setsockopt (router, ZMQ_ROUTER_MANDATORY, &mandatory, sizeof (mandatory)); - // Create dealer called "X" and connect it to our router + rc = zmq_bind (router, "tcp://127.0.0.1:*"); + assert (rc == 0); + + size_t my_endpoint_len = MAX_SOCKET_STRING; + char my_endpoint[MAX_SOCKET_STRING]; + rc = + zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &my_endpoint_len); + assert (rc == 0); + void *dealer1 = zmq_socket (ctx, ZMQ_DEALER); assert (dealer1); + + void *dealer2 = zmq_socket (ctx, ZMQ_DEALER); + assert (dealer2); + + // Lower HWMs to allow doing the test with fewer messages + int hwm = 100; + zmq_setsockopt (router, ZMQ_SNDHWM, &hwm, sizeof (int)); + zmq_setsockopt (dealer1, ZMQ_RCVHWM, &hwm, sizeof (int)); + zmq_setsockopt (dealer2, ZMQ_RCVHWM, &hwm, sizeof (int)); + + // Name dealer1 "X" and connect it to our router rc = zmq_setsockopt (dealer1, ZMQ_IDENTITY, "X", 1); assert (rc == 0); rc = zmq_connect (dealer1, my_endpoint); assert (rc == 0); - // Create dealer called "Y" and connect it to our router - void *dealer2 = zmq_socket (ctx, ZMQ_DEALER); - assert (dealer2); + // Name dealer2 "Y" and connect it to our router rc = zmq_setsockopt (dealer2, ZMQ_IDENTITY, "Y", 1); assert (rc == 0); rc = zmq_connect (dealer2, my_endpoint); @@ -83,7 +93,6 @@ void test_get_peer_state () rc = zmq_recv (router, buffer, 255, 0); assert (rc == 5); - void *poller = zmq_poller_new (); assert (poller); @@ -95,41 +104,53 @@ void test_get_peer_state () const size_t count = 10000; const size_t event_size = 2; + bool dealer2_blocked = false; + size_t dealer1_sent = 0, dealer2_sent = 0, dealer1_received = 0; zmq_poller_event_t events[event_size]; - for (size_t iterations = 0; - iterations < count - && zmq_poller_wait_all (poller, events, event_size, -1) != -1; - ++iterations) { + for (size_t iterations = 0; iterations < count; ++iterations) { + rc = zmq_poller_wait_all (poller, events, event_size, -1); + assert (rc != -1); for (size_t i = 0; i < event_size; ++i) { - if (events[i].socket == router) { + if (events[i].socket == router && events[i].events & ZMQ_POLLOUT) { rc = zmq_socket_get_peer_state (router, "X", 1); if (rc == -1) printf ("zmq_socket_get_peer_state failed: %i\n", errno); assert (rc != -1); if (rc & ZMQ_POLLOUT) { - rc = zmq_send (router, "X", 1, ZMQ_SNDMORE); + rc = zmq_send (router, "X", 1, ZMQ_SNDMORE | ZMQ_DONTWAIT); assert (rc == 1); - rc = zmq_send (router, "Hello", 5, 0); + rc = zmq_send (router, "Hello", 5, ZMQ_DONTWAIT); assert (rc == 5); + + ++dealer1_sent; } + rc = zmq_socket_get_peer_state (router, "Y", 1); if (rc == -1) printf ("zmq_socket_get_peer_state failed: %i\n", errno); assert (rc != -1); if (rc & ZMQ_POLLOUT) { - rc = zmq_send (router, "Y", 1, ZMQ_SNDMORE); + rc = zmq_send (router, "Y", 1, ZMQ_SNDMORE | ZMQ_DONTWAIT); assert (rc == 1); - rc = zmq_send (router, "Hello", 5, 0); + rc = zmq_send (router, "Hello", 5, ZMQ_DONTWAIT); assert (rc == 5); + ++dealer2_sent; + } else { + dealer2_blocked = true; } } - if (events[i].socket == dealer1) { + if (events[i].socket == dealer1 && events[i].events & ZMQ_POLLIN) { rc = zmq_recv (dealer1, buffer, 255, ZMQ_DONTWAIT); assert (rc == 5); + + ++dealer1_received; } // never read from dealer2, so its pipe becomes full eventually } } + printf ("dealer1_sent = %zu, dealer2_sent = %zu, dealer1_received = %zu\n", + dealer1_sent, dealer2_sent, dealer1_received); + assert (dealer2_blocked); zmq_poller_destroy (&poller); rc = zmq_close (router); @@ -217,5 +238,5 @@ int main (void) test_basic (); test_get_peer_state (); - return 0 ; + return 0; }