mirror of
				https://github.com/zeromq/libzmq.git
				synced 2025-11-04 04:10:00 +01:00 
			
		
		
		
	Problem: proxy_steerable STATISTICS returns conflated buffers
Solution: split each stat into its own frame, to make it simpler and easier to use it, especially from high level bindings
This commit is contained in:
		@@ -161,34 +161,44 @@ int forward (
 | 
				
			|||||||
    return 0;
 | 
					    return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
 | 
				
			||||||
 | 
					        uint64_t stat, bool first, bool more)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    int rc;
 | 
				
			||||||
 | 
					    zmq::msg_t msg;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    //  VSM of 8 bytes can't fail to init
 | 
				
			||||||
 | 
					    msg.init_size (sizeof (uint64_t));
 | 
				
			||||||
 | 
					    memcpy (msg.data (), (const void *)&stat, sizeof (uint64_t));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    //  if the first message is handled to the pipe successfully then the HWM
 | 
				
			||||||
 | 
					    //  is not full, which means failures are due to interrupts (on Windows pipes
 | 
				
			||||||
 | 
					    //  are TCP sockets), so keep retrying
 | 
				
			||||||
 | 
					    do {
 | 
				
			||||||
 | 
					        rc = control_->send (&msg, more ? ZMQ_SNDMORE : 0);
 | 
				
			||||||
 | 
					    } while (!first && rc != 0 && errno == EAGAIN);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return rc;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
