mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-06 00:31:13 +01:00
LABEL flag added to the wire format
So far there was no distinction between message parts used by 0MQ and message parts used by user. Now, the message parts used by 0MQ are marked as 'LABEL'. Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
parent
ada5d42472
commit
ab99975ad4
@ -57,6 +57,18 @@ Default value:: N/A
|
|||||||
Applicable socket types:: all
|
Applicable socket types:: all
|
||||||
|
|
||||||
|
|
||||||
|
ZMQ_RCVLABEL: Inquires whether last message received was a label
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
The 'ZMQ_RCVLABEL' option shall return a boolean value indicating whether the
|
||||||
|
last message part received was a label. Labels are used internally by 0MQ.
|
||||||
|
|
||||||
|
[horizontal]
|
||||||
|
Option value type:: int
|
||||||
|
Option value unit:: boolean
|
||||||
|
Default value:: N/A
|
||||||
|
Applicable socket types:: all
|
||||||
|
|
||||||
|
|
||||||
ZMQ_SNDHWM: Set high water mark for outbound messages
|
ZMQ_SNDHWM: Set high water mark for outbound messages
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_SNDHWM' option shall set the high water mark for outbound messages on
|
The 'ZMQ_SNDHWM' option shall set the high water mark for outbound messages on
|
||||||
|
@ -28,6 +28,10 @@ Specifies that the message being sent is a multi-part message, and that further
|
|||||||
message parts are to follow. Refer to the section regarding multi-part messages
|
message parts are to follow. Refer to the section regarding multi-part messages
|
||||||
below for a detailed description.
|
below for a detailed description.
|
||||||
|
|
||||||
|
*ZMQ_SNDLABEL*::
|
||||||
|
Specifies that the message being sent is a label. Labels are used internally
|
||||||
|
by 0MQ.
|
||||||
|
|
||||||
The _zmq_msg_t_ structure passed to _zmq_send()_ is nullified during the call.
|
The _zmq_msg_t_ structure passed to _zmq_send()_ is nullified during the call.
|
||||||
If you want to send the same message to multiple sockets you have to copy it
|
If you want to send the same message to multiple sockets you have to copy it
|
||||||
using (e.g. using _zmq_msg_copy()_).
|
using (e.g. using _zmq_msg_copy()_).
|
||||||
|
@ -183,10 +183,12 @@ ZMQ_EXPORT int zmq_term (void *context);
|
|||||||
#define ZMQ_FILTER 26
|
#define ZMQ_FILTER 26
|
||||||
#define ZMQ_RCVTIMEO 27
|
#define ZMQ_RCVTIMEO 27
|
||||||
#define ZMQ_SNDTIMEO 28
|
#define ZMQ_SNDTIMEO 28
|
||||||
|
#define ZMQ_RCVLABEL 29
|
||||||
|
|
||||||
/* Send/recv options. */
|
/* Send/recv options. */
|
||||||
#define ZMQ_DONTWAIT 1
|
#define ZMQ_DONTWAIT 1
|
||||||
#define ZMQ_SNDMORE 2
|
#define ZMQ_SNDMORE 2
|
||||||
|
#define ZMQ_SNDLABEL 4
|
||||||
|
|
||||||
ZMQ_EXPORT void *zmq_socket (void *context, int type);
|
ZMQ_EXPORT void *zmq_socket (void *context, int type);
|
||||||
ZMQ_EXPORT int zmq_close (void *s);
|
ZMQ_EXPORT int zmq_close (void *s);
|
||||||
|
@ -111,7 +111,7 @@ int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
|
|||||||
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
|
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
|
||||||
{
|
{
|
||||||
// Is this end of a multipart message?
|
// Is this end of a multipart message?
|
||||||
bool msg_more = msg_->flags () & msg_t::more;
|
bool msg_more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
|
|
||||||
// Push the message to matching pipes.
|
// Push the message to matching pipes.
|
||||||
distribute (msg_, flags_);
|
distribute (msg_, flags_);
|
||||||
@ -170,7 +170,7 @@ bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
|
|||||||
eligible--;
|
eligible--;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!(msg_->flags () & msg_t::more))
|
if (!(msg_->flags () & (msg_t::more | msg_t::label)))
|
||||||
pipe_->flush ();
|
pipe_->flush ();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -81,14 +81,14 @@ bool zmq::encoder_t::message_ready ()
|
|||||||
tmpbuf [0] = (unsigned char) size;
|
tmpbuf [0] = (unsigned char) size;
|
||||||
tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
|
tmpbuf [1] = (in_progress.flags () & ~msg_t::shared);
|
||||||
next_step (tmpbuf, 2, &encoder_t::size_ready,
|
next_step (tmpbuf, 2, &encoder_t::size_ready,
|
||||||
!(in_progress.flags () & msg_t::more));
|
!(in_progress.flags () & (msg_t::more | msg_t::label)));
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
tmpbuf [0] = 0xff;
|
tmpbuf [0] = 0xff;
|
||||||
put_uint64 (tmpbuf + 1, size);
|
put_uint64 (tmpbuf + 1, size);
|
||||||
tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
|
tmpbuf [9] = (in_progress.flags () & ~msg_t::shared);
|
||||||
next_step (tmpbuf, 10, &encoder_t::size_ready,
|
next_step (tmpbuf, 10, &encoder_t::size_ready,
|
||||||
!(in_progress.flags () & msg_t::more));
|
!(in_progress.flags () & (msg_t::more | msg_t::label)));
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -90,7 +90,7 @@ int zmq::fq_t::recvpipe (msg_t *msg_, int flags_, pipe_t **pipe_)
|
|||||||
if (fetched) {
|
if (fetched) {
|
||||||
if (pipe_)
|
if (pipe_)
|
||||||
*pipe_ = pipes [current];
|
*pipe_ = pipes [current];
|
||||||
more = msg_->flags () & msg_t::more;
|
more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
if (!more) {
|
if (!more) {
|
||||||
current++;
|
current++;
|
||||||
if (current >= active)
|
if (current >= active)
|
||||||
|
@ -75,7 +75,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
|
|||||||
// switch back to non-dropping mode.
|
// switch back to non-dropping mode.
|
||||||
if (dropping) {
|
if (dropping) {
|
||||||
|
|
||||||
more = msg_->flags () & msg_t::more;
|
more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
if (!more)
|
if (!more)
|
||||||
dropping = false;
|
dropping = false;
|
||||||
|
|
||||||
@ -88,7 +88,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_)
|
|||||||
|
|
||||||
while (active > 0) {
|
while (active > 0) {
|
||||||
if (pipes [current]->write (msg_)) {
|
if (pipes [current]->write (msg_)) {
|
||||||
more = msg_->flags () & msg_t::more;
|
more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -47,8 +47,9 @@ namespace zmq
|
|||||||
// Mesage flags.
|
// Mesage flags.
|
||||||
enum
|
enum
|
||||||
{
|
{
|
||||||
more = 1,
|
label = 1,
|
||||||
shared = 128
|
shared = 64,
|
||||||
|
more = 128
|
||||||
};
|
};
|
||||||
|
|
||||||
bool check ();
|
bool check ();
|
||||||
|
@ -125,7 +125,7 @@ bool zmq::pipe_t::read (msg_t *msg_)
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!(msg_->flags () & msg_t::more))
|
if (!(msg_->flags () & (msg_t::more | msg_t::label)))
|
||||||
msgs_read++;
|
msgs_read++;
|
||||||
|
|
||||||
if (lwm > 0 && msgs_read % lwm == 0)
|
if (lwm > 0 && msgs_read % lwm == 0)
|
||||||
@ -154,8 +154,9 @@ bool zmq::pipe_t::write (msg_t *msg_)
|
|||||||
if (unlikely (!check_write (msg_)))
|
if (unlikely (!check_write (msg_)))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
outpipe->write (*msg_, msg_->flags () & msg_t::more);
|
bool more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
if (!(msg_->flags () & msg_t::more))
|
outpipe->write (*msg_, more);
|
||||||
|
if (!more)
|
||||||
msgs_written++;
|
msgs_written++;
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
@ -167,7 +168,7 @@ void zmq::pipe_t::rollback ()
|
|||||||
msg_t msg;
|
msg_t msg;
|
||||||
if (outpipe) {
|
if (outpipe) {
|
||||||
while (outpipe->unwrite (&msg)) {
|
while (outpipe->unwrite (&msg)) {
|
||||||
zmq_assert (msg.flags () & msg_t::more);
|
zmq_assert (msg.flags () & (msg_t::more | msg_t::label));
|
||||||
int rc = msg.close ();
|
int rc = msg.close ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
@ -42,7 +42,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool more = (msg_->flags () & msg_t::more);
|
bool more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
|
|
||||||
// Push message to the reply pipe.
|
// Push message to the reply pipe.
|
||||||
int rc = xrep_t::xsend (msg_, flags_);
|
int rc = xrep_t::xsend (msg_, flags_);
|
||||||
@ -77,7 +77,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
|
|||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
return rc;
|
return rc;
|
||||||
|
|
||||||
if (msg_->flags () & msg_t::more) {
|
if (msg_->flags () & (msg_t::more | msg_t::label)) {
|
||||||
|
|
||||||
// Empty message part delimits the traceback stack.
|
// Empty message part delimits the traceback stack.
|
||||||
bool bottom = (msg_->size () == 0);
|
bool bottom = (msg_->size () == 0);
|
||||||
@ -111,7 +111,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_)
|
|||||||
return rc;
|
return rc;
|
||||||
|
|
||||||
// If whole request is read, flip the FSM to reply-sending state.
|
// If whole request is read, flip the FSM to reply-sending state.
|
||||||
if (!(msg_->flags () & msg_t::more)) {
|
if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
|
||||||
sending_reply = true;
|
sending_reply = true;
|
||||||
request_begins = true;
|
request_begins = true;
|
||||||
}
|
}
|
||||||
|
@ -48,14 +48,14 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_)
|
|||||||
msg_t prefix;
|
msg_t prefix;
|
||||||
int rc = prefix.init ();
|
int rc = prefix.init ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
prefix.set_flags (msg_t::more);
|
prefix.set_flags (msg_t::label);
|
||||||
rc = xreq_t::xsend (&prefix, flags_);
|
rc = xreq_t::xsend (&prefix, flags_);
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
return rc;
|
return rc;
|
||||||
message_begins = false;
|
message_begins = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool more = msg_->flags () & msg_t::more;
|
bool more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
|
|
||||||
int rc = xreq_t::xsend (msg_, flags_);
|
int rc = xreq_t::xsend (msg_, flags_);
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
@ -83,7 +83,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
|
|||||||
int rc = xreq_t::xrecv (msg_, flags_);
|
int rc = xreq_t::xrecv (msg_, flags_);
|
||||||
if (rc != 0)
|
if (rc != 0)
|
||||||
return rc;
|
return rc;
|
||||||
zmq_assert (msg_->flags () & msg_t::more);
|
zmq_assert (msg_->flags () & msg_t::label);
|
||||||
zmq_assert (msg_->size () == 0);
|
zmq_assert (msg_->size () == 0);
|
||||||
message_begins = false;
|
message_begins = false;
|
||||||
}
|
}
|
||||||
@ -93,7 +93,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_)
|
|||||||
return rc;
|
return rc;
|
||||||
|
|
||||||
// If the reply is fully received, flip the FSM into request-sending state.
|
// If the reply is fully received, flip the FSM into request-sending state.
|
||||||
if (!(msg_->flags () & msg_t::more)) {
|
if (!(msg_->flags () & (msg_t::more | msg_t::label))) {
|
||||||
receiving_reply = false;
|
receiving_reply = false;
|
||||||
message_begins = true;
|
message_begins = true;
|
||||||
}
|
}
|
||||||
|
@ -71,7 +71,7 @@ bool zmq::session_t::read (msg_t *msg_)
|
|||||||
if (!pipe->read (msg_))
|
if (!pipe->read (msg_))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
incomplete_in = msg_->flags () & msg_t::more;
|
incomplete_in = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,6 +119,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
|
|||||||
destroyed (false),
|
destroyed (false),
|
||||||
last_tsc (0),
|
last_tsc (0),
|
||||||
ticks (0),
|
ticks (0),
|
||||||
|
rcvlabel (false),
|
||||||
rcvmore (false)
|
rcvmore (false)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
@ -263,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (option_ == ZMQ_RCVLABEL) {
|
||||||
|
if (*optvallen_ < sizeof (int)) {
|
||||||
|
errno = EINVAL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*((int*) optval_) = rcvlabel ? 1 : 0;
|
||||||
|
*optvallen_ = sizeof (int);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (option_ == ZMQ_RCVMORE) {
|
if (option_ == ZMQ_RCVMORE) {
|
||||||
if (*optvallen_ < sizeof (int)) {
|
if (*optvallen_ < sizeof (int)) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
@ -479,7 +490,9 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
|
|||||||
if (unlikely (rc != 0))
|
if (unlikely (rc != 0))
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
// At this point we impose the MORE flag on the message.
|
// At this point we impose the LABEL & MORE flags on the message.
|
||||||
|
if (flags_ & ZMQ_SNDLABEL)
|
||||||
|
msg_->set_flags (msg_t::label);
|
||||||
if (flags_ & ZMQ_SNDMORE)
|
if (flags_ & ZMQ_SNDMORE)
|
||||||
msg_->set_flags (msg_t::more);
|
msg_->set_flags (msg_t::more);
|
||||||
|
|
||||||
@ -558,6 +571,9 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
|||||||
|
|
||||||
// If we have the message, return immediately.
|
// If we have the message, return immediately.
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
|
rcvlabel = msg_->flags () & msg_t::label;
|
||||||
|
if (rcvlabel)
|
||||||
|
msg_->reset_flags (msg_t::label);
|
||||||
rcvmore = msg_->flags () & msg_t::more;
|
rcvmore = msg_->flags () & msg_t::more;
|
||||||
if (rcvmore)
|
if (rcvmore)
|
||||||
msg_->reset_flags (msg_t::more);
|
msg_->reset_flags (msg_t::more);
|
||||||
@ -575,6 +591,9 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
|||||||
|
|
||||||
rc = xrecv (msg_, flags_);
|
rc = xrecv (msg_, flags_);
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
|
rcvlabel = msg_->flags () & msg_t::label;
|
||||||
|
if (rcvlabel)
|
||||||
|
msg_->reset_flags (msg_t::label);
|
||||||
rcvmore = msg_->flags () & msg_t::more;
|
rcvmore = msg_->flags () & msg_t::more;
|
||||||
if (rcvmore)
|
if (rcvmore)
|
||||||
msg_->reset_flags (msg_t::more);
|
msg_->reset_flags (msg_t::more);
|
||||||
@ -611,6 +630,10 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Extract LABEL & MORE flags from the message.
|
||||||
|
rcvlabel = msg_->flags () & msg_t::label;
|
||||||
|
if (rcvlabel)
|
||||||
|
msg_->reset_flags (msg_t::label);
|
||||||
rcvmore = msg_->flags () & msg_t::more;
|
rcvmore = msg_->flags () & msg_t::more;
|
||||||
if (rcvmore)
|
if (rcvmore)
|
||||||
msg_->reset_flags (msg_t::more);
|
msg_->reset_flags (msg_t::more);
|
||||||
|
@ -189,7 +189,10 @@ namespace zmq
|
|||||||
// Number of messages received since last command processing.
|
// Number of messages received since last command processing.
|
||||||
int ticks;
|
int ticks;
|
||||||
|
|
||||||
// If true there's a half-read message in the socket.
|
// True if the last message received had LABEL flag set.
|
||||||
|
bool rcvlabel;
|
||||||
|
|
||||||
|
// True if the last message received had MORE flag set.
|
||||||
bool rcvmore;
|
bool rcvmore;
|
||||||
|
|
||||||
// Lists of existing sessions. This list is never referenced from
|
// Lists of existing sessions. This list is never referenced from
|
||||||
|
@ -100,7 +100,7 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
|
|||||||
|
|
||||||
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
|
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
|
||||||
{
|
{
|
||||||
bool msg_more = msg_->flags () & msg_t::more;
|
bool msg_more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
|
|
||||||
// For the first part of multi-part message, find the matching pipes.
|
// For the first part of multi-part message, find the matching pipes.
|
||||||
if (!more)
|
if (!more)
|
||||||
|
11
src/xrep.cpp
11
src/xrep.cpp
@ -122,7 +122,8 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
|
|||||||
|
|
||||||
// If we have malformed message (prefix with no subsequent message)
|
// If we have malformed message (prefix with no subsequent message)
|
||||||
// then just silently ignore it.
|
// then just silently ignore it.
|
||||||
if (msg_->flags () & msg_t::more) {
|
// TODO: The connections should be killed instead.
|
||||||
|
if (msg_->flags () & msg_t::label) {
|
||||||
|
|
||||||
more_out = true;
|
more_out = true;
|
||||||
|
|
||||||
@ -158,7 +159,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check whether this is the last part of the message.
|
// Check whether this is the last part of the message.
|
||||||
more_out = msg_->flags () & msg_t::more;
|
more_out = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
|
|
||||||
// Push the message into the pipe. If there's no out pipe, just drop it.
|
// Push the message into the pipe. If there's no out pipe, just drop it.
|
||||||
if (current_out) {
|
if (current_out) {
|
||||||
@ -187,7 +188,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
|
|||||||
if (prefetched) {
|
if (prefetched) {
|
||||||
int rc = msg_->move (prefetched_msg);
|
int rc = msg_->move (prefetched_msg);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
more_in = msg_->flags () & msg_t::more;
|
more_in = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
prefetched = false;
|
prefetched = false;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
@ -201,7 +202,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
|
|||||||
zmq_assert (inpipes [current_in].active);
|
zmq_assert (inpipes [current_in].active);
|
||||||
bool fetched = inpipes [current_in].pipe->read (msg_);
|
bool fetched = inpipes [current_in].pipe->read (msg_);
|
||||||
zmq_assert (fetched);
|
zmq_assert (fetched);
|
||||||
more_in = msg_->flags () & msg_t::more;
|
more_in = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
if (!more_in) {
|
if (!more_in) {
|
||||||
current_in++;
|
current_in++;
|
||||||
if (current_in >= inpipes.size ())
|
if (current_in >= inpipes.size ())
|
||||||
@ -223,7 +224,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
|
|||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
memcpy (msg_->data (), inpipes [current_in].identity.data (),
|
memcpy (msg_->data (), inpipes [current_in].identity.data (),
|
||||||
msg_->size ());
|
msg_->size ());
|
||||||
msg_->set_flags (msg_t::more);
|
msg_->set_flags (msg_t::label);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -116,7 +116,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
|
|||||||
int rc = msg_->move (message);
|
int rc = msg_->move (message);
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
has_message = false;
|
has_message = false;
|
||||||
more = msg_->flags () & msg_t::more;
|
more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -136,13 +136,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
|
|||||||
// Check whether the message matches at least one subscription.
|
// Check whether the message matches at least one subscription.
|
||||||
// Non-initial parts of the message are passed
|
// Non-initial parts of the message are passed
|
||||||
if (more || !options.filter || match (msg_)) {
|
if (more || !options.filter || match (msg_)) {
|
||||||
more = msg_->flags () & msg_t::more;
|
more = msg_->flags () & (msg_t::more | msg_t::label);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Message doesn't match. Pop any remaining parts of the message
|
// Message doesn't match. Pop any remaining parts of the message
|
||||||
// from the pipe.
|
// from the pipe.
|
||||||
while (msg_->flags () & msg_t::more) {
|
while (msg_->flags () & (msg_t::more | msg_t::label)) {
|
||||||
rc = fq.recv (msg_, ZMQ_DONTWAIT);
|
rc = fq.recv (msg_, ZMQ_DONTWAIT);
|
||||||
zmq_assert (rc == 0);
|
zmq_assert (rc == 0);
|
||||||
}
|
}
|
||||||
@ -182,7 +182,7 @@ bool zmq::xsub_t::xhas_in ()
|
|||||||
|
|
||||||
// Message doesn't match. Pop any remaining parts of the message
|
// Message doesn't match. Pop any remaining parts of the message
|
||||||
// from the pipe.
|
// from the pipe.
|
||||||
while (message.flags () & msg_t::more) {
|
while (message.flags () & (msg_t::more | msg_t::label)) {
|
||||||
rc = fq.recv (&message, ZMQ_DONTWAIT);
|
rc = fq.recv (&message, ZMQ_DONTWAIT);
|
||||||
zmq_assert (rc == 0);
|
zmq_assert (rc == 0);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user