Problem: no permission to relicense zmq_proxy_steerable

Solution: remove implementation. Laurent Alebarde <l.alebarde@free.fr>,
the author, did not respond to requests to allow relicensing to MPL2,
so we have to remove his copyrighted work.
Make the API into an empty stub that returns -EOPNOTSUPP.
This commit is contained in:
Luca Boccassi 2023-02-02 02:11:33 +00:00
parent eaaea73bf1
commit 13bc1de421
9 changed files with 53 additions and 534 deletions

View File

@ -69,7 +69,6 @@ Jon Dyte
Kamil Shakirov Kamil Shakirov
Ken Steele Ken Steele
Kouhei Sutou Kouhei Sutou
Laurent Alebarde
Leonardo J. Consoni Leonardo J. Consoni
Lionel Flandrin Lionel Flandrin
Lourens Naudé Lourens Naudé

View File

@ -3,7 +3,7 @@ zmq_proxy_steerable(3)
NAME NAME
---- ----
zmq_proxy_steerable - built-in 0MQ proxy with control flow zmq_proxy_steerable - DEPRECATED
SYNOPSIS SYNOPSIS
@ -14,101 +14,9 @@ SYNOPSIS
DESCRIPTION DESCRIPTION
----------- -----------
The _zmq_proxy_steerable()_ function starts the built-in 0MQ proxy in the The _zmq_proxy_steerable()_ function is an empty stub that only returns an
current application thread, as _zmq_proxy()_ do. Please, refer to this function *EOPNOTSUPP* error, as the author did not provide a relicense agreement for
for the general description and usage. We describe here only the additional the Mozilla Public License v2 relicense of libzmq.
control flow provided by the socket passed as the fourth argument "control".
If the control socket is not NULL, the proxy supports control flow. If
'PAUSE' is received on this socket, the proxy suspends its activities. If
'RESUME' is received, it goes on. If 'TERMINATE' is received, it terminates
smoothly. If 'STATISTICS' is received, the proxy will reply on the control socket
sending a multipart message with 8 frames, each with an unsigned integer 64-bit
wide that provide in the following order:
- number of messages received by the frontend socket
- number of bytes received by the frontend socket
- number of messages sent out the frontend socket
- number of bytes sent out the frontend socket
- number of messages received by the backend socket
- number of bytes received by the backend socket
- number of messages sent out the backend socket
- number of bytes sent out the backend socket
At start, the proxy runs normally as if zmq_proxy was used.
If the control socket is NULL, the function behave exactly as if linkzmq:zmq_proxy[3]
had been called.
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy.
EXAMPLE USAGE
-------------
cf zmq_proxy
RETURN VALUE
------------
The _zmq_proxy_steerable()_ function returns 0 if TERMINATE is sent to its
control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* or
*EINTR* (the 0MQ 'context' associated with either of the specified sockets was
terminated) or *EFAULT* (the provided 'frontend' or 'backend' was invalid).
EXAMPLE
-------
.Creating a shared queue proxy
----
// Create frontend, backend and control sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (frontend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (backend);
void *control = zmq_socket (context, ZMQ_SUB);
assert (control);
// Bind sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
assert (zmq_connect (control, "tcp://*:5557") == 0);
// Subscribe to the control socket since we have chosen SUB here
assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
// Start the queue proxy, which runs until ETERM or "TERMINATE"
// received on the control socket
zmq_proxy_steerable (frontend, backend, NULL, control);
----
.Set up a controller in another node, process or whatever
----
void *control = zmq_socket (context, ZMQ_PUB);
assert (control);
assert (zmq_bind (control, "tcp://*:5557") == 0);
// pause the proxy
assert (zmq_send (control, "PAUSE", 5, 0) == 0);
// resume the proxy
assert (zmq_send (control, "RESUME", 6, 0) == 0);
// terminate the proxy
assert (zmq_send (control, "TERMINATE", 9, 0) == 0);
// check statistics
assert (zmq_send (control, "STATISTICS", 10, 0) == 0);
zmq_msg_t stats_msg;
while (1) {
assert (zmq_msg_init (&stats_msg) == 0);
assert (zmq_recvmsg (control, &stats_msg, 0) == sizeof (uint64_t));
assert (rc == sizeof (uint64_t));
printf ("Stat: %lu\n", *(unsigned long int *)zmq_msg_data (&stats_msg));
if (!zmq_msg_get (&stats_msg, ZMQ_MORE))
break;
assert (zmq_msg_close (&stats_msg) == 0);
}
assert (zmq_msg_close (&stats_msg) == 0);
---
SEE ALSO SEE ALSO

