diff --git a/tests/test_proxy_steerable.cpp b/tests/test_proxy_steerable.cpp index 14092df1..75286f0d 100644 --- a/tests/test_proxy_steerable.cpp +++ b/tests/test_proxy_steerable.cpp @@ -11,9 +11,10 @@ #define CONTENT_SIZE_MAX 32 #define ROUTING_ID_SIZE 10 #define ROUTING_ID_SIZE_MAX 32 -#define QT_WORKERS 5 +#define QT_WORKERS 3 #define QT_CLIENTS 3 #define is_verbose 0 +#define TEST_SLEEP_MS 500 const char *proxy_control_address = "inproc://proxy_control"; @@ -127,15 +128,12 @@ static void client_task (void *db_) routing_id, content); if (memcmp (content, "TERMINATE", 9) == 0) { run = false; - break; - } - if (memcmp (content, "STOP", 4) == 0) { + } else if (memcmp (content, "STOP", 4) == 0) { enable_send = false; - break; - } - if (memcmp (content, "START", 5) == 0) { + } else if (memcmp (content, "START", 5) == 0) { enable_send = true; } + break; } } } @@ -187,7 +185,7 @@ void server_task (void * /*unused_*/) // Launch pool of worker threads, precise number is not critical int thread_nbr; - void *threads[5]; + void *threads[QT_WORKERS]; for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) threads[thread_nbr] = zmq_threadstart (&server_worker, NULL); @@ -261,26 +259,34 @@ static void server_worker (void * /*unused_*/) char routing_id[ROUTING_ID_SIZE_MAX] = {}; // the size received is the size sent - bool run = true; + zmq_pollitem_t items[] = {{control, 0, ZMQ_POLLIN, 0}, + {worker, 0, ZMQ_POLLIN, 0}}; bool keep_sending = true; - while (run) { - int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, - ZMQ_DONTWAIT); // usually, rc == -1 (no message) - if (rc > 0) { - content[rc] = 0; // NULL-terminate the command string - if (is_verbose) - printf ("server_worker receives command = %s\n", content); - if (memcmp (content, "TERMINATE", 9) == 0) - run = false; - if (memcmp (content, "STOP", 4) == 0) - keep_sending = false; + while (true) { + zmq_poll (items, 2, 100); + if (items[0].revents & ZMQ_POLLIN) { + //Commands over the worker control socket + int rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); + if (rc > 0) { + content[rc] = 0; // NULL-terminate the command string + if (is_verbose) + printf ("server_worker receives command = %s\n", content); + if (memcmp (content, "TERMINATE", 9) == 0) + break; + if (memcmp (content, "STOP", 4) == 0) + keep_sending = false; + } } - // The DEALER socket gives us the reply envelope and message - // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 - rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, ZMQ_DONTWAIT); - if (rc == ROUTING_ID_SIZE) { + if (items[1].revents & ZMQ_POLLIN) { + // The DEALER socket gives us the reply envelope and message + int rc = zmq_recv (worker, routing_id, ROUTING_ID_SIZE_MAX, 0); + if (rc != ROUTING_ID_SIZE) { + continue; + } + routing_id[rc] = 0; //null terminate rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0); TEST_ASSERT_EQUAL_INT (CONTENT_SIZE, rc); + content[rc] = 0; //null terminate if (is_verbose) printf ("server receive - routing_id = %s content = %s\n", routing_id, content); @@ -360,7 +366,7 @@ uint64_t statistics (void *proxy_control, const char *runctx) zmq_atomic_counter_value (g_clients_pkts_out), zmq_atomic_counter_value (g_workers_pkts_out)); } - printf ("%" PRIu64, val); + printf ("%" PRIu64 " ", val); if (count == 7) { printf ("}\n"); } @@ -426,7 +432,7 @@ void test_proxy_steerable () threads[i] = zmq_threadstart (&client_task, &databags[i]); } threads[QT_CLIENTS] = zmq_threadstart (&server_task, NULL); - msleep (500); // Run for 500 ms then quit + msleep (TEST_SLEEP_MS); // setup time // Proxy control socket int control_socktype = ZMQ_PAIR; @@ -453,19 +459,20 @@ void test_proxy_steerable () send_string_expect_success (control, "START", 0); - msleep (500); // Run for 500 ms then quit + msleep (TEST_SLEEP_MS); // Run for some time TEST_ASSERT (statistics (proxy_control, "started clients") > 0); steer (proxy_control, "PAUSE", "pausing proxying after 500ms"); uint64_t bytes = statistics (proxy_control, "post-pause"); - msleep (500); // Run for 500 ms then quit + msleep (TEST_SLEEP_MS); // Paused for some time + //check no more bytes have been proxied while paused TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes); steer (proxy_control, "RESUME", "resuming proxying after another 500ms"); - msleep (500); // Run for 500 ms then quit + msleep (TEST_SLEEP_MS); // Resumed for a while TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes); @@ -475,16 +482,16 @@ void test_proxy_steerable () statistics (proxy_control, "stopped clients and workers"); - msleep (500); // Wait for all clients and workers to STOP + msleep (TEST_SLEEP_MS); // Wait for all clients and workers to STOP if (is_verbose) printf ("shutting down all clients and server workers\n"); send_string_expect_success (control, "TERMINATE", 0); - msleep (500); + msleep (TEST_SLEEP_MS); statistics (proxy_control, "terminate clients and server workers"); - msleep (500); // Wait for all clients and workers to terminate + msleep (TEST_SLEEP_MS); // Wait for all clients and workers to terminate steer (proxy_control, "TERMINATE", "terminate proxy"); for (int i = 0; i < QT_CLIENTS + 1; i++)