diff --git a/RELICENSE/drolevar.md b/RELICENSE/drolevar.md new file mode 100644 index 00000000..47741866 --- /dev/null +++ b/RELICENSE/drolevar.md @@ -0,0 +1,16 @@ +# Permission to Relicense under MPLv2 or any other OSI approved license chosen by the current ZeroMQ BDFL + +This is a statement by Andrij Abyzov +that grants permission to relicense his copyrights in the libzmq C++ +library (ZeroMQ) under the Mozilla Public License v2 (MPLv2) or any other +Open Source Initiative approved license chosen by the current ZeroMQ +BDFL (Benevolent Dictator for Life). + +A portion of the commits made by the Github handle "drolevar", with +commit author "Andrij Abyzov ", are copyright of Andrij Abyzov. +This document hereby grants the libzmq project team to relicense libzmq, +including all past, present and future contributions of the author listed +above. + +Andrij Abyzov +2019/11/19 diff --git a/src/select.cpp b/src/select.cpp index 7a2fc05b..159f44f2 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -373,10 +373,9 @@ void zmq::select_t::loop () // http://stackoverflow.com/q/35043420/188530 if (FD_ISSET (fd, &family_entry.fds_set.read) && FD_ISSET (fd, &family_entry.fds_set.write)) - rc = - WSAEventSelect (fd, wsa_events.events[3], - FD_READ | FD_ACCEPT | FD_CLOSE - | FD_WRITE | FD_CONNECT); + rc = WSAEventSelect (fd, wsa_events.events[3], + FD_READ | FD_ACCEPT | FD_CLOSE + | FD_WRITE | FD_CONNECT); else if (FD_ISSET (fd, &family_entry.fds_set.read)) rc = WSAEventSelect (fd, wsa_events.events[0], FD_READ | FD_ACCEPT | FD_CLOSE); diff --git a/src/xpub.cpp b/src/xpub.cpp index 2e099579..56131847 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -41,7 +41,8 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), _verbose_subs (false), _verbose_unsubs (false), - _more (false), + _more_send (false), + _more_recv (false), _lossy (true), _manual (false), _send_last_pipe (false), @@ -91,31 +92,40 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, void zmq::xpub_t::xread_activated (pipe_t *pipe_) { // There are some subscriptions waiting. Let's process them. - msg_t sub; - while (pipe_->read (&sub)) { - metadata_t *metadata = sub.metadata (); - unsigned char *msg_data = static_cast (sub.data ()), + msg_t msg; + while (pipe_->read (&msg)) { + metadata_t *metadata = msg.metadata (); + unsigned char *msg_data = static_cast (msg.data ()), *data = NULL; size_t size = 0; bool subscribe = false; + bool is_subscribe_or_cancel = false; - // Apply the subscription to the trie - if (sub.is_subscribe () || sub.is_cancel ()) { - data = static_cast (sub.command_body ()); - size = sub.command_body_size (); - subscribe = sub.is_subscribe (); - } else if (sub.size () > 0 && (*msg_data == 0 || *msg_data == 1)) { - data = msg_data + 1; - size = sub.size () - 1; - subscribe = *msg_data == 1; - } else { + if (!_more_recv) { + // Apply the subscription to the trie + if (msg.is_subscribe () || msg.is_cancel ()) { + data = static_cast (msg.command_body ()); + size = msg.command_body_size (); + subscribe = msg.is_subscribe (); + is_subscribe_or_cancel = true; + } else if (msg.size () > 0 && (*msg_data == 0 || *msg_data == 1)) { + data = msg_data + 1; + size = msg.size () - 1; + subscribe = *msg_data == 1; + is_subscribe_or_cancel = true; + } + } + + _more_recv = (msg.flags () & msg_t::more) != 0; + + if (!is_subscribe_or_cancel) { // Process user message coming upstream from xsub socket - _pending_data.push_back (blob_t (msg_data, sub.size ())); + _pending_data.push_back (blob_t (msg_data, msg.size ())); if (metadata) metadata->add_ref (); _pending_metadata.push_back (metadata); - _pending_flags.push_back (sub.flags ()); - sub.close (); + _pending_flags.push_back (msg.flags ()); + msg.close (); continue; } @@ -174,7 +184,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) _pending_flags.push_back (0); } } - sub.close (); + msg.close (); } } @@ -278,7 +288,7 @@ int zmq::xpub_t::xsend (msg_t *msg_) bool msg_more = (msg_->flags () & msg_t::more) != 0; // For the first part of multi-part message, find the matching pipes. - if (!_more) { + if (!_more_send) { if (unlikely (_manual && _last_pipe && _send_last_pipe)) { _subscriptions.match (static_cast (msg_->data ()), msg_->size (), mark_last_pipe_as_matching, @@ -300,7 +310,7 @@ int zmq::xpub_t::xsend (msg_t *msg_) // all the pipes as non-matching. if (!msg_more) _dist.unmatch (); - _more = msg_more; + _more_send = msg_more; rc = 0; // Yay, sent successfully } } else diff --git a/src/xpub.hpp b/src/xpub.hpp index f8fa960f..c5afcaf5 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -91,7 +91,10 @@ class xpub_t : public socket_base_t bool _verbose_unsubs; // True if we are in the middle of sending a multi-part message. - bool _more; + bool _more_send; + + // True if we are in the middle of receiving a multi-part message. + bool _more_recv; // Drop messages if HWM reached, otherwise return with EAGAIN bool _lossy; diff --git a/src/xsub.cpp b/src/xsub.cpp index e3e187c7..ca1be4bc 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -37,7 +37,8 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), _has_message (false), - _more (false) + _more_send (false), + _more_recv (false) { options.type = ZMQ_XSUB; @@ -99,6 +100,13 @@ int zmq::xsub_t::xsend (msg_t *msg_) size_t size = msg_->size (); unsigned char *data = static_cast (msg_->data ()); + bool send_more = _more_send; + _more_send = (msg_->flags () & msg_t::more) != 0; + + if (send_more) + // User message sent upstream to XPUB socket + return _dist.send_to_all (msg_); + if (msg_->is_subscribe () || (size > 0 && *data == 1)) { // Process subscribe message // This used to filter out duplicate subscriptions, @@ -152,7 +160,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_) int rc = msg_->move (_message); errno_assert (rc == 0); _has_message = false; - _more = (msg_->flags () & msg_t::more) != 0; + _more_recv = (msg_->flags () & msg_t::more) != 0; return 0; } @@ -170,8 +178,8 @@ int zmq::xsub_t::xrecv (msg_t *msg_) // Check whether the message matches at least one subscription. // Non-initial parts of the message are passed - if (_more || !options.filter || match (msg_)) { - _more = (msg_->flags () & msg_t::more) != 0; + if (_more_recv || !options.filter || match (msg_)) { + _more_recv = (msg_->flags () & msg_t::more) != 0; return 0; } @@ -187,7 +195,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_) bool zmq::xsub_t::xhas_in () { // There are subsequent parts of the partly-read message available. - if (_more) + if (_more_recv) return true; // If there's already a message prepared by a previous call to zmq_poll, diff --git a/src/xsub.hpp b/src/xsub.hpp index 57b0c54d..5cd98280 100755 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -93,9 +93,13 @@ class xsub_t : public socket_base_t bool _has_message; msg_t _message; + // If true, part of a multipart message was already sent, but + // there are following parts still waiting. + bool _more_send; + // If true, part of a multipart message was already received, but // there are following parts still waiting. - bool _more; + bool _more_recv; xsub_t (const xsub_t &); const xsub_t &operator= (const xsub_t &); diff --git a/tests/test_xpub_manual.cpp b/tests/test_xpub_manual.cpp index 540df8db..cc04c0a5 100644 --- a/tests/test_xpub_manual.cpp +++ b/tests/test_xpub_manual.cpp @@ -456,6 +456,58 @@ void test_user_message () test_context_socket_close (sub); } +void test_user_message_multi () +{ + // Create a publisher + void *pub = test_context_socket (ZMQ_XPUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname")); + + // Create a subscriber + void *sub = test_context_socket (ZMQ_XSUB); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname")); + + // Send some data that is neither sub nor unsub + const uint8_t msg_common[] = {'A', 'B', 'C'}; + // Message starts with 0 but should still treated as user + const uint8_t msg_0[] = {0, 'B', 'C'}; + // Message starts with 1 but should still treated as user + const uint8_t msg_1[] = {1, 'B', 'C'}; + + // Test second message starting with 0 + send_array_expect_success (sub, msg_common, ZMQ_SNDMORE); + send_array_expect_success (sub, msg_0, 0); + + // Receive messages from subscriber + recv_array_expect_success (pub, msg_common, 0); + recv_array_expect_success (pub, msg_0, 0); + + // Test second message starting with 1 + send_array_expect_success (sub, msg_common, ZMQ_SNDMORE); + send_array_expect_success (sub, msg_1, 0); + + // Receive messages from subscriber + recv_array_expect_success (pub, msg_common, 0); + recv_array_expect_success (pub, msg_1, 0); + + char buffer[255]; + // Test first message starting with 0 + send_array_expect_success (sub, msg_0, 0); + + // wait + msleep (SETTLE_TIME); + + int rc = zmq_recv (pub, buffer, sizeof (buffer), ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + + // Test first message starting with 1 + send_array_expect_success (sub, msg_1, 0); + recv_array_expect_success (pub, msg_1, 0); + + // Clean up. + test_context_socket_close (pub); + test_context_socket_close (sub); +} + int main () { setup_test_environment (); @@ -467,6 +519,7 @@ int main () RUN_TEST (test_missing_subscriptions); RUN_TEST (test_unsubscribe_cleanup); RUN_TEST (test_user_message); + RUN_TEST (test_user_message_multi); return UNITY_END (); }