Problem: can't process ZMTP 3.1 cancel/subscribe commands

Solution: add some msg helpers to parse commands, and check for
subscribe or cancel commands and process them accordingly in the xpub
and xsub classes.
This commit is contained in:
Luca Boccassi
2018-05-28 18:00:11 +01:00
parent 681e53f369
commit d70714e877
10 changed files with 440 additions and 66 deletions

View File

@@ -88,57 +88,89 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// There are some subscriptions waiting. Let's process them.
msg_t sub;
while (pipe_->read (&sub)) {
// Apply the subscription to the trie
const unsigned char *const data =
static_cast<const unsigned char *> (sub.data ());
const size_t size = sub.size ();
metadata_t *metadata = sub.metadata ();
if (size > 0 && (*data == 0 || *data == 1)) {
if (_manual) {
// Store manual subscription to use on termination
if (*data == 0)
_manual_subscriptions.rm (data + 1, size - 1, pipe_);
else
_manual_subscriptions.add (data + 1, size - 1, pipe_);
unsigned char *msg_data = static_cast<unsigned char *> (sub.data ()),
*data = NULL;
size_t size = 0;
bool subscribe = false;
// Apply the subscription to the trie
if (sub.is_subscribe () || sub.is_cancel ()) {
data = static_cast<unsigned char *> (sub.command_body ());
size = sub.command_body_size ();
subscribe = sub.is_subscribe ();
} else if (sub.size () > 0) {
unsigned char first = *msg_data;
if (first == 0 || first == 1) {
data = msg_data + 1;
size = sub.size () - 1;
subscribe = first == 1;
}
} else {
// Process user message coming upstream from xsub socket
_pending_data.push_back (blob_t (msg_data, sub.size ()));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (sub.flags ());
sub.close ();
continue;
}
if (_manual) {
// Store manual subscription to use on termination
if (!subscribe)
_manual_subscriptions.rm (data, size, pipe_);
else
_manual_subscriptions.add (data, size, pipe_);
_pending_pipes.push_back (pipe_);
// ZMTP 3.1 hack: we need to support sub/cancel commands, but
// we can't give them back to userspace as it would be an API
// breakage since the payload of the message is completely
// different. Manually craft an old-style message instead.
data = data - 1;
size = size + 1;
if (subscribe)
*data = 1;
else
*data = 0;
_pending_data.push_back (blob_t (data, size));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (0);
} else {
bool notify;
if (!subscribe) {
mtrie_t::rm_result rm_result =
_subscriptions.rm (data, size, pipe_);
// TODO reconsider what to do if rm_result == mtrie_t::not_found
notify = rm_result != mtrie_t::values_remain || _verbose_unsubs;
} else {
bool first_added = _subscriptions.add (data, size, pipe_);
notify = first_added || _verbose_subs;
}
// If the request was a new subscription, or the subscription
// was removed, or verbose mode is enabled, store it so that
// it can be passed to the user on next recv call.
if (options.type == ZMQ_XPUB && notify) {
data = data - 1;
size = size + 1;
if (subscribe)
*data = 1;
else
*data = 0;
_pending_pipes.push_back (pipe_);
_pending_data.push_back (blob_t (data, size));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (0);
} else {
bool notify;
if (*data == 0) {
mtrie_t::rm_result rm_result =
_subscriptions.rm (data + 1, size - 1, pipe_);
// TODO reconsider what to do if rm_result == mtrie_t::not_found
notify =
rm_result != mtrie_t::values_remain || _verbose_unsubs;
} else {
bool first_added =
_subscriptions.add (data + 1, size - 1, pipe_);
notify = first_added || _verbose_subs;
}
// If the request was a new subscription, or the subscription
// was removed, or verbose mode is enabled, store it so that
// it can be passed to the user on next recv call.
if (options.type == ZMQ_XPUB && notify) {
_pending_data.push_back (blob_t (data, size));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (0);
}
}
} else {
// Process user message coming upstream from xsub socket
_pending_data.push_back (blob_t (data, size));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (sub.flags ());
}
sub.close ();
}