View File

@ -56,7 +56,6 @@
do { \ do { \
delete poller_all; \ delete poller_all; \
delete poller_in; \ delete poller_in; \
delete poller_control; \
delete poller_receive_blocked; \ delete poller_receive_blocked; \
delete poller_send_blocked; \ delete poller_send_blocked; \
delete poller_both_blocked; \ delete poller_both_blocked; \
@ -75,20 +74,6 @@
#endif // ZMQ_HAVE_POLLER #endif // ZMQ_HAVE_POLLER
// Control socket messages
typedef struct
{
uint64_t msg_in;
uint64_t bytes_in;
uint64_t msg_out;
uint64_t bytes_out;
} zmq_socket_stats_t;
// Utility functions
static int static int
capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0) capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0)
{ {
@ -109,9 +94,7 @@ capture (class zmq::socket_base_t *capture_, zmq::msg_t *msg_, int more_ = 0)
} }
static int forward (class zmq::socket_base_t *from_, static int forward (class zmq::socket_base_t *from_,
zmq_socket_stats_t *from_stats_,
class zmq::socket_base_t *to_, class zmq::socket_base_t *to_,
zmq_socket_stats_t *to_stats_,
class zmq::socket_base_t *capture_, class zmq::socket_base_t *capture_,
zmq::msg_t *msg_) zmq::msg_t *msg_)
{ {
@ -119,7 +102,6 @@ static int forward (class zmq::socket_base_t *from_,
for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) { for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) {
int more; int more;
size_t moresz; size_t moresz;
size_t complete_msg_size = 0;
// Forward all the parts of one message // Forward all the parts of one message
while (true) { while (true) {
@ -131,8 +113,6 @@ static int forward (class zmq::socket_base_t *from_,
return -1; return -1;
} }
complete_msg_size += msg_->size ();
moresz = sizeof more; moresz = sizeof more;
rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
@ -150,76 +130,15 @@ static int forward (class zmq::socket_base_t *from_,
if (more == 0) if (more == 0)
break; break;
} }
// A multipart message counts as 1 packet:
from_stats_->msg_in++;
from_stats_->bytes_in += complete_msg_size;
to_stats_->msg_out++;
to_stats_->bytes_out += complete_msg_size;
} }
return 0; return 0;
} }
static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
uint64_t stat_,
bool first_,
bool more_)
{
int rc;
zmq::msg_t msg;
// VSM of 8 bytes can't fail to init
msg.init_size (sizeof (uint64_t));
memcpy (msg.data (), &stat_, sizeof (uint64_t));
// if the first message is handed to the pipe successfully then the HWM
// is not full, which means failures are due to interrupts (on Windows pipes
// are TCP sockets), so keep retrying
do {
rc = control_->send (&msg, more_ ? ZMQ_SNDMORE : 0);
} while (!first_ && rc != 0 && errno == EAGAIN);
return rc;
}
static int reply_stats (zmq::socket_base_t *control_,
const zmq_socket_stats_t *frontend_stats_,
const zmq_socket_stats_t *backend_stats_)
{
// first part: frontend stats - the first send might fail due to HWM
if (loop_and_send_multipart_stat (control_, frontend_stats_->msg_in, true,
true)
!= 0)
return -1;
loop_and_send_multipart_stat (control_, frontend_stats_->bytes_in, false,
true);
loop_and_send_multipart_stat (control_, frontend_stats_->msg_out, false,
true);
loop_and_send_multipart_stat (control_, frontend_stats_->bytes_out, false,
true);
// second part: backend stats
loop_and_send_multipart_stat (control_, backend_stats_->msg_in, false,
true);
loop_and_send_multipart_stat (control_, backend_stats_->bytes_in, false,
true);
loop_and_send_multipart_stat (control_, backend_stats_->msg_out, false,
true);
loop_and_send_multipart_stat (control_, backend_stats_->bytes_out, false,
false);
return 0;
}
#ifdef ZMQ_HAVE_POLLER #ifdef ZMQ_HAVE_POLLER
int zmq::proxy (class socket_base_t *frontend_, int zmq::proxy (class socket_base_t *frontend_,
class socket_base_t *backend_, class socket_base_t *backend_,
class socket_base_t *capture_, class socket_base_t *capture_)
class socket_base_t *control_)
{ {
msg_t msg; msg_t msg;
int rc = msg.init (); int rc = msg.init ();
@ -229,9 +148,6 @@ int zmq::proxy (class socket_base_t *frontend_,
// The algorithm below assumes ratio of requests and replies processed // The algorithm below assumes ratio of requests and replies processed
// under full load to be 1:1. // under full load to be 1:1.
int more;
size_t moresz = sizeof (more);
// Proxy can be in these three states // Proxy can be in these three states
enum enum
{ {
@ -245,12 +161,7 @@ int zmq::proxy (class socket_base_t *frontend_,
bool frontend_out = false; bool frontend_out = false;
bool backend_in = false; bool backend_in = false;
bool backend_out = false; bool backend_out = false;
bool control_in = false;
zmq::socket_poller_t::event_t events[3]; zmq::socket_poller_t::event_t events[3];
zmq_socket_stats_t frontend_stats;
zmq_socket_stats_t backend_stats;
memset (&frontend_stats, 0, sizeof (frontend_stats));
memset (&backend_stats, 0, sizeof (backend_stats));
// Don't allocate these pollers from stack because they will take more than 900 kB of stack! // Don't allocate these pollers from stack because they will take more than 900 kB of stack!
// On Windows this blows up default stack of 1 MB and aborts the program. // On Windows this blows up default stack of 1 MB and aborts the program.
@ -259,8 +170,6 @@ int zmq::proxy (class socket_base_t *frontend_,
new (std::nothrow) zmq::socket_poller_t; // Poll for everything. new (std::nothrow) zmq::socket_poller_t; // Poll for everything.
zmq::socket_poller_t *poller_in = new (std::nothrow) zmq:: zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::
socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop. socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop.
zmq::socket_poller_t *poller_control = new (std::nothrow) zmq::
socket_poller_t; // Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused.
zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow) zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow)
zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'. zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'.
@ -290,7 +199,7 @@ int zmq::proxy (class socket_base_t *frontend_,
} else } else
frontend_equal_to_backend = true; frontend_equal_to_backend = true;
if (poller_all == NULL || poller_in == NULL || poller_control == NULL if (poller_all == NULL || poller_in == NULL
|| poller_receive_blocked == NULL || poller_receive_blocked == NULL
|| ((poller_send_blocked == NULL || poller_both_blocked == NULL) || ((poller_send_blocked == NULL || poller_both_blocked == NULL)
&& !frontend_equal_to_backend)) { && !frontend_equal_to_backend)) {
@ -350,30 +259,6 @@ int zmq::proxy (class socket_base_t *frontend_,
CHECK_RC_EXIT_ON_FAILURE (); CHECK_RC_EXIT_ON_FAILURE ();
} }
// Register 'control_' with pollers.
if (control_ != NULL) {
rc = poller_all->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_in->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_control->add (
control_, NULL,
ZMQ_POLLIN); // When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
if (!frontend_equal_to_backend) {
rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
}
}
bool request_processed, reply_processed; bool request_processed, reply_processed;
while (state != terminated) { while (state != terminated) {
@ -402,58 +287,14 @@ int zmq::proxy (class socket_base_t *frontend_,
if (events[i].socket == backend_) { if (events[i].socket == backend_) {
backend_in = (events[i].events & ZMQ_POLLIN) != 0; backend_in = (events[i].events & ZMQ_POLLIN) != 0;
backend_out = (events[i].events & ZMQ_POLLOUT) != 0; backend_out = (events[i].events & ZMQ_POLLOUT) != 0;
} else if (events[i].socket == control_)
control_in = (events[i].events & ZMQ_POLLIN) != 0;
}
// Process a control command if any.
if (control_in) {
rc = control_->recv (&msg, 0);
CHECK_RC_EXIT_ON_FAILURE ();
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0) || more) {
PROXY_CLEANUP ();
return close_and_return (&msg, -1);
}
// Copy message to capture socket if any.
rc = capture (capture_, &msg);
CHECK_RC_EXIT_ON_FAILURE ();
if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) {
state = paused;
poller_wait = poller_control;
} else if (msg.size () == 6
&& memcmp (msg.data (), "RESUME", 6) == 0) {
state = active;
poller_wait = poller_in;
} else {
if (msg.size () == 9
&& memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated;
else {
if (msg.size () == 10
&& memcmp (msg.data (), "STATISTICS", 10) == 0) {
rc = reply_stats (control_, &frontend_stats,
&backend_stats);
CHECK_RC_EXIT_ON_FAILURE ();
} else {
// This is an API error, we assert
puts ("E: invalid command sent to proxy");
zmq_assert (false);
}
} }
}
control_in = false;
} }
if (state == active) { if (state == active) {
// Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'. // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
// In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event. // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
if (frontend_in && (backend_out || frontend_equal_to_backend)) { if (frontend_in && (backend_out || frontend_equal_to_backend)) {
rc = forward (frontend_, &frontend_stats, backend_, rc = forward (frontend_, backend_, capture_, &msg);
&backend_stats, capture_, &msg);
CHECK_RC_EXIT_ON_FAILURE (); CHECK_RC_EXIT_ON_FAILURE ();
request_processed = true; request_processed = true;
frontend_in = backend_out = false; frontend_in = backend_out = false;
@ -465,8 +306,7 @@ int zmq::proxy (class socket_base_t *frontend_,
// covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
// design in 'for' event processing loop. // design in 'for' event processing loop.
if (backend_in && frontend_out) { if (backend_in && frontend_out) {
rc = forward (backend_, &backend_stats, frontend_, rc = forward (backend_, frontend_, capture_, &msg);
&frontend_stats, capture_, &msg);
CHECK_RC_EXIT_ON_FAILURE (); CHECK_RC_EXIT_ON_FAILURE ();
reply_processed = true; reply_processed = true;
backend_in = frontend_out = false; backend_in = frontend_out = false;
@ -535,8 +375,7 @@ int zmq::proxy (class socket_base_t *frontend_,
int zmq::proxy (class socket_base_t *frontend_, int zmq::proxy (class socket_base_t *frontend_,
class socket_base_t *backend_, class socket_base_t *backend_,
class socket_base_t *capture_, class socket_base_t *capture_)
class socket_base_t *control_)
{ {
msg_t msg; msg_t msg;
int rc = msg.init (); int rc = msg.init ();
@ -546,20 +385,12 @@ int zmq::proxy (class socket_base_t *frontend_,
// The algorithm below assumes ratio of requests and replies processed // The algorithm below assumes ratio of requests and replies processed
// under full load to be 1:1. // under full load to be 1:1.
int more;
size_t moresz;
zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0}, zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0},
{backend_, 0, ZMQ_POLLIN, 0}, {backend_, 0, ZMQ_POLLIN, 0}};
{control_, 0, ZMQ_POLLIN, 0}}; int qt_poll_items = 2;
int qt_poll_items = (control_ ? 3 : 2);
zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0}, zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0},
{backend_, 0, ZMQ_POLLOUT, 0}}; {backend_, 0, ZMQ_POLLOUT, 0}};
zmq_socket_stats_t frontend_stats;
memset (&frontend_stats, 0, sizeof (frontend_stats));
zmq_socket_stats_t backend_stats;
memset (&backend_stats, 0, sizeof (backend_stats));
// Proxy can be in these three states // Proxy can be in these three states
enum enum
{ {
@ -584,48 +415,9 @@ int zmq::proxy (class socket_base_t *frontend_,
} }
} }
// Process a control command if any
if (control_ && items[2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
moresz = sizeof more;
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0) || more)
return close_and_return (&msg, -1);
// Copy message to capture socket if any
rc = capture (capture_, &msg);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
state = paused;
else if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
state = active;
else if (msg.size () == 9
&& memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated;
else {
if (msg.size () == 10
&& memcmp (msg.data (), "STATISTICS", 10) == 0) {
rc =
reply_stats (control_, &frontend_stats, &backend_stats);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
} else {
// This is an API error, we assert
puts ("E: invalid command sent to proxy");
zmq_assert (false);
}
}
}
// Process a request
if (state == active && items[0].revents & ZMQ_POLLIN if (state == active && items[0].revents & ZMQ_POLLIN
&& (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) { && (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) {
rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, rc = forward (frontend_, backend_, capture_, &msg);
capture_, &msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return close_and_return (&msg, -1); return close_and_return (&msg, -1);
} }
@ -633,8 +425,7 @@ int zmq::proxy (class socket_base_t *frontend_,
if (state == active && frontend_ != backend_ if (state == active && frontend_ != backend_
&& items[1].revents & ZMQ_POLLIN && items[1].revents & ZMQ_POLLIN
&& itemsout[0].revents & ZMQ_POLLOUT) { && itemsout[0].revents & ZMQ_POLLOUT) {
rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, rc = forward (backend_, frontend_, capture_, &msg);
capture_, &msg);
if (unlikely (rc < 0)) if (unlikely (rc < 0))
return close_and_return (&msg, -1); return close_and_return (&msg, -1);
} }

View File

@ -34,9 +34,7 @@ namespace zmq
{ {
int proxy (class socket_base_t *frontend_, int proxy (class socket_base_t *frontend_,
class socket_base_t *backend_, class socket_base_t *backend_,
class socket_base_t *capture_, class socket_base_t *capture_);
class socket_base_t *control_ =
NULL); // backward compatibility without this argument
} }
#endif #endif

View File

@ -1770,10 +1770,12 @@ int zmq_proxy_steerable (void *frontend_,
errno = EFAULT; errno = EFAULT;
return -1; return -1;
} }
return zmq::proxy (static_cast<zmq::socket_base_t *> (frontend_), #ifdef ZMQ_HAVE_WINDOWS
static_cast<zmq::socket_base_t *> (backend_), errno = WSAEOPNOTSUPP;
static_cast<zmq::socket_base_t *> (capture_), #else
static_cast<zmq::socket_base_t *> (control_)); errno = EOPNOTSUPP;
#endif
return -1;
} }
// The deprecated device functionality // The deprecated device functionality

View File

@ -33,8 +33,6 @@
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
SETUP_TEARDOWN_TESTCONTEXT
#define CONTENT_SIZE 13 #define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32 #define CONTENT_SIZE_MAX 32
#define ROUTING_ID_SIZE 10 #define ROUTING_ID_SIZE 10
@ -48,22 +46,15 @@ struct thread_data
int id; int id;
}; };
typedef struct
{
uint64_t msg_in;
uint64_t bytes_in;
uint64_t msg_out;
uint64_t bytes_out;
} zmq_socket_stats_t;
typedef struct
{
zmq_socket_stats_t frontend;
zmq_socket_stats_t backend;
} zmq_proxy_stats_t;
void *g_clients_pkts_out = NULL; void *g_clients_pkts_out = NULL;
void *g_workers_pkts_out = NULL; void *g_workers_pkts_out = NULL;
void *control_context = NULL;
void setUp ()
{
setup_test_context ();
}
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq // Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
// //
@ -98,7 +89,7 @@ static void client_task (void *db_)
TEST_ASSERT_NOT_NULL (client); TEST_ASSERT_NOT_NULL (client);
// Control socket receives terminate command from main over inproc // Control socket receives terminate command from main over inproc
void *control = zmq_socket (get_test_context (), ZMQ_SUB); void *control = zmq_socket (control_context, ZMQ_SUB);
TEST_ASSERT_NOT_NULL (control); TEST_ASSERT_NOT_NULL (control);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0)); TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
@ -210,13 +201,6 @@ void server_task (void * /*unused_*/)
zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger))); zmq_setsockopt (backend, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "inproc://backend")); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (backend, "inproc://backend"));
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (get_test_context (), ZMQ_REP);
TEST_ASSERT_NOT_NULL (control);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (control, "inproc://control_proxy"));
// Launch pool of worker threads, precise number is not critical // Launch pool of worker threads, precise number is not critical
int thread_nbr; int thread_nbr;
void *threads[5]; void *threads[5];
@ -242,15 +226,13 @@ void server_task (void * /*unused_*/)
} }
// Connect backend to frontend via a proxy // Connect backend to frontend via a proxy
TEST_ASSERT_SUCCESS_ERRNO ( zmq_proxy (frontend, backend, NULL);
zmq_proxy_steerable (frontend, backend, NULL, control));
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
zmq_threadclose (threads[thread_nbr]); zmq_threadclose (threads[thread_nbr]);
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (frontend)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (frontend));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (backend)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (backend));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
for (int i = 0; i < QT_CLIENTS; ++i) { for (int i = 0; i < QT_CLIENTS; ++i) {
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint_receivers[i])); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (endpoint_receivers[i]));
} }
@ -270,7 +252,7 @@ static void server_worker (void * /*unused_*/)
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (worker, "inproc://backend")); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (worker, "inproc://backend"));
// Control socket receives terminate command from main over inproc // Control socket receives terminate command from main over inproc
void *control = zmq_socket (get_test_context (), ZMQ_SUB); void *control = zmq_socket (control_context, ZMQ_SUB);
TEST_ASSERT_NOT_NULL (control); TEST_ASSERT_NOT_NULL (control);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0)); TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
@ -332,82 +314,6 @@ static void server_worker (void * /*unused_*/)
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
} }
uint64_t recv_stat (void *sock_, bool last_)
{
uint64_t res;
zmq_msg_t stats_msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
TEST_ASSERT_EQUAL_INT (sizeof (uint64_t),
zmq_recvmsg (sock_, &stats_msg, 0));
memcpy (&res, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
int more;
size_t moresz = sizeof more;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz));
TEST_ASSERT_TRUE ((last_ && !more) || (!last_ && more));
return res;
}
// Utility function to interrogate the proxy:
void check_proxy_stats (void *control_proxy_)
{
zmq_proxy_stats_t total_stats;
send_string_expect_success (control_proxy_, "STATISTICS", 0);
// first frame of the reply contains FRONTEND stats:
total_stats.frontend.msg_in = recv_stat (control_proxy_, false);
total_stats.frontend.bytes_in = recv_stat (control_proxy_, false);
total_stats.frontend.msg_out = recv_stat (control_proxy_, false);
total_stats.frontend.bytes_out = recv_stat (control_proxy_, false);
// second frame of the reply contains BACKEND stats:
total_stats.backend.msg_in = recv_stat (control_proxy_, false);
total_stats.backend.bytes_in = recv_stat (control_proxy_, false);
total_stats.backend.msg_out = recv_stat (control_proxy_, false);
total_stats.backend.bytes_out = recv_stat (control_proxy_, true);
// check stats
if (is_verbose) {
printf (
"frontend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n",
static_cast<unsigned long int> (total_stats.frontend.msg_in),
static_cast<unsigned long int> (total_stats.frontend.bytes_in),
static_cast<unsigned long int> (total_stats.frontend.msg_out),
static_cast<unsigned long int> (total_stats.frontend.bytes_out));
printf (
"backend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n",
static_cast<unsigned long int> (total_stats.backend.msg_in),
static_cast<unsigned long int> (total_stats.backend.bytes_in),
static_cast<unsigned long int> (total_stats.backend.msg_out),
static_cast<unsigned long int> (total_stats.backend.bytes_out));
printf ("clients sent out %d requests\n",
zmq_atomic_counter_value (g_clients_pkts_out));
printf ("workers sent out %d replies\n",
zmq_atomic_counter_value (g_workers_pkts_out));
}
TEST_ASSERT_EQUAL_UINT (
(unsigned) zmq_atomic_counter_value (g_clients_pkts_out),
total_stats.frontend.msg_in);
TEST_ASSERT_EQUAL_UINT (
(unsigned) zmq_atomic_counter_value (g_workers_pkts_out),
total_stats.frontend.msg_out);
TEST_ASSERT_EQUAL_UINT (
(unsigned) zmq_atomic_counter_value (g_workers_pkts_out),
total_stats.backend.msg_in);
TEST_ASSERT_EQUAL_UINT (
(unsigned) zmq_atomic_counter_value (g_clients_pkts_out),
total_stats.backend.msg_out);
}
// The main thread simply starts several clients and a server, and then // The main thread simply starts several clients and a server, and then
// waits for the server to finish. // waits for the server to finish.
@ -415,21 +321,16 @@ void test_proxy ()
{ {
g_clients_pkts_out = zmq_atomic_counter_new (); g_clients_pkts_out = zmq_atomic_counter_new ();
g_workers_pkts_out = zmq_atomic_counter_new (); g_workers_pkts_out = zmq_atomic_counter_new ();
control_context = zmq_ctx_new ();
TEST_ASSERT_NOT_NULL (control_context);
// Control socket receives terminate command from main over inproc // Control socket receives terminate command from main over inproc
void *control = test_context_socket (ZMQ_PUB); void *control = zmq_socket (control_context, ZMQ_PUB);
int linger = 0; int linger = 0;
TEST_ASSERT_SUCCESS_ERRNO ( TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger))); zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (control, "inproc://control")); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (control, "inproc://control"));
// Control socket receives terminate command from main over inproc
void *control_proxy = test_context_socket (ZMQ_REQ);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_bind (control_proxy, "inproc://control_proxy"));
void *threads[QT_CLIENTS + 1]; void *threads[QT_CLIENTS + 1];
struct thread_data databags[QT_CLIENTS + 1]; struct thread_data databags[QT_CLIENTS + 1];
for (int i = 0; i < QT_CLIENTS; i++) { for (int i = 0; i < QT_CLIENTS; i++) {
@ -445,23 +346,19 @@ void test_proxy ()
msleep (500); // Wait for all clients and workers to STOP msleep (500); // Wait for all clients and workers to STOP
if (is_verbose)
printf ("retrieving stats from the proxy\n");
check_proxy_stats (control_proxy);
if (is_verbose) if (is_verbose)
printf ("shutting down all clients and server workers\n"); printf ("shutting down all clients and server workers\n");
send_string_expect_success (control, "TERMINATE", 0); send_string_expect_success (control, "TERMINATE", 0);
if (is_verbose) msleep (500); // Wait for all clients and workers to terminate
printf ("shutting down the proxy\n");
send_string_expect_success (control_proxy, "TERMINATE", 0);
test_context_socket_close (control); teardown_test_context ();
test_context_socket_close (control_proxy);
for (int i = 0; i < QT_CLIENTS + 1; i++) for (int i = 0; i < QT_CLIENTS + 1; i++)
zmq_threadclose (threads[i]); zmq_threadclose (threads[i]);
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
TEST_ASSERT_SUCCESS_ERRNO (zmq_ctx_destroy (control_context));
} }
int main (void) int main (void)

