mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-12 10:33:52 +01:00
allow XSUB/XPUB to send/recv messages unrelated to sub/unsub (LIBZMQ-490)
zmq::xpub_t::xread_activated() – change to process messages without 0 or 1 prefix, but without affecting subscriptions zmq::xsub_t::xsend() – change to send rather than discard messages without 0 or 1 prefix, but without affecting subscriptions Update documentation
This commit is contained in:
parent
98a91e852e
commit
d32e392278
@ -224,7 +224,8 @@ ZMQ_XPUB
|
|||||||
Same as ZMQ_PUB except that you can receive subscriptions from the peers
|
Same as ZMQ_PUB except that you can receive subscriptions from the peers
|
||||||
in form of incoming messages. Subscription message is a byte 1 (for
|
in form of incoming messages. Subscription message is a byte 1 (for
|
||||||
subscriptions) or byte 0 (for unsubscriptions) followed by the subscription
|
subscriptions) or byte 0 (for unsubscriptions) followed by the subscription
|
||||||
body.
|
body. Messages without a sub/unsub prefix are also received, but have no
|
||||||
|
effect on subscription status.
|
||||||
|
|
||||||
[horizontal]
|
[horizontal]
|
||||||
.Summary of ZMQ_XPUB characteristics
|
.Summary of ZMQ_XPUB characteristics
|
||||||
@ -240,7 +241,8 @@ ZMQ_XSUB
|
|||||||
^^^^^^^^
|
^^^^^^^^
|
||||||
Same as ZMQ_SUB except that you subscribe by sending subscription messages to
|
Same as ZMQ_SUB except that you subscribe by sending subscription messages to
|
||||||
the socket. Subscription message is a byte 1 (for subscriptions) or byte 0
|
the socket. Subscription message is a byte 1 (for subscriptions) or byte 0
|
||||||
(for unsubscriptions) followed by the subscription body.
|
(for unsubscriptions) followed by the subscription body. Messages without a
|
||||||
|
sub/unsub prefix may also be sent, but have no effect on subscription status.
|
||||||
|
|
||||||
[horizontal]
|
[horizontal]
|
||||||
.Summary of ZMQ_XSUB characteristics
|
.Summary of ZMQ_XSUB characteristics
|
||||||
|
@ -74,6 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
|
|||||||
if (options.type == ZMQ_XPUB && (unique || (*data && verbose)))
|
if (options.type == ZMQ_XPUB && (unique || (*data && verbose)))
|
||||||
pending.push_back (blob_t (data, size));
|
pending.push_back (blob_t (data, size));
|
||||||
}
|
}
|
||||||
|
else /*process message unrelated to sub/unsub*/ {
|
||||||
|
pending.push_back (blob_t (data, size));
|
||||||
|
}
|
||||||
|
|
||||||
sub.close ();
|
sub.close ();
|
||||||
}
|
}
|
||||||
|
11
src/xsub.cpp
11
src/xsub.cpp
@ -87,12 +87,6 @@ int zmq::xsub_t::xsend (msg_t *msg_)
|
|||||||
size_t size = msg_->size ();
|
size_t size = msg_->size ();
|
||||||
unsigned char *data = (unsigned char*) msg_->data ();
|
unsigned char *data = (unsigned char*) msg_->data ();
|
||||||
|
|
||||||
// Malformed subscriptions.
|
|
||||||
if (size < 1 || (*data != 0 && *data != 1)) {
|
|
||||||
errno = EINVAL;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Process the subscription.
|
// Process the subscription.
|
||||||
if (*data == 1) {
|
if (*data == 1) {
|
||||||
// this used to filter out duplicate subscriptions,
|
// this used to filter out duplicate subscriptions,
|
||||||
@ -102,10 +96,13 @@ int zmq::xsub_t::xsend (msg_t *msg_)
|
|||||||
subscriptions.add (data + 1, size - 1);
|
subscriptions.add (data + 1, size - 1);
|
||||||
return dist.send_to_all (msg_);
|
return dist.send_to_all (msg_);
|
||||||
}
|
}
|
||||||
else {
|
else if (*data == 0) {
|
||||||
if (subscriptions.rm (data + 1, size - 1))
|
if (subscriptions.rm (data + 1, size - 1))
|
||||||
return dist.send_to_all (msg_);
|
return dist.send_to_all (msg_);
|
||||||
}
|
}
|
||||||
|
else /*upstream message unrelated to sub/unsub*/ {
|
||||||
|
return dist.send_to_all (msg_);
|
||||||
|
}
|
||||||
|
|
||||||
int rc = msg_->close ();
|
int rc = msg_->close ();
|
||||||
errno_assert (rc == 0);
|
errno_assert (rc == 0);
|
||||||
|
Loading…
Reference in New Issue
Block a user