diff --git a/src/proxy.cpp b/src/proxy.cpp index f4fdf071..a28030a3 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -76,8 +76,15 @@ int zmq::proxy ( { control_, 0, ZMQ_POLLIN, 0 } }; int qt_poll_items = (control_ ? 3 : 2); - enum {suspend, resume, terminate} state = resume; - while (state != terminate) { + + // Proxy can be in these three states + enum { + active, + paused, + terminated + } state = active; + + while (state != terminated) { // Wait while there are either requests or replies to process. rc = zmq_poll (&items [0], qt_poll_items, -1); if (unlikely (rc < 0)) @@ -85,46 +92,45 @@ int zmq::proxy ( // Process a control command if any if (control_ && items [2].revents & ZMQ_POLLIN) { - rc = control_->recv (&msg, 0); - if (unlikely (rc < 0)) - return -1; + rc = control_->recv (&msg, 0); + if (unlikely (rc < 0)) + return -1; - moresz = sizeof more; - rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (unlikely (rc < 0) || more) - return -1; + moresz = sizeof more; + rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0) || more) + return -1; - // Copy message to capture socket if any - if (capture_) { - msg_t ctrl; - rc = ctrl.init (); - if (unlikely (rc < 0)) - return -1; - rc = ctrl.copy (msg); - if (unlikely (rc < 0)) - return -1; - rc = capture_->send (&ctrl, 0); - if (unlikely (rc < 0)) - return -1; - } - - // process control command - int size = msg.size(); - char* message = (char*) malloc(size + 1); - memcpy(message, msg.data(), size); - message[size] = '\0'; - if (size == 8 && !memcmp(message, "SUSPEND", 8)) - state = suspend; - else if (size == 7 && !memcmp(message, "RESUME", 7)) - state = resume; - else if (size == 10 && !memcmp(message, "TERMINATE", 10)) - state = terminate; - else - fprintf(stderr, "Warning : \"%s\" bad command received by proxy\n", message); // prefered compared to "return -1" - free (message); + // Copy message to capture socket if any + if (capture_) { + msg_t ctrl; + rc = ctrl.init (); + if (unlikely (rc < 0)) + return -1; + rc = ctrl.copy (msg); + if (unlikely (rc < 0)) + return -1; + rc = capture_->send (&ctrl, 0); + if (unlikely (rc < 0)) + return -1; + } + if (msg.size () == 6 && memcmp (msg.data (), "PAUSE", 6) == 0) + state = paused; + else + if (msg.size () == 7 && memcmp (msg.data (), "RESUME", 7) == 0) + state = active; + else + if (msg.size () == 10 && memcmp (msg.data (), "TERMINATE", 10) == 0) + state = terminated; + else { + // This is an API error, we should assert + puts ("E: invalid command sent to proxy"); + assert (false); + } } // Process a request - if (state == resume && items [0].revents & ZMQ_POLLIN) { + if (state == active + && items [0].revents & ZMQ_POLLIN) { while (true) { rc = frontend_->recv (&msg, 0); if (unlikely (rc < 0)) @@ -156,38 +162,38 @@ int zmq::proxy ( } } // Process a reply - if (state == resume && items [1].revents & ZMQ_POLLIN) { + if (state == active + && items [1].revents & ZMQ_POLLIN) { while (true) { rc = backend_->recv (&msg, 0); - if (unlikely (rc < 0)) - return -1; + if (unlikely (rc < 0)) + return -1; - moresz = sizeof more; - rc = backend_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (unlikely (rc < 0)) - return -1; + moresz = sizeof more; + rc = backend_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0)) + return -1; - // Copy message to capture socket if any - if (capture_) { - msg_t ctrl; - rc = ctrl.init (); - if (unlikely (rc < 0)) - return -1; - rc = ctrl.copy (msg); - if (unlikely (rc < 0)) - return -1; - rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0); - if (unlikely (rc < 0)) - return -1; - } - rc = frontend_->send (&msg, more? ZMQ_SNDMORE: 0); + // Copy message to capture socket if any + if (capture_) { + msg_t ctrl; + rc = ctrl.init (); if (unlikely (rc < 0)) return -1; - if (more == 0) - break; + rc = ctrl.copy (msg); + if (unlikely (rc < 0)) + return -1; + rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0); + if (unlikely (rc < 0)) + return -1; + } + rc = frontend_->send (&msg, more? ZMQ_SNDMORE: 0); + if (unlikely (rc < 0)) + return -1; + if (more == 0) + break; } } - } return 0; }