add proxy_chain, a multi proxies chaining in the same thread feature

This commit is contained in:
Laurent Alebarde
2014-01-27 16:48:18 +01:00
parent fcd9b9506a
commit bc7441f517
7 changed files with 477 additions and 56 deletions

View File

@@ -119,14 +119,12 @@ forward(
int
zmq::proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_,
zmq::proxy_hook_t *hook_)
class socket_base_t **frontend_,
class socket_base_t **backend_,
class socket_base_t *capture_,
class socket_base_t *control_,
zmq::proxy_hook_t **hook_)
{
static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL};
msg_t msg;
int rc = msg.init ();
if (rc != 0)
@@ -137,12 +135,41 @@ zmq::proxy (
int more;
size_t moresz;
zmq_pollitem_t items [] = {
{ frontend_, 0, ZMQ_POLLIN, 0 },
{ backend_, 0, ZMQ_POLLIN, 0 },
{ control_, 0, ZMQ_POLLIN, 0 }
};
int qt_poll_items = (control_ ? 3 : 2);
size_t n = 0; // number of pair of sockets: the array ends with NULL
for (;; n++) { // counts the number of pair of sockets
if (!frontend_[n] && !backend_[n])
break;
if (!frontend_[n] || !backend_[n]) {
errno = EFAULT;
return -1;
}
}
if (!n) {
errno = EFAULT;
return -1;
}
// avoid dynamic allocation as we have no guarranty to reach the deallocator => limit the chain length
zmq_assert(n <= ZMQ_PROXY_CHAIN_MAX_LENGTH);
zmq_pollitem_t items [2 * ZMQ_PROXY_CHAIN_MAX_LENGTH + 1]; // +1 for the control socket
static zmq_pollitem_t null_item = { NULL, 0, ZMQ_POLLIN, 0 };
static zmq::proxy_hook_t dummy_hook = {NULL, NULL, NULL};
static zmq::proxy_hook_t* no_hooks[ZMQ_PROXY_CHAIN_MAX_LENGTH];
if (!hook_)
hook_ = no_hooks;
else
for (size_t i = 0; i < n; i++)
if (!hook_[i]) // Check if a hook is used
hook_[i] = &dummy_hook;
for (size_t i = 0; i < n; i++) {
memcpy(&items[2 * i], &null_item, sizeof(null_item));
items[2 * i].socket = frontend_[i];
memcpy(&items[2 * i + 1], &null_item, sizeof(null_item));
items[2 * i + 1].socket = backend_[i];
no_hooks[i] = &dummy_hook;
}
memcpy(&items[2 * n], &null_item, sizeof(null_item));
items[2 * n].socket = control_;
int qt_poll_items = (control_ ? 2 * n + 1 : 2 * n);
// Proxy can be in these three states
enum {
@@ -158,7 +185,7 @@ zmq::proxy (
return -1;
// Process a control command if any
if (control_ && items [2].revents & ZMQ_POLLIN) {
if (control_ && items [2 * n].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
@@ -187,22 +214,23 @@ zmq::proxy (
zmq_assert (false);
}
}
// Check if a hook is used
if (!hook_)
hook_ = &dummy_hook;
// Process a request
if (state == active
&& items [0].revents & ZMQ_POLLIN) {
rc = forward(frontend_, backend_, capture_, msg, hook_->front2back_hook, hook_->data);
if (unlikely (rc < 0))
return -1;
}
// Process a reply
if (state == active
&& items [1].revents & ZMQ_POLLIN) {
rc = forward(backend_, frontend_, capture_, msg, hook_->back2front_hook, hook_->data);
if (unlikely (rc < 0))
return -1;
// process each pair of sockets
for (size_t i = 0; i < n; i++) {
// Process a request
if (state == active
&& items [2 * i].revents & ZMQ_POLLIN) {
rc = forward(frontend_[i], backend_[i], capture_, msg, hook_[i]->front2back_hook, hook_[i]->data);
if (unlikely (rc < 0))
return -1;
}
// Process a reply
if (state == active
&& items [2 * i + 1].revents & ZMQ_POLLIN) {
rc = forward(backend_[i], frontend_[i], capture_, msg, hook_[i]->back2front_hook, hook_[i]->data);
if (unlikely (rc < 0))
return -1;
}
}
}
return 0;