mirror of
https://github.com/zeromq/libzmq.git
synced 2025-10-30 21:50:49 +01:00
XREQ: Correct behaviour on hitting ZMQ_HWM
This reverts part of commit 84e0c7991a to get
correct ZMQ_HWM semantics with XREQ sockets:
When sending a message to an XREQ socket, the underlying pipe is selected in
a round-robin fashion. If an underlying pipe is full it is skipped. If there
are no underlying pipes, or all underlying pipes are full then zmq_send()
shall block or return EAGAIN, depending on whether or not the call is blocking.
Messages are never dropped.
This commit is contained in:
27
src/xreq.cpp
27
src/xreq.cpp
@@ -23,8 +23,7 @@
|
|||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
|
|
||||||
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
|
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
|
||||||
socket_base_t (parent_),
|
socket_base_t (parent_)
|
||||||
dropping (false)
|
|
||||||
{
|
{
|
||||||
options.requires_in = true;
|
options.requires_in = true;
|
||||||
options.requires_out = true;
|
options.requires_out = true;
|
||||||
@@ -78,25 +77,7 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
|
|
||||||
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
|
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
|
||||||
{
|
{
|
||||||
while (true) {
|
return lb.send (msg_, flags_);
|
||||||
|
|
||||||
// If we are ignoring the current message, just drop it and return.
|
|
||||||
if (dropping) {
|
|
||||||
if (!(msg_->flags & ZMQ_MSG_MORE))
|
|
||||||
dropping = false;
|
|
||||||
int rc = zmq_msg_close (msg_);
|
|
||||||
zmq_assert (rc == 0);
|
|
||||||
rc = zmq_msg_init (msg_);
|
|
||||||
zmq_assert (rc == 0);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int rc = lb.send (msg_, flags_);
|
|
||||||
if (rc != 0 && errno == EAGAIN)
|
|
||||||
dropping = true;
|
|
||||||
else
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
|
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
|
||||||
@@ -111,8 +92,6 @@ bool zmq::xreq_t::xhas_in ()
|
|||||||
|
|
||||||
bool zmq::xreq_t::xhas_out ()
|
bool zmq::xreq_t::xhas_out ()
|
||||||
{
|
{
|
||||||
// Socket is always ready for writing. When the queue is full, message
|
return lb.has_out ();
|
||||||
// will be silently dropped.
|
|
||||||
return true;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -55,9 +55,6 @@ namespace zmq
|
|||||||
fq_t fq;
|
fq_t fq;
|
||||||
lb_t lb;
|
lb_t lb;
|
||||||
|
|
||||||
// If true, curently sent message is being dropped.
|
|
||||||
bool dropping;
|
|
||||||
|
|
||||||
xreq_t (const xreq_t&);
|
xreq_t (const xreq_t&);
|
||||||
void operator = (const xreq_t&);
|
void operator = (const xreq_t&);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user