diff --git a/doc/zmq_proxy_chain.txt b/doc/zmq_proxy_chain.txt new file mode 100644 index 00000000..bb2f3382 --- /dev/null +++ b/doc/zmq_proxy_chain.txt @@ -0,0 +1,103 @@ +zmq_proxy_chain(3) +================== + +NAME +---- +zmq_proxy_chain - start built-in 0MQ a proxy chain in the same thread +control flow + + +SYNOPSIS +-------- +*int zmq_proxy_chain (const void '**frontends', const void '**backends', + const void '*capture', const void **hooks_, const void '*control');* + + +DESCRIPTION +----------- +The _zmq_proxy_chain()_ function starts the built-in 0MQ proxy in the +current application thread, as _zmq_proxy()_, _zmq_proxy_steerable()_, or +_zmq_proxy_hook()_ do. Please, refer to these functions for their general +description and usage. We describe here only the additional proxy chaining +capability. + +Note that compared to the other proxy functions, the arguments _frontends_, +_backends_ and _hooks_ receive arrays instead of single values. Say one need +to implement the following architecture: + +*Process client proxy1 proxy2 worker* + | |-----------| |----------| | +*socket* cl f1 b1 f2 b2 wk +*endpoint* |c----e1-----b| |c----e2-----b| |c----e3----b| + +Note: "c" is for connect, "b" for bind. + +With the other proxy functions, one needs typically one thread for each proxy: +---- +thread 1: zmq_proxy(f1, b1); +thread 2: zmq_proxy(f2, b2); +---- + +With _zmq_proxy_chain_, it can be performed with only one thread: +---- +void** f = {f1, f2, NULL); +void** b = {b1, b2, NULL); +single thread: zmq_proxy_chain(f, b, NULL, NULL, NULL); +---- + +Note: the three NULL arguments are for capture, hooks, and control, since +_zmq_proxy_chain_ is built on top of _zmq_proxy_hook_, itself built on top +of _zmq_proxy_steerable_, itself built on top of _zmq_proxy_. Of course, hook and +steering features can be used along with chaining. + +We have limited the number of sockets that can be chained in a single command to 10, what +should be largely sufficient. The reason is to avoid dynamic memory allocation. + +Arguments frontends and backends shall be arrays of sockets of type void*, terminated +by NULL. Both arrays shall terminate by NULL at the same indice, otherwise, an error is +returned. Argument hooks shall be NULL or of the same length than the socket arrays. No +NULL is required at the end of hooks. Any number of elements may be NULL where no hook +is implemented in some proxies. + + +Refer to linkzmq:zmq_socket[3] for a description of the available socket types. +Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy. +Refer to linkzmq:zmq_proxy_steerable[3] for a description of the zmq_steerable. +Refer to linkzmq:zmq_proxy_hook[3] for a description of the zmq_hook. + +EXAMPLE USAGE +------------- +_zmq_proxy_chain_ aims at building protocol layers by easing the chaining of some +proxies typically by chaining: +DEALER | ROUTER <---> STREAM <---> DEALER +in the same thread. Any kind of protocol feature can be added via hooks. + +cf also zmq_proxy, zmq_proxy_steerable, zmq_proxy_hook. + +RETURN VALUE +------------ +The _zmq_proxy_chain()_ function returns 0 if TERMINATE is sent to its +control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* (the +0MQ 'context' associated with either of the specified sockets was terminated). + + +EXAMPLE +------- +cf test_proxy_chain.cpp +An example capable of proxying CURVE will be added soon. + +SEE ALSO +-------- +linkzmq:zmq_proxy[3] +linkzmq:zmq_proxy_steerable[3] +linkzmq:zmq_proxy_hook[3] +linkzmq:zmq_bind[3] +linkzmq:zmq_connect[3] +linkzmq:zmq_socket[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/include/zmq.h b/include/zmq.h index 3bfc6bd0..961c2836 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -398,9 +398,11 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); /* Built-in message proxy (3-way) */ +#define ZMQ_PROXY_CHAIN_MAX_LENGTH 10 ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture); ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control); ZMQ_EXPORT int zmq_proxy_hook (void *frontend, void *backend, void *capture, void *hook, void *control); +ZMQ_EXPORT int zmq_proxy_chain (void **frontends_, void **backends_, void *capture_, void **hooks_, void *control_); typedef int (*zmq_hook_f)(void *frontend, void *backend, void *capture, zmq_msg_t* msg_, size_t n_, void *data_); typedef struct zmq_proxy_hook_t { diff --git a/src/proxy.cpp b/src/proxy.cpp index 09788e57..1bdfe844 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -119,14 +119,12 @@ forward( int zmq::proxy ( - class socket_base_t *frontend_, - class socket_base_t *backend_, - class socket_base_t *capture_, - class socket_base_t *control_, - zmq::proxy_hook_t *hook_) + class socket_base_t **frontend_, + class socket_base_t **backend_, + class socket_base_t *capture_, + class socket_base_t *control_, + zmq::proxy_hook_t **hook_) { - static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL}; - msg_t msg; int rc = msg.init (); if (rc != 0) @@ -137,12 +135,41 @@ zmq::proxy ( int more; size_t moresz; - zmq_pollitem_t items [] = { - { frontend_, 0, ZMQ_POLLIN, 0 }, - { backend_, 0, ZMQ_POLLIN, 0 }, - { control_, 0, ZMQ_POLLIN, 0 } - }; - int qt_poll_items = (control_ ? 3 : 2); + size_t n = 0; // number of pair of sockets: the array ends with NULL + for (;; n++) { // counts the number of pair of sockets + if (!frontend_[n] && !backend_[n]) + break; + if (!frontend_[n] || !backend_[n]) { + errno = EFAULT; + return -1; + } + } + if (!n) { + errno = EFAULT; + return -1; + } + // avoid dynamic allocation as we have no guarranty to reach the deallocator => limit the chain length + zmq_assert(n <= ZMQ_PROXY_CHAIN_MAX_LENGTH); + zmq_pollitem_t items [2 * ZMQ_PROXY_CHAIN_MAX_LENGTH + 1]; // +1 for the control socket + static zmq_pollitem_t null_item = { NULL, 0, ZMQ_POLLIN, 0 }; + static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL}; + static zmq::proxy_hook_t* no_hooks[ZMQ_PROXY_CHAIN_MAX_LENGTH]; + if (!hook_) + hook_ = no_hooks; + else + for (size_t i = 0; i < n; i++) + if (!hook_[i]) // Check if a hook is used + hook_[i] = &dummy_hook; + for (size_t i = 0; i < n; i++) { + memcpy(&items[2 * i], &null_item, sizeof(null_item)); + items[2 * i].socket = frontend_[i]; + memcpy(&items[2 * i + 1], &null_item, sizeof(null_item)); + items[2 * i + 1].socket = backend_[i]; + no_hooks[i] = &dummy_hook; + } + memcpy(&items[2 * n], &null_item, sizeof(null_item)); + items[2 * n].socket = control_; + int qt_poll_items = (control_ ? 2 * n + 1 : 2 * n); // Proxy can be in these three states enum { @@ -158,7 +185,7 @@ zmq::proxy ( return -1; // Process a control command if any - if (control_ && items [2].revents & ZMQ_POLLIN) { + if (control_ && items [2 * n].revents & ZMQ_POLLIN) { rc = control_->recv (&msg, 0); if (unlikely (rc < 0)) return -1; @@ -187,22 +214,23 @@ zmq::proxy ( zmq_assert (false); } } - // Check if a hook is used - if (!hook_) - hook_ = &dummy_hook; - // Process a request - if (state == active - && items [0].revents & ZMQ_POLLIN) { - rc = forward(frontend_, backend_, capture_, msg, hook_->front2back_hook, hook_->data); - if (unlikely (rc < 0)) - return -1; - } - // Process a reply - if (state == active - && items [1].revents & ZMQ_POLLIN) { - rc = forward(backend_, frontend_, capture_, msg, hook_->back2front_hook, hook_->data); - if (unlikely (rc < 0)) - return -1; + + // process each pair of sockets + for (size_t i = 0; i < n; i++) { + // Process a request + if (state == active + && items [2 * i].revents & ZMQ_POLLIN) { + rc = forward(frontend_[i], backend_[i], capture_, msg, hook_[i]->front2back_hook, hook_[i]->data); + if (unlikely (rc < 0)) + return -1; + } + // Process a reply + if (state == active + && items [2 * i + 1].revents & ZMQ_POLLIN) { + rc = forward(backend_[i], frontend_[i], capture_, msg, hook_[i]->back2front_hook, hook_[i]->data); + if (unlikely (rc < 0)) + return -1; + } } } return 0; diff --git a/src/proxy.hpp b/src/proxy.hpp index 0b9eef84..fef76c5d 100644 --- a/src/proxy.hpp +++ b/src/proxy.hpp @@ -32,11 +32,11 @@ namespace zmq }; int proxy ( - class socket_base_t *frontend_, - class socket_base_t *backend_, - class socket_base_t *capture_ = NULL, - class socket_base_t *control_ = NULL, // backward compatibility without this argument - proxy_hook_t *hook_ = NULL // backward compatibility without this argument + class socket_base_t **frontend_, + class socket_base_t **backend_, + class socket_base_t *capture_ = NULL, + class socket_base_t *control_ = NULL, // backward compatibility without this argument + proxy_hook_t **hook_ = NULL // backward compatibility without this argument ); } diff --git a/src/zmq.cpp b/src/zmq.cpp index 4508eee1..400e7fa5 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1025,48 +1025,55 @@ typedef char check_proxy_hook_t_size int zmq_proxy (void *frontend_, void *backend_, void *capture_) { - if (!frontend_ || !backend_) { - errno = EFAULT; - return -1; - } + zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; + zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; return zmq::proxy ( - (zmq::socket_base_t*) frontend_, - (zmq::socket_base_t*) backend_, + (zmq::socket_base_t**) frontends_, + (zmq::socket_base_t**) backends_, (zmq::socket_base_t*) capture_); } int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_) { - if (!frontend_ || !backend_) { - errno = EFAULT; - return -1; - } + zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; + zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; return zmq::proxy ( - (zmq::socket_base_t*) frontend_, - (zmq::socket_base_t*) backend_, + (zmq::socket_base_t**) frontends_, + (zmq::socket_base_t**) backends_, (zmq::socket_base_t*) capture_, (zmq::socket_base_t*) control_); } int zmq_proxy_hook (void *frontend_, void *backend_, void *capture_, void *hook_, void *control_) { - if (!frontend_ || !backend_) { - errno = EFAULT; - return -1; - } + zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; + zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; + zmq::proxy_hook_t* hooks_[] = {(zmq::proxy_hook_t*) hook_}; return zmq::proxy ( - (zmq::socket_base_t*) frontend_, - (zmq::socket_base_t*) backend_, + (zmq::socket_base_t**) frontends_, + (zmq::socket_base_t**) backends_, (zmq::socket_base_t*) capture_, (zmq::socket_base_t*) control_, - (zmq::proxy_hook_t*) hook_); + (zmq::proxy_hook_t**) hooks_); +} + +int zmq_proxy_chain (void **frontends_, void **backends_, void *capture_, void **hooks_, void *control_) +{ + return zmq::proxy ( + (zmq::socket_base_t**) frontends_, + (zmq::socket_base_t**) backends_, + (zmq::socket_base_t*) capture_, + (zmq::socket_base_t*) control_, + (zmq::proxy_hook_t**) hooks_); } // The deprecated device functionality int zmq_device (int /* type */, void *frontend_, void *backend_) { + zmq::socket_base_t* frontends_[] = {(zmq::socket_base_t*) frontend_, NULL}; + zmq::socket_base_t* backends_[] = {(zmq::socket_base_t*) backend_, NULL}; return zmq::proxy ( - (zmq::socket_base_t*) frontend_, - (zmq::socket_base_t*) backend_); + (zmq::socket_base_t**) frontends_, + (zmq::socket_base_t**) backends_); } diff --git a/tests/Makefile.am b/tests/Makefile.am index 2a4c7808..6eeee234 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -44,6 +44,7 @@ noinst_PROGRAMS = test_system \ test_inproc_connect \ test_issue_566 \ test_proxy \ + test_proxy_chain \ test_abstract_ipc \ test_many_sockets \ test_ipc_wildcard \ @@ -110,6 +111,7 @@ test_conflate_SOURCES = test_conflate.cpp test_inproc_connect_SOURCES = test_inproc_connect.cpp test_issue_566_SOURCES = test_issue_566.cpp test_proxy_SOURCES = test_proxy.cpp +test_proxy_chain_SOURCES = test_proxy_chain.cpp test_abstract_ipc_SOURCES = test_abstract_ipc.cpp test_many_sockets_SOURCES = test_many_sockets.cpp test_ipc_wildcard_SOURCES = test_ipc_wildcard.cpp diff --git a/tests/test_proxy_chain.cpp b/tests/test_proxy_chain.cpp new file mode 100644 index 00000000..4cf1f746 --- /dev/null +++ b/tests/test_proxy_chain.cpp @@ -0,0 +1,279 @@ +/* + Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "../include/zmq_utils.h" + +// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq +// +// While this example runs in a single process, that is to make +// it easier to start and stop the example. Each task may have its own +// context and conceptually acts as a separate process. To have this +// behaviour, it is necessary to replace the inproc transport of the +// control socket by a tcp transport. + +// This is our client task +// It connects to the server, and then sends a request once per second +// It collects responses as they arrive, and it prints them out. We will +// run several client tasks in parallel, each with a different random ID. + +#define CONTENT_SIZE 13 +#define CONTENT_SIZE_MAX 32 +#define ID_SIZE 10 +#define ID_SIZE_MAX 32 +#define QT_WORKERS 1 +#define QT_CLIENTS 1 +#define is_verbose 0 + +static void +client_task (void *ctx) +{ + void *client = zmq_socket (ctx, ZMQ_DEALER); + assert (client); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_SUB); + assert (control); + int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_connect (control, "inproc://control"); + assert (rc == 0); + + char content [CONTENT_SIZE_MAX]; + // Set random identity to make tracing easier + char identity [ID_SIZE]; + sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF); + rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf + assert (rc == 0); + rc = zmq_connect (client, "tcp://127.0.0.1:9999"); + assert (rc == 0); + + zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; + int request_nbr = 0; + bool run = true; + while (run) { + // Tick once per 200 ms, pulling in arriving messages + int centitick; + for (centitick = 0; centitick < 20; centitick++) { + zmq_poll (items, 2, 10); + if (items [0].revents & ZMQ_POLLIN) { + int rcvmore; + size_t sz = sizeof (rcvmore); + rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0); + assert (rc == CONTENT_SIZE); + if (is_verbose) printf("client receive - identity = %s content = %s\n", identity, content); + // Check that message is still the same + assert (memcmp (content, "request #", 9) == 0); + rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (!rcvmore); + } + if (items [1].revents & ZMQ_POLLIN) { + rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); + if (rc > 0) { + if (is_verbose) { + if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) + content[9] = '\0'; // required to have a clean output since '\0' is not included in the command + printf("client receive - identity = %s command = %s\n", identity, content); + } + if (memcmp (content, "TERMINATE", 9) == 0) { + run = false; + break; + } + } + } + } + sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE + rc = zmq_send (client, content, CONTENT_SIZE, 0); + assert (rc == CONTENT_SIZE); + } + + rc = zmq_close (client); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); +} + +// This is our server task. +// It uses the multithreaded server model to deal requests out to a pool +// of workers and route replies back to clients. One worker can handle +// one request at a time but one client can talk to multiple workers at +// once. + +static void server_worker (void *ctx); + +void +server_task (void *ctx) +{ + // Frontend socket talks to clients over TCP + void *frontend = zmq_socket (ctx, ZMQ_ROUTER); + assert (frontend); + int rc = zmq_bind (frontend, "tcp://127.0.0.1:9999"); + assert (rc == 0); + + // Intermediate 1 + void *intermediate1 = zmq_socket (ctx, ZMQ_DEALER); + assert (intermediate1); + rc = zmq_connect (intermediate1, "inproc://intermediate"); + assert (rc == 0); + + // Intermediate 2 + void *intermediate2 = zmq_socket (ctx, ZMQ_DEALER); + assert (intermediate2); + rc = zmq_bind (intermediate2, "inproc://intermediate"); + assert (rc == 0); + + // Backend socket talks to workers over inproc + void *backend = zmq_socket (ctx, ZMQ_DEALER); + assert (backend); + rc = zmq_bind (backend, "inproc://backend"); + assert (rc == 0); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_SUB); + assert (control); + rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_connect (control, "inproc://control"); + assert (rc == 0); + + // Launch pool of worker threads, precise number is not critical + int thread_nbr; + void* threads [5]; + for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) + threads[thread_nbr] = zmq_threadstart (&server_worker, ctx); + + void* frontends[] = {frontend, intermediate2, NULL}; + void* backends[] = {intermediate1, backend, NULL}; + // Connect backend to frontend via a proxy + zmq_proxy_chain (frontends, backends, NULL, NULL, control); // until TERMINATE is sent on control + + for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) + zmq_threadclose (threads[thread_nbr]); + + rc = zmq_close (frontend); + assert (rc == 0); + rc = zmq_close (intermediate1); + assert (rc == 0); + rc = zmq_close (intermediate2); + assert (rc == 0); + rc = zmq_close (backend); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); +} + +// Each worker task works on one request at a time and sends a random number +// of replies back, with random delays between replies: +// The comments in the first column, if suppressed, makes it a poller version + +static void +server_worker (void *ctx) +{ + void *worker = zmq_socket (ctx, ZMQ_DEALER); + assert (worker); + int rc = zmq_connect (worker, "inproc://backend"); + assert (rc == 0); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_SUB); + assert (control); + rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_connect (control, "inproc://control"); + assert (rc == 0); + + char content [CONTENT_SIZE_MAX]; // bigger than what we need to check that + char identity [ID_SIZE_MAX]; // the size received is the size sent + + bool run = true; + while (run) { + rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) + if (rc > 0) { + if (is_verbose) { + if (rc == 9 && memcmp(content, "TERMINATE", 9) == 0) + content[9] = '\0'; // required to have a clean output since '\0' is not included in the command + printf("server_worker receives command = %s\n", content); + } + if (memcmp (content, "TERMINATE", 9) == 0) + run = false; + } + // The DEALER socket gives us the reply envelope and message + // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 + rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT); + if (rc == ID_SIZE) { + rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0); + assert (rc == CONTENT_SIZE); + if (is_verbose) + printf ("server receive - identity = %s content = %s\n", identity, content); + + // Send 0..4 replies back + int reply, replies = 1; //rand() % 5; + for (reply = 0; reply < replies; reply++) { + // Sleep for some fraction of a second + msleep (rand () % 10 + 1); + // Send message from server to client + rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE); + assert (rc == ID_SIZE); + rc = zmq_send (worker, content, CONTENT_SIZE, 0); + assert (rc == CONTENT_SIZE); + } + } + } + rc = zmq_close (worker); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); +} + +// The main thread simply starts several clients and a server, and then +// waits for the server to finish. + +int +main (void) +{ + setup_test_environment (); + + void *ctx = zmq_ctx_new (); + assert (ctx); + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_PUB); + assert (control); + int rc = zmq_bind (control, "inproc://control"); + assert (rc == 0); + + void *threads [QT_CLIENTS + 1]; + for (int i = 0; i < QT_CLIENTS; i++) + threads[i] = zmq_threadstart (&client_task, ctx); + threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); + + msleep (1000); // Run for 500 ms the standard proxy + rc = zmq_send (control, "TERMINATE", 9, 0); + assert (rc == 9); + + rc = zmq_close (control); + assert (rc == 0); + + for (int i = 0; i < QT_CLIENTS + 1; i++) + zmq_threadclose (threads[i]); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + return 0; +}