Problem: formatting inconsistent

Solution: applied clang-format
This commit is contained in:
sigiesec
2018-02-01 11:46:09 +01:00
parent 6d8baea714
commit 41f459e1dc
331 changed files with 13208 additions and 13691 deletions

View File

@@ -43,7 +43,8 @@
#include "proxy.hpp"
#include "likely.hpp"
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_AIX
#if defined ZMQ_POLL_BASED_ON_POLL && !defined ZMQ_HAVE_WINDOWS \
&& !defined ZMQ_HAVE_AIX
#include <poll.h>
#endif
@@ -59,25 +60,25 @@
// Macros for repetitive code.
// PROXY_CLEANUP() must not be used before these variables are initialized.
#define PROXY_CLEANUP()\
do {\
delete poller_all;\
delete poller_in;\
delete poller_control;\
delete poller_receive_blocked;\
delete poller_send_blocked;\
delete poller_both_blocked;\
delete poller_frontend_only;\
delete poller_backend_only;\
#define PROXY_CLEANUP() \
do { \
delete poller_all; \
delete poller_in; \
delete poller_control; \
delete poller_receive_blocked; \
delete poller_send_blocked; \
delete poller_both_blocked; \
delete poller_frontend_only; \
delete poller_backend_only; \
} while (false)
#define CHECK_RC_EXIT_ON_FAILURE()\
do {\
if (rc < 0) {\
PROXY_CLEANUP();\
return close_and_return (&msg, -1);\
}\
#define CHECK_RC_EXIT_ON_FAILURE() \
do { \
if (rc < 0) { \
PROXY_CLEANUP (); \
return close_and_return (&msg, -1); \
} \
} while (false)
#endif // ZMQ_HAVE_POLLER
@@ -94,13 +95,11 @@ typedef struct
} zmq_socket_stats_t;
// Utility functions
int capture (
class zmq::socket_base_t *capture_,
zmq::msg_t& msg_,
int more_ = 0)
int capture (class zmq::socket_base_t *capture_,
zmq::msg_t &msg_,
int more_ = 0)
{
// Copy message to capture socket if any
if (capture_) {
@@ -118,11 +117,12 @@ int capture (
return 0;
}
int forward (
class zmq::socket_base_t *from_, zmq_socket_stats_t* from_stats,
class zmq::socket_base_t *to_, zmq_socket_stats_t* to_stats,
class zmq::socket_base_t *capture_,
zmq::msg_t& msg_)
int forward (class zmq::socket_base_t *from_,
zmq_socket_stats_t *from_stats,
class zmq::socket_base_t *to_,
zmq_socket_stats_t *to_stats,
class zmq::socket_base_t *capture_,
zmq::msg_t &msg_)
{
int more;
size_t moresz;
@@ -132,7 +132,7 @@ int forward (
if (unlikely (rc < 0))
return -1;
complete_msg_size += msg_.size();
complete_msg_size += msg_.size ();
moresz = sizeof more;
rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
@@ -162,14 +162,16 @@ int forward (
}
static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
uint64_t stat, bool first, bool more)
uint64_t stat,
bool first,
bool more)
{
int rc;
zmq::msg_t msg;
// VSM of 8 bytes can't fail to init
msg.init_size (sizeof (uint64_t));
memcpy (msg.data (), (const void *)&stat, sizeof (uint64_t));
memcpy (msg.data (), (const void *) &stat, sizeof (uint64_t));
// if the first message is handed to the pipe successfully then the HWM
// is not full, which means failures are due to interrupts (on Windows pipes
@@ -181,24 +183,31 @@ static int loop_and_send_multipart_stat (zmq::socket_base_t *control_,
return rc;
}
int reply_stats(
class zmq::socket_base_t *control_,
zmq_socket_stats_t* frontend_stats,
zmq_socket_stats_t* backend_stats)
int reply_stats (class zmq::socket_base_t *control_,
zmq_socket_stats_t *frontend_stats,
zmq_socket_stats_t *backend_stats)
{
// first part: frontend stats - the first send might fail due to HWM
if (loop_and_send_multipart_stat (control_, frontend_stats->msg_in, true, true) != 0)
if (loop_and_send_multipart_stat (control_, frontend_stats->msg_in, true,
true)
!= 0)
return -1;
loop_and_send_multipart_stat (control_, frontend_stats->bytes_in, false, true);
loop_and_send_multipart_stat (control_, frontend_stats->msg_out, false, true);
loop_and_send_multipart_stat (control_, frontend_stats->bytes_out, false, true);
loop_and_send_multipart_stat (control_, frontend_stats->bytes_in, false,
true);
loop_and_send_multipart_stat (control_, frontend_stats->msg_out, false,
true);
loop_and_send_multipart_stat (control_, frontend_stats->bytes_out, false,
true);
// second part: backend stats
loop_and_send_multipart_stat (control_, backend_stats->msg_in, false, true);
loop_and_send_multipart_stat (control_, backend_stats->bytes_in, false, true);
loop_and_send_multipart_stat (control_, backend_stats->msg_out, false, true);
loop_and_send_multipart_stat (control_, backend_stats->bytes_out, false, false);
loop_and_send_multipart_stat (control_, backend_stats->bytes_in, false,
true);
loop_and_send_multipart_stat (control_, backend_stats->msg_out, false,
true);
loop_and_send_multipart_stat (control_, backend_stats->bytes_out, false,
false);
return 0;
}
@@ -206,11 +215,10 @@ int reply_stats(
#ifdef ZMQ_HAVE_POLLER
int zmq::proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_)
int zmq::proxy (class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_)
{
msg_t msg;
int rc = msg.init ();
@@ -224,7 +232,8 @@ int zmq::proxy (
size_t moresz = sizeof (more);
// Proxy can be in these three states
enum {
enum
{
active,
paused,
terminated
@@ -236,50 +245,66 @@ int zmq::proxy (
bool backend_in = false;
bool backend_out = false;
bool control_in = false;
zmq::socket_poller_t::event_t events [3];
zmq::socket_poller_t::event_t events[3];
zmq_socket_stats_t frontend_stats;
zmq_socket_stats_t backend_stats;
memset(&frontend_stats, 0, sizeof(frontend_stats));
memset(&backend_stats, 0, sizeof(backend_stats));
memset (&frontend_stats, 0, sizeof (frontend_stats));
memset (&backend_stats, 0, sizeof (backend_stats));
// Don't allocate these pollers from stack because they will take more than 900 kB of stack!
// On Windows this blows up default stack of 1 MB and aborts the program.
// I wanted to use std::shared_ptr here as the best solution but that requires C++11...
zmq::socket_poller_t *poller_all = new (std::nothrow) zmq::socket_poller_t; // Poll for everything.
zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop.
zmq::socket_poller_t *poller_control = new (std::nothrow) zmq::socket_poller_t; // Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused.
zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'.
zmq::socket_poller_t *poller_all =
new (std::nothrow) zmq::socket_poller_t; // Poll for everything.
zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::
socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop.
zmq::socket_poller_t *poller_control = new (std::nothrow) zmq::
socket_poller_t; // Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused.
zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow)
zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'.
// If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored.
// In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'.
// We also don't need 'poller_both_blocked', 'poller_backend_only' nor 'poller_frontend_only' no need to initialize it.
// We save some RAM and time for initialization.
zmq::socket_poller_t *poller_send_blocked = NULL; // All except 'ZMQ_POLLIN' on 'backend_'.
zmq::socket_poller_t *poller_both_blocked = NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
zmq::socket_poller_t *poller_frontend_only = NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
zmq::socket_poller_t *poller_backend_only = NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
zmq::socket_poller_t *poller_send_blocked =
NULL; // All except 'ZMQ_POLLIN' on 'backend_'.
zmq::socket_poller_t *poller_both_blocked =
NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
zmq::socket_poller_t *poller_frontend_only =
NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
zmq::socket_poller_t *poller_backend_only =
NULL; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
if (frontend_ != backend_) {
poller_send_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'.
poller_both_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
poller_frontend_only = new (std::nothrow) zmq::socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
poller_backend_only = new (std::nothrow) zmq::socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
poller_send_blocked = new (std::nothrow)
zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'.
poller_both_blocked = new (std::nothrow) zmq::
socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
poller_frontend_only = new (std::nothrow) zmq::
socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'frontend_'.
poller_backend_only = new (std::nothrow) zmq::
socket_poller_t; // Only 'ZMQ_POLLIN' and 'ZMQ_POLLOUT' on 'backend_'.
frontend_equal_to_backend = false;
} else
frontend_equal_to_backend = true;
if (poller_all == NULL || poller_in == NULL || poller_control == NULL || poller_receive_blocked == NULL
|| ((poller_send_blocked == NULL || poller_both_blocked == NULL) && !frontend_equal_to_backend)) {
if (poller_all == NULL || poller_in == NULL || poller_control == NULL
|| poller_receive_blocked == NULL
|| ((poller_send_blocked == NULL || poller_both_blocked == NULL)
&& !frontend_equal_to_backend)) {
PROXY_CLEANUP ();
return close_and_return (&msg, -1);
}
zmq::socket_poller_t *poller_wait = poller_in; // Poller for blocking wait, initially all 'ZMQ_POLLIN'.
zmq::socket_poller_t *poller_wait =
poller_in; // Poller for blocking wait, initially all 'ZMQ_POLLIN'.
// Register 'frontend_' and 'backend_' with pollers.
rc = poller_all->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
// Register 'frontend_' and 'backend_' with pollers.
rc = poller_all->add (frontend_, NULL,
ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
CHECK_RC_EXIT_ON_FAILURE ();
if (frontend_equal_to_backend) {
@@ -289,25 +314,38 @@ int zmq::proxy (
rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT);
CHECK_RC_EXIT_ON_FAILURE ();
} else {
rc = poller_all->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
rc = poller_all->add (backend_, NULL,
ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_both_blocked->add (frontend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
rc = poller_both_blocked->add (
frontend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_both_blocked->add (backend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
rc = poller_both_blocked->add (
backend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_send_blocked->add (backend_, NULL, ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
rc = poller_send_blocked->add (
backend_, NULL,
ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_send_blocked->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
rc = poller_send_blocked->add (
frontend_, NULL,
ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
rc = poller_receive_blocked->add (
frontend_, NULL,
ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_receive_blocked->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
rc = poller_receive_blocked->add (
backend_, NULL,
ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
rc =
poller_frontend_only->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
rc =
poller_backend_only->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT);
CHECK_RC_EXIT_ON_FAILURE ();
}
@@ -317,7 +355,9 @@ int zmq::proxy (
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_in->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_control->add (control_, NULL, ZMQ_POLLIN); // When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket.
rc = poller_control->add (
control_, NULL,
ZMQ_POLLIN); // When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket.
CHECK_RC_EXIT_ON_FAILURE ();
rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN);
CHECK_RC_EXIT_ON_FAILURE ();
@@ -339,7 +379,6 @@ int zmq::proxy (
while (state != terminated) {
// Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
// If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
// 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'.
@@ -356,18 +395,17 @@ int zmq::proxy (
// Process events.
for (i = 0; i < rc; i++) {
if (events [i].socket == frontend_) {
frontend_in = (events [i].events & ZMQ_POLLIN) != 0;
frontend_out = (events [i].events & ZMQ_POLLOUT) != 0;
if (events[i].socket == frontend_) {
frontend_in = (events[i].events & ZMQ_POLLIN) != 0;
frontend_out = (events[i].events & ZMQ_POLLOUT) != 0;
} else
// This 'if' needs to be after check for 'frontend_' in order never
// to be reached in case frontend_==backend_, so we ensure backend_in=false in that case.
if (events [i].socket == backend_) {
backend_in = (events [i].events & ZMQ_POLLIN) != 0;
backend_out = (events [i].events & ZMQ_POLLOUT) != 0;
} else
if (events [i].socket == control_)
control_in = (events [i].events & ZMQ_POLLIN) != 0;
// This 'if' needs to be after check for 'frontend_' in order never
// to be reached in case frontend_==backend_, so we ensure backend_in=false in that case.
if (events[i].socket == backend_) {
backend_in = (events[i].events & ZMQ_POLLIN) != 0;
backend_out = (events[i].events & ZMQ_POLLOUT) != 0;
} else if (events[i].socket == control_)
control_in = (events[i].events & ZMQ_POLLIN) != 0;
}
@@ -388,72 +426,76 @@ int zmq::proxy (
if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) {
state = paused;
poller_wait = poller_control;
} else
if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0) {
state = active;
poller_wait = poller_in;
} else {
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated;
else {
} else if (msg.size () == 6
&& memcmp (msg.data (), "RESUME", 6) == 0) {
state = active;
poller_wait = poller_in;
} else {
if (msg.size () == 9
&& memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated;
else {
#ifdef ZMQ_BUILD_DRAFT_API
if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0)
{
rc = reply_stats(control_, &frontend_stats, &backend_stats);
CHECK_RC_EXIT_ON_FAILURE ();
}
else {
if (msg.size () == 10
&& memcmp (msg.data (), "STATISTICS", 10) == 0) {
rc = reply_stats (control_, &frontend_stats,
&backend_stats);
CHECK_RC_EXIT_ON_FAILURE ();
} else {
#endif
// This is an API error, we assert
puts ("E: invalid command sent to proxy");
zmq_assert (false);
// This is an API error, we assert
puts ("E: invalid command sent to proxy");
zmq_assert (false);
#ifdef ZMQ_BUILD_DRAFT_API
}
#endif
}
#endif
}
}
control_in = false;
}
if (state == active) {
// Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
// In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
if (frontend_in && (backend_out || frontend_equal_to_backend)) {
rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg);
rc = forward (frontend_, &frontend_stats, backend_,
&backend_stats, capture_, msg);
CHECK_RC_EXIT_ON_FAILURE ();
request_processed = true;
frontend_in = backend_out = false;
} else request_processed = false;
} else
request_processed = false;
// Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'.
// If 'frontend_' and 'backend_' are the same this is not needed because previous processing
// If 'frontend_' and 'backend_' are the same this is not needed because previous processing
// covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
// design in 'for' event processing loop.
if (backend_in && frontend_out) {
rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg);
rc = forward (backend_, &backend_stats, frontend_,
&frontend_stats, capture_, msg);
CHECK_RC_EXIT_ON_FAILURE ();
reply_processed = true;
backend_in = frontend_out = false;
} else reply_processed = false;
} else
reply_processed = false;
if (request_processed || reply_processed) {
// If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event.
// Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled.
if (poller_wait != poller_in) {
if (request_processed) { // 'frontend_' -> 'backend_'
if (request_processed) { // 'frontend_' -> 'backend_'
if (poller_wait == poller_both_blocked)
poller_wait = poller_send_blocked;
else
if (poller_wait == poller_receive_blocked || poller_wait == poller_frontend_only)
poller_wait = poller_in;
else if (poller_wait == poller_receive_blocked
|| poller_wait == poller_frontend_only)
poller_wait = poller_in;
}
if (reply_processed) { // 'backend_' -> 'frontend_'
if (reply_processed) { // 'backend_' -> 'frontend_'
if (poller_wait == poller_both_blocked)
poller_wait = poller_receive_blocked;
else
if (poller_wait == poller_send_blocked || poller_wait == poller_backend_only)
poller_wait = poller_in;
else if (poller_wait == poller_send_blocked
|| poller_wait == poller_backend_only)
poller_wait = poller_in;
}
}
} else {
@@ -470,9 +512,8 @@ int zmq::proxy (
else {
if (poller_wait == poller_send_blocked)
poller_wait = poller_both_blocked;
else
if (poller_wait == poller_in)
poller_wait = poller_receive_blocked;
else if (poller_wait == poller_in)
poller_wait = poller_receive_blocked;
}
}
if (backend_in) {
@@ -485,13 +526,11 @@ int zmq::proxy (
else {
if (poller_wait == poller_receive_blocked)
poller_wait = poller_both_blocked;
else
if (poller_wait == poller_in)
poller_wait = poller_send_blocked;
else if (poller_wait == poller_in)
poller_wait = poller_send_blocked;
}
}
}
}
}
PROXY_CLEANUP ();
@@ -500,11 +539,10 @@ int zmq::proxy (
#else // ZMQ_HAVE_POLLER
int zmq::proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_)
int zmq::proxy (class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_,
class socket_base_t *control_)
{
msg_t msg;
int rc = msg.init ();
@@ -516,24 +554,21 @@ int zmq::proxy (
int more;
size_t moresz;
zmq_pollitem_t items [] = {
{ frontend_, 0, ZMQ_POLLIN, 0 },
{ backend_, 0, ZMQ_POLLIN, 0 },
{ control_, 0, ZMQ_POLLIN, 0 }
};
zmq_pollitem_t items[] = {{frontend_, 0, ZMQ_POLLIN, 0},
{backend_, 0, ZMQ_POLLIN, 0},
{control_, 0, ZMQ_POLLIN, 0}};
int qt_poll_items = (control_ ? 3 : 2);
zmq_pollitem_t itemsout [] = {
{ frontend_, 0, ZMQ_POLLOUT, 0 },
{ backend_, 0, ZMQ_POLLOUT, 0 }
};
zmq_pollitem_t itemsout[] = {{frontend_, 0, ZMQ_POLLOUT, 0},
{backend_, 0, ZMQ_POLLOUT, 0}};
zmq_socket_stats_t frontend_stats;
memset(&frontend_stats, 0, sizeof(frontend_stats));
memset (&frontend_stats, 0, sizeof (frontend_stats));
zmq_socket_stats_t backend_stats;
memset(&backend_stats, 0, sizeof(backend_stats));
memset (&backend_stats, 0, sizeof (backend_stats));
// Proxy can be in these three states
enum {
enum
{
active,
paused,
terminated
@@ -541,7 +576,7 @@ int zmq::proxy (
while (state != terminated) {
// Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], qt_poll_items, -1);
rc = zmq_poll (&items[0], qt_poll_items, -1);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
@@ -549,14 +584,14 @@ int zmq::proxy (
// because pollout shall most of the time return directly.
// POLLOUT is only checked when frontend and backend sockets are not the same.
if (frontend_ != backend_) {
rc = zmq_poll (&itemsout [0], 2, 0);
rc = zmq_poll (&itemsout[0], 2, 0);
if (unlikely (rc < 0)) {
return close_and_return (&msg, -1);
}
}
// Process a control command if any
if (control_ && items [2].revents & ZMQ_POLLIN) {
if (control_ && items[2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
@@ -573,44 +608,43 @@ int zmq::proxy (
if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
state = paused;
else
if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
state = active;
else
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated;
else {
else if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
state = active;
else if (msg.size () == 9
&& memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated;
else {
#ifdef ZMQ_BUILD_DRAFT_API
if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0)
{
rc = reply_stats(control_, &frontend_stats, &backend_stats);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
}
else {
if (msg.size () == 10
&& memcmp (msg.data (), "STATISTICS", 10) == 0) {
rc =
reply_stats (control_, &frontend_stats, &backend_stats);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
} else {
#endif
// This is an API error, we assert
puts ("E: invalid command sent to proxy");
zmq_assert (false);
// This is an API error, we assert
puts ("E: invalid command sent to proxy");
zmq_assert (false);
#ifdef ZMQ_BUILD_DRAFT_API
}
}
#endif
}
}
}
// Process a request
if (state == active
&& items [0].revents & ZMQ_POLLIN
&& (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg);
if (state == active && items[0].revents & ZMQ_POLLIN
&& (frontend_ == backend_ || itemsout[1].revents & ZMQ_POLLOUT)) {
rc = forward (frontend_, &frontend_stats, backend_, &backend_stats,
capture_, msg);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
}
// Process a reply
if (state == active
&& frontend_ != backend_
&& items [1].revents & ZMQ_POLLIN
&& itemsout [0].revents & ZMQ_POLLOUT) {
rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg);
if (state == active && frontend_ != backend_
&& items[1].revents & ZMQ_POLLIN
&& itemsout[0].revents & ZMQ_POLLOUT) {
rc = forward (backend_, &backend_stats, frontend_, &frontend_stats,
capture_, msg);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
}