From b8695a47b5a53649df669c88e7a7a8e6e2535d51 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Wed, 6 Sep 2017 01:28:28 +0100 Subject: [PATCH] Problem: proxy_steerable STATISTICS returns conflated buffers Solution: split each stat into its own frame, to make it simpler and easier to use it, especially from high level bindings --- src/proxy.cpp | 50 +++++++++++++++++++++++---------------- tests/test_proxy.cpp | 56 ++++++++++++++++++++++++-------------------- 2 files changed, 60 insertions(+), 46 deletions(-) diff --git a/src/proxy.cpp b/src/proxy.cpp index f933fa58..88483306 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -161,34 +161,44 @@ int forward ( return 0; } +static int loop_and_send_multipart_stat (zmq::socket_base_t *control_, + uint64_t stat, bool first, bool more) +{ + int rc; + zmq::msg_t msg; + + // VSM of 8 bytes can't fail to init + msg.init_size (sizeof (uint64_t)); + memcpy (msg.data (), (const void *)&stat, sizeof (uint64_t)); + + // if the first message is handled to the pipe successfully then the HWM + // is not full, which means failures are due to interrupts (on Windows pipes + // are TCP sockets), so keep retrying + do { + rc = control_->send (&msg, more ? ZMQ_SNDMORE : 0); + } while (!first && rc != 0 && errno == EAGAIN); + + return rc; +} + int reply_stats( class zmq::socket_base_t *control_, zmq_socket_stats_t* frontend_stats, zmq_socket_stats_t* backend_stats) { - // first part: frontend stats + // first part: frontend stats - the first send might fail due to HWM + if (loop_and_send_multipart_stat (control_, frontend_stats->msg_in, true, true) != 0) + return -1; - zmq::msg_t stats_msg1, stats_msg2; - int rc = stats_msg1.init_size (sizeof(zmq_socket_stats_t)); - if (unlikely (rc < 0)) - return close_and_return (&stats_msg1, -1); - - memcpy (stats_msg1.data(), (const void*) frontend_stats, sizeof(zmq_socket_stats_t)); - - rc = control_->send (&stats_msg1, ZMQ_SNDMORE); - if (unlikely (rc < 0)) - return close_and_return (&stats_msg1, -1); + loop_and_send_multipart_stat (control_, frontend_stats->bytes_in, false, true); + loop_and_send_multipart_stat (control_, frontend_stats->msg_out, false, true); + loop_and_send_multipart_stat (control_, frontend_stats->bytes_out, false, true); // second part: backend stats - - rc = stats_msg2.init_size (sizeof(zmq_socket_stats_t)); - if (unlikely (rc < 0)) - return close_and_return (&stats_msg2, -1); - memcpy (stats_msg2.data(), (const void*) backend_stats, sizeof(zmq_socket_stats_t)); - - rc = control_->send (&stats_msg2, 0); - if (unlikely (rc < 0)) - return close_and_return (&stats_msg2, -1); + loop_and_send_multipart_stat (control_, backend_stats->msg_in, true, true); + loop_and_send_multipart_stat (control_, backend_stats->bytes_in, false, true); + loop_and_send_multipart_stat (control_, backend_stats->msg_out, false, true); + loop_and_send_multipart_stat (control_, backend_stats->bytes_out, false, false); return 0; } diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp index e5eb5e6e..805d356f 100644 --- a/tests/test_proxy.cpp +++ b/tests/test_proxy.cpp @@ -335,6 +335,28 @@ server_worker (void *ctx) assert (rc == 0); } +uint64_t recv_stat (void *sock, bool last) +{ + uint64_t res; + zmq_msg_t stats_msg; + + int rc = zmq_msg_init (&stats_msg); + assert (rc == 0); + rc = zmq_recvmsg (sock, &stats_msg, 0); + assert (rc == sizeof(uint64_t)); + memcpy(&res, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg)); + rc = zmq_msg_close (&stats_msg); + assert (rc == 0); + + int more; + size_t moresz = sizeof more; + rc = zmq_getsockopt (sock, ZMQ_RCVMORE, &more, &moresz); + assert (rc == 0); + assert ((last && !more) || (!last && more)); + + return res; +} + // Utility function to interrogate the proxy: void check_proxy_stats(void *control_proxy) @@ -346,31 +368,16 @@ void check_proxy_stats(void *control_proxy) assert (rc == 10); // first frame of the reply contains FRONTEND stats: - - zmq_msg_t stats_msg; - rc = zmq_msg_init (&stats_msg); - assert (rc == 0); - rc = zmq_recvmsg (control_proxy, &stats_msg, 0); - assert (rc == sizeof(zmq_socket_stats_t)); - - memcpy(&total_stats.frontend, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg)); - + total_stats.frontend.msg_in = recv_stat (control_proxy, false); + total_stats.frontend.bytes_in = recv_stat (control_proxy, false); + total_stats.frontend.msg_out = recv_stat (control_proxy, false); + total_stats.frontend.bytes_out = recv_stat (control_proxy, false); // second frame of the reply contains BACKEND stats: - - int more; - size_t moresz = sizeof more; - rc = zmq_getsockopt (control_proxy, ZMQ_RCVMORE, &more, &moresz); - assert (rc == 0 && more == 1); - - rc = zmq_recvmsg (control_proxy, &stats_msg, 0); - assert (rc == sizeof(zmq_socket_stats_t)); - - memcpy(&total_stats.backend, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg)); - - rc = zmq_getsockopt (control_proxy, ZMQ_RCVMORE, &more, &moresz); - assert (rc == 0 && more == 0); - + total_stats.backend.msg_in = recv_stat (control_proxy, false); + total_stats.backend.bytes_in = recv_stat (control_proxy, false); + total_stats.backend.msg_out = recv_stat (control_proxy, false); + total_stats.backend.bytes_out = recv_stat (control_proxy, true); // check stats @@ -394,9 +401,6 @@ void check_proxy_stats(void *control_proxy) assert( total_stats.frontend.msg_out == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) ); assert( total_stats.backend.msg_in == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) ); assert( total_stats.backend.msg_out == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) ); - - rc = zmq_msg_close (&stats_msg); - assert (rc == 0); }