diff --git a/src/proxy.cpp b/src/proxy.cpp index 78d6ba61..2d6845fc 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -189,10 +189,10 @@ static int handle_control (class zmq::socket_base_t *control_, return 0; } - if (msiz == 5 && memcmp (command, "\x05PAUSE", 6)) { - state = active; - } else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) { + if (msiz == 5 && 0 == memcmp (command, "PAUSE", 5)) { state = paused; + } else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) { + state = active; } else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) { state = terminated; } diff --git a/tests/test_proxy_steerable.cpp b/tests/test_proxy_steerable.cpp index 68de26c1..14092df1 100644 --- a/tests/test_proxy_steerable.cpp +++ b/tests/test_proxy_steerable.cpp @@ -5,6 +5,7 @@ #include #include +#include #define CONTENT_SIZE 13 #define CONTENT_SIZE_MAX 32 @@ -25,6 +26,9 @@ void *g_clients_pkts_out = NULL; void *g_workers_pkts_out = NULL; void *control_context = NULL; // worker control, not proxy control +int g_proxy_control_socktype = + ZMQ_PAIR; //or ZMQ_PAIR, ZMQ_SUB (without statistics) + void setUp () { setup_test_context (); @@ -90,7 +94,7 @@ static void client_task (void *db_) int request_nbr = 0; bool run = true; - bool keep_sending = true; + bool enable_send = false; while (run) { // Tick once per 200 ms, pulling in arriving messages int centitick; @@ -126,14 +130,17 @@ static void client_task (void *db_) break; } if (memcmp (content, "STOP", 4) == 0) { - keep_sending = false; + enable_send = false; break; } + if (memcmp (content, "START", 5) == 0) { + enable_send = true; + } } } } - if (keep_sending) { + if (enable_send) { snprintf (content, CONTENT_SIZE_MAX * sizeof (char), "request #%03d", ++request_nbr); // CONTENT_SIZE if (is_verbose) @@ -203,9 +210,14 @@ void server_task (void * /*unused_*/) } // Proxy control socket - void *proxy_control = zmq_socket (get_test_context (), ZMQ_REP); + void *proxy_control = + zmq_socket (get_test_context (), g_proxy_control_socktype); TEST_ASSERT_NOT_NULL (proxy_control); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (proxy_control, proxy_control_address)); + if (g_proxy_control_socktype == ZMQ_SUB) { + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (proxy_control, ZMQ_SUBSCRIBE, "", 0)); + } // Connect backend to frontend via a steerable proxy int rc = zmq_proxy_steerable (frontend, backend, NULL, proxy_control); @@ -319,6 +331,55 @@ static void server_worker (void * /*unused_*/) // // - 7/bsb: number of bytes sent out the backend socket +uint64_t read_stat_value (void *proxy_control) +{ + zmq_msg_t stats_msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0)); + TEST_ASSERT_EQUAL_INT (sizeof (uint64_t), zmq_msg_size (&stats_msg)); + uint64_t val = *(uint64_t *) zmq_msg_data (&stats_msg); + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); + return val; +} + +//return total bytes proxied, so we can test PAUSE/RESUME +uint64_t statistics (void *proxy_control, const char *runctx) +{ + if (is_verbose) { + printf ("steer: sending STATISTICS - %s\n", runctx); + } + + TEST_ASSERT_SUCCESS_ERRNO (zmq_send (proxy_control, "STATISTICS", 10, 0)); + + uint64_t total_bytes_proxied = 0; + for (int count = 0; count < 8; ++count) { + uint64_t val = read_stat_value (proxy_control); + if (is_verbose) { + if (count == 0) { + printf ("stats: client pkts out: %d worker pkts out: %d { ", + zmq_atomic_counter_value (g_clients_pkts_out), + zmq_atomic_counter_value (g_workers_pkts_out)); + } + printf ("%" PRIu64, val); + if (count == 7) { + printf ("}\n"); + } + } + switch (count) { + case 3: //bytes sent on frontend + case 7: //bytes sent on backend + total_bytes_proxied += val; + } + } + + int rcvmore; + size_t sz = sizeof (rcvmore); + zmq_getsockopt (proxy_control, ZMQ_RCVMORE, &rcvmore, &sz); + TEST_ASSERT_EQUAL_INT (rcvmore, 0); + return total_bytes_proxied; +} + + // The main thread simply starts several clients and a server, and then // waits for the server to finish. @@ -328,32 +389,18 @@ void steer (void *proxy_control, const char *command, const char *runctx) printf ("steer: sending %s - %s\n", command, runctx); } - // Start with proxy paused TEST_ASSERT_SUCCESS_ERRNO ( zmq_send (proxy_control, command, strlen (command), 0)); - zmq_msg_t stats_msg; - int count = -1; - while (1) { - count = count + 1; + if (g_proxy_control_socktype == ZMQ_REP) { + //expect an empty reply from REP for commands that need no response + zmq_msg_t stats_msg; TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg)); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0)); - - - if (is_verbose && zmq_msg_size (&stats_msg)) { - if (count == 0) { - printf ("steer:"); - } - printf (" %lu", *(unsigned long int *) zmq_msg_data (&stats_msg)); - if (count == 7) { - printf ("\n"); - } - } - if (!zmq_msg_get (&stats_msg, ZMQ_MORE)) - break; + TEST_ASSERT_EQUAL_INT (zmq_msg_size (&stats_msg), 0); + TEST_ASSERT (!zmq_msg_get (&stats_msg, ZMQ_MORE)); TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); } - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); } void test_proxy_steerable () @@ -382,31 +429,51 @@ void test_proxy_steerable () msleep (500); // Run for 500 ms then quit // Proxy control socket - void *proxy_control = zmq_socket (get_test_context (), ZMQ_REQ); + int control_socktype = ZMQ_PAIR; + switch (g_proxy_control_socktype) { + case ZMQ_REP: + control_socktype = ZMQ_REQ; + break; + case ZMQ_SUB: + control_socktype = ZMQ_PUB; + break; + default: + break; + } + void *proxy_control = zmq_socket (get_test_context (), control_socktype); TEST_ASSERT_NOT_NULL (proxy_control); linger = 0; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (proxy_control, ZMQ_LINGER, &linger, sizeof (linger))); TEST_ASSERT_SUCCESS_ERRNO ( zmq_connect (proxy_control, proxy_control_address)); - msleep (500); // Run for 500 ms then quit - steer (proxy_control, "STATISTICS", "started clients"); - steer (proxy_control, "PAUSE", "started server"); + TEST_ASSERT ( + statistics (proxy_control, "should be all 0s before clients start") == 0); + + send_string_expect_success (control, "START", 0); msleep (500); // Run for 500 ms then quit - steer (proxy_control, "RESUME", "started clients"); + TEST_ASSERT (statistics (proxy_control, "started clients") > 0); + steer (proxy_control, "PAUSE", "pausing proxying after 500ms"); + uint64_t bytes = statistics (proxy_control, "post-pause"); msleep (500); // Run for 500 ms then quit - steer (proxy_control, "STATISTICS", "ran for a while"); + TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes); + + steer (proxy_control, "RESUME", "resuming proxying after another 500ms"); + + msleep (500); // Run for 500 ms then quit + + TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes); if (is_verbose) printf ("stopping all clients and server workers\n"); send_string_expect_success (control, "STOP", 0); - steer (proxy_control, "STATISTICS", "stopped clients and workers"); + statistics (proxy_control, "stopped clients and workers"); msleep (500); // Wait for all clients and workers to STOP @@ -415,7 +482,7 @@ void test_proxy_steerable () send_string_expect_success (control, "TERMINATE", 0); msleep (500); - steer (proxy_control, "STATISTICS", "terminate clients and server workers"); + statistics (proxy_control, "terminate clients and server workers"); msleep (500); // Wait for all clients and workers to terminate steer (proxy_control, "TERMINATE", "terminate proxy");