diff --git a/tests/test_conflate.cpp b/tests/test_conflate.cpp index b7e3331b..9c3ba5ae 100644 --- a/tests/test_conflate.cpp +++ b/tests/test_conflate.cpp @@ -45,7 +45,6 @@ int main (int argc, char *argv []) assert (rc == 0); int message_count = 20; - for (int j = 0; j < message_count; ++j) { rc = zmq_send(s_out, (void*)&j, sizeof(int), 0); if (rc < 0) { @@ -53,15 +52,13 @@ int main (int argc, char *argv []) return -1; } } - - zmq_sleep (1); + msleep (SETTLE_TIME); int payload_recved = 0; - rc = zmq_recv(s_in, (void*)&payload_recved, sizeof(int), 0); + rc = zmq_recv (s_in, (void*)&payload_recved, sizeof(int), 0); assert (rc > 0); assert (payload_recved == message_count - 1); - rc = zmq_close (s_in); assert (rc == 0); diff --git a/tests/test_connect_delay_tipc.cpp b/tests/test_connect_delay_tipc.cpp index 940edaaa..30a32f44 100644 --- a/tests/test_connect_delay_tipc.cpp +++ b/tests/test_connect_delay_tipc.cpp @@ -200,9 +200,7 @@ int main (void) assert (rc == 0); // Give time to process disconnect - // There's no way to do this except with a sleep - struct timespec t = { 0, 250 * 1000000 }; - nanosleep (&t, NULL); + msleep (SETTLE_TIME); // Send a message, should fail rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); diff --git a/tests/test_ctx_destroy.cpp b/tests/test_ctx_destroy.cpp index 7b3c5691..a269b2bd 100644 --- a/tests/test_ctx_destroy.cpp +++ b/tests/test_ctx_destroy.cpp @@ -61,7 +61,7 @@ void test_ctx_shutdown() void *receiver_thread = zmq_threadstart (&receiver, socket); // Wait for thread to start up and block - zmq_sleep (1); + msleep (SETTLE_TIME); // Shutdown context, if we used destroy here we would deadlock. rc = zmq_ctx_shutdown (ctx); diff --git a/tests/test_immediate.cpp b/tests/test_immediate.cpp index f385774b..2ac4aa3e 100644 --- a/tests/test_immediate.cpp +++ b/tests/test_immediate.cpp @@ -193,8 +193,7 @@ int main (void) assert (rc == 0); // Give time to process disconnect - // There's no way to do this except with a sleep - zmq_sleep(1); + msleep (SETTLE_TIME); // Send a message, should fail rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); diff --git a/tests/test_inproc_connect.cpp b/tests/test_inproc_connect.cpp index e3090fd2..10d828f4 100644 --- a/tests/test_inproc_connect.cpp +++ b/tests/test_inproc_connect.cpp @@ -142,7 +142,7 @@ void test_connect_before_bind_pub_sub() assert (rc == 0); // Wait for pub-sub connection to happen - zmq_sleep (1); + msleep (SETTLE_TIME); // Queue up some data, this not will be dropped rc = zmq_send_const (connectSocket, "after", 6, 0); diff --git a/tests/test_iov.cpp b/tests/test_iov.cpp index 2eef04ac..718b318f 100644 --- a/tests/test_iov.cpp +++ b/tests/test_iov.cpp @@ -80,7 +80,7 @@ int main (void) rc = zmq_bind (sb, "inproc://a"); assert (rc == 0); - zmq_sleep(1); + msleep (SETTLE_TIME); void *sc = zmq_socket (ctx, ZMQ_PUSH); rc = zmq_connect (sc, "inproc://a"); diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index 36b835a0..84376193 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -211,7 +211,7 @@ int main (void) rc = zmq_socket_monitor (req, "inproc://monitor.req", ZMQ_EVENT_ALL); assert (rc == 0); threads [1] = zmq_threadstart(&req_socket_monitor, ctx); - zmq_sleep(1); + msleep (SETTLE_TIME); // Bind REQ and REP rc = zmq_bind (rep, addr.c_str()); @@ -238,8 +238,8 @@ int main (void) rc = zmq_close (rep); assert (rc == 0); - // Allow some time for detecting error states - zmq_sleep(1); + // Allow enough time for detecting error states + msleep (250); // Close the REQ socket rc = zmq_close (req); diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp index a695dad3..8523e21d 100644 --- a/tests/test_proxy.cpp +++ b/tests/test_proxy.cpp @@ -37,19 +37,17 @@ #define CONTENT_SIZE_MAX 32 #define ID_SIZE 10 #define ID_SIZE_MAX 32 -#define QT_WORKERS 5 -#define QT_CLIENTS 3 +#define QT_WORKERS 5 +#define QT_CLIENTS 3 #define is_verbose 0 static void client_task (void *ctx) { -// void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket -// assert (ctx); void *client = zmq_socket (ctx, ZMQ_DEALER); assert (client); - // Control socket receives terminate command from main over inproc + // Control socket receives terminate command from main over inproc void *control = zmq_socket (ctx, ZMQ_SUB); assert (control); int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); @@ -58,54 +56,52 @@ client_task (void *ctx) assert (rc == 0); char content [CONTENT_SIZE_MAX]; - // Set random identity to make tracing easier - char identity [ID_SIZE]; - sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF); + // Set random identity to make tracing easier + char identity [ID_SIZE]; + sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF); rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf assert (rc == 0); rc = zmq_connect (client, "tcp://127.0.0.1:9999"); assert (rc == 0); - zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; - int request_nbr = 0; - bool run = true; - while (run) { - // Tick once per 200 ms, pulling in arriving messages - int centitick; - for (centitick = 0; centitick < 20; centitick++) { - zmq_poll (items, 2, 10); - if (items [0].revents & ZMQ_POLLIN) { - int rcvmore; - size_t sz = sizeof (rcvmore); - rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0); - assert (rc == CONTENT_SIZE); - if (is_verbose) printf("client receive - identity = %s content = %s\n", identity, content); - // Check that message is still the same - assert (memcmp (content, "request #", 9) == 0); - rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz); - assert (rc == 0); - assert (!rcvmore); - } - if (items [1].revents & ZMQ_POLLIN) { - rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); - if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content); - if (memcmp (content, "TERMINATE", 10) == 0) { - run = false; - break; - } - } - } - sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE - rc = zmq_send (client, content, CONTENT_SIZE, 0); - assert (rc == CONTENT_SIZE); - } + zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; + int request_nbr = 0; + bool run = true; + while (run) { + // Tick once per 200 ms, pulling in arriving messages + int centitick; + for (centitick = 0; centitick < 20; centitick++) { + zmq_poll (items, 2, 10); + if (items [0].revents & ZMQ_POLLIN) { + int rcvmore; + size_t sz = sizeof (rcvmore); + rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0); + assert (rc == CONTENT_SIZE); + if (is_verbose) printf("client receive - identity = %s content = %s\n", identity, content); + // Check that message is still the same + assert (memcmp (content, "request #", 9) == 0); + rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (!rcvmore); + } + if (items [1].revents & ZMQ_POLLIN) { + rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); + if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content); + if (memcmp (content, "TERMINATE", 10) == 0) { + run = false; + break; + } + } + } + sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE + rc = zmq_send (client, content, CONTENT_SIZE, 0); + assert (rc == CONTENT_SIZE); + } rc = zmq_close (client); assert (rc == 0); rc = zmq_close (control); assert (rc == 0); -// rc = zmq_ctx_term (ctx); -// assert (rc == 0); } // This is our server task. @@ -119,22 +115,19 @@ static void server_worker (void *ctx); void server_task (void *ctx) { -// void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket -// assert (ctx); - // Frontend socket talks to clients over TCP void *frontend = zmq_socket (ctx, ZMQ_ROUTER); assert (frontend); int rc = zmq_bind (frontend, "tcp://127.0.0.1:9999"); assert (rc == 0); - // Backend socket talks to workers over inproc + // Backend socket talks to workers over inproc void *backend = zmq_socket (ctx, ZMQ_DEALER); assert (backend); rc = zmq_bind (backend, "inproc://backend"); assert (rc == 0); - // Control socket receives terminate command from main over inproc + // Control socket receives terminate command from main over inproc void *control = zmq_socket (ctx, ZMQ_SUB); assert (control); rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); @@ -142,17 +135,17 @@ server_task (void *ctx) rc = zmq_connect (control, "inproc://control"); assert (rc == 0); - // Launch pool of worker threads, precise number is not critical - int thread_nbr; - void* threads [5]; - for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) - threads[thread_nbr] = zmq_threadstart (&server_worker, ctx); + // Launch pool of worker threads, precise number is not critical + int thread_nbr; + void* threads [5]; + for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) + threads[thread_nbr] = zmq_threadstart (&server_worker, ctx); - // Connect backend to frontend via a proxy - zmq_proxy_steerable (frontend, backend, NULL, control); + // Connect backend to frontend via a proxy + zmq_proxy_steerable (frontend, backend, NULL, control); - for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) - zmq_threadclose (threads[thread_nbr]); + for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) + zmq_threadclose (threads[thread_nbr]); rc = zmq_close (frontend); assert (rc == 0); @@ -160,8 +153,6 @@ server_task (void *ctx) assert (rc == 0); rc = zmq_close (control); assert (rc == 0); -// rc = zmq_ctx_term (ctx); -// assert (rc == 0); } // Each worker task works on one request at a time and sends a random number @@ -176,7 +167,7 @@ server_worker (void *ctx) int rc = zmq_connect (worker, "inproc://backend"); assert (rc == 0); - // Control socket receives terminate command from main over inproc + // Control socket receives terminate command from main over inproc void *control = zmq_socket (ctx, ZMQ_SUB); assert (control); rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); @@ -184,44 +175,40 @@ server_worker (void *ctx) rc = zmq_connect (control, "inproc://control"); assert (rc == 0); - char content [CONTENT_SIZE_MAX]; // bigger than what we need to check that - char identity [ID_SIZE_MAX]; // the size received is the size sent + char content [CONTENT_SIZE_MAX]; // bigger than what we need to check that + char identity [ID_SIZE_MAX]; // the size received is the size sent -// zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; // POLLING - bool run = true; - while (run) { -// zmq_poll (items, 2, 10); // POLLING -// if (items [1].revents & ZMQ_POLLIN) { // POLLING - rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) - if (rc > 0) { - if (is_verbose) printf("server_worker receives command = %s\n", content); - if (memcmp (content, "TERMINATE", 10) == 0) - run = false; - } -// } // POLLING + bool run = true; + while (run) { + rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) + if (rc > 0) { + if (is_verbose) + printf("server_worker receives command = %s\n", content); + if (memcmp (content, "TERMINATE", 10) == 0) + run = false; + } + // The DEALER socket gives us the reply envelope and message + // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 + rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT); + if (rc == ID_SIZE) { + rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0); + assert (rc == CONTENT_SIZE); + if (is_verbose) + printf ("server receive - identity = %s content = %s\n", identity, content); -// if (items [0].revents & ZMQ_POLLIN) { // POLLING - // The DEALER socket gives us the reply envelope and message - rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT); // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 - if (rc == ID_SIZE) { - rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0); - assert (rc == CONTENT_SIZE); - if (is_verbose) printf("server receive - identity = %s content = %s\n", identity, content); - - // Send 0..4 replies back - int reply, replies = rand() % 5; - for (reply = 0; reply < replies; reply++) { - // Sleep for some fraction of a second - msleep (rand () % 10 + 1); - // Send message from server to client - rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE); - assert (rc == ID_SIZE); - rc = zmq_send (worker, content, CONTENT_SIZE, 0); - assert (rc == CONTENT_SIZE); - } - } -// } // POLLING - } + // Send 0..4 replies back + int reply, replies = rand() % 5; + for (reply = 0; reply < replies; reply++) { + // Sleep for some fraction of a second + msleep (rand () % 10 + 1); + // Send message from server to client + rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE); + assert (rc == ID_SIZE); + rc = zmq_send (worker, content, CONTENT_SIZE, 0); + assert (rc == CONTENT_SIZE); + } + } + } rc = zmq_close (worker); assert (rc == 0); rc = zmq_close (control); @@ -237,31 +224,28 @@ int main (void) void *ctx = zmq_ctx_new (); assert (ctx); - // Control socket receives terminate command from main over inproc + // Control socket receives terminate command from main over inproc void *control = zmq_socket (ctx, ZMQ_PUB); assert (control); int rc = zmq_bind (control, "inproc://control"); assert (rc == 0); - void* threads [QT_CLIENTS + 1]; - for (int i = 0; i < QT_CLIENTS; i++) - { - threads[i] = zmq_threadstart (&client_task, ctx); - } - threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); - msleep (500); // Run for 500 ms then quit + void *threads [QT_CLIENTS + 1]; + for (int i = 0; i < QT_CLIENTS; i++) + threads[i] = zmq_threadstart (&client_task, ctx); + threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); + msleep (500); // Run for 500 ms then quit - rc = zmq_send (control, "TERMINATE", 10, 0); - assert (rc == 10); + rc = zmq_send (control, "TERMINATE", 10, 0); + assert (rc == 10); - // clean everything rc = zmq_close (control); assert (rc == 0); - //msleep (1000); // not sure it is usefull - for (int i = 0; i < QT_CLIENTS + 1; i++) - zmq_threadclose (threads[i]); + for (int i = 0; i < QT_CLIENTS + 1; i++) + zmq_threadclose (threads[i]); + rc = zmq_ctx_term (ctx); assert (rc == 0); - return 0; + return 0; } diff --git a/tests/test_req_relaxed.cpp b/tests/test_req_relaxed.cpp index d4ce0468..7970ed41 100644 --- a/tests/test_req_relaxed.cpp +++ b/tests/test_req_relaxed.cpp @@ -54,7 +54,7 @@ int main (void) // We have to give the connects time to finish otherwise the requests // will not properly round-robin. We could alternatively connect the // REQ sockets to the REP sockets. - zmq_sleep(1); + msleep (SETTLE_TIME); // Case 1: Second send() before a reply arrives in a pipe. diff --git a/tests/test_spec_req.cpp b/tests/test_spec_req.cpp index 83619acd..8e3e7218 100644 --- a/tests/test_spec_req.cpp +++ b/tests/test_spec_req.cpp @@ -46,7 +46,7 @@ void test_round_robin_out (void *ctx) // We have to give the connects time to finish otherwise the requests // will not properly round-robin. We could alternatively connect the // REQ sockets to the REP sockets. - zmq_sleep(1); + msleep (SETTLE_TIME); // Send our peer-replies, and expect every REP it used once in order for (size_t peer = 0; peer < services; peer++) { diff --git a/tests/test_sub_forward.cpp b/tests/test_sub_forward.cpp index 53533086..e77ed3b8 100644 --- a/tests/test_sub_forward.cpp +++ b/tests/test_sub_forward.cpp @@ -59,7 +59,7 @@ int main (void) assert (rc >= 0); // Wait a bit till the subscription gets to the publisher - zmq_sleep(1); + msleep (SETTLE_TIME); // Send an empty message rc = zmq_send (pub, NULL, 0, 0); diff --git a/tests/test_sub_forward_tipc.cpp b/tests/test_sub_forward_tipc.cpp index 1dda2f3a..4ead9f71 100644 --- a/tests/test_sub_forward_tipc.cpp +++ b/tests/test_sub_forward_tipc.cpp @@ -67,7 +67,7 @@ int main (void) assert (rc >= 0); // Wait a bit till the subscription gets to the publisher. - zmq_sleep (1); + msleep (SETTLE_TIME); // Send an empty message. rc = zmq_send (pub, NULL, 0, 0); diff --git a/tests/test_term_endpoint.cpp b/tests/test_term_endpoint.cpp index 6d9a5df0..186d3773 100644 --- a/tests/test_term_endpoint.cpp +++ b/tests/test_term_endpoint.cpp @@ -49,7 +49,7 @@ int main (void) assert (rc == 0); // Allow unbind to settle - zmq_sleep(1); + msleep (SETTLE_TIME); // Check that sending would block (there's no outbound connection) rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); @@ -86,7 +86,7 @@ int main (void) assert (rc == 0); // Allow disconnect to settle - zmq_sleep(1); + msleep (SETTLE_TIME); // Check that sending would block (there's no inbound connections). rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); diff --git a/tests/test_term_endpoint_tipc.cpp b/tests/test_term_endpoint_tipc.cpp index c338ade4..57ccf3e8 100644 --- a/tests/test_term_endpoint_tipc.cpp +++ b/tests/test_term_endpoint_tipc.cpp @@ -59,7 +59,7 @@ int main (void) assert (rc == 0); // Let events some time - zmq_sleep (1); + msleep (SETTLE_TIME); // Check that sending would block (there's no outbound connection). rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); @@ -100,8 +100,7 @@ int main (void) rc = zmq_disconnect (push, name); assert (rc == 0); - // Let events some time - zmq_sleep (1); + msleep (SETTLE_TIME); // Check that sending would block (there's no inbound connections). rc = zmq_send (push, "ABC", 3, ZMQ_DONTWAIT); diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 23ef7cbb..31db1e40 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -24,6 +24,11 @@ #include "../include/zmq_utils.h" #include "platform.hpp" +// This defines the settle time used in tests; raise this if we +// get test failures on slower systems due to binds/connects not +// settled. Tested to work reliably at 1 msec on a fast PC. +#define SETTLE_TIME 10 // In msec + #undef NDEBUG #include #include @@ -259,22 +264,15 @@ void setup_test_environment() #endif } -// provide portable millisecond sleep -#include - +// Provide portable millisecond sleep +// http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for +void msleep (int milliseconds) +{ #ifdef ZMQ_HAVE_WINDOWS -#include + Sleep (milliseconds); #else -#include + usleep (static_cast (milliseconds) * 1000); #endif - -void msleep(int milliseconds) -{ // http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for - #ifdef ZMQ_HAVE_WINDOWS - Sleep(milliseconds); - #else - usleep(static_cast(milliseconds)*1000); - #endif }