/* SPDX-License-Identifier: MPL-2.0 */ #include "../include/zmq.h" #include #include #include #include #include #include #include #include #include "platform.hpp" #if defined ZMQ_HAVE_WINDOWS #include #include #else #include #include #endif /* Asynchronous proxy benchmark using ZMQ_XPUB_NODROP. Topology: XPUB SUB | | +-----> XSUB -> XPUB -----/ | ^^^^^^^^^^^^ XPUB ZMQ proxy All connections use "inproc" transport. The two XPUB sockets start flooding the proxy. The throughput is computed using the bytes received in the SUB socket. */ #define HWM 10000 #ifndef ARRAY_SIZE #define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x)) #endif #define TEST_ASSERT_SUCCESS_ERRNO(expr) \ test_assert_success_message_errno_helper (expr, NULL, #expr) // This macro is used to avoid-variable warning. If used with an expression, // the sizeof is not evaluated to avoid polluting the assembly code. #ifdef NDEBUG #define ASSERT_EXPR_SAFE(x) \ do { \ (void) sizeof (x); \ } while (0) #else #define ASSERT_EXPR_SAFE(x) assert (x) #endif static uint64_t message_count = 0; static size_t message_size = 0; typedef struct { void *context; int thread_idx; const char *frontend_endpoint[4]; const char *backend_endpoint[4]; const char *control_endpoint; } proxy_hwm_cfg_t; int test_assert_success_message_errno_helper (int rc_, const char *msg_, const char *expr_) { if (rc_ == -1) { char buffer[512]; buffer[sizeof (buffer) - 1] = 0; // to ensure defined behavior with VC++ <= 2013 printf ("%s failed%s%s%s, errno = %i (%s)", expr_, msg_ ? " (additional info: " : "", msg_ ? msg_ : "", msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ())); exit (1); } return rc_; } static void set_hwm (void *skt) { int hwm = HWM; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm))); TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm))); } static void publisher_thread_main (void *pvoid) { const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; const int idx = cfg->thread_idx; int optval; int rc; void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB); assert (pubsocket); set_hwm (pubsocket); optval = 1; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval))); optval = 1; TEST_ASSERT_SUCCESS_ERRNO ( zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval))); TEST_ASSERT_SUCCESS_ERRNO ( zmq_connect (pubsocket, cfg->frontend_endpoint[idx])); // Wait before starting TX operations till 1 subscriber has subscribed // (in this test there's 1 subscriber only) char buffer[32] = {}; rc = TEST_ASSERT_SUCCESS_ERRNO ( zmq_recv (pubsocket, buffer, sizeof (buffer), 0)); if (rc != 1) { printf ("invalid response length: expected 1, received %d", rc); exit (1); } if (buffer[0] != 1) { printf ("invalid response value: expected 1, received %d", (int) buffer[0]); exit (1); } zmq_msg_t msg_orig; rc = zmq_msg_init_size (&msg_orig, message_size); assert (rc == 0); memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig)); uint64_t send_count = 0; while (send_count < message_count) { zmq_msg_t msg; zmq_msg_init (&msg); rc = zmq_msg_copy (&msg, &msg_orig); assert (rc == 0); // Send the message to the socket rc = zmq_msg_send (&msg, pubsocket, 0); if (rc != -1) { send_count++; } else { TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); } } zmq_close (pubsocket); //printf ("publisher thread ended\n"); } static void subscriber_thread_main (void *pvoid) { const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; const int idx = cfg->thread_idx; void *subsocket = zmq_socket (cfg->context, ZMQ_SUB); assert (subsocket); set_hwm (subsocket); TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0)); TEST_ASSERT_SUCCESS_ERRNO ( zmq_connect (subsocket, cfg->backend_endpoint[idx])); // Receive message_count messages uint64_t rxsuccess = 0; bool success = true; while (success) { zmq_msg_t msg; int rc = zmq_msg_init (&msg); assert (rc == 0); rc = zmq_msg_recv (&msg, subsocket, 0); if (rc != -1) { TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); rxsuccess++; } if (rxsuccess == message_count) break; } // Cleanup zmq_close (subsocket); //printf ("subscriber thread ended\n"); } static void proxy_thread_main (void *pvoid) { const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; int rc; // FRONTEND SUB void *frontend_xsub = zmq_socket ( cfg->context, ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC) assert (frontend_xsub); set_hwm (frontend_xsub); // Bind FRONTEND for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) { const char *ep = cfg->frontend_endpoint[i]; if (ep != NULL) { assert (strlen (ep) > 5); rc = zmq_bind (frontend_xsub, ep); ASSERT_EXPR_SAFE (rc == 0); } } // BACKEND PUB void *backend_xpub = zmq_socket ( cfg->context, ZMQ_XPUB); // the backend is the one exposed to the external world (TCP) assert (backend_xpub); int optval = 1; rc = zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval)); ASSERT_EXPR_SAFE (rc == 0); set_hwm (backend_xpub); // Bind BACKEND for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) { const char *ep = cfg->backend_endpoint[i]; if (ep != NULL) { assert (strlen (ep) > 5); rc = zmq_bind (backend_xpub, ep); ASSERT_EXPR_SAFE (rc == 0); } } // CONTROL REP void *control_rep = zmq_socket ( cfg->context, ZMQ_REP); // This one is used by the proxy to receive&reply to commands assert (control_rep); // Bind CONTROL rc = zmq_bind (control_rep, cfg->control_endpoint); ASSERT_EXPR_SAFE (rc == 0); // Start proxying! zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep); zmq_close (frontend_xsub); zmq_close (backend_xpub); zmq_close (control_rep); //printf ("proxy thread ended\n"); } void terminate_proxy (const proxy_hwm_cfg_t *cfg) { // CONTROL REQ void *control_req = zmq_socket ( cfg->context, ZMQ_REQ); // This one can be used to send command to the proxy assert (control_req); // Connect CONTROL-REQ: a socket to which send commands int rc = zmq_connect (control_req, cfg->control_endpoint); ASSERT_EXPR_SAFE (rc == 0); // Ask the proxy to exit: the subscriber has received all messages rc = zmq_send (control_req, "TERMINATE", 9, 0); ASSERT_EXPR_SAFE (rc == 9); zmq_close (control_req); } // The main thread simply starts some publishers, a proxy, // and a subscriber. Finish when all packets are received. int main (int argc, char *argv[]) { if (argc != 3) { printf ("usage: proxy_thr \n"); return 1; } message_size = atoi (argv[1]); message_count = atoi (argv[2]); printf ("message size: %d [B]\n", (int) message_size); printf ("message count: %d\n", (int) message_count); void *context = zmq_ctx_new (); assert (context); int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4); ASSERT_EXPR_SAFE (rv == 0); // START ALL SECONDARY THREADS const char *pub1 = "inproc://perf_pub1"; const char *pub2 = "inproc://perf_pub2"; const char *sub1 = "inproc://perf_backend"; proxy_hwm_cfg_t cfg_global = {}; cfg_global.context = context; cfg_global.frontend_endpoint[0] = pub1; cfg_global.frontend_endpoint[1] = pub2; cfg_global.backend_endpoint[0] = sub1; cfg_global.control_endpoint = "inproc://ctrl"; // Proxy proxy_hwm_cfg_t cfg_proxy = cfg_global; void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy); assert (proxy != 0); // Subscriber 1 proxy_hwm_cfg_t cfg_sub1 = cfg_global; cfg_sub1.thread_idx = 0; void *subscriber = zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1); assert (subscriber != 0); // Start measuring void *watch = zmq_stopwatch_start (); // Publisher 1 proxy_hwm_cfg_t cfg_pub1 = cfg_global; cfg_pub1.thread_idx = 0; void *publisher1 = zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1); assert (publisher1 != 0); // Publisher 2 proxy_hwm_cfg_t cfg_pub2 = cfg_global; cfg_pub2.thread_idx = 1; void *publisher2 = zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2); assert (publisher2 != 0); // Wait for all packets to be received zmq_threadclose (subscriber); // Stop measuring unsigned long elapsed = zmq_stopwatch_stop (watch); if (elapsed == 0) elapsed = 1; unsigned long throughput = (unsigned long) ((double) message_count / (double) elapsed * 1000000); double megabits = (double) (throughput * message_size * 8) / 1000000; printf ("mean throughput: %d [msg/s]\n", (int) throughput); printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits); // Wait for the end of publishers... zmq_threadclose (publisher1); zmq_threadclose (publisher2); // ... then close the proxy terminate_proxy (&cfg_proxy); zmq_threadclose (proxy); int rc = zmq_ctx_term (context); ASSERT_EXPR_SAFE (rc == 0); return 0; }