View File

@ -192,80 +192,6 @@ static void subscriber_thread_main (void *pvoid_)
zmq_close (subsocket); zmq_close (subsocket);
} }
bool recv_stat (void *sock_, bool last_, uint64_t *res_)
{
zmq_msg_t stats_msg;
int rc = zmq_msg_init (&stats_msg);
assert (rc == 0);
rc = zmq_msg_recv (&stats_msg, sock_, 0); //ZMQ_DONTWAIT);
if (rc == -1 && errno == EAGAIN) {
rc = zmq_msg_close (&stats_msg);
assert (rc == 0);
return false; // cannot retrieve the stat
}
assert (rc == sizeof (uint64_t));
memcpy (res_, zmq_msg_data (&stats_msg), zmq_msg_size (&stats_msg));
rc = zmq_msg_close (&stats_msg);
assert (rc == 0);
int more;
size_t moresz = sizeof more;
rc = zmq_getsockopt (sock_, ZMQ_RCVMORE, &more, &moresz);
assert (rc == 0);
assert ((last_ && !more) || (!last_ && more));
return true;
}
// Utility function to interrogate the proxy:
typedef struct
{
uint64_t msg_in;
uint64_t bytes_in;
uint64_t msg_out;
uint64_t bytes_out;
} zmq_socket_stats_t;
typedef struct
{
zmq_socket_stats_t frontend;
zmq_socket_stats_t backend;
} zmq_proxy_stats_t;
bool check_proxy_stats (void *control_proxy_)
{
zmq_proxy_stats_t total_stats;
int rc;
rc = zmq_send (control_proxy_, "STATISTICS", 10, ZMQ_DONTWAIT);
assert (rc == 10 || (rc == -1 && errno == EAGAIN));
if (rc == -1 && errno == EAGAIN) {
return false;
}
// first frame of the reply contains FRONTEND stats:
if (!recv_stat (control_proxy_, false, &total_stats.frontend.msg_in)) {
return false;
}
recv_stat (control_proxy_, false, &total_stats.frontend.bytes_in);
recv_stat (control_proxy_, false, &total_stats.frontend.msg_out);
recv_stat (control_proxy_, false, &total_stats.frontend.bytes_out);
// second frame of the reply contains BACKEND stats:
recv_stat (control_proxy_, false, &total_stats.backend.msg_in);
recv_stat (control_proxy_, false, &total_stats.backend.bytes_in);
recv_stat (control_proxy_, false, &total_stats.backend.msg_out);
recv_stat (control_proxy_, true, &total_stats.backend.bytes_out);
return true;
}
static void proxy_stats_asker_thread_main (void *pvoid_) static void proxy_stats_asker_thread_main (void *pvoid_)
{ {
const proxy_hwm_cfg_t *const cfg = const proxy_hwm_cfg_t *const cfg =
@ -305,16 +231,9 @@ static void proxy_stats_asker_thread_main (void *pvoid_)
// Start! // Start!
while (!zmq_atomic_counter_value (cfg->subscriber_received_all)) { while (!zmq_atomic_counter_value (cfg->subscriber_received_all)) {
check_proxy_stats (control_req);
usleep (1000); // 1ms -> in best case we will get 1000updates/second usleep (1000); // 1ms -> in best case we will get 1000updates/second
} }
// Ask the proxy to exit: the subscriber has received all messages
rc = zmq_send (control_req, "TERMINATE", 9, 0);
assert (rc == 9);
zmq_close (control_req); zmq_close (control_req);
} }
@ -371,7 +290,7 @@ static void proxy_thread_main (void *pvoid_)
// start proxying! // start proxying!
zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep); zmq_proxy (frontend_xsub, backend_xpub, NULL);
zmq_close (frontend_xsub); zmq_close (frontend_xsub);
zmq_close (backend_xpub); zmq_close (backend_xpub);
@ -415,11 +334,12 @@ int main (void)
zmq_threadclose (publisher); zmq_threadclose (publisher);
zmq_threadclose (subscriber); zmq_threadclose (subscriber);
zmq_threadclose (asker); zmq_threadclose (asker);
zmq_threadclose (proxy);
int rc = zmq_ctx_term (context); int rc = zmq_ctx_term (context);
assert (rc == 0); assert (rc == 0);
zmq_threadclose (proxy);
zmq_atomic_counter_destroy (&cfg.subscriber_received_all); zmq_atomic_counter_destroy (&cfg.subscriber_received_all);
return 0; return 0;

