Merge pull request #4600 from nnog/fix-proxy-steerable

Fix zmq_proxy_steerable
This commit is contained in:
Luca Boccassi 2023-10-12 21:53:48 +01:00 committed by GitHub
commit 6b80df14f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 44 deletions

View File

@ -19,21 +19,21 @@ The _zmq_proxy_steerable()_ function is a variant of the _zmq_proxy()_ function.
It accepts a fourth _control_ socket. When the _control_ socket is _NULL_ the
two functions operate identically.
When a _control_ socket of type _REP_ is provided to the proxy function the
When a _control_ socket of type _REP_ or _PAIR_ is provided to the proxy function the
application may send commands to the proxy. The following commands are
supported.
_PAUSE_::
The proxy will cease transferring messages between its endpoints.
The proxy will cease transferring messages between its endpoints. _REP_ control socket will reply with an empty message. No response otherwise.
_RESUME_::
The proxy will resume transferring messages between its endpoints.
The proxy will resume transferring messages between its endpoints. _REP_ control socket will reply with an empty message. No response otherwise.
_TERMINATE_::
The proxy function will exit with a return value of 0.
The proxy function will exit with a return value of 0. _REP_ control socket will reply with an empty message. No response otherwise.
_STATISTICS_::
The proxy behavior will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next.
The proxy state will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next. Control socket must support sending.
There are eight statistics values, each of size _uint64_t_ in the multi-part
message reply to the _STATISTICS_ command. These are:
@ -54,6 +54,8 @@ message reply to the _STATISTICS_ command. These are:
- number of bytes sent by the backend socket
Message totals count each part in a multipart message individually.
RETURN VALUE
------------

View File

@ -189,19 +189,24 @@ static int handle_control (class zmq::socket_base_t *control_,
return 0;
}
if (msiz == 5 && memcmp (command, "\x05PAUSE", 6)) {
state = active;
} else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) {
if (msiz == 5 && 0 == memcmp (command, "PAUSE", 5)) {
state = paused;
} else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) {
state = active;
} else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) {
state = terminated;
}
// satisfy REP duty and reply no matter what.
cmsg.init_size (0);
rc = control_->send (&cmsg, 0);
if (unlikely (rc < 0)) {
return -1;
int type;
size_t sz = sizeof (type);
zmq_getsockopt (control_, ZMQ_TYPE, &type, &sz);
if (type == ZMQ_REP) {
// satisfy REP duty and reply no matter what.
cmsg.init_size (0);
rc = control_->send (&cmsg, 0);
if (unlikely (rc < 0)) {
return -1;
}
}
return 0;
}

View File

