diff --git a/doc/zmq_proxy_steerable.txt b/doc/zmq_proxy_steerable.txt index c5c0f661..23081fa9 100644 --- a/doc/zmq_proxy_steerable.txt +++ b/doc/zmq_proxy_steerable.txt @@ -19,21 +19,21 @@ The _zmq_proxy_steerable()_ function is a variant of the _zmq_proxy()_ function. It accepts a fourth _control_ socket. When the _control_ socket is _NULL_ the two functions operate identically. -When a _control_ socket of type _REP_ is provided to the proxy function the +When a _control_ socket of type _REP_ or _PAIR_ is provided to the proxy function the application may send commands to the proxy. The following commands are supported. _PAUSE_:: - The proxy will cease transferring messages between its endpoints. + The proxy will cease transferring messages between its endpoints. _REP_ control socket will reply with an empty message. No response otherwise. _RESUME_:: - The proxy will resume transferring messages between its endpoints. + The proxy will resume transferring messages between its endpoints. _REP_ control socket will reply with an empty message. No response otherwise. _TERMINATE_:: - The proxy function will exit with a return value of 0. + The proxy function will exit with a return value of 0. _REP_ control socket will reply with an empty message. No response otherwise. _STATISTICS_:: - The proxy behavior will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next. + The proxy state will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next. Control socket must support sending. There are eight statistics values, each of size _uint64_t_ in the multi-part message reply to the _STATISTICS_ command. These are: @@ -54,6 +54,8 @@ message reply to the _STATISTICS_ command. These are: - number of bytes sent by the backend socket +Message totals count each part in a multipart message individually. + RETURN VALUE ------------ diff --git a/src/proxy.cpp b/src/proxy.cpp index 78d6ba61..5ba77b37 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -189,19 +189,24 @@ static int handle_control (class zmq::socket_base_t *control_, return 0; } - if (msiz == 5 && memcmp (command, "\x05PAUSE", 6)) { - state = active; - } else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) { + if (msiz == 5 && 0 == memcmp (command, "PAUSE", 5)) { state = paused; + } else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) { + state = active; } else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) { state = terminated; } - // satisfy REP duty and reply no matter what. - cmsg.init_size (0); - rc = control_->send (&cmsg, 0); - if (unlikely (rc < 0)) { - return -1; + int type; + size_t sz = sizeof (type); + zmq_getsockopt (control_, ZMQ_TYPE, &type, &sz); + if (type == ZMQ_REP) { + // satisfy REP duty and reply no matter what. + cmsg.init_size (0); + rc = control_->send (&cmsg, 0); + if (unlikely (rc < 0)) { + return -1; + } } return 0; } diff --git a/tests/test_proxy_steerable.cpp b/tests/test_proxy_steerable.cpp index 68de26c1..14092df1 100644 --- a/tests/test_proxy_steerable.cpp +++ b/tests/test_proxy_steerable.cpp @@ -5,6 +5,7 @@ #include #include +#include #define CONTENT_SIZE 13 #define CONTENT_SIZE_MAX 32 @@ -25,6 +26,9 @@ void *g_clients_pkts_out = NULL; void *g_workers_pkts_out = NULL; void *control_context = NULL; // worker control, not proxy control +int g_proxy_control_socktype = + ZMQ_PAIR; //or ZMQ_PAIR, ZMQ_SUB (without statistics) + void setUp () { setup_test_context (); @@ -90,7 +94,7 @@ static void client_task (void *db_) int request_nbr = 0; bool run = true; - bool keep_sending = true; + bool enable_send = false; while (run) { // Tick once per 200 ms, pulling in arriving messages int centitick; @@ -126,14 +130,17 @@ static void client_task (void *db_) break; } if (memcmp (content, "STOP", 4) == 0) { - keep_sending = false; + enable_send = false; break; } + if (memcmp (content, "START", 5) == 0) { + enable_send = true; + } } } } - if (keep_sending) { + if (enable_send) { snprintf (content, CONTENT_SIZE_MAX * sizeof (char), "request #%03d", ++request_nbr); // CONTENT_SIZE if (is_verbose) @@ -203,9 +210,14 @@ void server_task (void * /*unused_*/) } // Proxy control socket - void *proxy_control = zmq_socket (get_test_context (), ZMQ_REP); + void *proxy_control = + zmq_socket (get_test_context (), g_proxy_control_socktype); TEST_ASSERT_NOT_NULL (proxy_control); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (proxy_control, proxy_control_address)); + if (g_proxy_control_socktype == ZMQ_SUB) { + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (proxy_control, ZMQ_SUBSCRIBE, "", 0)); + } // Connect backend to frontend via a steerable proxy int rc = zmq_proxy_steerable (frontend, backend, NULL, proxy_control); @@ -319,6 +331,55 @@ static void server_worker (void * /*unused_*/) // // - 7/bsb: number of bytes sent out the backend socket +uint64_t read_stat_value (void *proxy_control) +{ + zmq_msg_t stats_msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0)); + TEST_ASSERT_EQUAL_INT (sizeof (uint64_t), zmq_msg_size (&stats_msg)); + uint64_t val = *(uint64_t *) zmq_msg_data (&stats_msg); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); + return val; +} + +//return total bytes proxied, so we can test PAUSE/RESUME +uint64_t statistics (void *proxy_control, const char *runctx) +{ + if (is_verbose) { + printf ("steer: sending STATISTICS - %s\n", runctx); + } + + TEST_ASSERT_SUCCESS_ERRNO (zmq_send (proxy_control, "STATISTICS", 10, 0)); + + uint64_t total_bytes_proxied = 0; + for (int count = 0; count < 8; ++count) { + uint64_t val = read_stat_value (proxy_control); + if (is_verbose) { + if (count == 0) { + printf ("stats: client pkts out: %d worker pkts out: %d { ", + zmq_atomic_counter_value (g_clients_pkts_out), + zmq_atomic_counter_value (g_workers_pkts_out)); + } + printf ("%" PRIu64, val); + if (count == 7) { + printf ("}\n"); + } + } + switch (count) { + case 3: //bytes sent on frontend + case 7: //bytes sent on backend + total_bytes_proxied += val; + } + } + + int rcvmore; + size_t sz = sizeof (rcvmore); + zmq_getsockopt (proxy_control, ZMQ_RCVMORE, &rcvmore, &sz); + TEST_ASSERT_EQUAL_INT (rcvmore, 0); + return total_bytes_proxied; +} + + // The main thread simply starts several clients and a server, and then // waits for the server to finish. @@ -328,32 +389,18 @@ void steer (void *proxy_control, const char *command, const char *runctx) printf ("steer: sending %s - %s\n", command, runctx); } - // Start with proxy paused TEST_ASSERT_SUCCESS_ERRNO ( zmq_send (proxy_control, command, strlen (command), 0)); - zmq_msg_t stats_msg; - int count = -1; - while (1) { - count = count + 1; + if (g_proxy_control_socktype == ZMQ_REP) { + //expect an empty reply from REP for commands that need no response + zmq_msg_t stats_msg; TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg)); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0)); - - - if (is_verbose && zmq_msg_size (&stats_msg)) { - if (count == 0) { - printf ("steer:"); - } - printf (" %lu", *(unsigned long int *) zmq_msg_data (&stats_msg)); - if (count == 7) { - printf ("\n"); - } - } - if (!zmq_msg_get (&stats_msg, ZMQ_MORE)) - break; + TEST_ASSERT_EQUAL_INT (zmq_msg_size (&stats_msg), 0); + TEST_ASSERT (!zmq_msg_get (&stats_msg, ZMQ_MORE)); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); } - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); } void test_proxy_steerable () @@ -382,31 +429,51 @@ void test_proxy_steerable () msleep (500); // Run for 500 ms then quit // Proxy control socket - void *proxy_control = zmq_socket (get_test_context (), ZMQ_REQ); + int control_socktype = ZMQ_PAIR; + switch (g_proxy_control_socktype) { + case ZMQ_REP: + control_socktype = ZMQ_REQ; + break; + case ZMQ_SUB: + control_socktype = ZMQ_PUB; + break; + default: + break; + } + void *proxy_control = zmq_socket (get_test_context (), control_socktype); TEST_ASSERT_NOT_NULL (proxy_control); linger = 0; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (proxy_control, ZMQ_LINGER, &linger, sizeof (linger))); TEST_ASSERT_SUCCESS_ERRNO ( zmq_connect (proxy_control, proxy_control_address)); - msleep (500); // Run for 500 ms then quit - steer (proxy_control, "STATISTICS", "started clients"); - steer (proxy_control, "PAUSE", "started server"); + TEST_ASSERT ( + statistics (proxy_control, "should be all 0s before clients start") == 0); + + send_string_expect_success (control, "START", 0); msleep (500); // Run for 500 ms then quit - steer (proxy_control, "RESUME", "started clients"); + TEST_ASSERT (statistics (proxy_control, "started clients") > 0); + steer (proxy_control, "PAUSE", "pausing proxying after 500ms"); + uint64_t bytes = statistics (proxy_control, "post-pause"); msleep (500); // Run for 500 ms then quit - steer (proxy_control, "STATISTICS", "ran for a while"); + TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes); + + steer (proxy_control, "RESUME", "resuming proxying after another 500ms"); + + msleep (500); // Run for 500 ms then quit + + TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes); if (is_verbose) printf ("stopping all clients and server workers\n"); send_string_expect_success (control, "STOP", 0); - steer (proxy_control, "STATISTICS", "stopped clients and workers"); + statistics (proxy_control, "stopped clients and workers"); msleep (500); // Wait for all clients and workers to STOP @@ -415,7 +482,7 @@ void test_proxy_steerable () send_string_expect_success (control, "TERMINATE", 0); msleep (500); - steer (proxy_control, "STATISTICS", "terminate clients and server workers"); + statistics (proxy_control, "terminate clients and server workers"); msleep (500); // Wait for all clients and workers to terminate steer (proxy_control, "TERMINATE", "terminate proxy");