Change XSUB -> XPUB multipart message processing.

Now only the first part in a multipart message will be treated as subscribe/unsubscribe.
The rest will be considered user messages regardless of the first byte.
This commit is contained in:
Andrij Abyzov 2019-11-19 13:29:54 +01:00
parent e0d9e21374
commit 108977c838
5 changed files with 106 additions and 28 deletions

View File

@ -41,7 +41,8 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
_verbose_subs (false),
_verbose_unsubs (false),
_more (false),
_more_send (false),
_more_recv (false),
_lossy (true),
_manual (false),
_send_last_pipe (false),
@ -91,31 +92,40 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_,
void zmq::xpub_t::xread_activated (pipe_t *pipe_)
{
// There are some subscriptions waiting. Let's process them.
msg_t sub;
while (pipe_->read (&sub)) {
metadata_t *metadata = sub.metadata ();
unsigned char *msg_data = static_cast<unsigned char *> (sub.data ()),
msg_t msg;
while (pipe_->read (&msg)) {
metadata_t *metadata = msg.metadata ();
unsigned char *msg_data = static_cast<unsigned char *> (msg.data ()),
*data = NULL;
size_t size = 0;
bool subscribe = false;
bool is_subscribe_or_cancel = false;
// Apply the subscription to the trie
if (sub.is_subscribe () || sub.is_cancel ()) {
data = static_cast<unsigned char *> (sub.command_body ());
size = sub.command_body_size ();
subscribe = sub.is_subscribe ();
} else if (sub.size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
data = msg_data + 1;
size = sub.size () - 1;
subscribe = *msg_data == 1;
} else {
if (!_more_recv) {
// Apply the subscription to the trie
if (msg.is_subscribe () || msg.is_cancel ()) {
data = static_cast<unsigned char *> (msg.command_body ());
size = msg.command_body_size ();
subscribe = msg.is_subscribe ();
is_subscribe_or_cancel = true;
} else if (msg.size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
data = msg_data + 1;
size = msg.size () - 1;
subscribe = *msg_data == 1;
is_subscribe_or_cancel = true;
}
}
_more_recv = (msg.flags () & msg_t::more) != 0;
if (!is_subscribe_or_cancel) {
// Process user message coming upstream from xsub socket
_pending_data.push_back (blob_t (msg_data, sub.size ()));
_pending_data.push_back (blob_t (msg_data, msg.size ()));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (sub.flags ());
sub.close ();
_pending_flags.push_back (msg.flags ());
msg.close ();
continue;
}
@ -174,7 +184,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
_pending_flags.push_back (0);
}
}
sub.close ();
msg.close ();
}
}
@ -278,7 +288,7 @@ int zmq::xpub_t::xsend (msg_t *msg_)
bool msg_more = (msg_->flags () & msg_t::more) != 0;
// For the first part of multi-part message, find the matching pipes.
if (!_more) {
if (!_more_send) {
if (unlikely (_manual && _last_pipe && _send_last_pipe)) {
_subscriptions.match (static_cast<unsigned char *> (msg_->data ()),
msg_->size (), mark_last_pipe_as_matching,
@ -300,7 +310,7 @@ int zmq::xpub_t::xsend (msg_t *msg_)
// all the pipes as non-matching.
if (!msg_more)
_dist.unmatch ();
_more = msg_more;
_more_send = msg_more;
rc = 0; // Yay, sent successfully
}
} else

View File

@ -91,7 +91,10 @@ class xpub_t : public socket_base_t
bool _verbose_unsubs;
// True if we are in the middle of sending a multi-part message.
bool _more;
bool _more_send;
// True if we are in the middle of receiving a multi-part message.
bool _more_recv;
// Drop messages if HWM reached, otherwise return with EAGAIN
bool _lossy;

View File

@ -37,7 +37,8 @@
zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_),
_has_message (false),
_more (false)
_more_send (false),
_more_recv (false)
{
options.type = ZMQ_XSUB;
@ -99,6 +100,13 @@ int zmq::xsub_t::xsend (msg_t *msg_)
size_t size = msg_->size ();
unsigned char *data = static_cast<unsigned char *> (msg_->data ());
bool send_more = _more_send;
_more_send = (msg_->flags () & msg_t::more) != 0;
if (send_more)
// User message sent upstream to XPUB socket
return _dist.send_to_all (msg_);
if (msg_->is_subscribe () || (size > 0 && *data == 1)) {
// Process subscribe message
// This used to filter out duplicate subscriptions,
@ -152,7 +160,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_)
int rc = msg_->move (_message);
errno_assert (rc == 0);
_has_message = false;
_more = (msg_->flags () & msg_t::more) != 0;
_more_recv = (msg_->flags () & msg_t::more) != 0;
return 0;
}
@ -170,8 +178,8 @@ int zmq::xsub_t::xrecv (msg_t *msg_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (_more || !options.filter || match (msg_)) {
_more = (msg_->flags () & msg_t::more) != 0;
if (_more_recv || !options.filter || match (msg_)) {
_more_recv = (msg_->flags () & msg_t::more) != 0;
return 0;
}
@ -187,7 +195,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_)
bool zmq::xsub_t::xhas_in ()
{
// There are subsequent parts of the partly-read message available.
if (_more)
if (_more_recv)
return true;
// If there's already a message prepared by a previous call to zmq_poll,

View File

@ -93,9 +93,13 @@ class xsub_t : public socket_base_t
bool _has_message;
msg_t _message;
// If true, part of a multipart message was already sent, but
// there are following parts still waiting.
bool _more_send;
// If true, part of a multipart message was already received, but
// there are following parts still waiting.
bool _more;
bool _more_recv;
xsub_t (const xsub_t &);
const xsub_t &operator= (const xsub_t &);

View File

@ -456,6 +456,58 @@ void test_user_message ()
test_context_socket_close (sub);
}
void test_user_message_multi ()
{
// Create a publisher
void *pub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
// Create a subscriber
void *sub = test_context_socket (ZMQ_XSUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
// Send some data that is neither sub nor unsub
const uint8_t msg_common[] = {'A', 'B', 'C'};
// Message starts with 0 but should still treated as user
const uint8_t msg_0[] = {0, 'B', 'C'};
// Message starts with 1 but should still treated as user
const uint8_t msg_1[] = {1, 'B', 'C'};
// Test second message starting with 0
send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
send_array_expect_success (sub, msg_0, 0);
// Receive messages from subscriber
recv_array_expect_success (pub, msg_common, 0);
recv_array_expect_success (pub, msg_0, 0);
// Test second message starting with 1
send_array_expect_success (sub, msg_common, ZMQ_SNDMORE);
send_array_expect_success (sub, msg_1, 0);
// Receive messages from subscriber
recv_array_expect_success (pub, msg_common, 0);
recv_array_expect_success (pub, msg_1, 0);
char buffer[255];
// Test first message starting with 0
send_array_expect_success (sub, msg_0, 0);
// wait
msleep (SETTLE_TIME);
int rc = zmq_recv (pub, buffer, sizeof (buffer), ZMQ_DONTWAIT);
TEST_ASSERT_EQUAL_INT (-1, rc);
// Test first message starting with 1
send_array_expect_success (sub, msg_1, 0);
recv_array_expect_success (pub, msg_1, 0);
// Clean up.
test_context_socket_close (pub);
test_context_socket_close (sub);
}
int main ()
{
setup_test_environment ();
@ -467,6 +519,7 @@ int main ()
RUN_TEST (test_missing_subscriptions);
RUN_TEST (test_unsubscribe_cleanup);
RUN_TEST (test_user_message);
RUN_TEST (test_user_message_multi);
return UNITY_END ();
}