int reply_stats(
 | 
					int reply_stats(
 | 
				
			||||||
        class zmq::socket_base_t *control_,
 | 
					        class zmq::socket_base_t *control_,
 | 
				
			||||||
        zmq_socket_stats_t* frontend_stats,
 | 
					        zmq_socket_stats_t* frontend_stats,
 | 
				
			||||||
        zmq_socket_stats_t* backend_stats)
 | 
					        zmq_socket_stats_t* backend_stats)
 | 
				
			||||||
{
 | 
					{
 | 
				
			||||||
    // first part: frontend stats
 | 
					    // first part: frontend stats - the first send might fail due to HWM
 | 
				
			||||||
 | 
					    if (loop_and_send_multipart_stat (control_, frontend_stats->msg_in, true, true) != 0)
 | 
				
			||||||
 | 
					        return -1;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    zmq::msg_t stats_msg1, stats_msg2;
 | 
					    loop_and_send_multipart_stat (control_, frontend_stats->bytes_in, false, true);
 | 
				
			||||||
    int rc = stats_msg1.init_size (sizeof(zmq_socket_stats_t));
 | 
					    loop_and_send_multipart_stat (control_, frontend_stats->msg_out, false, true);
 | 
				
			||||||
    if (unlikely (rc < 0))
 | 
					    loop_and_send_multipart_stat (control_, frontend_stats->bytes_out, false, true);
 | 
				
			||||||
        return close_and_return (&stats_msg1, -1);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    memcpy (stats_msg1.data(), (const void*) frontend_stats, sizeof(zmq_socket_stats_t));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    rc = control_->send (&stats_msg1, ZMQ_SNDMORE);
 | 
					 | 
				
			||||||
    if (unlikely (rc < 0))
 | 
					 | 
				
			||||||
        return close_and_return (&stats_msg1, -1);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // second part: backend stats
 | 
					    // second part: backend stats
 | 
				
			||||||
 | 
					    loop_and_send_multipart_stat (control_, backend_stats->msg_in, true, true);
 | 
				
			||||||
    rc = stats_msg2.init_size (sizeof(zmq_socket_stats_t));
 | 
					    loop_and_send_multipart_stat (control_, backend_stats->bytes_in, false, true);
 | 
				
			||||||
    if (unlikely (rc < 0))
 | 
					    loop_and_send_multipart_stat (control_, backend_stats->msg_out, false, true);
 | 
				
			||||||
        return close_and_return (&stats_msg2, -1);
 | 
					    loop_and_send_multipart_stat (control_, backend_stats->bytes_out, false, false);
 | 
				
			||||||
    memcpy (stats_msg2.data(), (const void*) backend_stats, sizeof(zmq_socket_stats_t));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    rc = control_->send (&stats_msg2, 0);
 | 
					 | 
				
			||||||
    if (unlikely (rc < 0))
 | 
					 | 
				
			||||||
        return close_and_return (&stats_msg2, -1);
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    return 0;
 | 
					    return 0;
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 
 | 
				
			|||||||
@@ -335,6 +335,28 @@ server_worker (void *ctx)
 | 
				
			|||||||
    assert (rc == 0);
 | 
					    assert (rc == 0);
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					uint64_t recv_stat (void *sock, bool last)
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    uint64_t res;
 | 
				
			||||||
 | 
					    zmq_msg_t stats_msg;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    int rc = zmq_msg_init (&stats_msg);
 | 
				
			||||||
 | 
					    assert (rc == 0);
 | 
				
			||||||
 | 
					    rc = zmq_recvmsg (sock, &stats_msg, 0);
 | 
				
			||||||
 | 
					    assert (rc == sizeof(uint64_t));
 | 
				
			||||||
 | 
					    memcpy(&res, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg));
 | 
				
			||||||
 | 
					    rc = zmq_msg_close (&stats_msg);
 | 
				
			||||||
 | 
					    assert (rc == 0);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    int more;
 | 
				
			||||||
 | 
					    size_t moresz = sizeof more;
 | 
				
			||||||
 | 
					    rc = zmq_getsockopt (sock, ZMQ_RCVMORE, &more, &moresz);
 | 
				
			||||||
 | 
					    assert (rc == 0);
 | 
				
			||||||
 | 
					    assert ((last && !more) || (!last && more));
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					    return res;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Utility function to interrogate the proxy:
 | 
					// Utility function to interrogate the proxy:
 | 
				
			||||||
 | 
					
 | 
				
			||||||
void check_proxy_stats(void *control_proxy)
 | 
					void check_proxy_stats(void *control_proxy)
 | 
				
			||||||
@@ -346,31 +368,16 @@ void check_proxy_stats(void *control_proxy)
 | 
				
			|||||||
    assert (rc == 10);
 | 
					    assert (rc == 10);
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // first frame of the reply contains FRONTEND stats:
 | 
					    // first frame of the reply contains FRONTEND stats:
 | 
				
			||||||
 | 
					    total_stats.frontend.msg_in = recv_stat (control_proxy, false);
 | 
				
			||||||
    zmq_msg_t stats_msg;
 | 
					    total_stats.frontend.bytes_in = recv_stat (control_proxy, false);
 | 
				
			||||||
    rc = zmq_msg_init (&stats_msg);
 | 
					    total_stats.frontend.msg_out = recv_stat (control_proxy, false);
 | 
				
			||||||
    assert (rc == 0);
 | 
					    total_stats.frontend.bytes_out = recv_stat (control_proxy, false);
 | 
				
			||||||
    rc = zmq_recvmsg (control_proxy, &stats_msg, 0);
 | 
					 | 
				
			||||||
    assert (rc == sizeof(zmq_socket_stats_t));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    memcpy(&total_stats.frontend, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // second frame of the reply contains BACKEND stats:
 | 
					    // second frame of the reply contains BACKEND stats:
 | 
				
			||||||
 | 
					    total_stats.backend.msg_in = recv_stat (control_proxy, false);
 | 
				
			||||||
    int more;
 | 
					    total_stats.backend.bytes_in = recv_stat (control_proxy, false);
 | 
				
			||||||
    size_t moresz = sizeof more;
 | 
					    total_stats.backend.msg_out = recv_stat (control_proxy, false);
 | 
				
			||||||
    rc = zmq_getsockopt (control_proxy, ZMQ_RCVMORE, &more, &moresz);
 | 
					    total_stats.backend.bytes_out = recv_stat (control_proxy, true);
 | 
				
			||||||
    assert (rc == 0 && more == 1);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    rc = zmq_recvmsg (control_proxy, &stats_msg, 0);
 | 
					 | 
				
			||||||
    assert (rc == sizeof(zmq_socket_stats_t));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    memcpy(&total_stats.backend, zmq_msg_data(&stats_msg), zmq_msg_size(&stats_msg));
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
    rc = zmq_getsockopt (control_proxy, ZMQ_RCVMORE, &more, &moresz);
 | 
					 | 
				
			||||||
    assert (rc == 0 && more == 0);
 | 
					 | 
				
			||||||
 | 
					 | 
				
			||||||
 | 
					
 | 
				
			||||||
    // check stats
 | 
					    // check stats
 | 
				
			||||||
 | 
					
 | 
				
			||||||
@@ -394,9 +401,6 @@ void check_proxy_stats(void *control_proxy)
 | 
				
			|||||||
    assert( total_stats.frontend.msg_out == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
 | 
					    assert( total_stats.frontend.msg_out == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
 | 
				
			||||||
    assert( total_stats.backend.msg_in == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
 | 
					    assert( total_stats.backend.msg_in == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
 | 
				
			||||||
    assert( total_stats.backend.msg_out == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) );
 | 
					    assert( total_stats.backend.msg_out == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) );
 | 
				
			||||||
 | 
					 | 
				
			||||||
    rc = zmq_msg_close (&stats_msg);
 | 
					 | 
				
			||||||
    assert (rc == 0);
 | 
					 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 
 | 
				
			|||||||
		Reference in New Issue
	
	Block a user