Merge pull request #712 from lalebarde/master

add proxy control flow and test
This commit is contained in:
Pieter Hintjens 2013-10-21 01:39:09 -07:00
commit 08c91c0f94
11 changed files with 451 additions and 9 deletions

1
.gitignore vendored
View File

@ -69,6 +69,7 @@ tests/test_inproc_connect
tests/test_linger
tests/test_security_null
tests/test_security_plain
tests/test_proxy
tests/test_abstract_ipc
tests/test*.log
tests/test*.trs

View File

@ -53,6 +53,7 @@ Joe Thornber <joe.thornber@gmail.com>
Jon Dyte <jon@totient.co.uk>
Kamil Shakirov <kamils80@gmail.com>
Ken Steele <ken@tilera.com>
Laurent Alebarde <l.alebarde@free.fr>
Marc Rossi <mrossi19@gmail.com>
Martin Hurton <hurtonm@gmail.com>
Martin Lucina <martin@lucina.net>

View File

@ -7,7 +7,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \
zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 \
zmq_getsockopt.3 zmq_setsockopt.3 \
zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \
zmq_errno.3 zmq_strerror.3 zmq_version.3 zmq_proxy.3 \
zmq_errno.3 zmq_strerror.3 zmq_version.3 zmq_proxy.3 zmq_proxy_steerable.3 \
zmq_sendmsg.3 zmq_recvmsg.3 zmq_init.3 zmq_term.3 \
zmq_z85_encode.3 zmq_z85_decode.3 zmq_curve_keypair.3

View File

@ -0,0 +1,93 @@
zmq_proxy_steerable(3)
======================
NAME
----
zmq_proxy_steerable - start built-in 0MQ proxy with STOP/RESUME/TERMINATE
control flow
SYNOPSIS
--------
*int zmq_proxy_steerable (const void '*frontend', const void '*backend',
const void '*capture', const void '*control');*
DESCRIPTION
-----------
The _zmq_proxy_steerable()_ function starts the built-in 0MQ proxy in the
current application thread, as _zmq_proxy()_ do. Please, refer to this function
for the general description and usage. We describe here only the additional
control flow provided by the socket passed as the fourth argument "control".
If the control socket is not NULL, the proxy supports control flow. If
'SUSPEND\0' is received on this socket, the proxy suspends its activities. If
'RESUME\0' is received, it goes on. If 'TERMINATE\0' is received, it terminates
smoothly. At start, the proxy runs normally as if zmq_proxy was used.
If the control socket is NULL, the function behave exactly as if zmq_proxy
had been called.
Refer to linkzmq:zmq_socket[3] for a description of the available socket types.
Refer to linkzmq:zmq_proxy[3] for a description of the zmq_proxy.
EXAMPLE USAGE
-------------
cf zmq_proxy
RETURN VALUE
------------
The _zmq_proxy_steerable()_ function returns 0 if TERMINATE is sent to its
control socket. Otherwise, it returns `-1` and 'errno' set to *ETERM* (the
0MQ 'context' associated with either of the specified sockets was terminated).
EXAMPLE
-------
.Creating a shared queue proxy
----
// Create frontend, backend and control sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
void *control = zmq_socket (context, ZMQ_SUB);
assert (control);
// Bind sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
assert (zmq_connect (control, "tcp://*:5557") == 0);
// Subscribe to the control socket since we have chosen SUB here
assert (zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0));
// Start the queue proxy, which runs until ETERM or "TERMINATE" received on
the control socket
zmq_proxy (frontend, backend, NULL, control);
----
.Set up a controller in another node, process or whatever
----
void *control = zmq_socket (context, ZMQ_PUB);
assert (control);
assert (zmq_bind (control, "tcp://*:5557") == 0);
// stop the proxy
assert (zmq_send (control, "STOP", 5, 0) == 0);
// resume the proxy
assert (zmq_send (control, "RESUME", 7, 0) == 0);
// terminate the proxy
assert (zmq_send (control, "TERMINATE", 10, 0) == 0);
---
SEE ALSO
--------
linkzmq:zmq_proxy[3]
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This page was written by the 0MQ community. To make a change please
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.

