diff --git a/src/proxy.cpp b/src/proxy.cpp index 6496214e..227a2970 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -52,6 +52,30 @@ #include "socket_base.hpp" #include "err.hpp" +#ifdef ZMQ_HAVE_POLLER + +#include "socket_poller.hpp" + +// Macros for repetitive code. + +// PROXY_CLEANUP() must not be used before these variables are initialized. +#define PROXY_CLEANUP()\ + delete poller_all;\ + delete poller_in;\ + delete poller_control;\ + delete poller_receive_blocked;\ + delete poller_send_blocked;\ + delete poller_both_blocked;\ + + +#define CHECK_RC_EXIT_ON_FAILURE()\ + if (rc < 0) {\ + PROXY_CLEANUP();\ + return close_and_return (&msg, -1);\ + } + +#endif // ZMQ_HAVE_POLLER + int capture ( class zmq::socket_base_t *capture_, zmq::msg_t& msg_, @@ -105,6 +129,258 @@ int forward ( return 0; } +#ifdef ZMQ_HAVE_POLLER + +int zmq::proxy ( + class socket_base_t *frontend_, + class socket_base_t *backend_, + class socket_base_t *capture_, + class socket_base_t *control_) +{ + msg_t msg; + int rc = msg.init (); + if (rc != 0) + return -1; + + // The algorithm below assumes ratio of requests and replies processed + // under full load to be 1:1. + + int more; + size_t moresz = sizeof (more); + + // Proxy can be in these three states + enum { + active, + paused, + terminated + } state = active; + + bool frontend_equal_to_backend; + bool frontend_in = false; + bool frontend_out = false; + bool backend_in = false; + bool backend_out = false; + bool control_in = false; + zmq::socket_poller_t::event_t events [3]; + + // Don't allocate these pollers from stack because they will take more than 900 kB of stack! + // On Windows this blows up default stack of 1 MB and aborts the program. + // I wanted to use std::shared_ptr here as the best solution but that requires C++11... + zmq::socket_poller_t *poller_all = new (std::nothrow) zmq::socket_poller_t; // Poll for everything. + zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop. + zmq::socket_poller_t *poller_control = new (std::nothrow) zmq::socket_poller_t; // Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused. + zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'. + + // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored. + // In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'. + // We also don't need 'poller_both_blocked', no need to initialize it. + // We save some RAM and time for initialization. + zmq::socket_poller_t *poller_send_blocked = NULL; // All except 'ZMQ_POLLIN' on 'backend_'. + zmq::socket_poller_t *poller_both_blocked = NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. + + if (frontend_ != backend_) { + poller_send_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'. + poller_both_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. + frontend_equal_to_backend = false; + } else + frontend_equal_to_backend = true; + + if (poller_all == NULL || poller_in == NULL || poller_control == NULL || poller_receive_blocked == NULL + || ((poller_send_blocked == NULL || poller_both_blocked == NULL) && !frontend_equal_to_backend)) { + PROXY_CLEANUP (); + return close_and_return (&msg, -1); + } + + zmq::socket_poller_t *poller_wait = poller_in; // Poller for blocking wait, initially all 'ZMQ_POLLIN'. + + // Register 'frontend_' and 'backend_' with pollers. + rc = poller_all->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // Everything. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's. + CHECK_RC_EXIT_ON_FAILURE (); + + if (frontend_equal_to_backend) { + // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, + // so we don't need 'poller_send_blocked'. We need only 'poller_receive_blocked'. + // We also don't need 'poller_both_blocked', no need to initialize it. + rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT); + CHECK_RC_EXIT_ON_FAILURE (); + } else { + rc = poller_all->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // Everything. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_both_blocked->add (frontend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_both_blocked->add (backend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_send_blocked->add (backend_, NULL, ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_send_blocked->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_receive_blocked->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'. + CHECK_RC_EXIT_ON_FAILURE (); + } + + // Register 'control_' with pollers. + if (control_ != NULL) { + rc = poller_all->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_in->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_control->add (control_, NULL, ZMQ_POLLIN); // When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + if (!frontend_equal_to_backend) { + rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + } + } + + + int i; + bool request_processed, reply_processed; + + + while (state != terminated) { + + // Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'. + // If one of receiving end's queue is full ('ZMQ_POLLOUT' not available), + // 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'. + rc = poller_wait->wait (events, 3, -1); + if (rc < 0 && errno == ETIMEDOUT) + rc = 0; + CHECK_RC_EXIT_ON_FAILURE (); + + // Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking. + rc = poller_all->wait (events, 3, 0); + if (rc < 0 && errno == ETIMEDOUT) + rc = 0; + CHECK_RC_EXIT_ON_FAILURE (); + + // Process events. + for (i = 0; i < rc; i++) { + if (events [i].socket == frontend_) { + frontend_in = (events [i].events & ZMQ_POLLIN) != 0; + frontend_out = (events [i].events & ZMQ_POLLOUT) != 0; + } else + // This 'if' needs to be after check for 'frontend_' in order never + // to be reached in case frontend_==backend_, so we ensure backend_in=false in that case. + if (events [i].socket == backend_) { + backend_in = (events [i].events & ZMQ_POLLIN) != 0; + backend_out = (events [i].events & ZMQ_POLLOUT) != 0; + } else + if (events [i].socket == control_) + control_in = (events [i].events & ZMQ_POLLIN) != 0; + } + + + // Process a control command if any. + if (control_in) { + rc = control_->recv (&msg, 0); + CHECK_RC_EXIT_ON_FAILURE (); + rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0) || more) { + PROXY_CLEANUP (); + return close_and_return (&msg, -1); + } + + // Copy message to capture socket if any. + rc = capture (capture_, msg); + CHECK_RC_EXIT_ON_FAILURE (); + + if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) { + state = paused; + poller_wait = poller_control; + } else + if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0) { + state = active; + poller_wait = poller_in; + } else + if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) + state = terminated; + else { + // This is an API error, we assert + puts ("E: invalid command sent to proxy"); + zmq_assert (false); + } + control_in = false; + } + + if (state == active) { + + // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'. + // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event. + if (frontend_in && (backend_out || frontend_equal_to_backend)) { + rc = forward (frontend_, backend_, capture_, msg); + CHECK_RC_EXIT_ON_FAILURE (); + request_processed = true; + frontend_in = backend_out = false; + } else request_processed = false; + + // Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'. + // If 'frontend_' and 'backend_' are the same this is not needed because previous processing + // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to + // design in 'for' event processing loop. + if (backend_in && frontend_out) { + rc = forward (backend_, frontend_, capture_, msg); + CHECK_RC_EXIT_ON_FAILURE (); + reply_processed = true; + backend_in = frontend_out = false; + } else reply_processed = false; + + if (request_processed || reply_processed) { + // If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event. + // Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled. + if (poller_wait != poller_in) { + if (request_processed) { // 'frontend_' -> 'backend_' + if (poller_wait == poller_both_blocked) + poller_wait = poller_send_blocked; + else + if (poller_wait == poller_receive_blocked) + poller_wait = poller_in; + } + if (reply_processed) { // 'backend_' -> 'frontend_' + if (poller_wait == poller_both_blocked) + poller_wait = poller_receive_blocked; + else + if (poller_wait == poller_send_blocked) + poller_wait = poller_in; + } + } + } else { + // No requests have been processed, there were no 'ZMQ_POLLOUT' events. + // That means that receiving end queue(s) is/are full. + // Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT'. + if (frontend_in) { + if (poller_wait == poller_send_blocked) + poller_wait = poller_both_blocked; + else + poller_wait = poller_receive_blocked; + } + if (backend_in) { + // Will never be reached if frontend_==backend_, 'backend_in' will + // always be false due to design in 'for' event processing loop. + if (poller_wait == poller_receive_blocked) + poller_wait = poller_both_blocked; + else + poller_wait = poller_send_blocked; + } + } + + } + } + PROXY_CLEANUP (); + return close_and_return (&msg, 0); +} + +#else // ZMQ_HAVE_POLLER + int zmq::proxy ( class socket_base_t *frontend_, class socket_base_t *backend_, @@ -206,3 +482,5 @@ int zmq::proxy ( return close_and_return (&msg, 0); } + +#endif // ZMQ_HAVE_POLLER