@ -5,6 +5,7 @@
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
@ -25,6 +26,9 @@ void *g_clients_pkts_out = NULL;
void *g_workers_pkts_out = NULL;
void *control_context = NULL; // worker control, not proxy control
int g_proxy_control_socktype =
ZMQ_PAIR; //or ZMQ_PAIR, ZMQ_SUB (without statistics)
void setUp ()
{
setup_test_context ();
@ -90,7 +94,7 @@ static void client_task (void *db_)
int request_nbr = 0;
bool run = true;
bool keep_sending = true;
bool enable_send = false;
while (run) {
// Tick once per 200 ms, pulling in arriving messages
int centitick;
@ -126,14 +130,17 @@ static void client_task (void *db_)
break;
}
if (memcmp (content, "STOP", 4) == 0) {
keep_sending = false;
enable_send = false;
break;
}
if (memcmp (content, "START", 5) == 0) {
enable_send = true;
}
}
}
}
if (keep_sending) {
if (enable_send) {
snprintf (content, CONTENT_SIZE_MAX * sizeof (char),
"request #%03d", ++request_nbr); // CONTENT_SIZE
if (is_verbose)
@ -203,9 +210,14 @@ void server_task (void * /*unused_*/)
}
// Proxy control socket
void *proxy_control = zmq_socket (get_test_context (), ZMQ_REP);
void *proxy_control =
zmq_socket (get_test_context (), g_proxy_control_socktype);
TEST_ASSERT_NOT_NULL (proxy_control);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (proxy_control, proxy_control_address));
if (g_proxy_control_socktype == ZMQ_SUB) {
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (proxy_control, ZMQ_SUBSCRIBE, "", 0));
}
// Connect backend to frontend via a steerable proxy
int rc = zmq_proxy_steerable (frontend, backend, NULL, proxy_control);
@ -319,6 +331,55 @@ static void server_worker (void * /*unused_*/)
//
// - 7/bsb: number of bytes sent out the backend socket
uint64_t read_stat_value (void *proxy_control)
{
zmq_msg_t stats_msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0));
TEST_ASSERT_EQUAL_INT (sizeof (uint64_t), zmq_msg_size (&stats_msg));
uint64_t val = *(uint64_t *) zmq_msg_data (&stats_msg);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
return val;
}
//return total bytes proxied, so we can test PAUSE/RESUME
uint64_t statistics (void *proxy_control, const char *runctx)
{
if (is_verbose) {
printf ("steer: sending STATISTICS - %s\n", runctx);
}
TEST_ASSERT_SUCCESS_ERRNO (zmq_send (proxy_control, "STATISTICS", 10, 0));
uint64_t total_bytes_proxied = 0;
for (int count = 0; count < 8; ++count) {
uint64_t val = read_stat_value (proxy_control);
if (is_verbose) {
if (count == 0) {
printf ("stats: client pkts out: %d worker pkts out: %d { ",
zmq_atomic_counter_value (g_clients_pkts_out),
zmq_atomic_counter_value (g_workers_pkts_out));
}
printf ("%" PRIu64, val);
if (count == 7) {
printf ("}\n");
}
}
switch (count) {
case 3: //bytes sent on frontend
case 7: //bytes sent on backend
total_bytes_proxied += val;
}
}
int rcvmore;
size_t sz = sizeof (rcvmore);
zmq_getsockopt (proxy_control, ZMQ_RCVMORE, &rcvmore, &sz);
TEST_ASSERT_EQUAL_INT (rcvmore, 0);
return total_bytes_proxied;
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
@ -328,32 +389,18 @@ void steer (void *proxy_control, const char *command, const char *runctx)
printf ("steer: sending %s - %s\n", command, runctx);
}
// Start with proxy paused
TEST_ASSERT_SUCCESS_ERRNO (
zmq_send (proxy_control, command, strlen (command), 0));
zmq_msg_t stats_msg;
int count = -1;
while (1) {
count = count + 1;
if (g_proxy_control_socktype == ZMQ_REP) {
//expect an empty reply from REP for commands that need no response
zmq_msg_t stats_msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0));
if (is_verbose && zmq_msg_size (&stats_msg)) {
if (count == 0) {
printf ("steer:");
}
printf (" %lu", *(unsigned long int *) zmq_msg_data (&stats_msg));
if (count == 7) {
printf ("\n");
}
}
if (!zmq_msg_get (&stats_msg, ZMQ_MORE))
break;
TEST_ASSERT_EQUAL_INT (zmq_msg_size (&stats_msg), 0);
TEST_ASSERT (!zmq_msg_get (&stats_msg, ZMQ_MORE));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
}
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
}
void test_proxy_steerable ()
@ -382,31 +429,51 @@ void test_proxy_steerable ()
msleep (500); // Run for 500 ms then quit
// Proxy control socket
void *proxy_control = zmq_socket (get_test_context (), ZMQ_REQ);
int control_socktype = ZMQ_PAIR;
switch (g_proxy_control_socktype) {
case ZMQ_REP:
control_socktype = ZMQ_REQ;
break;
case ZMQ_SUB:
control_socktype = ZMQ_PUB;
break;
default:
break;
}
void *proxy_control = zmq_socket (get_test_context (), control_socktype);
TEST_ASSERT_NOT_NULL (proxy_control);
linger = 0;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (proxy_control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (proxy_control, proxy_control_address));
msleep (500); // Run for 500 ms then quit
steer (proxy_control, "STATISTICS", "started clients");
steer (proxy_control, "PAUSE", "started server");
TEST_ASSERT (
statistics (proxy_control, "should be all 0s before clients start") == 0);
send_string_expect_success (control, "START", 0);
msleep (500); // Run for 500 ms then quit
steer (proxy_control, "RESUME", "started clients");
TEST_ASSERT (statistics (proxy_control, "started clients") > 0);
steer (proxy_control, "PAUSE", "pausing proxying after 500ms");
uint64_t bytes = statistics (proxy_control, "post-pause");
msleep (500); // Run for 500 ms then quit
steer (proxy_control, "STATISTICS", "ran for a while");
TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes);
steer (proxy_control, "RESUME", "resuming proxying after another 500ms");
msleep (500); // Run for 500 ms then quit
TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes);
if (is_verbose)
printf ("stopping all clients and server workers\n");
send_string_expect_success (control, "STOP", 0);
steer (proxy_control, "STATISTICS", "stopped clients and workers");
statistics (proxy_control, "stopped clients and workers");
msleep (500); // Wait for all clients and workers to STOP
@ -415,7 +482,7 @@ void test_proxy_steerable ()
send_string_expect_success (control, "TERMINATE", 0);
msleep (500);
steer (proxy_control, "STATISTICS", "terminate clients and server workers");
statistics (proxy_control, "terminate clients and server workers");
msleep (500); // Wait for all clients and workers to terminate
steer (proxy_control, "TERMINATE", "terminate proxy");