diff --git a/src/proxy.cpp b/src/proxy.cpp index 227a2970..5bac3ac6 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -66,6 +66,8 @@ delete poller_receive_blocked;\ delete poller_send_blocked;\ delete poller_both_blocked;\ + delete poller_frontend_only;\ + delete poller_backend_only\ #define CHECK_RC_EXIT_ON_FAILURE()\ @@ -173,14 +175,18 @@ int zmq::proxy ( // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored. // In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'. - // We also don't need 'poller_both_blocked', no need to initialize it. + // We also don't need 'poller_both_blocked', 'poller_backend_only' nor 'poller_frontend_only' no need to initialize it. // We save some RAM and time for initialization. zmq::socket_poller_t *poller_send_blocked = NULL; // All except 'ZMQ_POLLIN' on 'backend_'. zmq::socket_poller_t *poller_both_blocked = NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. + zmq::socket_poller_t *poller_frontend_only = NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'. + zmq::socket_poller_t *poller_backend_only = NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'. if (frontend_ != backend_) { poller_send_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'. poller_both_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. + poller_frontend_only = new (std::nothrow) zmq::socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'. + poller_backend_only = new (std::nothrow) zmq::socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'. frontend_equal_to_backend = false; } else frontend_equal_to_backend = true; @@ -222,6 +228,10 @@ int zmq::proxy ( CHECK_RC_EXIT_ON_FAILURE (); rc = poller_receive_blocked->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'. CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); + CHECK_RC_EXIT_ON_FAILURE (); } // Register 'control_' with pollers. @@ -239,6 +249,10 @@ int zmq::proxy ( CHECK_RC_EXIT_ON_FAILURE (); rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN); CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_frontend_only->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_backend_only->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); } } @@ -342,34 +356,50 @@ int zmq::proxy ( if (poller_wait == poller_both_blocked) poller_wait = poller_send_blocked; else - if (poller_wait == poller_receive_blocked) + if (poller_wait == poller_receive_blocked || poller_wait == poller_frontend_only) poller_wait = poller_in; } if (reply_processed) { // 'backend_' -> 'frontend_' if (poller_wait == poller_both_blocked) poller_wait = poller_receive_blocked; else - if (poller_wait == poller_send_blocked) + if (poller_wait == poller_send_blocked || poller_wait == poller_backend_only) poller_wait = poller_in; } } } else { - // No requests have been processed, there were no 'ZMQ_POLLOUT' events. - // That means that receiving end queue(s) is/are full. - // Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT'. + // No requests have been processed, there were no 'ZMQ_POLLIN' with corresponding 'ZMQ_POLLOUT' events. + // That means that out queue(s) is/are full or one out queue is full and second one has no messages to process. + // Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT', + // or wait only on both 'backend_''s or 'frontend_''s 'ZMQ_POLLIN' and 'ZMQ_POLLOUT'. if (frontend_in) { - if (poller_wait == poller_send_blocked) - poller_wait = poller_both_blocked; - else - poller_wait = poller_receive_blocked; + if (frontend_out) + // If frontend_in and frontend_out are true, obviously backend_in and backend_out are both false. + // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'backend_'. + // We'll never get here in case of frontend_==backend_ because then frontend_out will always be false. + poller_wait = poller_backend_only; + else { + if (poller_wait == poller_send_blocked) + poller_wait = poller_both_blocked; + else + if (poller_wait == poller_in) + poller_wait = poller_receive_blocked; + } } if (backend_in) { // Will never be reached if frontend_==backend_, 'backend_in' will // always be false due to design in 'for' event processing loop. - if (poller_wait == poller_receive_blocked) - poller_wait = poller_both_blocked; - else - poller_wait = poller_send_blocked; + if (backend_out) + // If backend_in and backend_out are true, obviously frontend_in and frontend_out are both false. + // In that case we need to wait for both 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' only on 'frontend_'. + poller_wait = poller_frontend_only; + else { + if (poller_wait == poller_receive_blocked) + poller_wait = poller_both_blocked; + else + if (poller_wait == poller_in) + poller_wait = poller_send_blocked; + } } }