From 13bc1de42149ad8dfca7847ffa56b331dcd6a379 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Thu, 2 Feb 2023 02:11:33 +0000 Subject: [PATCH] Problem: no permission to relicense zmq_proxy_steerable Solution: remove implementation. Laurent Alebarde , the author, did not respond to requests to allow relicensing to MPL2, so we have to remove his copyrighted work. Make the API into an empty stub that returns -EOPNOTSUPP. --- AUTHORS | 1 - doc/zmq_proxy_steerable.txt | 100 +------------ src/proxy.cpp | 227 ++--------------------------- src/proxy.hpp | 4 +- src/zmq.cpp | 10 +- tests/test_proxy.cpp | 139 +++--------------- tests/test_proxy_hwm.cpp | 86 +---------- tests/test_proxy_single_socket.cpp | 10 +- tests/test_proxy_terminate.cpp | 10 +- 9 files changed, 53 insertions(+), 534 deletions(-) diff --git a/AUTHORS b/AUTHORS index 42b865fa..b1fbae03 100644 --- a/AUTHORS +++ b/AUTHORS @@ -69,7 +69,6 @@ Jon Dyte Kamil Shakirov Ken Steele Kouhei Sutou -Laurent Alebarde Leonardo J. Consoni Lionel Flandrin Lourens Naudé diff --git a/doc/zmq_proxy_steerable.txt b/doc/zmq_proxy_steerable.txt index 91bfc443..4002c934 100644 --- a/doc/zmq_proxy_steerable.txt +++ b/doc/zmq_proxy_steerable.txt @@ -3,7 +3,7 @@ zmq_proxy_steerable(3) NAME ---- -zmq_proxy_steerable - built-in 0MQ proxy with control flow +zmq_proxy_steerable - DEPRECATED SYNOPSIS @@ -14,101 +14,9 @@ SYNOPSIS DESCRIPTION ----------- -The _zmq_proxy_steerable()_ function starts the built-in 0MQ proxy in the -current application thread, as _zmq_proxy()_ do. Please, refer to this function -for the general description and usage. We describe here only the additional -control flow provided by the socket passed as the fourth argument "control". - -If the control socket is not NULL, the proxy supports control flow. If -'PAUSE' is received on this socket, the proxy suspends its activities. If -'RESUME' is received, it goes on. If 'TERMINATE' is received, it terminates -smoothly. If 'STATISTICS' is received, the proxy will reply on the control socket -sending a multipart message with 8 frames, each with an unsigned integer 64-bit -wide that provide in the following order: - - number of messages received by the frontend socket - - number of bytes received by the frontend socket - - number of messages sent out the frontend socket - - number of bytes sent out the frontend socket - - number of messages received by the backend socket - - number of bytes received by the backend socket - - number of messages sent out the backend socket - - number of bytes sent out the backend socket - -At start, the proxy runs normally as if zmq_proxy was used. - -If the control socket is NULL, the function behave exactly as if linkzmq:zmq_proxy[3] -had been called. - - -Refer to linkzmq:zmq_socket[3] for a description of the available socket types. -Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy. - -EXAMPLE USAGE -------------- -cf zmq_proxy - -RETURN VALUE ------------- -The _zmq_proxy_steerable()_ function returns 0 if TERMINATE is sent to its -control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* or -*EINTR* (the 0MQ 'context' associated with either of the specified sockets was -terminated) or *EFAULT* (the provided 'frontend' or 'backend' was invalid). - - -EXAMPLE -------- -.Creating a shared queue proxy ----- -// Create frontend, backend and control sockets -void *frontend = zmq_socket (context, ZMQ_ROUTER); -assert (frontend); -void *backend = zmq_socket (context, ZMQ_DEALER); -assert (backend); -void *control = zmq_socket (context, ZMQ_SUB); -assert (control); - -// Bind sockets to TCP ports -assert (zmq_bind (frontend, "tcp://*:5555") == 0); -assert (zmq_bind (backend, "tcp://*:5556") == 0); -assert (zmq_connect (control, "tcp://*:5557") == 0); - -// Subscribe to the control socket since we have chosen SUB here -assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0)); - -// Start the queue proxy, which runs until ETERM or "TERMINATE" -// received on the control socket -zmq_proxy_steerable (frontend, backend, NULL, control); ----- -.Set up a controller in another node, process or whatever ----- -void *control = zmq_socket (context, ZMQ_PUB); -assert (control); -assert (zmq_bind (control, "tcp://*:5557") == 0); - -// pause the proxy -assert (zmq_send (control, "PAUSE", 5, 0) == 0); - -// resume the proxy -assert (zmq_send (control, "RESUME", 6, 0) == 0); - -// terminate the proxy -assert (zmq_send (control, "TERMINATE", 9, 0) == 0); - -// check statistics -assert (zmq_send (control, "STATISTICS", 10, 0) == 0); -zmq_msg_t stats_msg; - -while (1) { - assert (zmq_msg_init (&stats_msg) == 0); - assert (zmq_recvmsg (control, &stats_msg, 0) == sizeof (uint64_t)); - assert (rc == sizeof (uint64_t)); - printf ("Stat: %lu\n", *(unsigned long int *)zmq_msg_data (&stats_msg)); - if (!zmq_msg_get (&stats_msg, ZMQ_MORE)) - break; - assert (zmq_msg_close (&stats_msg) == 0); -} -assert (zmq_msg_close (&stats_msg) == 0); ---- +The _zmq_proxy_steerable()_ function is an empty stub that only returns an +*EOPNOTSUPP* error, as the author did not provide a relicense agreement for +the Mozilla Public License v2 relicense of libzmq. SEE ALSO diff --git a/src/proxy.cpp b/src/proxy.cpp index 878fe2bf..a0096a62 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -56,7 +56,6 @@ do { \ delete poller_all; \ delete poller_in; \ - delete poller_control; \ delete poller_receive_blocked; \ delete poller_send_blocked; \ delete poller_both_blocked; \ @@ -75,20 +74,6 @@ #endif // ZMQ_HAVE_POLLER - -// Control socket messages - -typedef struct -{ - uint64_t msg_in; - uint64_t bytes_in; - uint64_t msg_out; - uint64_t bytes_out; -} zmq_socket_stats_t; - - -// Utility functions - static int capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0) { @@ -109,9 +94,7 @@ capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0) } static int forward (class zmq::socket_base_t *from_, - zmq_socket_stats_t *from_stats_, class zmq::socket_base_t *to_, - zmq_socket_stats_t *to_stats_, class zmq::socket_base_t *capture_, zmq::msg_t *msg_) { @@ -119,7 +102,6 @@ static int forward (class zmq::socket_base_t *from_, for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) { int more; size_t moresz; - size_t complete_msg_size = 0; // Forward all the parts of one message while (true) { @@ -131,8 +113,6 @@ static int forward (class zmq::socket_base_t *from_, return -1; } - complete_msg_size += msg_->size (); - moresz = sizeof more; rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); if (unlikely (rc < 0)) @@ -150,76 +130,15 @@ static int forward (class zmq::socket_base_t *from_, if (more == 0) break; } - - // A multipart message counts as 1 packet: - from_stats_->msg_in++; - from_stats_->bytes_in += complete_msg_size; - to_stats_->msg_out++; - to_stats_->bytes_out += complete_msg_size; } return 0; } -static int loop_and_send_multipart_stat (zmq::socket_base_t *control_, - uint64_t stat_, - bool first_, - bool more_) -{ - int rc; - zmq::msg_t msg; - - // VSM of 8 bytes can't fail to init - msg.init_size (sizeof (uint64_t)); - memcpy (msg.data (), &stat_, sizeof (uint64_t)); - - // if the first message is handed to the pipe successfully then the HWM - // is not full, which means failures are due to interrupts (on Windows pipes - // are TCP sockets), so keep retrying - do { - rc = control_->send (&msg, more_ ? ZMQ_SNDMORE : 0); - } while (!first_ && rc != 0 && errno == EAGAIN); - - return rc; -} - -static int reply_stats (zmq::socket_base_t *control_, - const zmq_socket_stats_t *frontend_stats_, - const zmq_socket_stats_t *backend_stats_) -{ - // first part: frontend stats - the first send might fail due to HWM - if (loop_and_send_multipart_stat (control_, frontend_stats_->msg_in, true, - true) - != 0) - return -1; - - loop_and_send_multipart_stat (control_, frontend_stats_->bytes_in, false, - true); - loop_and_send_multipart_stat (control_, frontend_stats_->msg_out, false, - true); - loop_and_send_multipart_stat (control_, frontend_stats_->bytes_out, false, - true); - - // second part: backend stats - loop_and_send_multipart_stat (control_, backend_stats_->msg_in, false, - true); - loop_and_send_multipart_stat (control_, backend_stats_->bytes_in, false, - true); - loop_and_send_multipart_stat (control_, backend_stats_->msg_out, false, - true); - loop_and_send_multipart_stat (control_, backend_stats_->bytes_out, false, - false); - - return 0; -} - - #ifdef ZMQ_HAVE_POLLER - int zmq::proxy (class socket_base_t *frontend_, class socket_base_t *backend_, - class socket_base_t *capture_, - class socket_base_t *control_) + class socket_base_t *capture_) { msg_t msg; int rc = msg.init (); @@ -229,9 +148,6 @@ int zmq::proxy (class socket_base_t *frontend_, // The algorithm below assumes ratio of requests and replies processed // under full load to be 1:1. - int more; - size_t moresz = sizeof (more); - // Proxy can be in these three states enum { @@ -245,12 +161,7 @@ int zmq::proxy (class socket_base_t *frontend_, bool frontend_out = false; bool backend_in = false; bool backend_out = false; - bool control_in = false; zmq::socket_poller_t::event_t events[3]; - zmq_socket_stats_t frontend_stats; - zmq_socket_stats_t backend_stats; - memset (&frontend_stats, 0, sizeof (frontend_stats)); - memset (&backend_stats, 0, sizeof (backend_stats)); // Don't allocate these pollers from stack because they will take more than 900 kB of stack! // On Windows this blows up default stack of 1 MB and aborts the program. @@ -259,8 +170,6 @@ int zmq::proxy (class socket_base_t *frontend_, new (std::nothrow) zmq::socket_poller_t; // Poll for everything. zmq::socket_poller_t *poller_in = new (std::nothrow) zmq:: socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop. - zmq::socket_poller_t *poller_control = new (std::nothrow) zmq:: - socket_poller_t; // Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused. zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'. @@ -290,7 +199,7 @@ int zmq::proxy (class socket_base_t *frontend_, } else frontend_equal_to_backend = true; - if (poller_all == NULL || poller_in == NULL || poller_control == NULL + if (poller_all == NULL || poller_in == NULL || poller_receive_blocked == NULL || ((poller_send_blocked == NULL || poller_both_blocked == NULL) && !frontend_equal_to_backend)) { @@ -350,30 +259,6 @@ int zmq::proxy (class socket_base_t *frontend_, CHECK_RC_EXIT_ON_FAILURE (); } - // Register 'control_' with pollers. - if (control_ != NULL) { - rc = poller_all->add (control_, NULL, ZMQ_POLLIN); - CHECK_RC_EXIT_ON_FAILURE (); - rc = poller_in->add (control_, NULL, ZMQ_POLLIN); - CHECK_RC_EXIT_ON_FAILURE (); - rc = poller_control->add ( - control_, NULL, - ZMQ_POLLIN); // When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket. - CHECK_RC_EXIT_ON_FAILURE (); - rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN); - CHECK_RC_EXIT_ON_FAILURE (); - if (!frontend_equal_to_backend) { - rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN); - CHECK_RC_EXIT_ON_FAILURE (); - rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN); - CHECK_RC_EXIT_ON_FAILURE (); - rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN); - CHECK_RC_EXIT_ON_FAILURE (); - rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN); - CHECK_RC_EXIT_ON_FAILURE (); - } - } - bool request_processed, reply_processed; while (state != terminated) { @@ -402,58 +287,14 @@ int zmq::proxy (class socket_base_t *frontend_, if (events[i].socket == backend_) { backend_in = (events[i].events & ZMQ_POLLIN) != 0; backend_out = (events[i].events & ZMQ_POLLOUT) != 0; - } else if (events[i].socket == control_) - control_in = (events[i].events & ZMQ_POLLIN) != 0; - } - - - // Process a control command if any. - if (control_in) { - rc = control_->recv (&msg, 0); - CHECK_RC_EXIT_ON_FAILURE (); - rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (unlikely (rc < 0) || more) { - PROXY_CLEANUP (); - return close_and_return (&msg, -1); - } - - // Copy message to capture socket if any. - rc = capture (capture_, &msg); - CHECK_RC_EXIT_ON_FAILURE (); - - if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) { - state = paused; - poller_wait = poller_control; - } else if (msg.size () == 6 - && memcmp (msg.data (), "RESUME", 6) == 0) { - state = active; - poller_wait = poller_in; - } else { - if (msg.size () == 9 - && memcmp (msg.data (), "TERMINATE", 9) == 0) - state = terminated; - else { - if (msg.size () == 10 - && memcmp (msg.data (), "STATISTICS", 10) == 0) { - rc = reply_stats (control_, &frontend_stats, - &backend_stats); - CHECK_RC_EXIT_ON_FAILURE (); - } else { - // This is an API error, we assert - puts ("E: invalid command sent to proxy"); - zmq_assert (false); - } } - } - control_in = false; } if (state == active) { // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'. // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event. if (frontend_in && (backend_out || frontend_equal_to_backend)) { - rc = forward (frontend_, &frontend_stats, backend_, - &backend_stats, capture_, &msg); + rc = forward (frontend_, backend_, capture_, &msg); CHECK_RC_EXIT_ON_FAILURE (); request_processed = true; frontend_in = backend_out = false; @@ -465,8 +306,7 @@ int zmq::proxy (class socket_base_t *frontend_, // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to // design in 'for' event processing loop. if (backend_in && frontend_out) { - rc = forward (backend_, &backend_stats, frontend_, - &frontend_stats, capture_, &msg); + rc = forward (backend_, frontend_, capture_, &msg); CHECK_RC_EXIT_ON_FAILURE (); reply_processed = true; backend_in = frontend_out = false; @@ -535,8 +375,7 @@ int zmq::proxy (class socket_base_t *frontend_, int zmq::proxy (class socket_base_t *frontend_, class socket_base_t *backend_, - class socket_base_t *capture_, - class socket_base_t *control_) + class socket_base_t *capture_) { msg_t msg; int rc = msg.init (); @@ -546,20 +385,12 @@ int zmq::proxy (class socket_base_t *frontend_, // The algorithm below assumes ratio of requests and replies processed // under full load to be 1:1. - int more; - size_t moresz; zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0}, - {backend_, 0, ZMQ_POLLIN, 0}, - {control_, 0, ZMQ_POLLIN, 0}}; - int qt_poll_items = (control_ ? 3 : 2); + {backend_, 0, ZMQ_POLLIN, 0}}; + int qt_poll_items = 2; zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0}, {backend_, 0, ZMQ_POLLOUT, 0}}; - zmq_socket_stats_t frontend_stats; - memset (&frontend_stats, 0, sizeof (frontend_stats)); - zmq_socket_stats_t backend_stats; - memset (&backend_stats, 0, sizeof (backend_stats)); - // Proxy can be in these three states enum { @@ -584,48 +415,9 @@ int zmq::proxy (class socket_base_t *frontend_, } } - // Process a control command if any - if (control_ && items[2].revents & ZMQ_POLLIN) { - rc = control_->recv (&msg, 0); - if (unlikely (rc < 0)) - return close_and_return (&msg, -1); - - moresz = sizeof more; - rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (unlikely (rc < 0) || more) - return close_and_return (&msg, -1); - - // Copy message to capture socket if any - rc = capture (capture_, &msg); - if (unlikely (rc < 0)) - return close_and_return (&msg, -1); - - if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) - state = paused; - else if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0) - state = active; - else if (msg.size () == 9 - && memcmp (msg.data (), "TERMINATE", 9) == 0) - state = terminated; - else { - if (msg.size () == 10 - && memcmp (msg.data (), "STATISTICS", 10) == 0) { - rc = - reply_stats (control_, &frontend_stats, &backend_stats); - if (unlikely (rc < 0)) - return close_and_return (&msg, -1); - } else { - // This is an API error, we assert - puts ("E: invalid command sent to proxy"); - zmq_assert (false); - } - } - } - // Process a request if (state == active && items[0].revents & ZMQ_POLLIN && (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) { - rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, - capture_, &msg); + rc = forward (frontend_, backend_, capture_, &msg); if (unlikely (rc < 0)) return close_and_return (&msg, -1); } @@ -633,8 +425,7 @@ int zmq::proxy (class socket_base_t *frontend_, if (state == active && frontend_ != backend_ && items[1].revents & ZMQ_POLLIN && itemsout[0].revents & ZMQ_POLLOUT) { - rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, - capture_, &msg); + rc = forward (backend_, frontend_, capture_, &msg); if (unlikely (rc < 0)) return close_and_return (&msg, -1); } diff --git a/src/proxy.hpp b/src/proxy.hpp index f078c6ae..c621cb45 100644 --- a/src/proxy.hpp +++ b/src/proxy.hpp @@ -34,9 +34,7 @@ namespace zmq { int proxy (class socket_base_t *frontend_, class socket_base_t *backend_, - class socket_base_t *capture_, - class socket_base_t *control_ = - NULL); // backward compatibility without this argument + class socket_base_t *capture_); } #endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 1733cafc..a9641a31 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1770,10 +1770,12 @@ int zmq_proxy_steerable (void *frontend_, errno = EFAULT; return -1; } - return zmq::proxy (static_cast (frontend_), - static_cast (backend_), - static_cast (capture_), - static_cast (control_)); +#ifdef ZMQ_HAVE_WINDOWS + errno = WSAEOPNOTSUPP; +#else + errno = EOPNOTSUPP; +#endif + return -1; } // The deprecated device functionality diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp index 16722e3c..93873428 100644 --- a/tests/test_proxy.cpp +++ b/tests/test_proxy.cpp @@ -33,8 +33,6 @@ #include #include -SETUP_TEARDOWN_TESTCONTEXT - #define CONTENT_SIZE 13 #define CONTENT_SIZE_MAX 32 #define ROUTING_ID_SIZE 10 @@ -48,22 +46,15 @@ struct thread_data int id; }; -typedef struct -{ - uint64_t msg_in; - uint64_t bytes_in; - uint64_t msg_out; - uint64_t bytes_out; -} zmq_socket_stats_t; - -typedef struct -{ - zmq_socket_stats_t frontend; - zmq_socket_stats_t backend; -} zmq_proxy_stats_t; - void *g_clients_pkts_out = NULL; void *g_workers_pkts_out = NULL; +void *control_context = NULL; + +void setUp () +{ + setup_test_context (); +} + // Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq // @@ -98,7 +89,7 @@ static void client_task (void *db_) TEST_ASSERT_NOT_NULL (client); // Control socket receives terminate command from main over inproc - void *control = zmq_socket (get_test_context (), ZMQ_SUB); + void *control = zmq_socket (control_context, ZMQ_SUB); TEST_ASSERT_NOT_NULL (control); TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0)); TEST_ASSERT_SUCCESS_ERRNO ( @@ -210,13 +201,6 @@ void server_task (void * /*unused_*/) zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger))); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "inproc://backend")); - // Control socket receives terminate command from main over inproc - void *control = zmq_socket (get_test_context (), ZMQ_REP); - TEST_ASSERT_NOT_NULL (control); - TEST_ASSERT_SUCCESS_ERRNO ( - zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger))); - TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control_proxy")); - // Launch pool of worker threads, precise number is not critical int thread_nbr; void *threads[5]; @@ -242,15 +226,13 @@ void server_task (void * /*unused_*/) } // Connect backend to frontend via a proxy - TEST_ASSERT_SUCCESS_ERRNO ( - zmq_proxy_steerable (frontend, backend, NULL, control)); + zmq_proxy (frontend, backend, NULL); for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) zmq_threadclose (threads[thread_nbr]); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (frontend)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (backend)); - TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control)); for (int i = 0; i < QT_CLIENTS; ++i) { TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint_receivers[i])); } @@ -270,7 +252,7 @@ static void server_worker (void * /*unused_*/) TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (worker, "inproc://backend")); // Control socket receives terminate command from main over inproc - void *control = zmq_socket (get_test_context (), ZMQ_SUB); + void *control = zmq_socket (control_context, ZMQ_SUB); TEST_ASSERT_NOT_NULL (control); TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0)); TEST_ASSERT_SUCCESS_ERRNO ( @@ -332,82 +314,6 @@ static void server_worker (void * /*unused_*/) TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control)); } -uint64_t recv_stat (void *sock_, bool last_) -{ - uint64_t res; - zmq_msg_t stats_msg; - - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg)); - TEST_ASSERT_EQUAL_INT (sizeof (uint64_t), - zmq_recvmsg (sock_, &stats_msg, 0)); - memcpy (&res, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg)); - TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg)); - - int more; - size_t moresz = sizeof more; - TEST_ASSERT_SUCCESS_ERRNO ( - zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz)); - TEST_ASSERT_TRUE ((last_ && !more) || (!last_ && more)); - - return res; -} - -// Utility function to interrogate the proxy: - -void check_proxy_stats (void *control_proxy_) -{ - zmq_proxy_stats_t total_stats; - - send_string_expect_success (control_proxy_, "STATISTICS", 0); - - // first frame of the reply contains FRONTEND stats: - total_stats.frontend.msg_in = recv_stat (control_proxy_, false); - total_stats.frontend.bytes_in = recv_stat (control_proxy_, false); - total_stats.frontend.msg_out = recv_stat (control_proxy_, false); - total_stats.frontend.bytes_out = recv_stat (control_proxy_, false); - - // second frame of the reply contains BACKEND stats: - total_stats.backend.msg_in = recv_stat (control_proxy_, false); - total_stats.backend.bytes_in = recv_stat (control_proxy_, false); - total_stats.backend.msg_out = recv_stat (control_proxy_, false); - total_stats.backend.bytes_out = recv_stat (control_proxy_, true); - - // check stats - - if (is_verbose) { - printf ( - "frontend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n", - static_cast (total_stats.frontend.msg_in), - static_cast (total_stats.frontend.bytes_in), - static_cast (total_stats.frontend.msg_out), - static_cast (total_stats.frontend.bytes_out)); - printf ( - "backend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n", - static_cast (total_stats.backend.msg_in), - static_cast (total_stats.backend.bytes_in), - static_cast (total_stats.backend.msg_out), - static_cast (total_stats.backend.bytes_out)); - - printf ("clients sent out %d requests\n", - zmq_atomic_counter_value (g_clients_pkts_out)); - printf ("workers sent out %d replies\n", - zmq_atomic_counter_value (g_workers_pkts_out)); - } - TEST_ASSERT_EQUAL_UINT ( - (unsigned) zmq_atomic_counter_value (g_clients_pkts_out), - total_stats.frontend.msg_in); - TEST_ASSERT_EQUAL_UINT ( - (unsigned) zmq_atomic_counter_value (g_workers_pkts_out), - total_stats.frontend.msg_out); - TEST_ASSERT_EQUAL_UINT ( - (unsigned) zmq_atomic_counter_value (g_workers_pkts_out), - total_stats.backend.msg_in); - TEST_ASSERT_EQUAL_UINT ( - (unsigned) zmq_atomic_counter_value (g_clients_pkts_out), - total_stats.backend.msg_out); -} - - // The main thread simply starts several clients and a server, and then // waits for the server to finish. @@ -415,21 +321,16 @@ void test_proxy () { g_clients_pkts_out = zmq_atomic_counter_new (); g_workers_pkts_out = zmq_atomic_counter_new (); + control_context = zmq_ctx_new (); + TEST_ASSERT_NOT_NULL (control_context); // Control socket receives terminate command from main over inproc - void *control = test_context_socket (ZMQ_PUB); + void *control = zmq_socket (control_context, ZMQ_PUB); int linger = 0; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger))); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (control, "inproc://control")); - // Control socket receives terminate command from main over inproc - void *control_proxy = test_context_socket (ZMQ_REQ); - TEST_ASSERT_SUCCESS_ERRNO ( - zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger))); - TEST_ASSERT_SUCCESS_ERRNO ( - zmq_bind (control_proxy, "inproc://control_proxy")); - void *threads[QT_CLIENTS + 1]; struct thread_data databags[QT_CLIENTS + 1]; for (int i = 0; i < QT_CLIENTS; i++) { @@ -445,23 +346,19 @@ void test_proxy () msleep (500); // Wait for all clients and workers to STOP - if (is_verbose) - printf ("retrieving stats from the proxy\n"); - check_proxy_stats (control_proxy); - if (is_verbose) printf ("shutting down all clients and server workers\n"); send_string_expect_success (control, "TERMINATE", 0); - if (is_verbose) - printf ("shutting down the proxy\n"); - send_string_expect_success (control_proxy, "TERMINATE", 0); + msleep (500); // Wait for all clients and workers to terminate - test_context_socket_close (control); - test_context_socket_close (control_proxy); + teardown_test_context (); for (int i = 0; i < QT_CLIENTS + 1; i++) zmq_threadclose (threads[i]); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_ctx_destroy (control_context)); } int main (void) diff --git a/tests/test_proxy_hwm.cpp b/tests/test_proxy_hwm.cpp index 463d8813..613fd344 100644 --- a/tests/test_proxy_hwm.cpp +++ b/tests/test_proxy_hwm.cpp @@ -192,80 +192,6 @@ static void subscriber_thread_main (void *pvoid_) zmq_close (subsocket); } -bool recv_stat (void *sock_, bool last_, uint64_t *res_) -{ - zmq_msg_t stats_msg; - - int rc = zmq_msg_init (&stats_msg); - assert (rc == 0); - - rc = zmq_msg_recv (&stats_msg, sock_, 0); //ZMQ_DONTWAIT); - if (rc == -1 && errno == EAGAIN) { - rc = zmq_msg_close (&stats_msg); - assert (rc == 0); - return false; // cannot retrieve the stat - } - - assert (rc == sizeof (uint64_t)); - memcpy (res_, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg)); - - rc = zmq_msg_close (&stats_msg); - assert (rc == 0); - - int more; - size_t moresz = sizeof more; - rc = zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz); - assert (rc == 0); - assert ((last_ && !more) || (!last_ && more)); - - return true; -} - -// Utility function to interrogate the proxy: - -typedef struct -{ - uint64_t msg_in; - uint64_t bytes_in; - uint64_t msg_out; - uint64_t bytes_out; -} zmq_socket_stats_t; - -typedef struct -{ - zmq_socket_stats_t frontend; - zmq_socket_stats_t backend; -} zmq_proxy_stats_t; - -bool check_proxy_stats (void *control_proxy_) -{ - zmq_proxy_stats_t total_stats; - int rc; - - rc = zmq_send (control_proxy_, "STATISTICS", 10, ZMQ_DONTWAIT); - assert (rc == 10 || (rc == -1 && errno == EAGAIN)); - if (rc == -1 && errno == EAGAIN) { - return false; - } - - // first frame of the reply contains FRONTEND stats: - if (!recv_stat (control_proxy_, false, &total_stats.frontend.msg_in)) { - return false; - } - - recv_stat (control_proxy_, false, &total_stats.frontend.bytes_in); - recv_stat (control_proxy_, false, &total_stats.frontend.msg_out); - recv_stat (control_proxy_, false, &total_stats.frontend.bytes_out); - - // second frame of the reply contains BACKEND stats: - recv_stat (control_proxy_, false, &total_stats.backend.msg_in); - recv_stat (control_proxy_, false, &total_stats.backend.bytes_in); - recv_stat (control_proxy_, false, &total_stats.backend.msg_out); - recv_stat (control_proxy_, true, &total_stats.backend.bytes_out); - - return true; -} - static void proxy_stats_asker_thread_main (void *pvoid_) { const proxy_hwm_cfg_t *const cfg = @@ -305,16 +231,9 @@ static void proxy_stats_asker_thread_main (void *pvoid_) // Start! while (!zmq_atomic_counter_value (cfg->subscriber_received_all)) { - check_proxy_stats (control_req); usleep (1000); // 1ms -> in best case we will get 1000updates/second } - - // Ask the proxy to exit: the subscriber has received all messages - - rc = zmq_send (control_req, "TERMINATE", 9, 0); - assert (rc == 9); - zmq_close (control_req); } @@ -371,7 +290,7 @@ static void proxy_thread_main (void *pvoid_) // start proxying! - zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep); + zmq_proxy (frontend_xsub, backend_xpub, NULL); zmq_close (frontend_xsub); zmq_close (backend_xpub); @@ -415,11 +334,12 @@ int main (void) zmq_threadclose (publisher); zmq_threadclose (subscriber); zmq_threadclose (asker); - zmq_threadclose (proxy); int rc = zmq_ctx_term (context); assert (rc == 0); + zmq_threadclose (proxy); + zmq_atomic_counter_destroy (&cfg.subscriber_received_all); return 0; diff --git a/tests/test_proxy_single_socket.cpp b/tests/test_proxy_single_socket.cpp index 8545267e..98d61b54 100644 --- a/tests/test_proxy_single_socket.cpp +++ b/tests/test_proxy_single_socket.cpp @@ -32,7 +32,10 @@ #include -SETUP_TEARDOWN_TESTCONTEXT +void setUp () +{ + setup_test_context (); +} // This is our server task. // It runs a proxy with a single REP socket as both frontend and backend. @@ -51,7 +54,7 @@ void server_task (void * /*unused_*/) send_string_expect_success (control, my_endpoint, 0); // Use rep as both frontend and backend - TEST_ASSERT_SUCCESS_ERRNO (zmq_proxy_steerable (rep, rep, NULL, control)); + zmq_proxy (rep, rep, NULL); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (rep)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control)); @@ -82,10 +85,9 @@ void test_proxy_single_socket () send_string_expect_success (req, "msg22", 0); recv_string_expect_success (req, "msg22", 0); - send_string_expect_success (control, "TERMINATE", 0); - test_context_socket_close (control); test_context_socket_close (req); + teardown_test_context (); free (my_endpoint); zmq_threadclose (server_thread); diff --git a/tests/test_proxy_terminate.cpp b/tests/test_proxy_terminate.cpp index 69daaded..ae62797e 100644 --- a/tests/test_proxy_terminate.cpp +++ b/tests/test_proxy_terminate.cpp @@ -32,7 +32,10 @@ #include -SETUP_TEARDOWN_TESTCONTEXT +void setUp () +{ + setup_test_context (); +} // This is a test for issue #1382. The server thread creates a SUB-PUSH // steerable proxy. The main process then sends messages to the SUB @@ -60,8 +63,7 @@ void server_task (void * /*unused_*/) send_string_expect_success (control, my_endpoint, 0); // Connect backend to frontend via a proxy - TEST_ASSERT_SUCCESS_ERRNO ( - zmq_proxy_steerable (frontend, backend, NULL, control)); + zmq_proxy (frontend, backend, NULL); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (frontend)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (backend)); @@ -97,10 +99,10 @@ void test_proxy_terminate () msleep (50); send_string_expect_success (publisher, "This is a test", 0); - send_string_expect_success (control, "TERMINATE", 0); test_context_socket_close (publisher); test_context_socket_close (control); + teardown_test_context (); free (my_endpoint); zmq_threadclose (thread);