From bc25366f7cf7f06a4baccd94b19e24a34fe97ac8 Mon Sep 17 00:00:00 2001 From: Laurent Alebarde Date: Thu, 13 Feb 2014 18:31:00 +0100 Subject: [PATCH 1/3] Revert "add proxy_chain, a multi proxies chaining in the same thread feature" This reverts commit bc7441f5176589ad9a34dc2bde6b91d78a44e3e0. --- doc/zmq_proxy_chain.txt | 103 -------------- include/zmq.h | 2 - src/proxy.cpp | 88 ++++-------- src/proxy.hpp | 10 +- src/zmq.cpp | 49 +++---- tests/Makefile.am | 2 - tests/test_proxy_chain.cpp | 279 ------------------------------------- 7 files changed, 56 insertions(+), 477 deletions(-) delete mode 100644 doc/zmq_proxy_chain.txt delete mode 100644 tests/test_proxy_chain.cpp diff --git a/doc/zmq_proxy_chain.txt b/doc/zmq_proxy_chain.txt deleted file mode 100644 index bb2f3382..00000000 --- a/doc/zmq_proxy_chain.txt +++ /dev/null @@ -1,103 +0,0 @@ -zmq_proxy_chain(3) -================== - -NAME ----- -zmq_proxy_chain - start built-in 0MQ a proxy chain in the same thread -control flow - - -SYNOPSIS --------- -*int zmq_proxy_chain (const void '**frontends', const void '**backends', - const void '*capture', const void **hooks_, const void '*control');* - - -DESCRIPTION ------------ -The _zmq_proxy_chain()_ function starts the built-in 0MQ proxy in the -current application thread, as _zmq_proxy()_, _zmq_proxy_steerable()_, or -_zmq_proxy_hook()_ do. Please, refer to these functions for their general -description and usage. We describe here only the additional proxy chaining -capability. - -Note that compared to the other proxy functions, the arguments _frontends_, -_backends_ and _hooks_ receive arrays instead of single values. Say one need -to implement the following architecture: - -*Process client proxy1 proxy2 worker* - | |-----------| |----------| | -*socket* cl f1 b1 f2 b2 wk -*endpoint* |c----e1-----b| |c----e2-----b| |c----e3----b| - -Note: "c" is for connect, "b" for bind. - -With the other proxy functions, one needs typically one thread for each proxy: ----- -thread 1: zmq_proxy(f1, b1); -thread 2: zmq_proxy(f2, b2); ----- - -With _zmq_proxy_chain_, it can be performed with only one thread: ----- -void** f = {f1, f2, NULL); -void** b = {b1, b2, NULL); -single thread: zmq_proxy_chain(f, b, NULL, NULL, NULL); ----- - -Note: the three NULL arguments are for capture, hooks, and control, since -_zmq_proxy_chain_ is built on top of _zmq_proxy_hook_, itself built on top -of _zmq_proxy_steerable_, itself built on top of _zmq_proxy_. Of course, hook and -steering features can be used along with chaining. - -We have limited the number of sockets that can be chained in a single command to 10, what -should be largely sufficient. The reason is to avoid dynamic memory allocation. - -Arguments frontends and backends shall be arrays of sockets of type void*, terminated -by NULL. Both arrays shall terminate by NULL at the same indice, otherwise, an error is -returned. Argument hooks shall be NULL or of the same length than the socket arrays. No -NULL is required at the end of hooks. Any number of elements may be NULL where no hook -is implemented in some proxies. - - -Refer to linkzmq:zmq_socket[3] for a description of the available socket types. -Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy. -Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_steerable. -Refer to linkzmq:zmq_proxy_hook[3] for a description of the zmq_hook. - -EXAMPLE USAGE -------------- -_zmq_proxy_chain_ aims at building protocol layers by easing the chaining of some -proxies typically by chaining: -DEALER | ROUTER <---> STREAM <---> DEALER -in the same thread. Any kind of protocol feature can be added via hooks. - -cf also zmq_proxy, zmq_proxy_steerable, zmq_proxy_hook. - -RETURN VALUE ------------- -The _zmq_proxy_chain()_ function returns 0 if TERMINATE is sent to its -control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* (the -0MQ 'context' associated with either of the specified sockets was terminated). - - -EXAMPLE -------- -cf test_proxy_chain.cpp -An example capable of proxying CURVE will be added soon. - -SEE ALSO --------- -linkzmq:zmq_proxy[3] -linkzmq:zmq_proxy_steerable[3] -linkzmq:zmq_proxy_hook[3] -linkzmq:zmq_bind[3] -linkzmq:zmq_connect[3] -linkzmq:zmq_socket[3] -linkzmq:zmq[7] - - -AUTHORS -------- -This page was written by the 0MQ community. To make a change please -read the 0MQ Contribution Policy at . diff --git a/include/zmq.h b/include/zmq.h index 961c2836..3bfc6bd0 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -398,11 +398,9 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); /* Built-in message proxy (3-way) */ -#define ZMQ_PROXY_CHAIN_MAX_LENGTH 10 ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture); ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control); ZMQ_EXPORT int zmq_proxy_hook (void *frontend, void *backend, void *capture, void *hook, void *control); -ZMQ_EXPORT int zmq_proxy_chain (void **frontends_, void **backends_, void *capture_, void **hooks_, void *control_); typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, zmq_msg_t* msg_, size_t n_, void *data_); typedef struct zmq_proxy_hook_t { diff --git a/src/proxy.cpp b/src/proxy.cpp index 1bdfe844..09788e57 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -119,12 +119,14 @@ forward( int zmq::proxy ( - class socket_base_t **frontend_, - class socket_base_t **backend_, - class socket_base_t *capture_, - class socket_base_t *control_, - zmq::proxy_hook_t **hook_) + class socket_base_t *frontend_, + class socket_base_t *backend_, + class socket_base_t *capture_, + class socket_base_t *control_, + zmq::proxy_hook_t *hook_) { + static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL}; + msg_t msg; int rc = msg.init (); if (rc != 0) @@ -135,41 +137,12 @@ zmq::proxy ( int more; size_t moresz; - size_t n = 0; // number of pair of sockets: the array ends with NULL - for (;; n++) { // counts the number of pair of sockets - if (!frontend_[n] && !backend_[n]) - break; - if (!frontend_[n] || !backend_[n]) { - errno = EFAULT; - return -1; - } - } - if (!n) { - errno = EFAULT; - return -1; - } - // avoid dynamic allocation as we have no guarranty to reach the deallocator => limit the chain length - zmq_assert(n <= ZMQ_PROXY_CHAIN_MAX_LENGTH); - zmq_pollitem_t items [2 * ZMQ_PROXY_CHAIN_MAX_LENGTH + 1]; // +1 for the control socket - static zmq_pollitem_t null_item = { NULL, 0, ZMQ_POLLIN, 0 }; - static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL}; - static zmq::proxy_hook_t* no_hooks[ZMQ_PROXY_CHAIN_MAX_LENGTH]; - if (!hook_) - hook_ = no_hooks; - else - for (size_t i = 0; i < n; i++) - if (!hook_[i]) // Check if a hook is used - hook_[i] = &dummy_hook; - for (size_t i = 0; i < n; i++) { - memcpy(&items[2 * i], &null_item, sizeof(null_item)); - items[2 * i].socket = frontend_[i]; - memcpy(&items[2 * i + 1], &null_item, sizeof(null_item)); - items[2 * i + 1].socket = backend_[i]; - no_hooks[i] = &dummy_hook; - } - memcpy(&items[2 * n], &null_item, sizeof(null_item)); - items[2 * n].socket = control_; - int qt_poll_items = (control_ ? 2 * n + 1 : 2 * n); + zmq_pollitem_t items [] = { + { frontend_, 0, ZMQ_POLLIN, 0 }, + { backend_, 0, ZMQ_POLLIN, 0 }, + { control_, 0, ZMQ_POLLIN, 0 } + }; + int qt_poll_items = (control_ ? 3 : 2); // Proxy can be in these three states enum { @@ -185,7 +158,7 @@ zmq::proxy ( return -1; // Process a control command if any - if (control_ && items [2 * n].revents & ZMQ_POLLIN) { + if (control_ && items [2].revents & ZMQ_POLLIN) { rc = control_->recv (&msg, 0); if (unlikely (rc < 0)) return -1; @@ -214,23 +187,22 @@ zmq::proxy ( zmq_assert (false); } } - - // process each pair of sockets - for (size_t i = 0; i < n; i++) { - // Process a request - if (state == active - && items [2 * i].revents & ZMQ_POLLIN) { - rc = forward(frontend_[i], backend_[i], capture_, msg, hook_[i]->front2back_hook, hook_[i]->data); - if (unlikely (rc < 0)) - return -1; - } - // Process a reply - if (state == active - && items [2 * i + 1].revents & ZMQ_POLLIN) { - rc = forward(backend_[i], frontend_[i], capture_, msg, hook_[i]->back2front_hook, hook_[i]->data); - if (unlikely (rc < 0)) - return -1; - } + // Check if a hook is used + if (!hook_) + hook_ = &dummy_hook; + // Process a request + if (state == active + && items [0].revents & ZMQ_POLLIN) { + rc = forward(frontend_, backend_, capture_, msg, hook_->front2back_hook, hook_->data); + if (unlikely (rc < 0)) + return -1; + } + // Process a reply + if (state == active + && items [1].revents & ZMQ_POLLIN) { + rc = forward(backend_, frontend_, capture_, msg, hook_->back2front_hook, hook_->data); + if (unlikely (rc < 0)) + return -1; } } return 0; diff --git a/src/proxy.hpp b/src/proxy.hpp index fef76c5d..0b9eef84 100644 --- a/src/proxy.hpp +++ b/src/proxy.hpp @@ -32,11 +32,11 @@ namespace zmq }; int proxy ( - class socket_base_t **frontend_, - class socket_base_t **backend_, - class socket_base_t *capture_ = NULL, - class socket_base_t *control_ = NULL, // backward compatibility without this argument - proxy_hook_t **hook_ = NULL // backward compatibility without this argument + class socket_base_t *frontend_, + class socket_base_t *backend_, + class socket_base_t *capture_ = NULL, + class socket_base_t *control_ = NULL, // backward compatibility without this argument + proxy_hook_t *hook_ = NULL // backward compatibility without this argument ); } diff --git a/src/zmq.cpp b/src/zmq.cpp index 400e7fa5..4508eee1 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1025,55 +1025,48 @@ typedef char check_proxy_hook_t_size int zmq_proxy (void *frontend_, void *backend_, void *capture_) { - zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; - zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; + if (!frontend_ || !backend_) { + errno = EFAULT; + return -1; + } return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_, + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, (zmq::socket_base_t*) capture_); } int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_) { - zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; - zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; + if (!frontend_ || !backend_) { + errno = EFAULT; + return -1; + } return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_, + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, (zmq::socket_base_t*) capture_, (zmq::socket_base_t*) control_); } int zmq_proxy_hook (void *frontend_, void *backend_, void *capture_, void *hook_, void *control_) { - zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; - zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; - zmq::proxy_hook_t* hooks_[] = {(zmq::proxy_hook_t*) hook_}; + if (!frontend_ || !backend_) { + errno = EFAULT; + return -1; + } return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_, + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, (zmq::socket_base_t*) capture_, (zmq::socket_base_t*) control_, - (zmq::proxy_hook_t**) hooks_); -} - -int zmq_proxy_chain (void **frontends_, void **backends_, void *capture_, void **hooks_, void *control_) -{ - return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_, - (zmq::socket_base_t*) capture_, - (zmq::socket_base_t*) control_, - (zmq::proxy_hook_t**) hooks_); + (zmq::proxy_hook_t*) hook_); } // The deprecated device functionality int zmq_device (int /* type */, void *frontend_, void *backend_) { - zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; - zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; return zmq::proxy ( - (zmq::socket_base_t**) frontends_, - (zmq::socket_base_t**) backends_); + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_); } diff --git a/tests/Makefile.am b/tests/Makefile.am index 6eeee234..2a4c7808 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -44,7 +44,6 @@ noinst_PROGRAMS = test_system \ test_inproc_connect \ test_issue_566 \ test_proxy \ - test_proxy_chain \ test_abstract_ipc \ test_many_sockets \ test_ipc_wildcard \ @@ -111,7 +110,6 @@ test_conflate_SOURCES = test_conflate.cpp test_inproc_connect_SOURCES = test_inproc_connect.cpp test_issue_566_SOURCES = test_issue_566.cpp test_proxy_SOURCES = test_proxy.cpp -test_proxy_chain_SOURCES = test_proxy_chain.cpp test_abstract_ipc_SOURCES = test_abstract_ipc.cpp test_many_sockets_SOURCES = test_many_sockets.cpp test_ipc_wildcard_SOURCES = test_ipc_wildcard.cpp diff --git a/tests/test_proxy_chain.cpp b/tests/test_proxy_chain.cpp deleted file mode 100644 index 4cf1f746..00000000 --- a/tests/test_proxy_chain.cpp +++ /dev/null @@ -1,279 +0,0 @@ -/* - Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file - - This file is part of 0MQ. - - 0MQ is free software; you can redistribute it and/or modify it under - the terms of the GNU Lesser General Public License as published by - the Free Software Foundation; either version 3 of the License, or - (at your option) any later version. - - 0MQ is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Lesser General Public License for more details. - - You should have received a copy of the GNU Lesser General Public License - along with this program. If not, see . -*/ - -#include "testutil.hpp" -#include "../include/zmq_utils.h" - -// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq -// -// While this example runs in a single process, that is to make -// it easier to start and stop the example. Each task may have its own -// context and conceptually acts as a separate process. To have this -// behaviour, it is necessary to replace the inproc transport of the -// control socket by a tcp transport. - -// This is our client task -// It connects to the server, and then sends a request once per second -// It collects responses as they arrive, and it prints them out. We will -// run several client tasks in parallel, each with a different random ID. - -#define CONTENT_SIZE 13 -#define CONTENT_SIZE_MAX 32 -#define ID_SIZE 10 -#define ID_SIZE_MAX 32 -#define QT_WORKERS 1 -#define QT_CLIENTS 1 -#define is_verbose 0 - -static void -client_task (void *ctx) -{ - void *client = zmq_socket (ctx, ZMQ_DEALER); - assert (client); - - // Control socket receives terminate command from main over inproc - void *control = zmq_socket (ctx, ZMQ_SUB); - assert (control); - int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); - assert (rc == 0); - rc = zmq_connect (control, "inproc://control"); - assert (rc == 0); - - char content [CONTENT_SIZE_MAX]; - // Set random identity to make tracing easier - char identity [ID_SIZE]; - sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF); - rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf - assert (rc == 0); - rc = zmq_connect (client, "tcp://127.0.0.1:9999"); - assert (rc == 0); - - zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; - int request_nbr = 0; - bool run = true; - while (run) { - // Tick once per 200 ms, pulling in arriving messages - int centitick; - for (centitick = 0; centitick < 20; centitick++) { - zmq_poll (items, 2, 10); - if (items [0].revents & ZMQ_POLLIN) { - int rcvmore; - size_t sz = sizeof (rcvmore); - rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0); - assert (rc == CONTENT_SIZE); - if (is_verbose) printf("client receive - identity = %s content = %s\n", identity, content); - // Check that message is still the same - assert (memcmp (content, "request #", 9) == 0); - rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz); - assert (rc == 0); - assert (!rcvmore); - } - if (items [1].revents & ZMQ_POLLIN) { - rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); - if (rc > 0) { - if (is_verbose) { - if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) - content[9] = '\0'; // required to have a clean output since '\0' is not included in the command - printf("client receive - identity = %s command = %s\n", identity, content); - } - if (memcmp (content, "TERMINATE", 9) == 0) { - run = false; - break; - } - } - } - } - sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE - rc = zmq_send (client, content, CONTENT_SIZE, 0); - assert (rc == CONTENT_SIZE); - } - - rc = zmq_close (client); - assert (rc == 0); - rc = zmq_close (control); - assert (rc == 0); -} - -// This is our server task. -// It uses the multithreaded server model to deal requests out to a pool -// of workers and route replies back to clients. One worker can handle -// one request at a time but one client can talk to multiple workers at -// once. - -static void server_worker (void *ctx); - -void -server_task (void *ctx) -{ - // Frontend socket talks to clients over TCP - void *frontend = zmq_socket (ctx, ZMQ_ROUTER); - assert (frontend); - int rc = zmq_bind (frontend, "tcp://127.0.0.1:9999"); - assert (rc == 0); - - // Intermediate 1 - void *intermediate1 = zmq_socket (ctx, ZMQ_DEALER); - assert (intermediate1); - rc = zmq_connect (intermediate1, "inproc://intermediate"); - assert (rc == 0); - - // Intermediate 2 - void *intermediate2 = zmq_socket (ctx, ZMQ_DEALER); - assert (intermediate2); - rc = zmq_bind (intermediate2, "inproc://intermediate"); - assert (rc == 0); - - // Backend socket talks to workers over inproc - void *backend = zmq_socket (ctx, ZMQ_DEALER); - assert (backend); - rc = zmq_bind (backend, "inproc://backend"); - assert (rc == 0); - - // Control socket receives terminate command from main over inproc - void *control = zmq_socket (ctx, ZMQ_SUB); - assert (control); - rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); - assert (rc == 0); - rc = zmq_connect (control, "inproc://control"); - assert (rc == 0); - - // Launch pool of worker threads, precise number is not critical - int thread_nbr; - void* threads [5]; - for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) - threads[thread_nbr] = zmq_threadstart (&server_worker, ctx); - - void* frontends[] = {frontend, intermediate2, NULL}; - void* backends[] = {intermediate1, backend, NULL}; - // Connect backend to frontend via a proxy - zmq_proxy_chain (frontends, backends, NULL, NULL, control); // until TERMINATE is sent on control - - for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) - zmq_threadclose (threads[thread_nbr]); - - rc = zmq_close (frontend); - assert (rc == 0); - rc = zmq_close (intermediate1); - assert (rc == 0); - rc = zmq_close (intermediate2); - assert (rc == 0); - rc = zmq_close (backend); - assert (rc == 0); - rc = zmq_close (control); - assert (rc == 0); -} - -// Each worker task works on one request at a time and sends a random number -// of replies back, with random delays between replies: -// The comments in the first column, if suppressed, makes it a poller version - -static void -server_worker (void *ctx) -{ - void *worker = zmq_socket (ctx, ZMQ_DEALER); - assert (worker); - int rc = zmq_connect (worker, "inproc://backend"); - assert (rc == 0); - - // Control socket receives terminate command from main over inproc - void *control = zmq_socket (ctx, ZMQ_SUB); - assert (control); - rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); - assert (rc == 0); - rc = zmq_connect (control, "inproc://control"); - assert (rc == 0); - - char content [CONTENT_SIZE_MAX]; // bigger than what we need to check that - char identity [ID_SIZE_MAX]; // the size received is the size sent - - bool run = true; - while (run) { - rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) - if (rc > 0) { - if (is_verbose) { - if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) - content[9] = '\0'; // required to have a clean output since '\0' is not included in the command - printf("server_worker receives command = %s\n", content); - } - if (memcmp (content, "TERMINATE", 9) == 0) - run = false; - } - // The DEALER socket gives us the reply envelope and message - // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 - rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT); - if (rc == ID_SIZE) { - rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0); - assert (rc == CONTENT_SIZE); - if (is_verbose) - printf ("server receive - identity = %s content = %s\n", identity, content); - - // Send 0..4 replies back - int reply, replies = 1; //rand() % 5; - for (reply = 0; reply < replies; reply++) { - // Sleep for some fraction of a second - msleep (rand () % 10 + 1); - // Send message from server to client - rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE); - assert (rc == ID_SIZE); - rc = zmq_send (worker, content, CONTENT_SIZE, 0); - assert (rc == CONTENT_SIZE); - } - } - } - rc = zmq_close (worker); - assert (rc == 0); - rc = zmq_close (control); - assert (rc == 0); -} - -// The main thread simply starts several clients and a server, and then -// waits for the server to finish. - -int -main (void) -{ - setup_test_environment (); - - void *ctx = zmq_ctx_new (); - assert (ctx); - // Control socket receives terminate command from main over inproc - void *control = zmq_socket (ctx, ZMQ_PUB); - assert (control); - int rc = zmq_bind (control, "inproc://control"); - assert (rc == 0); - - void *threads [QT_CLIENTS + 1]; - for (int i = 0; i < QT_CLIENTS; i++) - threads[i] = zmq_threadstart (&client_task, ctx); - threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); - - msleep (1000); // Run for 500 ms the standard proxy - rc = zmq_send (control, "TERMINATE", 9, 0); - assert (rc == 9); - - rc = zmq_close (control); - assert (rc == 0); - - for (int i = 0; i < QT_CLIENTS + 1; i++) - zmq_threadclose (threads[i]); - - rc = zmq_ctx_term (ctx); - assert (rc == 0); - return 0; -} From abf9d8b74ea3be11abd68d4016e5ede3c7e5b834 Mon Sep 17 00:00:00 2001 From: Laurent Alebarde Date: Thu, 13 Feb 2014 18:33:52 +0100 Subject: [PATCH 2/3] Revert "add a proxy hook" This reverts commit 9ae6a91fadb96fd48038fde04cc3d4b61b49a8a1. --- doc/zmq_proxy_hook.txt | 200 ----------------------------------------- include/zmq.h | 8 -- src/proxy.cpp | 34 ++----- src/proxy.hpp | 15 +--- src/zmq.cpp | 21 +---- tests/test_proxy.cpp | 93 +++---------------- 6 files changed, 23 insertions(+), 348 deletions(-) delete mode 100644 doc/zmq_proxy_hook.txt diff --git a/doc/zmq_proxy_hook.txt b/doc/zmq_proxy_hook.txt deleted file mode 100644 index e7dbddb2..00000000 --- a/doc/zmq_proxy_hook.txt +++ /dev/null @@ -1,200 +0,0 @@ -zmq_proxy_hook(3) -================= - -NAME ----- -zmq_proxy_hook - start built-in 0MQ proxy with an hook to modify the messages -between the frontend and the backend - - -SYNOPSIS --------- -*int zmq_proxy_hook (const void '*frontend', const void '*backend', - const void '*capture', const void '*hook', const void '*control');* - - -DESCRIPTION ------------ -The _zmq_proxy_hook()_ function starts the built-in 0MQ proxy in the -current application thread, as _zmq_proxy()_ or _zmq_proxy_steerable()_ do. -Please, refer to these functions for the general description and usage. -We describe here only the additional hook provided by the structure "hook" -passed as a fith argument. - -If the hook structure pointer is not NULL, the proxy supports a hook defined as -a structure 'zmq_proxy_hook_t' containing a data pointer to any data type and -the address of two functions of type 'zmq_hook_f'. The first function, -'front2back_hook' is to manipulate the message received from the frontend, before -it is sent to the backend. The second one, 'back2front_hook' is for the way back. - -Both functions receive as an argument in addition to a pointer to the message, the -pointer to the data passed in the 'zmq_proxy_hook_t' structure. This data makes it -possible to manage stateful behaviours in the proxy. They receive also the frame -number n_ which is 1 for the first frame, n for the nth one, 0 for the last one. This -enable to manage specifically the identity frame when ROUTER | STREAM sockets are -concerned. Moreover, to give the hook full capabilities, the three sockets passed -as parameters to the proxy are also provided to the hook functions, enabling to -consume some frames or to add others: - ----- -typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, -zmq_msg_t* msg_, size_t n_, void *data_); -typedef struct zmq_proxy_hook_t { - void *data; - zmq_hook_f front2back_hook; - zmq_hook_f back2front_hook; -} zmq_proxy_hook_t; ----- - -If the hook pointer is NULL, zmq_proxy_hook behaves exactly as if zmq_proxy -or zmq_proxy_steerable had been called. - -Refer to linkzmq:zmq_socket[3] for a description of the available socket types. -Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy. -Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_proxy_steerable. - -EXAMPLE USAGE -------------- - -Filter -~~~~~~ - -The most simple use is to simply filter the messages for example against vulgarity. -Messages are simply scanned against a dictionnary and target words are replaced. - -ROUTER | STREAM / ROUTER | STREAM proxy -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The data field enables to multiplex as desired identities in a ROUTER/ROUTER or in a -STREAM/STREAM proxy or what ever. Such architecture enables also custom load balancers. - -Sticky ROUTER / ROUTER proxy -~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -The data field enables to manage sticky identity pairing in a ROUTER/ROUTER proxy. - -Security mechanism proxying -~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -We expect to be able to proxy CURVE with the use of this feature. - -Tests -~~~~~ - -In an existing application, just change zmq_proxy or zmq_proxy_steerable for -zmq_proxy_hook to test anythink, even "Man in the middle" attacks ws security -mechanisms with a STREAM/STREAM proxy. - - -RETURN VALUE ------------- -The _zmq_proxy_hook()_ function returns the same values than zmq_proxy -or zmq_proxy_steerable in the same conditions of use. - - -EXAMPLE -------- -This simple example aims at uppercasing the traffic between the frontend and the -backend, and lowercasing it on the way back. - -.Setup the hook ----- -struct stats_t { - int qt_upper_case; - int qt_lower_case; -} stats = {NULL, 0, 0}; - -int -upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_) -{ - size_t size = zmq_msg_size(msg_); - if (!size || n_ == 1) return 0; // skip identity and 0 frames - char* message = (char*) zmq_msg_data(msg_); - for (size_t i = 0; i < size; i++) - if ('a' <= message[i] && message[i] <= 'z') - message[i] += 'A' - 'a'; - struct stats_t* stats = (struct stats_t*) stats_; - stats->qt_upper_case++; - return 0; -} - -int -lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_) -{ - size_t size = zmq_msg_size(msg_); - if (!size || n_ == 1) return 0; // skip identity and 0 frames - char* message = (char*) zmq_msg_data(msg_); - for (size_t i = 0; i < size; i++) - if ('A' <= message[i] && message[i] <= 'Z') - message[i] += 'a' - 'A'; - struct stats_t* stats = (struct stats_t*) stats_; - stats->qt_lower_case++; - return 0; -} - -zmq_proxy_hook_t hook = { - &stats, // data used by the hook functions, passed as void* data_ - upper_case, // hook for messages going from frontend to backend - lower_case // hook for messages going from backend to frontend -}; ----- -.in main: ----- -int -main (void) -{ - setup_test_environment (); - void *context = zmq_ctx_new (); - assert (context); - // Create frontend, backend and control sockets - void *frontend = zmq_socket (context, ZMQ_ROUTER); - assert (backend); - void *backend = zmq_socket (context, ZMQ_DEALER); - assert (frontend); - void *control = zmq_socket (context, ZMQ_PUB); - assert (control); - - // Bind sockets to TCP ports - assert (zmq_bind (frontend, "tcp://*:5555") == 0); - assert (zmq_bind (backend, "tcp://*:5556") == 0); - assert (zmq_connect (control, "tcp://*:5557") == 0); - - // Start the queue proxy, which runs until ETERM or "TERMINATE" - // received on the control socket - zmq_proxy_hook (frontend, backend, NULL, &hook, control); - - printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case); - - // close sockets and context - rc = zmq_close (control); - assert (rc == 0); - rc = zmq_close (backend); - assert (rc == 0); - rc = zmq_close (frontend); - assert (rc == 0); - rc = zmq_ctx_term (ctx); - assert (rc == 0); - return 0; -} ----- -.somewhere, the proxy is stopped with: ----- -rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy -assert (rc == 9); ----- -.cf test_proxy.cpp for a full implementation of this test, with clients and workers. - -SEE ALSO --------- -linkzmq:zmq_proxy[3] -linkzmq:zmq_proxy_steerable[3] -linkzmq:zmq_bind[3] -linkzmq:zmq_connect[3] -linkzmq:zmq_socket[3] -linkzmq:zmq[7] - - -AUTHORS -------- -This page was written by the 0MQ community. To make a change please -read the 0MQ Contribution Policy at . diff --git a/include/zmq.h b/include/zmq.h index 3bfc6bd0..524d6aa2 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -400,14 +400,6 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture); ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control); -ZMQ_EXPORT int zmq_proxy_hook (void *frontend, void *backend, void *capture, void *hook, void *control); - -typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, zmq_msg_t* msg_, size_t n_, void *data_); -typedef struct zmq_proxy_hook_t { - void *data; - zmq_hook_f front2back_hook; - zmq_hook_f back2front_hook; -} zmq_proxy_hook_t; /* Encode a binary key as printable text using ZMQ RFC 32 */ ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size); diff --git a/src/proxy.cpp b/src/proxy.cpp index 09788e57..ab57d509 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -53,8 +53,7 @@ // zmq.h must be included *after* poll.h for AIX to build properly #include "../include/zmq.h" -int -capture( +int capture( class zmq::socket_base_t *capture_, zmq::msg_t& msg_, int more_ = 0) @@ -75,18 +74,15 @@ capture( return 0; } -int -forward( +int forward( class zmq::socket_base_t *from_, class zmq::socket_base_t *to_, class zmq::socket_base_t *capture_, - zmq::msg_t& msg_, - zmq::hook_f do_hook_, - void *data_) + zmq::msg_t& msg_) { int more; size_t moresz; - for (size_t n = 1;; n++) { + while (true) { int rc = from_->recv (&msg_, 0); if (unlikely (rc < 0)) return -1; @@ -101,13 +97,6 @@ forward( if (unlikely (rc < 0)) return -1; - // Hook - if (do_hook_) { - rc = (*do_hook_)(from_, to_, capture_, &msg_, more ? n : 0, data_); // first message: n == 1, mth message: n == m, last message: n == 0 - if (unlikely (rc < 0)) - return -1; - } - rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0); if (unlikely (rc < 0)) return -1; @@ -117,16 +106,12 @@ forward( return 0; } -int -zmq::proxy ( +int zmq::proxy ( class socket_base_t *frontend_, class socket_base_t *backend_, class socket_base_t *capture_, - class socket_base_t *control_, - zmq::proxy_hook_t *hook_) + class socket_base_t *control_) { - static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL}; - msg_t msg; int rc = msg.init (); if (rc != 0) @@ -187,20 +172,17 @@ zmq::proxy ( zmq_assert (false); } } - // Check if a hook is used - if (!hook_) - hook_ = &dummy_hook; // Process a request if (state == active && items [0].revents & ZMQ_POLLIN) { - rc = forward(frontend_, backend_, capture_, msg, hook_->front2back_hook, hook_->data); + rc = forward(frontend_, backend_, capture_,msg); if (unlikely (rc < 0)) return -1; } // Process a reply if (state == active && items [1].revents & ZMQ_POLLIN) { - rc = forward(backend_, frontend_, capture_, msg, hook_->back2front_hook, hook_->data); + rc = forward(backend_, frontend_, capture_,msg); if (unlikely (rc < 0)) return -1; } diff --git a/src/proxy.hpp b/src/proxy.hpp index 0b9eef84..c055290b 100644 --- a/src/proxy.hpp +++ b/src/proxy.hpp @@ -22,22 +22,11 @@ namespace zmq { - typedef int (*hook_f)(void *frontend, void *backend, void *capture, void* msg_, size_t n_, void *data_); - - struct proxy_hook_t - { - void *data; - hook_f front2back_hook; - hook_f back2front_hook; - }; - int proxy ( class socket_base_t *frontend_, class socket_base_t *backend_, - class socket_base_t *capture_ = NULL, - class socket_base_t *control_ = NULL, // backward compatibility without this argument - proxy_hook_t *hook_ = NULL // backward compatibility without this argument - ); + class socket_base_t *capture_, + class socket_base_t *control_ = NULL); // backward compatibility without this argument } #endif diff --git a/src/zmq.cpp b/src/zmq.cpp index 4508eee1..25da581a 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1018,11 +1018,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // The proxy functionality -// Compile time check whether proxy_hook_t fits into zmq_proxy_hook_t. -typedef char check_proxy_hook_t_size - [sizeof (zmq::proxy_hook_t) == sizeof (zmq_proxy_hook_t) ? 1 : -1]; - - int zmq_proxy (void *frontend_, void *backend_, void *capture_) { if (!frontend_ || !backend_) { @@ -1048,25 +1043,11 @@ int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void * (zmq::socket_base_t*) control_); } -int zmq_proxy_hook (void *frontend_, void *backend_, void *capture_, void *hook_, void *control_) -{ - if (!frontend_ || !backend_) { - errno = EFAULT; - return -1; - } - return zmq::proxy ( - (zmq::socket_base_t*) frontend_, - (zmq::socket_base_t*) backend_, - (zmq::socket_base_t*) capture_, - (zmq::socket_base_t*) control_, - (zmq::proxy_hook_t*) hook_); -} - // The deprecated device functionality int zmq_device (int /* type */, void *frontend_, void *backend_) { return zmq::proxy ( (zmq::socket_base_t*) frontend_, - (zmq::socket_base_t*) backend_); + (zmq::socket_base_t*) backend_, NULL); } diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp index 5731c0c1..83d95741 100644 --- a/tests/test_proxy.cpp +++ b/tests/test_proxy.cpp @@ -41,48 +41,6 @@ #define QT_CLIENTS 3 #define is_verbose 0 -// Our test Hook that uppercase the message from the frontend to the backend and vice versa -struct stats_t { - void *ctx; // not usefull for the kook itself, but convenient to provide the thread with it without building an additional struct for arguments - int qt_upper_case; - int qt_lower_case; -} stats = {NULL, 0, 0}; - -int -upper_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_) -{ - size_t size = zmq_msg_size(msg_); - if (!size || n_ == 1) return 0; // skip identity and 0 frames - char* message = (char*) zmq_msg_data(msg_); - for (size_t i = 0; i < size; i++) - if ('a' <= message[i] && message[i] <= 'z') - message[i] += 'A' - 'a'; - struct stats_t* stats = (struct stats_t*) stats_; - stats->qt_upper_case++; - return 0; -} - -int -lower_case(void*, void*, void*, zmq_msg_t* msg_, size_t n_, void *stats_) -{ - size_t size = zmq_msg_size(msg_); - if (!size || n_ == 1) return 0; // skip identity and 0 frames - char* message = (char*) zmq_msg_data(msg_); - for (size_t i = 0; i < size; i++) - if ('A' <= message[i] && message[i] <= 'Z') - message[i] += 'a' - 'A'; - struct stats_t* stats = (struct stats_t*) stats_; - stats->qt_lower_case++; - return 0; -} - -zmq_proxy_hook_t hook = { - &stats, // data used by the hook functions if needed, NULL otherwise - upper_case, // hook for messages going from frontend to backend - lower_case // hook for messages going from backend to frontend -}; - - static void client_task (void *ctx) { @@ -128,16 +86,10 @@ client_task (void *ctx) } if (items [1].revents & ZMQ_POLLIN) { rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); - if (rc > 0) { - if (is_verbose) { - if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) - content[9] = '\0'; // required to have a clean output since '\0' is not included in the command - printf("client receive - identity = %s command = %s\n", identity, content); - } - if (memcmp (content, "STOP", 4) == 0) { - run = false; - break; - } + if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content); + if (memcmp (content, "TERMINATE", 9) == 0) { + run = false; + break; } } } @@ -161,11 +113,8 @@ client_task (void *ctx) static void server_worker (void *ctx); void -server_task (void *arg) +server_task (void *ctx) { - zmq_proxy_hook_t* hook = (zmq_proxy_hook_t*) arg; - struct stats_t* stats = (struct stats_t*) hook->data; - void* ctx = stats->ctx; // Frontend socket talks to clients over TCP void *frontend = zmq_socket (ctx, ZMQ_ROUTER); assert (frontend); @@ -193,13 +142,7 @@ server_task (void *arg) threads[thread_nbr] = zmq_threadstart (&server_worker, ctx); // Connect backend to frontend via a proxy - if (is_verbose) - printf("---------- standard proxy ----------\n"); - zmq_proxy_steerable (frontend, backend, NULL, control); // until TERMINATE is sent on control - // Connect backend to frontend via a hooked proxy - if (is_verbose) - printf("---------- hooked proxy ----------\n"); - zmq_proxy_hook (frontend, backend, NULL, hook, control); // until TERMINATE is sent on control + zmq_proxy_steerable (frontend, backend, NULL, control); for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) zmq_threadclose (threads[thread_nbr]); @@ -239,12 +182,9 @@ server_worker (void *ctx) while (run) { rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) if (rc > 0) { - if (is_verbose) { - if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) - content[9] = '\0'; // required to have a clean output since '\0' is not included in the command + if (is_verbose) printf("server_worker receives command = %s\n", content); - } - if (memcmp (content, "STOP", 4) == 0) + if (memcmp (content, "TERMINATE", 9) == 0) run = false; } // The DEALER socket gives us the reply envelope and message @@ -278,8 +218,7 @@ server_worker (void *ctx) // The main thread simply starts several clients and a server, and then // waits for the server to finish. -int -main (void) +int main (void) { setup_test_environment (); @@ -294,19 +233,11 @@ main (void) void *threads [QT_CLIENTS + 1]; for (int i = 0; i < QT_CLIENTS; i++) threads[i] = zmq_threadstart (&client_task, ctx); - stats.ctx = ctx; - threads[QT_CLIENTS] = zmq_threadstart (&server_task, &hook); + threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); + msleep (500); // Run for 500 ms then quit - msleep (500); // Run for 500 ms the standard proxy - rc = zmq_send (control, "TERMINATE", 9, 0); // stops the standard proxy + rc = zmq_send (control, "TERMINATE", 9, 0); assert (rc == 9); - msleep (200); // Run for 200 ms the standard proxy - rc = zmq_send (control, "TERMINATE", 9, 0); // stops the hooked proxy - assert (rc == 9); - rc = zmq_send (control, "STOP", 5, 0); // stops clients and workers (\0 is sent to ease the printf of the verbose mode) - assert (rc == 5); - - if (is_verbose) printf("frontend to backend hook hits = %d\nbackend to frontend hook hits = %d\n", stats.qt_upper_case, stats.qt_lower_case); rc = zmq_close (control); assert (rc == 0); From 3fb800c10013bf6e756bd52e70d060166ec16f48 Mon Sep 17 00:00:00 2001 From: Laurent Alebarde Date: Thu, 13 Feb 2014 18:52:15 +0100 Subject: [PATCH 3/3] fix revert --- doc/Makefile.am | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/Makefile.am b/doc/Makefile.am index 936f6806..d02fbb17 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -9,7 +9,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \ zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \ zmq_errno.3 zmq_strerror.3 zmq_version.3 \ zmq_sendmsg.3 zmq_recvmsg.3 zmq_init.3 zmq_term.3 \ - zmq_proxy.3 zmq_proxy_steerable.3 zmq_proxy_chain.3 zmq_proxy_hook.3 \ + zmq_proxy.3 zmq_proxy_steerable.3 \ zmq_z85_encode.3 zmq_z85_decode.3 zmq_curve_keypair.3 MAN7 = zmq.7 zmq_tcp.7 zmq_pgm.7 zmq_epgm.7 zmq_inproc.7 zmq_ipc.7 \