From f8b3cc81081360be9e63ffbc530c2cf30c65c1de Mon Sep 17 00:00:00 2001 From: George Cockshull Date: Thu, 12 Oct 2023 11:16:48 -0400 Subject: [PATCH 1/2] Fix zmq_proxy_steerable PAUSE/RESUME Problem: the new reimplementation of zmq_proxy_steerable had PAUSE/RESUME that didn't follow expected behaviour. Possibly mixed up. Test didn't properly cover the issue. Solution: improve test coverage, fix the proxy command parsing. I had no knowledge of the pre-MPL-2.0 implementation. This fix is based on documented semantics and prior API usage. I contribute this under MPL-2.0. --- src/proxy.cpp | 6 +- tests/test_proxy_steerable.cpp | 129 +++++++++++++++++++++++++-------- 2 files changed, 101 insertions(+), 34 deletions(-) diff --git a/src/proxy.cpp b/src/proxy.cpp index 78d6ba61..2d6845fc 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -189,10 +189,10 @@ static int handle_control (class zmq::socket_base_t *control_, return 0; } - if (msiz == 5 && memcmp (command, "\x05PAUSE", 6)) { - state = active; - } else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) { + if (msiz == 5 && 0 == memcmp (command, "PAUSE", 5)) { state = paused; + } else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) { + state = active; } else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) { state = terminated; } diff --git a/tests/test_proxy_steerable.cpp b/tests/test_proxy_steerable.cpp index 68de26c1..14092df1 100644 --- a/tests/test_proxy_steerable.cpp +++ b/tests/test_proxy_steerable.cpp @@ -5,6 +5,7 @@ #include #include +#include #define CONTENT_SIZE 13 #define CONTENT_SIZE_MAX 32 @@ -25,6 +26,9 @@ void *g_clients_pkts_out = NULL; void *g_workers_pkts_out = NULL; void *control_context = NULL; // worker control, not proxy control +int g_proxy_control_socktype = + ZMQ_PAIR; //or ZMQ_PAIR, ZMQ_SUB (without statistics) + void setUp () { setup_test_context (); @@ -90,7 +94,7 @@ static void client_task (void *db_) int request_nbr = 0; bool run = true; - bool keep_sending = true; + bool enable_send = false; while (run) { // Tick once per 200 ms, pulling in arriving messages int centitick; @@ -126,14 +130,17 @@ static void client_task (void *db_) break; } if (memcmp (content, "STOP", 4) == 0) { - keep_sending = false; + enable_send = false; break; } + if (memcmp (content, "START", 5) == 0) { + enable_send = true; + } } } } - if (keep_sending) { + if (enable_send) { snprintf (content, CONTENT_SIZE_MAX * sizeof (char), "request #%03d", ++request_nbr); // CONTENT_SIZE if (is_verbose) @@ -203,9 +210,14 @@ void server_task (void * /*unused_*/) } // Proxy control socket - void *proxy_control = zmq_socket (get_test_context (), ZMQ_REP); + void *proxy_control = + zmq_socket (get_test_context (), g_proxy_control_socktype); TEST_ASSERT_NOT_NULL (proxy_control); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (proxy_control, proxy_control_address)); + if (g_proxy_control_socktype == ZMQ_SUB) { + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (proxy_control, ZMQ_SUBSCRIBE, "", 0)); + } // Connect backend to frontend via a steerable proxy int rc = zmq_proxy_steerable (frontend, backend, NULL, proxy_control); @@ -319,6 +331,55 @@ static void server_worker (void * /*unused_*/) // // - 7/bsb: number of bytes sent out the backend socket +uint64_t read_stat_value (void *proxy_control) +{ + zmq_msg_t stats_msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0)); + TEST_ASSERT_EQUAL_INT (sizeof (uint64_t), zmq_msg_size (&stats_msg)); + uint64_t val = *(uint64_t *) zmq_msg_data (&stats_msg); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); + return val; +} + +//return total bytes proxied, so we can test PAUSE/RESUME +uint64_t statistics (void *proxy_control, const char *runctx) +{ + if (is_verbose) { + printf ("steer: sending STATISTICS - %s\n", runctx); + } + + TEST_ASSERT_SUCCESS_ERRNO (zmq_send (proxy_control, "STATISTICS", 10, 0)); + + uint64_t total_bytes_proxied = 0; + for (int count = 0; count < 8; ++count) { + uint64_t val = read_stat_value (proxy_control); + if (is_verbose) { + if (count == 0) { + printf ("stats: client pkts out: %d worker pkts out: %d { ", + zmq_atomic_counter_value (g_clients_pkts_out), + zmq_atomic_counter_value (g_workers_pkts_out)); + } + printf ("%" PRIu64, val); + if (count == 7) { + printf ("}\n"); + } + } + switch (count) { + case 3: //bytes sent on frontend + case 7: //bytes sent on backend + total_bytes_proxied += val; + } + } + + int rcvmore; + size_t sz = sizeof (rcvmore); + zmq_getsockopt (proxy_control, ZMQ_RCVMORE, &rcvmore, &sz); + TEST_ASSERT_EQUAL_INT (rcvmore, 0); + return total_bytes_proxied; +} + + // The main thread simply starts several clients and a server, and then // waits for the server to finish. @@ -328,32 +389,18 @@ void steer (void *proxy_control, const char *command, const char *runctx) printf ("steer: sending %s - %s\n", command, runctx); } - // Start with proxy paused TEST_ASSERT_SUCCESS_ERRNO ( zmq_send (proxy_control, command, strlen (command), 0)); - zmq_msg_t stats_msg; - int count = -1; - while (1) { - count = count + 1; + if (g_proxy_control_socktype == ZMQ_REP) { + //expect an empty reply from REP for commands that need no response + zmq_msg_t stats_msg; TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg)); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0)); - - - if (is_verbose && zmq_msg_size (&stats_msg)) { - if (count == 0) { - printf ("steer:"); - } - printf (" %lu", *(unsigned long int *) zmq_msg_data (&stats_msg)); - if (count == 7) { - printf ("\n"); - } - } - if (!zmq_msg_get (&stats_msg, ZMQ_MORE)) - break; + TEST_ASSERT_EQUAL_INT (zmq_msg_size (&stats_msg), 0); + TEST_ASSERT (!zmq_msg_get (&stats_msg, ZMQ_MORE)); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); } - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); } void test_proxy_steerable () @@ -382,31 +429,51 @@ void test_proxy_steerable () msleep (500); // Run for 500 ms then quit // Proxy control socket - void *proxy_control = zmq_socket (get_test_context (), ZMQ_REQ); + int control_socktype = ZMQ_PAIR; + switch (g_proxy_control_socktype) { + case ZMQ_REP: + control_socktype = ZMQ_REQ; + break; + case ZMQ_SUB: + control_socktype = ZMQ_PUB; + break; + default: + break; + } + void *proxy_control = zmq_socket (get_test_context (), control_socktype); TEST_ASSERT_NOT_NULL (proxy_control); linger = 0; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (proxy_control, ZMQ_LINGER, &linger, sizeof (linger))); TEST_ASSERT_SUCCESS_ERRNO ( zmq_connect (proxy_control, proxy_control_address)); - msleep (500); // Run for 500 ms then quit - steer (proxy_control, "STATISTICS", "started clients"); - steer (proxy_control, "PAUSE", "started server"); + TEST_ASSERT ( + statistics (proxy_control, "should be all 0s before clients start") == 0); + + send_string_expect_success (control, "START", 0); msleep (500); // Run for 500 ms then quit - steer (proxy_control, "RESUME", "started clients"); + TEST_ASSERT (statistics (proxy_control, "started clients") > 0); + steer (proxy_control, "PAUSE", "pausing proxying after 500ms"); + uint64_t bytes = statistics (proxy_control, "post-pause"); msleep (500); // Run for 500 ms then quit - steer (proxy_control, "STATISTICS", "ran for a while"); + TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes); + + steer (proxy_control, "RESUME", "resuming proxying after another 500ms"); + + msleep (500); // Run for 500 ms then quit + + TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes); if (is_verbose) printf ("stopping all clients and server workers\n"); send_string_expect_success (control, "STOP", 0); - steer (proxy_control, "STATISTICS", "stopped clients and workers"); + statistics (proxy_control, "stopped clients and workers"); msleep (500); // Wait for all clients and workers to STOP @@ -415,7 +482,7 @@ void test_proxy_steerable () send_string_expect_success (control, "TERMINATE", 0); msleep (500); - steer (proxy_control, "STATISTICS", "terminate clients and server workers"); + statistics (proxy_control, "terminate clients and server workers"); msleep (500); // Wait for all clients and workers to terminate steer (proxy_control, "TERMINATE", "terminate proxy"); From 058ad60b9a5b9dae444279bc5cee6d040b0b08d7 Mon Sep 17 00:00:00 2001 From: George Cockshull Date: Thu, 12 Oct 2023 11:24:57 -0400 Subject: [PATCH 2/2] zmq_proxy_steerable: support non-REP socket types Problem: reimplemented zmq_proxy_steerable control socket type is not backwards compatible - replies are always sent. In the past, zmq_proxy_steerable never sent a reply for commands that weren't STATISTICS, so only really worked with PAIR and didn't work at all with REP. Now it only supports REP and PAIR semantics changed. This breaks compatibility with PAIR in a subtle and slightly annoying way if HWMs are hit without reading the replies. Solution: Add a check to send the empty reply only for REP control sockets. This restores backwards compatibility, and supports REP, PAIR, and SUB (for non-reply commands). I had no knowledge of the pre-MPL-2.0 implementation. This fix is based on docs and prior API usage. I contribute this under MPL-2.0. --- doc/zmq_proxy_steerable.txt | 12 +++++++----- src/proxy.cpp | 15 ++++++++++----- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/doc/zmq_proxy_steerable.txt b/doc/zmq_proxy_steerable.txt index c5c0f661..23081fa9 100644 --- a/doc/zmq_proxy_steerable.txt +++ b/doc/zmq_proxy_steerable.txt @@ -19,21 +19,21 @@ The _zmq_proxy_steerable()_ function is a variant of the _zmq_proxy()_ function. It accepts a fourth _control_ socket. When the _control_ socket is _NULL_ the two functions operate identically. -When a _control_ socket of type _REP_ is provided to the proxy function the +When a _control_ socket of type _REP_ or _PAIR_ is provided to the proxy function the application may send commands to the proxy. The following commands are supported. _PAUSE_:: - The proxy will cease transferring messages between its endpoints. + The proxy will cease transferring messages between its endpoints. _REP_ control socket will reply with an empty message. No response otherwise. _RESUME_:: - The proxy will resume transferring messages between its endpoints. + The proxy will resume transferring messages between its endpoints. _REP_ control socket will reply with an empty message. No response otherwise. _TERMINATE_:: - The proxy function will exit with a return value of 0. + The proxy function will exit with a return value of 0. _REP_ control socket will reply with an empty message. No response otherwise. _STATISTICS_:: - The proxy behavior will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next. + The proxy state will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next. Control socket must support sending. There are eight statistics values, each of size _uint64_t_ in the multi-part message reply to the _STATISTICS_ command. These are: @@ -54,6 +54,8 @@ message reply to the _STATISTICS_ command. These are: - number of bytes sent by the backend socket +Message totals count each part in a multipart message individually. + RETURN VALUE ------------ diff --git a/src/proxy.cpp b/src/proxy.cpp index 2d6845fc..5ba77b37 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -197,11 +197,16 @@ static int handle_control (class zmq::socket_base_t *control_, state = terminated; } - // satisfy REP duty and reply no matter what. - cmsg.init_size (0); - rc = control_->send (&cmsg, 0); - if (unlikely (rc < 0)) { - return -1; + int type; + size_t sz = sizeof (type); + zmq_getsockopt (control_, ZMQ_TYPE, &type, &sz); + if (type == ZMQ_REP) { + // satisfy REP duty and reply no matter what. + cmsg.init_size (0); + rc = control_->send (&cmsg, 0); + if (unlikely (rc < 0)) { + return -1; + } } return 0; }