mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-19 00:46:05 +01:00
Merge pull request #608 from ckamm/req-drops-unwanted
REQ sockets drop replies from unasked peers.
This commit is contained in:
commit
93b9f9021c
1
AUTHORS
1
AUTHORS
@ -28,6 +28,7 @@ Chia-liang Kao <clkao@clkao.org>
|
||||
Chris Rempel <csrl@gmx.com>
|
||||
Chris Wong <chris@chriswongstudio.com>
|
||||
Christian Gudrian <christian.gudrian@fluidon.com>
|
||||
Christian Kamm <kamm@incasoftware.de>
|
||||
Chuck Remes <cremes@mac.com>
|
||||
Conrad D. Steenberg <conrad.steenberg@caltech.edu>
|
||||
Dhammika Pathirana <dhammika@gmail.com>
|
||||
|
@ -80,12 +80,12 @@ int zmq::dealer_t::xsetsockopt (int option_, const void *optval_,
|
||||
|
||||
int zmq::dealer_t::xsend (msg_t *msg_)
|
||||
{
|
||||
return lb.send (msg_);
|
||||
return sendpipe (msg_, NULL);
|
||||
}
|
||||
|
||||
int zmq::dealer_t::xrecv (msg_t *msg_)
|
||||
{
|
||||
return fq.recv (msg_);
|
||||
return recvpipe (msg_, NULL);
|
||||
}
|
||||
|
||||
bool zmq::dealer_t::xhas_in ()
|
||||
@ -113,3 +113,13 @@ void zmq::dealer_t::xpipe_terminated (pipe_t *pipe_)
|
||||
fq.pipe_terminated (pipe_);
|
||||
lb.pipe_terminated (pipe_);
|
||||
}
|
||||
|
||||
int zmq::dealer_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
|
||||
{
|
||||
return lb.sendpipe (msg_, pipe_);
|
||||
}
|
||||
|
||||
int zmq::dealer_t::recvpipe (msg_t *msg_, pipe_t **pipe_)
|
||||
{
|
||||
return fq.recvpipe (msg_, pipe_);
|
||||
}
|
||||
|
@ -55,6 +55,10 @@ namespace zmq
|
||||
void xwrite_activated (zmq::pipe_t *pipe_);
|
||||
void xpipe_terminated (zmq::pipe_t *pipe_);
|
||||
|
||||
// Send and recv - knowing which pipe was used.
|
||||
int sendpipe (zmq::msg_t *msg_, zmq::pipe_t **pipe_);
|
||||
int recvpipe (zmq::msg_t *msg_, zmq::pipe_t **pipe_);
|
||||
|
||||
private:
|
||||
|
||||
// Messages are fair-queued from inbound pipes. And load-balanced to
|
||||
|
10
src/lb.cpp
10
src/lb.cpp
@ -69,6 +69,11 @@ void zmq::lb_t::activated (pipe_t *pipe_)
|
||||
}
|
||||
|
||||
int zmq::lb_t::send (msg_t *msg_)
|
||||
{
|
||||
return sendpipe (msg_, NULL);
|
||||
}
|
||||
|
||||
int zmq::lb_t::sendpipe (msg_t *msg_, pipe_t **pipe_)
|
||||
{
|
||||
// Drop the message if required. If we are at the end of the message
|
||||
// switch back to non-dropping mode.
|
||||
@ -86,7 +91,11 @@ int zmq::lb_t::send (msg_t *msg_)
|
||||
|
||||
while (active > 0) {
|
||||
if (pipes [current]->write (msg_))
|
||||
{
|
||||
if (pipe_)
|
||||
*pipe_ = pipes [current];
|
||||
break;
|
||||
}
|
||||
|
||||
zmq_assert (!more);
|
||||
active--;
|
||||
@ -139,4 +148,3 @@ bool zmq::lb_t::has_out ()
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -41,6 +41,13 @@ namespace zmq
|
||||
void pipe_terminated (pipe_t *pipe_);
|
||||
|
||||
int send (msg_t *msg_);
|
||||
|
||||
// Sends a message and stores the pipe that was used in pipe_.
|
||||
// It is possible for this function to return success but keep pipe_
|
||||
// unset if the rest of a multipart message to a terminated pipe is
|
||||
// being dropped. For the first frame, this will never happen.
|
||||
int sendpipe (msg_t *msg_, pipe_t **pipe_);
|
||||
|
||||
bool has_out ();
|
||||
|
||||
private:
|
||||
|
39
src/req.cpp
39
src/req.cpp
@ -27,7 +27,8 @@
|
||||
zmq::req_t::req_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
dealer_t (parent_, tid_, sid_),
|
||||
receiving_reply (false),
|
||||
message_begins (true)
|
||||
message_begins (true),
|
||||
reply_pipe (NULL)
|
||||
{
|
||||
options.type = ZMQ_REQ;
|
||||
}
|
||||
@ -51,10 +52,28 @@ int zmq::req_t::xsend (msg_t *msg_)
|
||||
int rc = bottom.init ();
|
||||
errno_assert (rc == 0);
|
||||
bottom.set_flags (msg_t::more);
|
||||
rc = dealer_t::xsend (&bottom);
|
||||
|
||||
reply_pipe = NULL;
|
||||
rc = dealer_t::sendpipe (&bottom, &reply_pipe);
|
||||
if (rc != 0)
|
||||
return -1;
|
||||
assert (reply_pipe);
|
||||
message_begins = false;
|
||||
|
||||
// Eat all currently avaliable messages before the request is fully
|
||||
// sent. This is done to avoid:
|
||||
// REQ sends request to A, A replies, B replies too.
|
||||
// A's reply was first and matches, that is used.
|
||||
// An hour later REQ sends a request to B. B's old reply is used.
|
||||
msg_t drop;
|
||||
while (true) {
|
||||
rc = drop.init ();
|
||||
errno_assert (rc == 0);
|
||||
rc = dealer_t::xrecv (&drop);
|
||||
if (rc != 0)
|
||||
break;
|
||||
drop.close ();
|
||||
}
|
||||
}
|
||||
|
||||
bool more = msg_->flags () & msg_t::more ? true : false;
|
||||
@ -82,7 +101,7 @@ int zmq::req_t::xrecv (msg_t *msg_)
|
||||
|
||||
// First part of the reply should be the original request ID.
|
||||
if (message_begins) {
|
||||
int rc = dealer_t::xrecv (msg_);
|
||||
int rc = recv_reply_pipe (msg_);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
@ -103,7 +122,7 @@ int zmq::req_t::xrecv (msg_t *msg_)
|
||||
message_begins = false;
|
||||
}
|
||||
|
||||
int rc = dealer_t::xrecv (msg_);
|
||||
int rc = recv_reply_pipe (msg_);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
|
||||
@ -134,6 +153,18 @@ bool zmq::req_t::xhas_out ()
|
||||
return dealer_t::xhas_out ();
|
||||
}
|
||||
|
||||
int zmq::req_t::recv_reply_pipe (msg_t *msg_)
|
||||
{
|
||||
while (true) {
|
||||
pipe_t *pipe = NULL;
|
||||
int rc = dealer_t::recvpipe(msg_, &pipe);
|
||||
if (rc != 0)
|
||||
return rc;
|
||||
if (pipe == reply_pipe)
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
|
||||
socket_base_t *socket_, const options_t &options_,
|
||||
const address_t *addr_) :
|
||||
|
@ -44,6 +44,12 @@ namespace zmq
|
||||
bool xhas_in ();
|
||||
bool xhas_out ();
|
||||
|
||||
protected:
|
||||
|
||||
// Receive only from the pipe the request was sent to, discarding
|
||||
// frames from other pipes.
|
||||
int recv_reply_pipe (zmq::msg_t *msg_);
|
||||
|
||||
private:
|
||||
|
||||
// If true, request was already sent and reply wasn't received yet or
|
||||
@ -54,6 +60,9 @@ namespace zmq
|
||||
// of the message must be empty message part (backtrace stack bottom).
|
||||
bool message_begins;
|
||||
|
||||
// The pipe the request was sent to and where the reply is expected.
|
||||
zmq::pipe_t *reply_pipe;
|
||||
|
||||
req_t (const req_t&);
|
||||
const req_t &operator = (const req_t&);
|
||||
};
|
||||
|
@ -234,8 +234,7 @@ int main (void)
|
||||
// SHALL accept an incoming message only from the last peer that it sent a
|
||||
// request to.
|
||||
// SHALL discard silently any messages received from other peers.
|
||||
// *** Test disabled until libzmq does this properly ***
|
||||
// test_req_only_listens_to_current_peer (ctx);
|
||||
test_req_only_listens_to_current_peer (ctx);
|
||||
}
|
||||
|
||||
int rc = zmq_ctx_term (ctx);
|
||||
|
Loading…
x
Reference in New Issue
Block a user