diff --git a/src/proxy.cpp b/src/proxy.cpp index 5ef11ab9..ab57d509 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -53,6 +53,58 @@ // zmq.h must be included *after* poll.h for AIX to build properly #include "../include/zmq.h" +int capture( + class zmq::socket_base_t *capture_, + zmq::msg_t& msg_, + int more_ = 0) +{ + // Copy message to capture socket if any + if (capture_) { + zmq::msg_t ctrl; + int rc = ctrl.init (); + if (unlikely (rc < 0)) + return -1; + rc = ctrl.copy (msg_); + if (unlikely (rc < 0)) + return -1; + rc = capture_->send (&ctrl, more_? ZMQ_SNDMORE: 0); + if (unlikely (rc < 0)) + return -1; + } + return 0; +} + +int forward( + class zmq::socket_base_t *from_, + class zmq::socket_base_t *to_, + class zmq::socket_base_t *capture_, + zmq::msg_t& msg_) +{ + int more; + size_t moresz; + while (true) { + int rc = from_->recv (&msg_, 0); + if (unlikely (rc < 0)) + return -1; + + moresz = sizeof more; + rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0)) + return -1; + + // Copy message to capture socket if any + rc = capture(capture_, msg_, more); + if (unlikely (rc < 0)) + return -1; + + rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0); + if (unlikely (rc < 0)) + return -1; + if (more == 0) + break; + } + return 0; +} int zmq::proxy ( class socket_base_t *frontend_, @@ -102,18 +154,10 @@ int zmq::proxy ( return -1; // Copy message to capture socket if any - if (capture_) { - msg_t ctrl; - rc = ctrl.init (); - if (unlikely (rc < 0)) - return -1; - rc = ctrl.copy (msg); - if (unlikely (rc < 0)) - return -1; - rc = capture_->send (&ctrl, 0); - if (unlikely (rc < 0)) - return -1; - } + rc = capture(capture_, msg); + if (unlikely (rc < 0)) + return -1; + if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) state = paused; else @@ -131,68 +175,16 @@ int zmq::proxy ( // Process a request if (state == active && items [0].revents & ZMQ_POLLIN) { - while (true) { - rc = frontend_->recv (&msg, 0); - if (unlikely (rc < 0)) - return -1; - - moresz = sizeof more; - rc = frontend_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (unlikely (rc < 0)) - return -1; - - // Copy message to capture socket if any - if (capture_) { - msg_t ctrl; - rc = ctrl.init (); - if (unlikely (rc < 0)) - return -1; - rc = ctrl.copy (msg); - if (unlikely (rc < 0)) - return -1; - rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0); - if (unlikely (rc < 0)) - return -1; - } - rc = backend_->send (&msg, more? ZMQ_SNDMORE: 0); - if (unlikely (rc < 0)) - return -1; - if (more == 0) - break; - } + rc = forward(frontend_, backend_, capture_,msg); + if (unlikely (rc < 0)) + return -1; } // Process a reply if (state == active && items [1].revents & ZMQ_POLLIN) { - while (true) { - rc = backend_->recv (&msg, 0); + rc = forward(backend_, frontend_, capture_,msg); if (unlikely (rc < 0)) return -1; - - moresz = sizeof more; - rc = backend_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (unlikely (rc < 0)) - return -1; - - // Copy message to capture socket if any - if (capture_) { - msg_t ctrl; - rc = ctrl.init (); - if (unlikely (rc < 0)) - return -1; - rc = ctrl.copy (msg); - if (unlikely (rc < 0)) - return -1; - rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0); - if (unlikely (rc < 0)) - return -1; - } - rc = frontend_->send (&msg, more? ZMQ_SNDMORE: 0); - if (unlikely (rc < 0)) - return -1; - if (more == 0) - break; - } } } return 0;