mirror of
https://github.com/zeromq/libzmq.git
synced 2025-02-23 15:14:40 +01:00
queue device fixed
This commit is contained in:
parent
4a6bac1dea
commit
84e0c7991a
@ -17,6 +17,8 @@
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <stddef.h>
|
||||
|
||||
#include "../include/zmq.h"
|
||||
|
||||
#include "queue.hpp"
|
||||
@ -26,15 +28,12 @@
|
||||
int zmq::queue (class socket_base_t *insocket_,
|
||||
class socket_base_t *outsocket_)
|
||||
{
|
||||
zmq_msg_t request_msg;
|
||||
int rc = zmq_msg_init (&request_msg);
|
||||
errno_assert (rc == 0);
|
||||
bool has_request = false;
|
||||
zmq_msg_t msg;
|
||||
int rc = zmq_msg_init (&msg);
|
||||
zmq_assert (rc == 0);
|
||||
|
||||
zmq_msg_t response_msg;
|
||||
rc = zmq_msg_init (&response_msg);
|
||||
errno_assert (rc == 0);
|
||||
bool has_response = false;
|
||||
int64_t more;
|
||||
size_t moresz;
|
||||
|
||||
zmq_pollitem_t items [2];
|
||||
items [0].socket = insocket_;
|
||||
@ -47,53 +46,54 @@ int zmq::queue (class socket_base_t *insocket_,
|
||||
items [1].revents = 0;
|
||||
|
||||
while (true) {
|
||||
|
||||
// Wait while there are either requests or replies to process.
|
||||
rc = zmq_poll (&items [0], 2, -1);
|
||||
errno_assert (rc > 0);
|
||||
|
||||
// The algorithm below asumes ratio of request and replies processed
|
||||
// under full load to be 1:1. While processing requests replies first
|
||||
// is tempting it is suspectible to DoS attacks (overloading the system
|
||||
// with unsolicited replies).
|
||||
// under full load to be 1:1. Although processing requests replies
|
||||
// first is tempting it is suspectible to DoS attacks (overloading
|
||||
// the system with unsolicited replies).
|
||||
|
||||
// Receive a new request.
|
||||
// Process a request.
|
||||
if (items [0].revents & ZMQ_POLLIN) {
|
||||
zmq_assert (!has_request);
|
||||
rc = insocket_->recv (&request_msg, ZMQ_NOBLOCK);
|
||||
while (true) {
|
||||
|
||||
rc = insocket_->recv (&msg, 0);
|
||||
errno_assert (rc == 0);
|
||||
items [0].events &= ~ZMQ_POLLIN;
|
||||
items [1].events |= ZMQ_POLLOUT;
|
||||
has_request = true;
|
||||
|
||||
moresz = sizeof (more);
|
||||
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
if (!more)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Send the request further.
|
||||
if (items [1].revents & ZMQ_POLLOUT) {
|
||||
zmq_assert (has_request);
|
||||
rc = outsocket_->send (&request_msg, ZMQ_NOBLOCK);
|
||||
errno_assert (rc == 0);
|
||||
items [0].events |= ZMQ_POLLIN;
|
||||
items [1].events &= ~ZMQ_POLLOUT;
|
||||
has_request = false;
|
||||
}
|
||||
|
||||
// Get a new reply.
|
||||
// Process a reply.
|
||||
if (items [1].revents & ZMQ_POLLIN) {
|
||||
zmq_assert (!has_response);
|
||||
rc = outsocket_->recv (&response_msg, ZMQ_NOBLOCK);
|
||||
while (true) {
|
||||
|
||||
rc = outsocket_->recv (&msg, 0);
|
||||
errno_assert (rc == 0);
|
||||
items [0].events |= ZMQ_POLLOUT;
|
||||
items [1].events &= ~ZMQ_POLLIN;
|
||||
has_response = true;
|
||||
|
||||
moresz = sizeof (more);
|
||||
rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
if (!more)
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Send the reply further.
|
||||
if (items [0].revents & ZMQ_POLLOUT) {\
|
||||
zmq_assert (has_response);
|
||||
rc = insocket_->send (&response_msg, ZMQ_NOBLOCK);
|
||||
errno_assert (rc == 0);
|
||||
items [0].events &= ~ZMQ_POLLOUT;
|
||||
items [1].events |= ZMQ_POLLIN;
|
||||
has_response = false;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
27
src/xreq.cpp
27
src/xreq.cpp
@ -23,7 +23,8 @@
|
||||
#include "err.hpp"
|
||||
|
||||
zmq::xreq_t::xreq_t (class app_thread_t *parent_) :
|
||||
socket_base_t (parent_)
|
||||
socket_base_t (parent_),
|
||||
dropping (false)
|
||||
{
|
||||
options.requires_in = true;
|
||||
options.requires_out = true;
|
||||
@ -77,7 +78,25 @@ int zmq::xreq_t::xsetsockopt (int option_, const void *optval_,
|
||||
|
||||
int zmq::xreq_t::xsend (zmq_msg_t *msg_, int flags_)
|
||||
{
|
||||
return lb.send (msg_, flags_);
|
||||
while (true) {
|
||||
|
||||
// If we are ignoring the current message, just drop it and return.
|
||||
if (dropping) {
|
||||
if (!(msg_->flags & ZMQ_MSG_MORE))
|
||||
dropping = false;
|
||||
int rc = zmq_msg_close (msg_);
|
||||
zmq_assert (rc == 0);
|
||||
rc = zmq_msg_init (msg_);
|
||||
zmq_assert (rc == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int rc = lb.send (msg_, flags_);
|
||||
if (rc != 0 && errno == EAGAIN)
|
||||
dropping = true;
|
||||
else
|
||||
return rc;
|
||||
}
|
||||
}
|
||||
|
||||
int zmq::xreq_t::xrecv (zmq_msg_t *msg_, int flags_)
|
||||
@ -92,6 +111,8 @@ bool zmq::xreq_t::xhas_in ()
|
||||
|
||||
bool zmq::xreq_t::xhas_out ()
|
||||
{
|
||||
return lb.has_out ();
|
||||
// Socket is always ready for writing. When the queue is full, message
|
||||
// will be silently dropped.
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -55,6 +55,9 @@ namespace zmq
|
||||
fq_t fq;
|
||||
lb_t lb;
|
||||
|
||||
// If true, curently sent message is being dropped.
|
||||
bool dropping;
|
||||
|
||||
xreq_t (const xreq_t&);
|
||||
void operator = (const xreq_t&);
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user