View File

@ -32,7 +32,10 @@
#include <stdlib.h> #include <stdlib.h>
SETUP_TEARDOWN_TESTCONTEXT void setUp ()
{
setup_test_context ();
}
// This is our server task. // This is our server task.
// It runs a proxy with a single REP socket as both frontend and backend. // It runs a proxy with a single REP socket as both frontend and backend.
@ -51,7 +54,7 @@ void server_task (void * /*unused_*/)
send_string_expect_success (control, my_endpoint, 0); send_string_expect_success (control, my_endpoint, 0);
// Use rep as both frontend and backend // Use rep as both frontend and backend
TEST_ASSERT_SUCCESS_ERRNO (zmq_proxy_steerable (rep, rep, NULL, control)); zmq_proxy (rep, rep, NULL);
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (rep)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (rep));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (control));
@ -82,10 +85,9 @@ void test_proxy_single_socket ()
send_string_expect_success (req, "msg22", 0); send_string_expect_success (req, "msg22", 0);
recv_string_expect_success (req, "msg22", 0); recv_string_expect_success (req, "msg22", 0);
send_string_expect_success (control, "TERMINATE", 0);
test_context_socket_close (control); test_context_socket_close (control);
test_context_socket_close (req); test_context_socket_close (req);
teardown_test_context ();
free (my_endpoint); free (my_endpoint);
zmq_threadclose (server_thread); zmq_threadclose (server_thread);

