From bc25366f7cf7f06a4baccd94b19e24a34fe97ac8 Mon Sep 17 00:00:00 2001 From: Laurent Alebarde Date: Thu, 13 Feb 2014 18:31:00 +0100 Subject: [PATCH] Revert "add proxy_chain, a multi proxies chaining in the same thread feature" This reverts commit bc7441f5176589ad9a34dc2bde6b91d78a44e3e0. --- doc/zmq_proxy_chain.txt | 103 -------------- include/zmq.h | 2 - src/proxy.cpp | 88 ++++-------- src/proxy.hpp | 10 +- src/zmq.cpp | 49 +++---- tests/Makefile.am | 2 - tests/test_proxy_chain.cpp | 279 ------------------------------------- 7 files changed, 56 insertions(+), 477 deletions(-) delete mode 100644 doc/zmq_proxy_chain.txt delete mode 100644 tests/test_proxy_chain.cpp diff --git a/doc/zmq_proxy_chain.txt b/doc/zmq_proxy_chain.txt deleted file mode 100644 index bb2f3382..00000000 --- a/doc/zmq_proxy_chain.txt +++ /dev/null @@ -1,103 +0,0 @@ -zmq_proxy_chain(3) -================== - -NAME ----- -zmq_proxy_chain - start built-in 0MQ a proxy chain in the same thread -control flow - - -SYNOPSIS --------- -*int zmq_proxy_chain (const void '**frontends', const void '**backends', - const void '*capture', const void **hooks_, const void '*control');* - - -DESCRIPTION ------------ -The _zmq_proxy_chain()_ function starts the built-in 0MQ proxy in the -current application thread, as _zmq_proxy()_, _zmq_proxy_steerable()_, or -_zmq_proxy_hook()_ do. Please, refer to these functions for their general -description and usage. We describe here only the additional proxy chaining -capability. - -Note that compared to the other proxy functions, the arguments _frontends_, -_backends_ and _hooks_ receive arrays instead of single values. Say one need -to implement the following architecture: - -*Process client proxy1 proxy2 worker* - | |-----------| |----------| | -*socket* cl f1 b1 f2 b2 wk -*endpoint* |c----e1-----b| |c----e2-----b| |c----e3----b| - -Note: "c" is for connect, "b" for bind. - -With the other proxy functions, one needs typically one thread for each proxy: ----- -thread 1: zmq_proxy(f1, b1); -thread 2: zmq_proxy(f2, b2); ----- - -With _zmq_proxy_chain_, it can be performed with only one thread: ----- -void** f = {f1, f2, NULL); -void** b = {b1, b2, NULL); -single thread: zmq_proxy_chain(f, b, NULL, NULL, NULL); ----- - -Note: the three NULL arguments are for capture, hooks, and control, since -_zmq_proxy_chain_ is built on top of _zmq_proxy_hook_, itself built on top -of _zmq_proxy_steerable_, itself built on top of _zmq_proxy_. Of course, hook and -steering features can be used along with chaining. - -We have limited the number of sockets that can be chained in a single command to 10, what -should be largely sufficient. The reason is to avoid dynamic memory allocation. - -Arguments frontends and backends shall be arrays of sockets of type void*, terminated -by NULL. Both arrays shall terminate by NULL at the same indice, otherwise, an error is -returned. Argument hooks shall be NULL or of the same length than the socket arrays. No -NULL is required at the end of hooks. Any number of elements may be NULL where no hook -is implemented in some proxies. - - -Refer to linkzmq:zmq_socket[3] for a description of the available socket types. -Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy. -Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_steerable. -Refer to linkzmq:zmq_proxy_hook[3] for a description of the zmq_hook. - -EXAMPLE USAGE -------------- -_zmq_proxy_chain_ aims at building protocol layers by easing the chaining of some -proxies typically by chaining: -DEALER | ROUTER <---> STREAM <---> DEALER -in the same thread. Any kind of protocol feature can be added via hooks. - -cf also zmq_proxy, zmq_proxy_steerable, zmq_proxy_hook. - -RETURN VALUE ------------- -The _zmq_proxy_chain()_ function returns 0 if TERMINATE is sent to its -control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* (the -0MQ 'context' associated with either of the specified sockets was terminated). - - -EXAMPLE -------- -cf test_proxy_chain.cpp -An example capable of proxying CURVE will be added soon. - -SEE ALSO --------- -linkzmq:zmq_proxy[3] -linkzmq:zmq_proxy_steerable[3] -linkzmq:zmq_proxy_hook[3] -linkzmq:zmq_bind[3] -linkzmq:zmq_connect[3] -linkzmq:zmq_socket[3] -linkzmq:zmq[7] - - -AUTHORS -------- -This page was written by the 0MQ community. To make a change please -read the 0MQ Contribution Policy at . diff --git a/include/zmq.h b/include/zmq.h index 961c2836..3bfc6bd0 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -398,11 +398,9 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); /* Built-in message proxy (3-way) */ -#define ZMQ_PROXY_CHAIN_MAX_LENGTH 10 ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture); ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control); ZMQ_EXPORT int zmq_proxy_hook (void *frontend, void *backend, void *capture, void *hook, void *control); -ZMQ_EXPORT int zmq_proxy_chain (void **frontends_, void **backends_, void *capture_, void **hooks_, void *control_); typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, zmq_msg_t* msg_, size_t n_, void *data_); typedef struct zmq_proxy_hook_t { diff --git a/src/proxy.cpp b/src/proxy.cpp index 1bdfe844..09788e57 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -119,12 +119,14 @@ forward( int zmq::proxy ( - class socket_base_t **frontend_, - class socket_base_t **backend_, - class socket_base_t *capture_, - class socket_base_t *control_, - zmq::proxy_hook_t **hook_) + class socket_base_t *frontend_, + class socket_base_t *backend_, + class socket_base_t *capture_, + class socket_base_t *control_, + zmq::proxy_hook_t *hook_) { + static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL}; + msg_t msg; int rc = msg.init (); if (rc != 0) @@ -135,41 +137,12 @@ zmq::proxy ( int more; size_t moresz; - size_t n = 0; // number of pair of sockets: the array ends with NULL - for (;; n++) { // counts the number of pair of sockets - if (!frontend_[n] && !backend_[n]) - break; - if (!frontend_[n] || !backend_[n]) { - errno = EFAULT; - return -1; - } - } - if (!n) { - errno = EFAULT; - return -1; - } - // avoid dynamic allocation as we have no guarranty to reach the deallocator => limit the chain length - zmq_assert(n <= ZMQ_PROXY_CHAIN_MAX_LENGTH); - zmq_pollitem_t items [2 * ZMQ_PROXY_CHAIN_MAX_LENGTH + 1]; // +1 for the control socket - static zmq_pollitem_t null_item = { NULL, 0, ZMQ_POLLIN, 0 }; - static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL}; - static zmq::proxy_hook_t* no_hooks[ZMQ_PROXY_CHAIN_MAX_LENGTH]; - if (!hook_) - hook_ = no_hooks; - else - for (size_t i = 0; i < n; i++) - if (!hook_[i]) // Check if a hook is used - hook_[i] = &dummy_hook; - for (size_t i = 0; i < n; i++) { - memcpy(&items[2 * i], &null_item, sizeof(null_item)); - items[2 * i].socket = frontend_[i]; - memcpy(&items[2 * i + 1], &null_item, sizeof(null_item)); - items[2 * i + 1].socket = backend_[i]; - no_hooks[i] = &dummy_hook; - } - memcpy(&items[2 * n], &null_item, sizeof(null_item)); - items[2 * n].socket = control_; - int qt_poll_items = (control_ ? 2 * n + 1 : 2 * n); + zmq_pollitem_t items [] = { + { frontend_, 0, ZMQ_POLLIN, 0 }, + { backend_, 0, ZMQ_POLLIN, 0 }, + { control_, 0, ZMQ_POLLIN, 0 } + }; + int qt_poll_items = (control_ ? 3 : 2); // Proxy can be in these three states enum { @@ -185,7 +158,7 @@ zmq::proxy ( return -1; // Process a control command if any - if (control_ && items [2 * n].revents & ZMQ_POLLIN) { + if (control_ && items [2].revents & ZMQ_POLLIN) { rc = control_->recv (&msg, 0); if (unlikely (rc < 0)) return -1; @@ -214,23 +187,22 @@ zmq::proxy ( zmq_assert (false); } } - - // process each pair of sockets - for (size_t i = 0; i < n; i++) { - // Process a request - if (state == active - && items [2 * i].revents & ZMQ_POLLIN) { - rc = forward(frontend_[i], backend_[i], capture_, msg, hook_[i]->front2back_hook, hook_[i]->data); - if (unlikely (rc < 0)) - return -1; - } - // Process a reply - if (state == active - && items [2 * i + 1].revents & ZMQ_POLLIN) { - rc = forward(backend_[i], frontend_[i], capture_, msg, hook_[i]->back2front_hook, hook_[i]->data); - if (unlikely (rc < 0)) - return -1; - } + // Check if a hook is used + if (!hook_) + hook_ = &dummy_hook; + // Process a request + if (state == active + && items [0].revents & ZMQ_POLLIN) { + rc = forward(frontend_, backend_, capture_, msg, hook_->front2back_hook, hook_->data); + if (unlikely (rc < 0)) + return -1; + } + // Process a reply + if (state == active + && items [1].revents & ZMQ_POLLIN) { + rc = forward(backend_, frontend_, capture_, msg, hook_->back2front_hook, hook_->data); + if (unlikely (rc < 0)) + return -1; } } return 0; diff --git a/src/proxy.hpp b/src/proxy.hpp index fef76c5d..0b9eef84 100644 --- a/src/proxy.hpp +++ b/src/proxy.hpp @@ -32,11 +32,11 @@ namespace zmq }; int proxy ( - class socket_base_t **frontend_, - class socket_base_t **backend_, - class socket_base_t *capture_ = NULL, - class socket_base_t *control_ = NULL, // backward compatibility without this argument - proxy_hook_t **hook_ = NULL // backward compatibility without this argument + class socket_base_t *frontend_, + class socket_base_t *backend_, + class socket_base_t *capture_ = NULL, + class socket_base_t *control_ = NULL, // backward compatibility without this argument + proxy_hook_t *hook_ = NULL // backward compatibility without this argument ); } diff --git a/src/zmq.cpp b/src/zmq.cpp index 400e7fa5..4508eee1 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1025,55 +1025,48 @@ typedef char check_proxy_hook_t_size int zmq_proxy (void *frontend_, void *backend_, void *capture_) { - zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; - zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; + if (!frontend_ || !backend_) { + errno = EFAULT; + return -1; + } return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_, + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, (zmq::socket_base_t*) capture_); } int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_) { - zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; - zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; + if (!frontend_ || !backend_) { + errno = EFAULT; + return -1; + } return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_, + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, (zmq::socket_base_t*) capture_, (zmq::socket_base_t*) control_); } int zmq_proxy_hook (void *frontend_, void *backend_, void *capture_, void *hook_, void *control_) { - zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; - zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; - zmq::proxy_hook_t* hooks_[] = {(zmq::proxy_hook_t*) hook_}; + if (!frontend_ || !backend_) { + errno = EFAULT; + return -1; + } return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_, + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, (zmq::socket_base_t*) capture_, (zmq::socket_base_t*) control_, - (zmq::proxy_hook_t**) hooks_); -} - -int zmq_proxy_chain (void **frontends_, void **backends_, void *capture_, void **hooks_, void *control_) -{ - return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_, - (zmq::socket_base_t*) capture_, - (zmq::socket_base_t*) control_, - (zmq::proxy_hook_t**) hooks_); + (zmq::proxy_hook_t*) hook_); } // The deprecated device functionality int zmq_device (int /* type */, void *frontend_, void *backend_) { - zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; - zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_); + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_); } diff --git a/tests/Makefile.am b/tests/Makefile.am index 6eeee234..2a4c7808 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -44,7 +44,6 @@ noinst_PROGRAMS = test_system \ test_inproc_connect \ test_issue_566 \ test_proxy \ - test_proxy_chain \ test_abstract_ipc \ test_many_sockets \ test_ipc_wildcard \ @@ -111,7 +110,6 @@ test_conflate_SOURCES = test_conflate.cpp test_inproc_connect_SOURCES = test_inproc_connect.cpp test_issue_566_SOURCES = test_issue_566.cpp test_proxy_SOURCES = test_proxy.cpp -test_proxy_chain_SOURCES = test_proxy_chain.cpp test_abstract_ipc_SOURCES = test_abstract_ipc.cpp test_many_sockets_SOURCES = test_many_sockets.cpp test_ipc_wildcard_SOURCES = test_ipc_wildcard.cpp diff --git a/tests/test_proxy_chain.cpp b/tests/test_proxy_chain.cpp deleted file mode 100644 index 4cf1f746..00000000 --- a/tests/test_proxy_chain.cpp +++ /dev/null @@ -1,279 +0,0 @@ -/* - Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "testutil.hpp" -#include "../include/zmq_utils.h" - -// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq -// -// While this example runs in a single process, that is to make -// it easier to start and stop the example. Each task may have its own -// context and conceptually acts as a separate process. To have this -// behaviour, it is necessary to replace the inproc transport of the -// control socket by a tcp transport. - -// This is our client task -// It connects to the server, and then sends a request once per second -// It collects responses as they arrive, and it prints them out. We will -// run several client tasks in parallel, each with a different random ID. - -#define CONTENT_SIZE 13 -#define CONTENT_SIZE_MAX 32 -#define ID_SIZE 10 -#define ID_SIZE_MAX 32 -#define QT_WORKERS 1 -#define QT_CLIENTS 1 -#define is_verbose 0 - -static void -client_task (void *ctx) -{ - void *client = zmq_socket (ctx, ZMQ_DEALER); - assert (client); - - // Control socket receives terminate command from main over inproc - void *control = zmq_socket (ctx, ZMQ_SUB); - assert (control); - int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); - assert (rc == 0); - rc = zmq_connect (control, "inproc://control"); - assert (rc == 0); - - char content [CONTENT_SIZE_MAX]; - // Set random identity to make tracing easier - char identity [ID_SIZE]; - sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF); - rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf - assert (rc == 0); - rc = zmq_connect (client, "tcp://127.0.0.1:9999"); - assert (rc == 0); - - zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; - int request_nbr = 0; - bool run = true; - while (run) { - // Tick once per 200 ms, pulling in arriving messages - int centitick; - for (centitick = 0; centitick < 20; centitick++) { - zmq_poll (items, 2, 10); - if (items [0].revents & ZMQ_POLLIN) { - int rcvmore; - size_t sz = sizeof (rcvmore); - rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0); - assert (rc == CONTENT_SIZE); - if (is_verbose) printf("client receive - identity = %s content = %s\n", identity, content); - // Check that message is still the same - assert (memcmp (content, "request #", 9) == 0); - rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz); - assert (rc == 0); - assert (!rcvmore); - } - if (items [1].revents & ZMQ_POLLIN) { - rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); - if (rc > 0) { - if (is_verbose) { - if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) - content[9] = '\0'; // required to have a clean output since '\0' is not included in the command - printf("client receive - identity = %s command = %s\n", identity, content); - } - if (memcmp (content, "TERMINATE", 9) == 0) { - run = false; - break; - } - } - } - } - sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE - rc = zmq_send (client, content, CONTENT_SIZE, 0); - assert (rc == CONTENT_SIZE); - } - - rc = zmq_close (client); - assert (rc == 0); - rc = zmq_close (control); - assert (rc == 0); -} - -// This is our server task. -// It uses the multithreaded server model to deal requests out to a pool -// of workers and route replies back to clients. One worker can handle -// one request at a time but one client can talk to multiple workers at -// once. - -static void server_worker (void *ctx); - -void -server_task (void *ctx) -{ - // Frontend socket talks to clients over TCP - void *frontend = zmq_socket (ctx, ZMQ_ROUTER); - assert (frontend); - int rc = zmq_bind (frontend, "tcp://127.0.0.1:9999"); - assert (rc == 0); - - // Intermediate 1 - void *intermediate1 = zmq_socket (ctx, ZMQ_DEALER); - assert (intermediate1); - rc = zmq_connect (intermediate1, "inproc://intermediate"); - assert (rc == 0); - - // Intermediate 2 - void *intermediate2 = zmq_socket (ctx, ZMQ_DEALER); - assert (intermediate2); - rc = zmq_bind (intermediate2, "inproc://intermediate"); - assert (rc == 0); - - // Backend socket talks to workers over inproc - void *backend = zmq_socket (ctx, ZMQ_DEALER); - assert (backend); - rc = zmq_bind (backend, "inproc://backend"); - assert (rc == 0); - - // Control socket receives terminate command from main over inproc - void *control = zmq_socket (ctx, ZMQ_SUB); - assert (control); - rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); - assert (rc == 0); - rc = zmq_connect (control, "inproc://control"); - assert (rc == 0); - - // Launch pool of worker threads, precise number is not critical - int thread_nbr; - void* threads [5]; - for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) - threads[thread_nbr] = zmq_threadstart (&server_worker, ctx); - - void* frontends[] = {frontend, intermediate2, NULL}; - void* backends[] = {intermediate1, backend, NULL}; - // Connect backend to frontend via a proxy - zmq_proxy_chain (frontends, backends, NULL, NULL, control); // until TERMINATE is sent on control - - for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) - zmq_threadclose (threads[thread_nbr]); - - rc = zmq_close (frontend); - assert (rc == 0); - rc = zmq_close (intermediate1); - assert (rc == 0); - rc = zmq_close (intermediate2); - assert (rc == 0); - rc = zmq_close (backend); - assert (rc == 0); - rc = zmq_close (control); - assert (rc == 0); -} - -// Each worker task works on one request at a time and sends a random number -// of replies back, with random delays between replies: -// The comments in the first column, if suppressed, makes it a poller version - -static void -server_worker (void *ctx) -{ - void *worker = zmq_socket (ctx, ZMQ_DEALER); - assert (worker); - int rc = zmq_connect (worker, "inproc://backend"); - assert (rc == 0); - - // Control socket receives terminate command from main over inproc - void *control = zmq_socket (ctx, ZMQ_SUB); - assert (control); - rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); - assert (rc == 0); - rc = zmq_connect (control, "inproc://control"); - assert (rc == 0); - - char content [CONTENT_SIZE_MAX]; // bigger than what we need to check that - char identity [ID_SIZE_MAX]; // the size received is the size sent - - bool run = true; - while (run) { - rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) - if (rc > 0) { - if (is_verbose) { - if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) - content[9] = '\0'; // required to have a clean output since '\0' is not included in the command - printf("server_worker receives command = %s\n", content); - } - if (memcmp (content, "TERMINATE", 9) == 0) - run = false; - } - // The DEALER socket gives us the reply envelope and message - // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 - rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT); - if (rc == ID_SIZE) { - rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0); - assert (rc == CONTENT_SIZE); - if (is_verbose) - printf ("server receive - identity = %s content = %s\n", identity, content); - - // Send 0..4 replies back - int reply, replies = 1; //rand() % 5; - for (reply = 0; reply < replies; reply++) { - // Sleep for some fraction of a second - msleep (rand () % 10 + 1); - // Send message from server to client - rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE); - assert (rc == ID_SIZE); - rc = zmq_send (worker, content, CONTENT_SIZE, 0); - assert (rc == CONTENT_SIZE); - } - } - } - rc = zmq_close (worker); - assert (rc == 0); - rc = zmq_close (control); - assert (rc == 0); -} - -// The main thread simply starts several clients and a server, and then -// waits for the server to finish. - -int -main (void) -{ - setup_test_environment (); - - void *ctx = zmq_ctx_new (); - assert (ctx); - // Control socket receives terminate command from main over inproc - void *control = zmq_socket (ctx, ZMQ_PUB); - assert (control); - int rc = zmq_bind (control, "inproc://control"); - assert (rc == 0); - - void *threads [QT_CLIENTS + 1]; - for (int i = 0; i < QT_CLIENTS; i++) - threads[i] = zmq_threadstart (&client_task, ctx); - threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); - - msleep (1000); // Run for 500 ms the standard proxy - rc = zmq_send (control, "TERMINATE", 9, 0); - assert (rc == 9); - - rc = zmq_close (control); - assert (rc == 0); - - for (int i = 0; i < QT_CLIENTS + 1; i++) - zmq_threadclose (threads[i]); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); - return 0; -}