diff --git a/src/router.cpp b/src/router.cpp index fec65ff7..2e005691 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -198,7 +198,7 @@ int zmq::router_t::xsend (msg_t *msg_) // Find the pipe associated with the routing id stored in the prefix. // If there's no such pipe just silently ignore the message, unless - // router_mandatory is set. ; + // router_mandatory is set. out_pipe_t *out_pipe = lookup_out_pipe ( blob_t (static_cast (msg_->data ()), msg_->size (), zmq::reference_tag_t ())); @@ -487,9 +487,9 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_) // Try to remove an existing routing id entry to allow the new // connection to take the routing id. - out_pipe_t existing_outpipe = try_erase_out_pipe (routing_id); + out_pipe_t *existing_outpipe = lookup_out_pipe (routing_id); - if (existing_outpipe.pipe) { + if (existing_outpipe) { if (!_handover) // Ignore peers with duplicate ID return false; @@ -502,15 +502,16 @@ bool zmq::router_t::identify_peer (pipe_t *pipe_, bool locally_initiated_) put_uint32 (buf + 1, _next_integral_routing_id++); blob_t new_routing_id (buf, sizeof buf); - existing_outpipe.pipe->set_router_socket_routing_id ( - new_routing_id); + pipe_t *const old_pipe = existing_outpipe->pipe; - add_out_pipe (ZMQ_MOVE (new_routing_id), existing_outpipe.pipe); + erase_out_pipe (old_pipe); + old_pipe->set_router_socket_routing_id (new_routing_id); + add_out_pipe (ZMQ_MOVE (new_routing_id), old_pipe); - if (existing_outpipe.pipe == _current_in) + if (old_pipe == _current_in) _terminate_current_in = true; else - existing_outpipe.pipe->terminate (true); + old_pipe->terminate (true); } } } diff --git a/tests/test_router_handover.cpp b/tests/test_router_handover.cpp index 4457731e..e04bcf6e 100644 --- a/tests/test_router_handover.cpp +++ b/tests/test_router_handover.cpp @@ -43,7 +43,7 @@ void tearDown () teardown_test_context (); } -void test_x () +void test_with_handover () { size_t len = MAX_SOCKET_STRING; char my_endpoint[MAX_SOCKET_STRING]; @@ -85,15 +85,16 @@ void test_x () recv_string_expect_success (router, "Hello", 0); // Send a message to 'X' routing id. This should be delivered - // to the second dealer, instead of the first beccause of the handover. + // to the second dealer, instead of the first because of the handover. send_string_expect_success (router, "X", ZMQ_SNDMORE); send_string_expect_success (router, "Hello", 0); // Ensure that the first dealer doesn't receive the message // but the second one does - int rc = zmq_recv (dealer_one, buffer, 255, ZMQ_NOBLOCK); - TEST_ASSERT_EQUAL_INT (-1, rc); - TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + const int timeout = SETTLE_TIME; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (dealer_one, ZMQ_RCVTIMEO, &timeout, sizeof timeout)); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dealer_one, buffer, 255, 0)); recv_string_expect_success (dealer_two, "Hello", 0); @@ -102,11 +103,69 @@ void test_x () test_context_socket_close (dealer_two); } -int main (void) +void test_without_handover () +{ + size_t len = MAX_SOCKET_STRING; + char my_endpoint[MAX_SOCKET_STRING]; + void *router = test_context_socket (ZMQ_ROUTER); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (router, "tcp://127.0.0.1:*")); + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (router, ZMQ_LAST_ENDPOINT, my_endpoint, &len)); + + // Create dealer called "X" and connect it to our router + void *dealer_one = test_context_socket (ZMQ_DEALER); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (dealer_one, ZMQ_ROUTING_ID, "X", 1)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer_one, my_endpoint)); + + // Get message from dealer to know when connection is ready + char buffer[255]; + send_string_expect_success (dealer_one, "Hello", 0); + + recv_string_expect_success (router, "X", 0); + recv_string_expect_success (router, "Hello", 0); + + // Now create a second dealer that uses the same routing id + void *dealer_two = test_context_socket (ZMQ_DEALER); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (dealer_two, ZMQ_ROUTING_ID, "X", 1)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer_two, my_endpoint)); + + // Send message from second dealer + send_string_expect_success (dealer_two, "Hello", 0); + + // This should be ignored by the router + const int timeout = SETTLE_TIME; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (router, ZMQ_RCVTIMEO, &timeout, sizeof timeout)); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (router, buffer, 255, 0)); + + // Send a message to 'X' routing id. This should be delivered + // to the second dealer, instead of the first because of the handover. + send_string_expect_success (router, "X", ZMQ_SNDMORE); + send_string_expect_success (router, "Hello", 0); + + // Ensure that the second dealer doesn't receive the message + // but the first one does + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (dealer_two, ZMQ_RCVTIMEO, &timeout, sizeof timeout)); + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (dealer_two, buffer, 255, 0)); + + recv_string_expect_success (dealer_one, "Hello", 0); + + test_context_socket_close (router); + test_context_socket_close (dealer_one); + test_context_socket_close (dealer_two); +} + +int main () { setup_test_environment (); UNITY_BEGIN (); - RUN_TEST (test_x); + RUN_TEST (test_with_handover); + RUN_TEST (test_without_handover); return UNITY_END (); }