From ecc63d0d3b0e1a62c90b58b1ccdb5ac16cb2400a Mon Sep 17 00:00:00 2001 From: Cornelius Date: Tue, 20 Jun 2023 16:17:26 +0200 Subject: [PATCH] Problem: long flag isn't set for subscriptions if topic has between 246 and 255 characters (#4564) * Problem: long flag isn't set for subscriptions if topic has between 246 and 255 characters Solution: fix V3.1 encoder to calculate long flag after evaluating the subscribe and cancel commands --- Makefile.am | 5 +++ src/v3_1_encoder.cpp | 6 ++- tests/CMakeLists.txt | 1 + tests/test_xpub_topic.cpp | 85 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 2 deletions(-) create mode 100644 tests/test_xpub_topic.cpp diff --git a/Makefile.am b/Makefile.am index aa40273d..77fe616b 100755 --- a/Makefile.am +++ b/Makefile.am @@ -493,6 +493,7 @@ test_apps = \ tests/test_capabilities \ tests/test_xpub_nodrop \ tests/test_xpub_manual \ + tests/test_xpub_topic \ tests/test_xpub_welcome_msg \ tests/test_xpub_verbose \ tests/test_atomics \ @@ -766,6 +767,10 @@ tests_test_xpub_manual_SOURCES = tests/test_xpub_manual.cpp tests_test_xpub_manual_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_xpub_manual_CPPFLAGS = ${TESTUTIL_CPPFLAGS} +tests_test_xpub_topic_SOURCES = tests/test_xpub_topic.cpp +tests_test_xpub_topic_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_xpub_topic_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + tests_test_xpub_welcome_msg_SOURCES = tests/test_xpub_welcome_msg.cpp tests_test_xpub_welcome_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_xpub_welcome_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} diff --git a/src/v3_1_encoder.cpp b/src/v3_1_encoder.cpp index fcf8cc63..f6b04c35 100644 --- a/src/v3_1_encoder.cpp +++ b/src/v3_1_encoder.cpp @@ -29,8 +29,6 @@ void zmq::v3_1_encoder_t::message_ready () protocol_flags = 0; if (in_progress ()->flags () & msg_t::more) protocol_flags |= v2_protocol_t::more_flag; - if (in_progress ()->size () > UCHAR_MAX) - protocol_flags |= v2_protocol_t::large_flag; if (in_progress ()->flags () & msg_t::command || in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) { protocol_flags |= v2_protocol_t::command_flag; @@ -39,6 +37,10 @@ void zmq::v3_1_encoder_t::message_ready () else if (in_progress ()->is_cancel ()) size += zmq::msg_t::cancel_cmd_name_size; } + // Calculate large_flag after command_flag. Subscribe or cancel commands + // increase the message size. + if (size > UCHAR_MAX) + protocol_flags |= v2_protocol_t::large_flag; // Encode the message length. For messages less then 256 bytes, // the length is encoded as 8-bit unsigned integer. For larger diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 66c44b12..6b7f6cfa 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -64,6 +64,7 @@ set(tests test_srcfd test_stream_timeout test_xpub_manual + test_xpub_topic test_xpub_welcome_msg test_xpub_verbose test_base85 diff --git a/tests/test_xpub_topic.cpp b/tests/test_xpub_topic.cpp new file mode 100644 index 00000000..8609cfe2 --- /dev/null +++ b/tests/test_xpub_topic.cpp @@ -0,0 +1,85 @@ +/* SPDX-License-Identifier: MPL-2.0 */ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +SETUP_TEARDOWN_TESTCONTEXT + +const char bind_address[] = "tcp://127.0.0.1:*"; +char connect_address[MAX_SOCKET_STRING]; + +// 245 chars + 10 chars for subscribe command = 255 chars +const char short_topic[] = + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDE"; + +// 246 chars + 10 chars for subscribe command = 256 chars +const char long_topic[] = + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP" + "ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEF"; + + +template +void test_subscribe_cancel (void *xpub, void *sub, const char (&topic)[SIZE]) +{ + // Ignore '\0' terminating the topic string. + const size_t topic_len = SIZE - 1; + + // Subscribe for topic + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic, topic_len)); + + // Allow receiving more than the expected number of bytes + char buffer[topic_len + 5]; + + // Receive subscription + int rc = + TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (xpub, buffer, sizeof (buffer), 0)); + TEST_ASSERT_EQUAL_INT (topic_len + 1, rc); + TEST_ASSERT_EQUAL_UINT8 (1, buffer[0]); + TEST_ASSERT_EQUAL_UINT8_ARRAY (topic, buffer + 1, topic_len); + + // Unsubscribe from topic + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic, topic_len)); + + // Receive unsubscription + rc = + TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (xpub, buffer, sizeof (buffer), 0)); + TEST_ASSERT_EQUAL_INT (topic_len + 1, rc); + TEST_ASSERT_EQUAL_UINT8 (0, buffer[0]); + TEST_ASSERT_EQUAL_UINT8_ARRAY (topic, buffer + 1, topic_len); +} + +void test_xpub_subscribe_long_topic () +{ + void *xpub = test_context_socket (ZMQ_XPUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (xpub, bind_address)); + size_t len = MAX_SOCKET_STRING; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (xpub, ZMQ_LAST_ENDPOINT, connect_address, &len)); + + void *sub = test_context_socket (ZMQ_SUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, connect_address)); + + test_subscribe_cancel (xpub, sub, short_topic); + test_subscribe_cancel (xpub, sub, long_topic); + + // Clean up. + test_context_socket_close (xpub); + test_context_socket_close (sub); +} + +int main () +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_xpub_subscribe_long_topic); + + return UNITY_END (); +}