/* SPDX-License-Identifier: MPL-2.0 */ #include "testutil.hpp" #include "testutil_unity.hpp" #include SETUP_TEARDOWN_TESTCONTEXT void test_sockopt_router_notify () { void *router = test_context_socket (ZMQ_ROUTER); int opt_notify; int opt_notify_read; size_t opt_notify_read_size = sizeof (opt_notify_read); // default value is off when socket is constructed TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); TEST_ASSERT_EQUAL (0, opt_notify_read); // valid value - Connect opt_notify = ZMQ_NOTIFY_CONNECT; TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); TEST_ASSERT_EQUAL (opt_notify, opt_notify_read); // valid value - Disconnect opt_notify = ZMQ_NOTIFY_DISCONNECT; TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); TEST_ASSERT_EQUAL (opt_notify, opt_notify_read); // valid value - Off opt_notify = 0; TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); TEST_ASSERT_EQUAL (opt_notify, opt_notify_read); // valid value - Both opt_notify = ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT; TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); TEST_ASSERT_EQUAL (opt_notify, opt_notify_read); // value boundary opt_notify = -1; TEST_ASSERT_FAILURE_ERRNO ( EINVAL, zmq_setsockopt (router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); opt_notify = (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT) + 1; TEST_ASSERT_FAILURE_ERRNO ( EINVAL, zmq_setsockopt (router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); // failures don't update the value TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); TEST_ASSERT_EQUAL (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT, opt_notify_read); test_context_socket_close (router); // check a non-router socket type void *dealer = test_context_socket (ZMQ_DEALER); // setsockopt fails for non-router sockets opt_notify = ZMQ_NOTIFY_CONNECT; TEST_ASSERT_FAILURE_ERRNO ( EINVAL, zmq_setsockopt (dealer, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); // getsockopts returns off for any non-router socket TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( dealer, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); TEST_ASSERT_EQUAL (0, opt_notify_read); test_context_socket_close (dealer); } void test_router_notify_helper (int opt_notify_) { void *router = test_context_socket (ZMQ_ROUTER); int opt_more; size_t opt_more_length = sizeof (opt_more); int opt_events; size_t opt_events_length = sizeof (opt_events); char connect_address[MAX_SOCKET_STRING]; // valid values TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify_, sizeof (opt_notify_))); bind_loopback_ipv4 (router, connect_address, sizeof connect_address); void *dealer = test_context_socket (ZMQ_DEALER); const char *dealer_routing_id = "X"; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (dealer, ZMQ_ROUTING_ID, dealer_routing_id, 1)); // dealer connects TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, connect_address)); // connection notification msg if (opt_notify_ & ZMQ_NOTIFY_CONNECT) { // routing-id only message of the connect recv_string_expect_success (router, dealer_routing_id, 0); // 1st part: routing-id recv_string_expect_success (router, "", 0); // 2nd part: empty TEST_ASSERT_SUCCESS_ERRNO ( zmq_getsockopt (router, ZMQ_RCVMORE, &opt_more, &opt_more_length)); TEST_ASSERT_EQUAL (0, opt_more); } // test message from the dealer send_string_expect_success (dealer, "Hello", 0); recv_string_expect_success (router, dealer_routing_id, 0); recv_string_expect_success (router, "Hello", 0); TEST_ASSERT_SUCCESS_ERRNO ( zmq_getsockopt (router, ZMQ_RCVMORE, &opt_more, &opt_more_length)); TEST_ASSERT_EQUAL (0, opt_more); // dealer disconnects TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (dealer, connect_address)); // need one more process_commands() (???) msleep (SETTLE_TIME); zmq_getsockopt (dealer, ZMQ_EVENTS, &opt_events, &opt_events_length); // connection notification msg if (opt_notify_ & ZMQ_NOTIFY_DISCONNECT) { // routing-id only message of the connect recv_string_expect_success (router, dealer_routing_id, 0); // 1st part: routing-id recv_string_expect_success (router, "", 0); // 2nd part: empty TEST_ASSERT_SUCCESS_ERRNO ( zmq_getsockopt (router, ZMQ_RCVMORE, &opt_more, &opt_more_length)); TEST_ASSERT_EQUAL (0, opt_more); } test_context_socket_close (dealer); test_context_socket_close (router); } void test_router_notify_connect () { test_router_notify_helper (ZMQ_NOTIFY_CONNECT); } void test_router_notify_disconnect () { test_router_notify_helper (ZMQ_NOTIFY_DISCONNECT); } void test_router_notify_both () { test_router_notify_helper (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT); } void test_handshake_fail () { // setup router socket void *router = test_context_socket (ZMQ_ROUTER); int opt_timeout = 200; int opt_notify = ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT; char connect_address[MAX_SOCKET_STRING]; // valid values TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( router, ZMQ_RCVTIMEO, &opt_timeout, sizeof (opt_timeout))); bind_loopback_ipv4 (router, connect_address, sizeof connect_address); // send something on raw tcp void *stream = test_context_socket (ZMQ_STREAM); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (stream, connect_address)); send_string_expect_success (stream, "not-a-handshake", 0); TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (stream, connect_address)); test_context_socket_close (stream); // no notification delivered char buffer[255]; TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_recv (router, buffer, sizeof (buffer), 0)); test_context_socket_close (router); } void test_error_during_multipart () { /* * If the disconnect occurs in the middle of the multipart * message, the socket should not add the notification at the * end of the incomplete message. It must discard the incomplete * message, and delivert the notification as a new message. */ char connect_address[MAX_SOCKET_STRING]; char long_str[128] = {0}; memset (long_str, '*', sizeof (long_str) - 1); // setup router void *router = test_context_socket (ZMQ_ROUTER); int opt_notify = ZMQ_NOTIFY_DISCONNECT; TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); int64_t opt_maxmsgsize = 64; // the handshake fails if this is too small TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( router, ZMQ_MAXMSGSIZE, &opt_maxmsgsize, sizeof (opt_maxmsgsize))); bind_loopback_ipv4 (router, connect_address, sizeof connect_address); // setup dealer void *dealer = test_context_socket (ZMQ_DEALER); const char *dealer_routing_id = "X"; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (dealer, ZMQ_ROUTING_ID, dealer_routing_id, 1)); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, connect_address)); // send multipart message, the 2nd part causes a disconnect. send_string_expect_success (dealer, "Hello2", ZMQ_SNDMORE); send_string_expect_success (dealer, long_str, 0); // disconnect notification recv_string_expect_success (router, dealer_routing_id, 0); recv_string_expect_success (router, "", 0); test_context_socket_close (dealer); test_context_socket_close (router); } int main (void) { setup_test_environment (); UNITY_BEGIN (); RUN_TEST (test_sockopt_router_notify); RUN_TEST (test_router_notify_connect); RUN_TEST (test_router_notify_disconnect); RUN_TEST (test_router_notify_both); RUN_TEST (test_handshake_fail); RUN_TEST (test_error_during_multipart); return UNITY_END (); }