From c56d797bf915e0774646e651dbbce89b8e566f45 Mon Sep 17 00:00:00 2001 From: Christian Kamm Date: Tue, 2 Jul 2013 20:05:20 +0200 Subject: [PATCH] REQ sockets drop replies from unasked peers. * Add lb_t::sendpipe() that returns the pipe that was used for sending, similar to fq_t::recvpipe(). * Add forwarder functions to dealer_t to access these two. * Add logic to req_t to ignore replies on pipes that are not the one where the request was sent. * Enable test in test_spec_req. --- AUTHORS | 1 + src/dealer.cpp | 14 ++++++++++++-- src/dealer.hpp | 4 ++++ src/lb.cpp | 10 +++++++++- src/lb.hpp | 7 +++++++ src/req.cpp | 39 +++++++++++++++++++++++++++++++++++---- src/req.hpp | 9 +++++++++ tests/test_spec_req.cpp | 3 +-- 8 files changed, 78 insertions(+), 9 deletions(-) diff --git a/AUTHORS b/AUTHORS index 8e001db0..10180f86 100644 --- a/AUTHORS +++ b/AUTHORS @@ -28,6 +28,7 @@ Chia-liang Kao Chris Rempel Chris Wong Christian Gudrian +Christian Kamm Chuck Remes Conrad D. Steenberg Dhammika Pathirana diff --git a/src/dealer.cpp b/src/dealer.cpp index 8b5b8d66..d69ed762 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -80,12 +80,12 @@ int zmq::dealer_t::xsetsockopt (int option_, const void *optval_, int zmq::dealer_t::xsend (msg_t *msg_) { - return lb.send (msg_); + return sendpipe (msg_, NULL); } int zmq::dealer_t::xrecv (msg_t *msg_) { - return fq.recv (msg_); + return recvpipe (msg_, NULL); } bool zmq::dealer_t::xhas_in () @@ -113,3 +113,13 @@ void zmq::dealer_t::xpipe_terminated (pipe_t *pipe_) fq.pipe_terminated (pipe_); lb.pipe_terminated (pipe_); } + +int zmq::dealer_t::sendpipe (msg_t *msg_, pipe_t **pipe_) +{ + return lb.sendpipe (msg_, pipe_); +} + +int zmq::dealer_t::recvpipe (msg_t *msg_, pipe_t **pipe_) +{ + return fq.recvpipe (msg_, pipe_); +} diff --git a/src/dealer.hpp b/src/dealer.hpp index 6b91b06a..fd3366f4 100644 --- a/src/dealer.hpp +++ b/src/dealer.hpp @@ -55,6 +55,10 @@ namespace zmq void xwrite_activated (zmq::pipe_t *pipe_); void xpipe_terminated (zmq::pipe_t *pipe_); + // Send and recv - knowing which pipe was used. + int sendpipe (zmq::msg_t *msg_, zmq::pipe_t **pipe_); + int recvpipe (zmq::msg_t *msg_, zmq::pipe_t **pipe_); + private: // Messages are fair-queued from inbound pipes. And load-balanced to diff --git a/src/lb.cpp b/src/lb.cpp index 933e4447..213bdfb4 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -69,6 +69,11 @@ void zmq::lb_t::activated (pipe_t *pipe_) } int zmq::lb_t::send (msg_t *msg_) +{ + return sendpipe (msg_, NULL); +} + +int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_) { // Drop the message if required. If we are at the end of the message // switch back to non-dropping mode. @@ -86,7 +91,11 @@ int zmq::lb_t::send (msg_t *msg_) while (active > 0) { if (pipes [current]->write (msg_)) + { + if (pipe_) + *pipe_ = pipes [current]; break; + } zmq_assert (!more); active--; @@ -139,4 +148,3 @@ bool zmq::lb_t::has_out () return false; } - diff --git a/src/lb.hpp b/src/lb.hpp index 2e40a745..6904c892 100644 --- a/src/lb.hpp +++ b/src/lb.hpp @@ -41,6 +41,13 @@ namespace zmq void pipe_terminated (pipe_t *pipe_); int send (msg_t *msg_); + + // Sends a message and stores the pipe that was used in pipe_. + // It is possible for this function to return success but keep pipe_ + // unset if the rest of a multipart message to a terminated pipe is + // being dropped. For the first frame, this will never happen. + int sendpipe (msg_t *msg_, pipe_t **pipe_); + bool has_out (); private: diff --git a/src/req.cpp b/src/req.cpp index fd356a28..0f9dcf6c 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -27,7 +27,8 @@ zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) : dealer_t (parent_, tid_, sid_), receiving_reply (false), - message_begins (true) + message_begins (true), + reply_pipe (NULL) { options.type = ZMQ_REQ; } @@ -51,10 +52,28 @@ int zmq::req_t::xsend (msg_t *msg_) int rc = bottom.init (); errno_assert (rc == 0); bottom.set_flags (msg_t::more); - rc = dealer_t::xsend (&bottom); + + reply_pipe = NULL; + rc = dealer_t::sendpipe (&bottom, &reply_pipe); if (rc != 0) return -1; + assert (reply_pipe); message_begins = false; + + // Eat all currently avaliable messages before the request is fully + // sent. This is done to avoid: + // REQ sends request to A, A replies, B replies too. + // A's reply was first and matches, that is used. + // An hour later REQ sends a request to B. B's old reply is used. + msg_t drop; + while (true) { + rc = drop.init (); + errno_assert (rc == 0); + rc = dealer_t::xrecv (&drop); + if (rc != 0) + break; + drop.close (); + } } bool more = msg_->flags () & msg_t::more ? true : false; @@ -82,7 +101,7 @@ int zmq::req_t::xrecv (msg_t *msg_) // First part of the reply should be the original request ID. if (message_begins) { - int rc = dealer_t::xrecv (msg_); + int rc = recv_reply_pipe (msg_); if (rc != 0) return rc; @@ -103,7 +122,7 @@ int zmq::req_t::xrecv (msg_t *msg_) message_begins = false; } - int rc = dealer_t::xrecv (msg_); + int rc = recv_reply_pipe (msg_); if (rc != 0) return rc; @@ -134,6 +153,18 @@ bool zmq::req_t::xhas_out () return dealer_t::xhas_out (); } +int zmq::req_t::recv_reply_pipe (msg_t *msg_) +{ + while (true) { + pipe_t *pipe = NULL; + int rc = dealer_t::recvpipe(msg_, &pipe); + if (rc != 0) + return rc; + if (pipe == reply_pipe) + return 0; + } +} + zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_, socket_base_t *socket_, const options_t &options_, const address_t *addr_) : diff --git a/src/req.hpp b/src/req.hpp index fd187e23..1ebf390c 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -44,6 +44,12 @@ namespace zmq bool xhas_in (); bool xhas_out (); + protected: + + // Receive only from the pipe the request was sent to, discarding + // frames from other pipes. + int recv_reply_pipe (zmq::msg_t *msg_); + private: // If true, request was already sent and reply wasn't received yet or @@ -54,6 +60,9 @@ namespace zmq // of the message must be empty message part (backtrace stack bottom). bool message_begins; + // The pipe the request was sent to and where the reply is expected. + zmq::pipe_t *reply_pipe; + req_t (const req_t&); const req_t &operator = (const req_t&); }; diff --git a/tests/test_spec_req.cpp b/tests/test_spec_req.cpp index 7e682d7a..24538f08 100644 --- a/tests/test_spec_req.cpp +++ b/tests/test_spec_req.cpp @@ -234,8 +234,7 @@ int main (void) // SHALL accept an incoming message only from the last peer that it sent a // request to. // SHALL discard silently any messages received from other peers. - // *** Test disabled until libzmq does this properly *** - // test_req_only_listens_to_current_peer (ctx); + test_req_only_listens_to_current_peer (ctx); } int rc = zmq_ctx_term (ctx);