View File

@ -32,7 +32,10 @@
#include <stdlib.h> #include <stdlib.h>
SETUP_TEARDOWN_TESTCONTEXT void setUp ()
{
setup_test_context ();
}
// This is a test for issue #1382. The server thread creates a SUB-PUSH // This is a test for issue #1382. The server thread creates a SUB-PUSH
// steerable proxy. The main process then sends messages to the SUB // steerable proxy. The main process then sends messages to the SUB
@ -60,8 +63,7 @@ void server_task (void * /*unused_*/)
send_string_expect_success (control, my_endpoint, 0); send_string_expect_success (control, my_endpoint, 0);
// Connect backend to frontend via a proxy // Connect backend to frontend via a proxy
TEST_ASSERT_SUCCESS_ERRNO ( zmq_proxy (frontend, backend, NULL);
zmq_proxy_steerable (frontend, backend, NULL, control));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (frontend)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (frontend));
TEST_ASSERT_SUCCESS_ERRNO (zmq_close (backend)); TEST_ASSERT_SUCCESS_ERRNO (zmq_close (backend));
@ -97,10 +99,10 @@ void test_proxy_terminate ()
msleep (50); msleep (50);
send_string_expect_success (publisher, "This is a test", 0); send_string_expect_success (publisher, "This is a test", 0);
send_string_expect_success (control, "TERMINATE", 0);
test_context_socket_close (publisher); test_context_socket_close (publisher);
test_context_socket_close (control); test_context_socket_close (control);
teardown_test_context ();
free (my_endpoint); free (my_endpoint);
zmq_threadclose (thread); zmq_threadclose (thread);