diff --git a/.gitignore b/.gitignore index 9d0a5563..4a089fac 100644 --- a/.gitignore +++ b/.gitignore @@ -69,6 +69,7 @@ tests/test_inproc_connect tests/test_linger tests/test_security_null tests/test_security_plain +tests/test_proxy tests/test_abstract_ipc tests/test*.log tests/test*.trs diff --git a/AUTHORS b/AUTHORS index 8481813f..88e3ce25 100644 --- a/AUTHORS +++ b/AUTHORS @@ -53,6 +53,7 @@ Joe Thornber Jon Dyte Kamil Shakirov Ken Steele +Laurent Alebarde Marc Rossi Martin Hurton Martin Lucina diff --git a/doc/Makefile.am b/doc/Makefile.am index 3805749c..db826199 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -7,7 +7,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \ zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 \ zmq_getsockopt.3 zmq_setsockopt.3 \ zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \ - zmq_errno.3 zmq_strerror.3 zmq_version.3 zmq_proxy.3 \ + zmq_errno.3 zmq_strerror.3 zmq_version.3 zmq_proxy.3 zmq_proxy_steerable.3 \ zmq_sendmsg.3 zmq_recvmsg.3 zmq_init.3 zmq_term.3 \ zmq_z85_encode.3 zmq_z85_decode.3 zmq_curve_keypair.3 diff --git a/doc/zmq_proxy_steerable.txt b/doc/zmq_proxy_steerable.txt new file mode 100644 index 00000000..b7e69dd9 --- /dev/null +++ b/doc/zmq_proxy_steerable.txt @@ -0,0 +1,93 @@ +zmq_proxy_steerable(3) +====================== + +NAME +---- +zmq_proxy_steerable - start built-in 0MQ proxy with STOP/RESUME/TERMINATE +control flow + + +SYNOPSIS +-------- +*int zmq_proxy_steerable (const void '*frontend', const void '*backend', + const void '*capture', const void '*control');* + + +DESCRIPTION +----------- +The _zmq_proxy_steerable()_ function starts the built-in 0MQ proxy in the +current application thread, as _zmq_proxy()_ do. Please, refer to this function +for the general description and usage. We describe here only the additional +control flow provided by the socket passed as the fourth argument "control". + +If the control socket is not NULL, the proxy supports control flow. If +'SUSPEND\0' is received on this socket, the proxy suspends its activities. If +'RESUME\0' is received, it goes on. If 'TERMINATE\0' is received, it terminates +smoothly. At start, the proxy runs normally as if zmq_proxy was used. + +If the control socket is NULL, the function behave exactly as if zmq_proxy +had been called. + + +Refer to linkzmq:zmq_socket[3] for a description of the available socket types. +Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy. + +EXAMPLE USAGE +------------- +cf zmq_proxy + +RETURN VALUE +------------ +The _zmq_proxy_steerable()_ function returns 0 if TERMINATE is sent to its +control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* (the +0MQ 'context' associated with either of the specified sockets was terminated). + + +EXAMPLE +------- +.Creating a shared queue proxy +---- +// Create frontend, backend and control sockets +void *frontend = zmq_socket (context, ZMQ_ROUTER); +assert (backend); +void *backend = zmq_socket (context, ZMQ_DEALER); +assert (frontend); +void *control = zmq_socket (context, ZMQ_SUB); +assert (control); +// Bind sockets to TCP ports +assert (zmq_bind (frontend, "tcp://*:5555") == 0); +assert (zmq_bind (backend, "tcp://*:5556") == 0); +assert (zmq_connect (control, "tcp://*:5557") == 0); +// Subscribe to the control socket since we have chosen SUB here +assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0)); +// Start the queue proxy, which runs until ETERM or "TERMINATE" received on +the control socket +zmq_proxy (frontend, backend, NULL, control); +---- +.Set up a controller in another node, process or whatever +---- +void *control = zmq_socket (context, ZMQ_PUB); +assert (control); +assert (zmq_bind (control, "tcp://*:5557") == 0); +// stop the proxy +assert (zmq_send (control, "STOP", 5, 0) == 0); +// resume the proxy +assert (zmq_send (control, "RESUME", 7, 0) == 0); +// terminate the proxy +assert (zmq_send (control, "TERMINATE", 10, 0) == 0); +--- + + +SEE ALSO +-------- +linkzmq:zmq_proxy[3] +linkzmq:zmq_bind[3] +linkzmq:zmq_connect[3] +linkzmq:zmq_socket[3] +linkzmq:zmq[7] + + +AUTHORS +------- +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/include/zmq.h b/include/zmq.h index 01e2c031..5ab51a1b 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -396,6 +396,7 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); /* Built-in message proxy (3-way) */ ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture); +ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control); /* Encode a binary key as printable text using ZMQ RFC 32 */ ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size); diff --git a/src/proxy.cpp b/src/proxy.cpp index 1a172c10..f4fdf071 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -57,7 +57,8 @@ int zmq::proxy ( class socket_base_t *frontend_, class socket_base_t *backend_, - class socket_base_t *capture_) + class socket_base_t *capture_, + class socket_base_t *control_) { msg_t msg; int rc = msg.init (); @@ -71,16 +72,59 @@ int zmq::proxy ( size_t moresz; zmq_pollitem_t items [] = { { frontend_, 0, ZMQ_POLLIN, 0 }, - { backend_, 0, ZMQ_POLLIN, 0 } + { backend_, 0, ZMQ_POLLIN, 0 }, + { control_, 0, ZMQ_POLLIN, 0 } }; - while (true) { + int qt_poll_items = (control_ ? 3 : 2); + enum {suspend, resume, terminate} state = resume; + while (state != terminate) { // Wait while there are either requests or replies to process. - rc = zmq_poll (&items [0], 2, -1); + rc = zmq_poll (&items [0], qt_poll_items, -1); if (unlikely (rc < 0)) return -1; + // Process a control command if any + if (control_ && items [2].revents & ZMQ_POLLIN) { + rc = control_->recv (&msg, 0); + if (unlikely (rc < 0)) + return -1; + + moresz = sizeof more; + rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0) || more) + return -1; + + // Copy message to capture socket if any + if (capture_) { + msg_t ctrl; + rc = ctrl.init (); + if (unlikely (rc < 0)) + return -1; + rc = ctrl.copy (msg); + if (unlikely (rc < 0)) + return -1; + rc = capture_->send (&ctrl, 0); + if (unlikely (rc < 0)) + return -1; + } + + // process control command + int size = msg.size(); + char* message = (char*) malloc(size + 1); + memcpy(message, msg.data(), size); + message[size] = '\0'; + if (size == 8 && !memcmp(message, "SUSPEND", 8)) + state = suspend; + else if (size == 7 && !memcmp(message, "RESUME", 7)) + state = resume; + else if (size == 10 && !memcmp(message, "TERMINATE", 10)) + state = terminate; + else + fprintf(stderr, "Warning : \"%s\" bad command received by proxy\n", message); // prefered compared to "return -1" + free (message); + } // Process a request - if (items [0].revents & ZMQ_POLLIN) { + if (state == resume && items [0].revents & ZMQ_POLLIN) { while (true) { rc = frontend_->recv (&msg, 0); if (unlikely (rc < 0)) @@ -112,7 +156,7 @@ int zmq::proxy ( } } // Process a reply - if (items [1].revents & ZMQ_POLLIN) { + if (state == resume && items [1].revents & ZMQ_POLLIN) { while (true) { rc = backend_->recv (&msg, 0); if (unlikely (rc < 0)) diff --git a/src/proxy.hpp b/src/proxy.hpp index eaf164fd..efbdbe58 100644 --- a/src/proxy.hpp +++ b/src/proxy.hpp @@ -25,7 +25,8 @@ namespace zmq int proxy ( class socket_base_t *frontend_, class socket_base_t *backend_, - class socket_base_t *control_); + class socket_base_t *capture_, + class socket_base_t *control_ = NULL); // backward compatibility without this argument } #endif diff --git a/src/zmq.cpp b/src/zmq.cpp index d9116285..daf4f085 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1016,7 +1016,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // The proxy functionality -int zmq_proxy (void *frontend_, void *backend_, void *control_) +int zmq_proxy (void *frontend_, void *backend_, void *capture_) { if (!frontend_ || !backend_) { errno = EFAULT; @@ -1025,6 +1025,19 @@ int zmq_proxy (void *frontend_, void *backend_, void *control_) return zmq::proxy ( (zmq::socket_base_t*) frontend_, (zmq::socket_base_t*) backend_, + (zmq::socket_base_t*) capture_); +} + +int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_) +{ + if (!frontend_ || !backend_) { + errno = EFAULT; + return -1; + } + return zmq::proxy ( + (zmq::socket_base_t*) frontend_, + (zmq::socket_base_t*) backend_, + (zmq::socket_base_t*) capture_, (zmq::socket_base_t*) control_); } diff --git a/tests/Makefile.am b/tests/Makefile.am index b211eb10..54eccea3 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -39,6 +39,7 @@ noinst_PROGRAMS = test_system \ test_conflate \ test_inproc_connect \ test_issue_566 \ + test_proxy \ test_abstract_ipc if !ON_MINGW @@ -85,6 +86,7 @@ test_req_relaxed_SOURCES = test_req_relaxed.cpp test_conflate_SOURCES = test_conflate.cpp test_inproc_connect_SOURCES = test_inproc_connect.cpp test_issue_566_SOURCES = test_issue_566.cpp +test_proxy_SOURCES = test_proxy.cpp test_abstract_ipc_SOURCES = test_abstract_ipc.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp new file mode 100644 index 00000000..a695dad3 --- /dev/null +++ b/tests/test_proxy.cpp @@ -0,0 +1,267 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "../include/zmq_utils.h" + +// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq +// +// While this example runs in a single process, that is to make +// it easier to start and stop the example. Each task may have its own +// context and conceptually acts as a separate process. To have this +// behaviour, it is necessary to replace the inproc transport of the +// control socket by a tcp transport. + +// This is our client task +// It connects to the server, and then sends a request once per second +// It collects responses as they arrive, and it prints them out. We will +// run several client tasks in parallel, each with a different random ID. + +#define CONTENT_SIZE 13 +#define CONTENT_SIZE_MAX 32 +#define ID_SIZE 10 +#define ID_SIZE_MAX 32 +#define QT_WORKERS 5 +#define QT_CLIENTS 3 +#define is_verbose 0 + +static void +client_task (void *ctx) +{ +// void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket +// assert (ctx); + void *client = zmq_socket (ctx, ZMQ_DEALER); + assert (client); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_SUB); + assert (control); + int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_connect (control, "inproc://control"); + assert (rc == 0); + + char content [CONTENT_SIZE_MAX]; + // Set random identity to make tracing easier + char identity [ID_SIZE]; + sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF); + rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf + assert (rc == 0); + rc = zmq_connect (client, "tcp://127.0.0.1:9999"); + assert (rc == 0); + + zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; + int request_nbr = 0; + bool run = true; + while (run) { + // Tick once per 200 ms, pulling in arriving messages + int centitick; + for (centitick = 0; centitick < 20; centitick++) { + zmq_poll (items, 2, 10); + if (items [0].revents & ZMQ_POLLIN) { + int rcvmore; + size_t sz = sizeof (rcvmore); + rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0); + assert (rc == CONTENT_SIZE); + if (is_verbose) printf("client receive - identity = %s content = %s\n", identity, content); + // Check that message is still the same + assert (memcmp (content, "request #", 9) == 0); + rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz); + assert (rc == 0); + assert (!rcvmore); + } + if (items [1].revents & ZMQ_POLLIN) { + rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); + if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content); + if (memcmp (content, "TERMINATE", 10) == 0) { + run = false; + break; + } + } + } + sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE + rc = zmq_send (client, content, CONTENT_SIZE, 0); + assert (rc == CONTENT_SIZE); + } + + rc = zmq_close (client); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); +// rc = zmq_ctx_term (ctx); +// assert (rc == 0); +} + +// This is our server task. +// It uses the multithreaded server model to deal requests out to a pool +// of workers and route replies back to clients. One worker can handle +// one request at a time but one client can talk to multiple workers at +// once. + +static void server_worker (void *ctx); + +void +server_task (void *ctx) +{ +// void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket +// assert (ctx); + + // Frontend socket talks to clients over TCP + void *frontend = zmq_socket (ctx, ZMQ_ROUTER); + assert (frontend); + int rc = zmq_bind (frontend, "tcp://127.0.0.1:9999"); + assert (rc == 0); + + // Backend socket talks to workers over inproc + void *backend = zmq_socket (ctx, ZMQ_DEALER); + assert (backend); + rc = zmq_bind (backend, "inproc://backend"); + assert (rc == 0); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_SUB); + assert (control); + rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_connect (control, "inproc://control"); + assert (rc == 0); + + // Launch pool of worker threads, precise number is not critical + int thread_nbr; + void* threads [5]; + for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) + threads[thread_nbr] = zmq_threadstart (&server_worker, ctx); + + // Connect backend to frontend via a proxy + zmq_proxy_steerable (frontend, backend, NULL, control); + + for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++) + zmq_threadclose (threads[thread_nbr]); + + rc = zmq_close (frontend); + assert (rc == 0); + rc = zmq_close (backend); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); +// rc = zmq_ctx_term (ctx); +// assert (rc == 0); +} + +// Each worker task works on one request at a time and sends a random number +// of replies back, with random delays between replies: +// The comments in the first column, if suppressed, makes it a poller version + +static void +server_worker (void *ctx) +{ + void *worker = zmq_socket (ctx, ZMQ_DEALER); + assert (worker); + int rc = zmq_connect (worker, "inproc://backend"); + assert (rc == 0); + + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_SUB); + assert (control); + rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0); + rc = zmq_connect (control, "inproc://control"); + assert (rc == 0); + + char content [CONTENT_SIZE_MAX]; // bigger than what we need to check that + char identity [ID_SIZE_MAX]; // the size received is the size sent + +// zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; // POLLING + bool run = true; + while (run) { +// zmq_poll (items, 2, 10); // POLLING +// if (items [1].revents & ZMQ_POLLIN) { // POLLING + rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) + if (rc > 0) { + if (is_verbose) printf("server_worker receives command = %s\n", content); + if (memcmp (content, "TERMINATE", 10) == 0) + run = false; + } +// } // POLLING + +// if (items [0].revents & ZMQ_POLLIN) { // POLLING + // The DEALER socket gives us the reply envelope and message + rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT); // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 + if (rc == ID_SIZE) { + rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0); + assert (rc == CONTENT_SIZE); + if (is_verbose) printf("server receive - identity = %s content = %s\n", identity, content); + + // Send 0..4 replies back + int reply, replies = rand() % 5; + for (reply = 0; reply < replies; reply++) { + // Sleep for some fraction of a second + msleep (rand () % 10 + 1); + // Send message from server to client + rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE); + assert (rc == ID_SIZE); + rc = zmq_send (worker, content, CONTENT_SIZE, 0); + assert (rc == CONTENT_SIZE); + } + } +// } // POLLING + } + rc = zmq_close (worker); + assert (rc == 0); + rc = zmq_close (control); + assert (rc == 0); +} + +// The main thread simply starts several clients and a server, and then +// waits for the server to finish. + +int main (void) +{ + setup_test_environment (); + + void *ctx = zmq_ctx_new (); + assert (ctx); + // Control socket receives terminate command from main over inproc + void *control = zmq_socket (ctx, ZMQ_PUB); + assert (control); + int rc = zmq_bind (control, "inproc://control"); + assert (rc == 0); + + void* threads [QT_CLIENTS + 1]; + for (int i = 0; i < QT_CLIENTS; i++) + { + threads[i] = zmq_threadstart (&client_task, ctx); + } + threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); + msleep (500); // Run for 500 ms then quit + + rc = zmq_send (control, "TERMINATE", 10, 0); + assert (rc == 10); + + // clean everything + rc = zmq_close (control); + assert (rc == 0); + //msleep (1000); // not sure it is usefull + + for (int i = 0; i < QT_CLIENTS + 1; i++) + zmq_threadclose (threads[i]); + rc = zmq_ctx_term (ctx); + assert (rc == 0); + return 0; +} diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 6a9fb1a4..23ef7cbb 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -259,4 +259,23 @@ void setup_test_environment() #endif } +// provide portable millisecond sleep +#include + +#ifdef ZMQ_HAVE_WINDOWS +#include +#else +#include +#endif + +void msleep(int milliseconds) +{ // http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for + #ifdef ZMQ_HAVE_WINDOWS + Sleep(milliseconds); + #else + usleep(static_cast(milliseconds)*1000); + #endif +} + + #endif