View File

@ -396,6 +396,7 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/* Built-in message proxy (3-way) */
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control);
/* Encode a binary key as printable text using ZMQ RFC 32 */
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);

View File

@ -57,7 +57,8 @@
int zmq::proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_)
class socket_base_t *capture_,
class socket_base_t *control_)
{
msg_t msg;
int rc = msg.init ();
@ -71,16 +72,59 @@ int zmq::proxy (
size_t moresz;
zmq_pollitem_t items [] = {
{ frontend_, 0, ZMQ_POLLIN, 0 },
{ backend_, 0, ZMQ_POLLIN, 0 }
{ backend_, 0, ZMQ_POLLIN, 0 },
{ control_, 0, ZMQ_POLLIN, 0 }
};
while (true) {
int qt_poll_items = (control_ ? 3 : 2);
enum {suspend, resume, terminate} state = resume;
while (state != terminate) {
// Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], 2, -1);
rc = zmq_poll (&items [0], qt_poll_items, -1);
if (unlikely (rc < 0))
return -1;
// Process a control command if any
if (control_ && items [2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
moresz = sizeof more;
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0) || more)
return -1;
// Copy message to capture socket if any
if (capture_) {
msg_t ctrl;
rc = ctrl.init ();
if (unlikely (rc < 0))
return -1;
rc = ctrl.copy (msg);
if (unlikely (rc < 0))
return -1;
rc = capture_->send (&ctrl, 0);
if (unlikely (rc < 0))
return -1;
}
// process control command
int size = msg.size();
char* message = (char*) malloc(size + 1);
memcpy(message, msg.data(), size);
message[size] = '\0';
if (size == 8 && !memcmp(message, "SUSPEND", 8))
state = suspend;
else if (size == 7 && !memcmp(message, "RESUME", 7))
state = resume;
else if (size == 10 && !memcmp(message, "TERMINATE", 10))
state = terminate;
else
fprintf(stderr, "Warning : \"%s\" bad command received by proxy\n", message); // prefered compared to "return -1"
free (message);
}
// Process a request
if (items [0].revents & ZMQ_POLLIN) {
if (state == resume && items [0].revents & ZMQ_POLLIN) {
while (true) {
rc = frontend_->recv (&msg, 0);
if (unlikely (rc < 0))
@ -112,7 +156,7 @@ int zmq::proxy (
}
}
// Process a reply
if (items [1].revents & ZMQ_POLLIN) {
if (state == resume && items [1].revents & ZMQ_POLLIN) {
while (true) {
rc = backend_->recv (&msg, 0);
if (unlikely (rc < 0))

View File

@ -25,7 +25,8 @@ namespace zmq
int proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *control_);
class socket_base_t *capture_,
class socket_base_t *control_ = NULL); // backward compatibility without this argument
}
#endif

View File

@ -1016,7 +1016,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// The proxy functionality
int zmq_proxy (void *frontend_, void *backend_, void *control_)
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
{
if (!frontend_ || !backend_) {
errno = EFAULT;
@ -1025,6 +1025,19 @@ int zmq_proxy (void *frontend_, void *backend_, void *control_)
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_);
}
int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
{
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
}
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_,
(zmq::socket_base_t*) control_);
}

View File

@ -39,6 +39,7 @@ noinst_PROGRAMS = test_system \
test_conflate \
test_inproc_connect \
test_issue_566 \
test_proxy \
test_abstract_ipc
if !ON_MINGW
@ -85,6 +86,7 @@ test_req_relaxed_SOURCES = test_req_relaxed.cpp
test_conflate_SOURCES = test_conflate.cpp
test_inproc_connect_SOURCES = test_inproc_connect.cpp
test_issue_566_SOURCES = test_issue_566.cpp
test_proxy_SOURCES = test_proxy.cpp
test_abstract_ipc_SOURCES = test_abstract_ipc.cpp
if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp

267
tests/test_proxy.cpp Normal file
View File

@ -0,0 +1,267 @@
/*
Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "../include/zmq_utils.h"
// Asynchronous client-to-server (DEALER to ROUTER) - pure libzmq
//
// While this example runs in a single process, that is to make
// it easier to start and stop the example. Each task may have its own
// context and conceptually acts as a separate process. To have this
// behaviour, it is necessary to replace the inproc transport of the
// control socket by a tcp transport.
// This is our client task
// It connects to the server, and then sends a request once per second
// It collects responses as they arrive, and it prints them out. We will
// run several client tasks in parallel, each with a different random ID.
#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
#define ID_SIZE 10
#define ID_SIZE_MAX 32
#define QT_WORKERS 5
#define QT_CLIENTS 3
#define is_verbose 0
static void
client_task (void *ctx)
{
// void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket
// assert (ctx);
void *client = zmq_socket (ctx, ZMQ_DEALER);
assert (client);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_SUB);
assert (control);
int rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
assert (rc == 0);
char content [CONTENT_SIZE_MAX];
// Set random identity to make tracing easier
char identity [ID_SIZE];
sprintf (identity, "%04X-%04X", rand() % 0xFFFF, rand() % 0xFFFF);
rc = zmq_setsockopt (client, ZMQ_IDENTITY, identity, ID_SIZE); // includes '\0' as an helper for printf
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:9999");
assert (rc == 0);
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } };
int request_nbr = 0;
bool run = true;
while (run) {
// Tick once per 200 ms, pulling in arriving messages
int centitick;
for (centitick = 0; centitick < 20; centitick++) {
zmq_poll (items, 2, 10);
if (items [0].revents & ZMQ_POLLIN) {
int rcvmore;
size_t sz = sizeof (rcvmore);
rc = zmq_recv (client, content, CONTENT_SIZE_MAX, 0);
assert (rc == CONTENT_SIZE);
if (is_verbose) printf("client receive - identity = %s content = %s\n", identity, content);
// Check that message is still the same
assert (memcmp (content, "request #", 9) == 0);
rc = zmq_getsockopt (client, ZMQ_RCVMORE, &rcvmore, &sz);
assert (rc == 0);
assert (!rcvmore);
}
if (items [1].revents & ZMQ_POLLIN) {
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content);
if (memcmp (content, "TERMINATE", 10) == 0) {
run = false;
break;
}
}
}
sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
rc = zmq_send (client, content, CONTENT_SIZE, 0);
assert (rc == CONTENT_SIZE);
}
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
// rc = zmq_ctx_term (ctx);
// assert (rc == 0);
}
// This is our server task.
// It uses the multithreaded server model to deal requests out to a pool
// of workers and route replies back to clients. One worker can handle
// one request at a time but one client can talk to multiple workers at
// once.
static void server_worker (void *ctx);
void
server_task (void *ctx)
{
// void *ctx = zmq_ctx_new (); // if we want our own context, we shall use tcp instead of inproc for the control socket
// assert (ctx);
// Frontend socket talks to clients over TCP
void *frontend = zmq_socket (ctx, ZMQ_ROUTER);
assert (frontend);
int rc = zmq_bind (frontend, "tcp://127.0.0.1:9999");
assert (rc == 0);
// Backend socket talks to workers over inproc
void *backend = zmq_socket (ctx, ZMQ_DEALER);
assert (backend);
rc = zmq_bind (backend, "inproc://backend");
assert (rc == 0);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_SUB);
assert (control);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
assert (rc == 0);
// Launch pool of worker threads, precise number is not critical
int thread_nbr;
void* threads [5];
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
threads[thread_nbr] = zmq_threadstart (&server_worker, ctx);
// Connect backend to frontend via a proxy
zmq_proxy_steerable (frontend, backend, NULL, control);
for (thread_nbr = 0; thread_nbr < QT_WORKERS; thread_nbr++)
zmq_threadclose (threads[thread_nbr]);
rc = zmq_close (frontend);
assert (rc == 0);
rc = zmq_close (backend);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
// rc = zmq_ctx_term (ctx);
// assert (rc == 0);
}
// Each worker task works on one request at a time and sends a random number
// of replies back, with random delays between replies:
// The comments in the first column, if suppressed, makes it a poller version
static void
server_worker (void *ctx)
{
void *worker = zmq_socket (ctx, ZMQ_DEALER);
assert (worker);
int rc = zmq_connect (worker, "inproc://backend");
assert (rc == 0);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_SUB);
assert (control);
rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
assert (rc == 0);
rc = zmq_connect (control, "inproc://control");
assert (rc == 0);
char content [CONTENT_SIZE_MAX]; // bigger than what we need to check that
char identity [ID_SIZE_MAX]; // the size received is the size sent
// zmq_pollitem_t items [] = { { worker, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; // POLLING
bool run = true;
while (run) {
// zmq_poll (items, 2, 10); // POLLING
// if (items [1].revents & ZMQ_POLLIN) { // POLLING
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
if (rc > 0) {
if (is_verbose) printf("server_worker receives command = %s\n", content);
if (memcmp (content, "TERMINATE", 10) == 0)
run = false;
}
// } // POLLING
// if (items [0].revents & ZMQ_POLLIN) { // POLLING
// The DEALER socket gives us the reply envelope and message
rc = zmq_recv (worker, identity, ID_SIZE_MAX, ZMQ_DONTWAIT); // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
if (rc == ID_SIZE) {
rc = zmq_recv (worker, content, CONTENT_SIZE_MAX, 0);
assert (rc == CONTENT_SIZE);
if (is_verbose) printf("server receive - identity = %s content = %s\n", identity, content);
// Send 0..4 replies back
int reply, replies = rand() % 5;
for (reply = 0; reply < replies; reply++) {
// Sleep for some fraction of a second
msleep (rand () % 10 + 1);
// Send message from server to client
rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
assert (rc == ID_SIZE);
rc = zmq_send (worker, content, CONTENT_SIZE, 0);
assert (rc == CONTENT_SIZE);
}
}
// } // POLLING
}
rc = zmq_close (worker);
assert (rc == 0);
rc = zmq_close (control);
assert (rc == 0);
}
// The main thread simply starts several clients and a server, and then
// waits for the server to finish.
int main (void)
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_PUB);
assert (control);
int rc = zmq_bind (control, "inproc://control");
assert (rc == 0);
void* threads [QT_CLIENTS + 1];
for (int i = 0; i < QT_CLIENTS; i++)
{
threads[i] = zmq_threadstart (&client_task, ctx);
}
threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
msleep (500); // Run for 500 ms then quit
rc = zmq_send (control, "TERMINATE", 10, 0);
assert (rc == 10);
// clean everything
rc = zmq_close (control);
assert (rc == 0);
//msleep (1000); // not sure it is usefull
for (int i = 0; i < QT_CLIENTS + 1; i++)
zmq_threadclose (threads[i]);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}

View File

@ -259,4 +259,23 @@ void setup_test_environment()
#endif
}
// provide portable millisecond sleep
#include <time.h>
#ifdef ZMQ_HAVE_WINDOWS
#include <windows.h>
#else
#include <unistd.h>
#endif
void msleep(int milliseconds)
{ // http://www.cplusplus.com/forum/unices/60161/ http://en.cppreference.com/w/cpp/thread/sleep_for
#ifdef ZMQ_HAVE_WINDOWS
Sleep(milliseconds);
#else
usleep(static_cast<useconds_t>(milliseconds)*1000);
#endif
}
#endif