From 253e9dd27b3d7c0d736d4fa3802e87d00bd915c9 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Mon, 28 May 2018 19:59:44 +0100 Subject: [PATCH] Problem: libzmq does not send ZMTP 3.1 sub/cancel commands Solution: if all peers of a socket are >= 3.1 use sub/cancel commands instead of the old 0/1 messages. For backward compatibility, move the handling of 0/1 or sub/cancel command strings to the encoders, so that the right thing can be done depending on the protocol version. Do not set the command flag until the encoder, so that we can handle the inproc case (which skips the encoder). --- CMakeLists.txt | 2 + Makefile.am | 2 + src/msg.cpp | 46 +++++++++++-- src/msg.hpp | 5 ++ src/sub.cpp | 16 ++--- src/v1_encoder.cpp | 22 +++++- src/v1_encoder.hpp | 2 +- src/v2_encoder.cpp | 22 +++++- src/v2_encoder.hpp | 3 +- src/v3_1_encoder.cpp | 105 +++++++++++++++++++++++++++++ src/v3_1_encoder.hpp | 56 ++++++++++++++++ src/xpub.cpp | 66 +++++++++--------- src/xsub.cpp | 20 +----- src/zmtp_engine.cpp | 55 ++++++++++----- src/zmtp_engine.hpp | 8 ++- tests/test_mock_pub_sub.cpp | 130 +++++++++++++++++++++++++----------- 16 files changed, 432 insertions(+), 128 deletions(-) create mode 100644 src/v3_1_encoder.cpp create mode 100644 src/v3_1_encoder.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 2ba77858..35c82e22 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -865,6 +865,7 @@ set(cxx-sources v1_encoder.cpp v2_decoder.cpp v2_encoder.cpp + v3_1_encoder.cpp xpub.cpp xsub.cpp zmq.cpp @@ -1007,6 +1008,7 @@ set(cxx-sources v1_encoder.hpp v2_decoder.hpp v2_encoder.hpp + v3_1_encoder.hpp v2_protocol.hpp vmci.hpp vmci_address.hpp diff --git a/Makefile.am b/Makefile.am index 746545cc..7dad758f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -239,6 +239,8 @@ src_libzmq_la_SOURCES = \ src/v1_encoder.hpp \ src/v2_encoder.cpp \ src/v2_encoder.hpp \ + src/v3_1_encoder.cpp \ + src/v3_1_encoder.hpp \ src/v2_protocol.hpp \ src/vmci.cpp \ src/vmci.hpp \ diff --git a/src/msg.cpp b/src/msg.cpp index 612a18b2..bcf88bd1 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -212,6 +212,36 @@ int zmq::msg_t::init_leave () return 0; } +int zmq::msg_t::init_subscribe (const size_t size_, const unsigned char *topic) +{ + int rc = init_size (size_); + if (rc == 0) { + set_flags (zmq::msg_t::subscribe); + + // We explicitly allow a NULL subscription with size zero + if (size_) { + assert (topic); + memcpy (data (), topic, size_); + } + } + return rc; +} + +int zmq::msg_t::init_cancel (const size_t size_, const unsigned char *topic) +{ + int rc = init_size (size_); + if (rc == 0) { + set_flags (zmq::msg_t::cancel); + + // We explicitly allow a NULL subscription with size zero + if (size_) { + assert (topic); + memcpy (data (), topic, size_); + } + } + return rc; +} + int zmq::msg_t::close () { // Check the validity of the message. @@ -487,9 +517,12 @@ size_t zmq::msg_t::command_body_size () const { if (this->is_ping () || this->is_pong ()) return this->size () - ping_cmd_name_size; - if (this->is_subscribe ()) + else if (!(this->flags () & msg_t::command) + && (this->is_subscribe () || this->is_cancel ())) + return this->size (); + else if (this->is_subscribe ()) return this->size () - sub_cmd_name_size; - if (this->is_cancel ()) + else if (this->is_cancel ()) return this->size () - cancel_cmd_name_size; return 0; @@ -498,12 +531,17 @@ size_t zmq::msg_t::command_body_size () const void *zmq::msg_t::command_body () { unsigned char *data = NULL; + if (this->is_ping () || this->is_pong ()) data = static_cast (this->data ()) + ping_cmd_name_size; - if (this->is_subscribe ()) + // With inproc, command flag is not set for sub/cancel + else if (!(this->flags () & msg_t::command) + && (this->is_subscribe () || this->is_cancel ())) + data = static_cast (this->data ()); + else if (this->is_subscribe ()) data = static_cast (this->data ()) + sub_cmd_name_size; - if (this->is_cancel ()) + else if (this->is_cancel ()) data = static_cast (this->data ()) + cancel_cmd_name_size; diff --git a/src/msg.hpp b/src/msg.hpp index c9c8ec56..532cf87d 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -54,6 +54,9 @@ namespace zmq // Note that this structure needs to be explicitly constructed // (init functions) and destructed (close function). +static const char cancel_cmd_name[] = "\6CANCEL"; +static const char sub_cmd_name[] = "\x9SUBSCRIBE"; + class msg_t { public: @@ -109,6 +112,8 @@ class msg_t int init_delimiter (); int init_join (); int init_leave (); + int init_subscribe (const size_t size_, const unsigned char *topic); + int init_cancel (const size_t size_, const unsigned char *topic); int close (); int move (msg_t &src_); int copy (msg_t &src_); diff --git a/src/sub.cpp b/src/sub.cpp index e235360d..7240054a 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -56,15 +56,15 @@ int zmq::sub_t::xsetsockopt (int option_, // Create the subscription message. msg_t msg; - int rc = msg.init_size (optvallen_ + 1); - errno_assert (rc == 0); - unsigned char *data = static_cast (msg.data ()); - *data = (option_ == ZMQ_SUBSCRIBE); - // We explicitly allow a NULL subscription with size zero - if (optvallen_) { - assert (optval_); - memcpy (data + 1, optval_, optvallen_); + int rc; + const unsigned char *data = static_cast (optval_); + if (option_ == ZMQ_SUBSCRIBE) { + rc = msg.init_subscribe (optvallen_, data); + } else { + rc = msg.init_cancel (optvallen_, data); } + errno_assert (rc == 0); + // Pass it further on in the stack. rc = xsub_t::xsend (&msg); return close_and_return (&msg, rc); diff --git a/src/v1_encoder.cpp b/src/v1_encoder.cpp index 8ef22ae7..38e7e982 100644 --- a/src/v1_encoder.cpp +++ b/src/v1_encoder.cpp @@ -55,23 +55,41 @@ void zmq::v1_encoder_t::size_ready () void zmq::v1_encoder_t::message_ready () { + size_t header_size = 2; // flags byte + size byte // Get the message size. size_t size = in_progress ()->size (); // Account for the 'flags' byte. size++; + // Account for the subscribe/cancel byte. + if (in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) + size++; + // For messages less than 255 bytes long, write one byte of message size. // For longer messages write 0xff escape character followed by 8-byte // message size. In both cases 'flags' field follows. if (size < UCHAR_MAX) { _tmpbuf[0] = static_cast (size); _tmpbuf[1] = (in_progress ()->flags () & msg_t::more); - next_step (_tmpbuf, 2, &v1_encoder_t::size_ready, false); } else { _tmpbuf[0] = UCHAR_MAX; put_uint64 (_tmpbuf + 1, size); _tmpbuf[9] = (in_progress ()->flags () & msg_t::more); - next_step (_tmpbuf, 10, &v1_encoder_t::size_ready, false); + header_size = 10; } + + // Encode the subscribe/cancel byte. This is done in the encoder as + // opposed to when the subscribe message is created to allow different + // protocol behaviour on the wire in the v3.1 and legacy encoders. + // It results in the work being done multiple times in case the sub + // is sending the subscription/cancel to multiple pubs, but it cannot + // be avoided. This processing can be moved to xsub once support for + // ZMTP < 3.1 is dropped. + if (in_progress ()->is_subscribe ()) + _tmpbuf[header_size++] = 1; + else if (in_progress ()->is_cancel ()) + _tmpbuf[header_size++] = 0; + + next_step (_tmpbuf, header_size, &v1_encoder_t::size_ready, false); } diff --git a/src/v1_encoder.hpp b/src/v1_encoder.hpp index 5d4d7b45..3f7a57e3 100644 --- a/src/v1_encoder.hpp +++ b/src/v1_encoder.hpp @@ -46,7 +46,7 @@ class v1_encoder_t ZMQ_FINAL : public encoder_base_t void size_ready (); void message_ready (); - unsigned char _tmpbuf[10]; + unsigned char _tmpbuf[11]; ZMQ_NON_COPYABLE_NOR_MOVABLE (v1_encoder_t) }; diff --git a/src/v2_encoder.cpp b/src/v2_encoder.cpp index 88f005a9..f331adee 100644 --- a/src/v2_encoder.cpp +++ b/src/v2_encoder.cpp @@ -50,6 +50,8 @@ zmq::v2_encoder_t::~v2_encoder_t () void zmq::v2_encoder_t::message_ready () { // Encode flags. + size_t size = in_progress ()->size (); + size_t header_size = 2; // flags byte + size byte unsigned char &protocol_flags = _tmp_buf[0]; protocol_flags = 0; if (in_progress ()->flags () & msg_t::more) @@ -58,18 +60,32 @@ void zmq::v2_encoder_t::message_ready () protocol_flags |= v2_protocol_t::large_flag; if (in_progress ()->flags () & msg_t::command) protocol_flags |= v2_protocol_t::command_flag; + if (in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) + ++size; // Encode the message length. For messages less then 256 bytes, // the length is encoded as 8-bit unsigned integer. For larger // messages, 64-bit unsigned integer in network byte order is used. - const size_t size = in_progress ()->size (); if (unlikely (size > UCHAR_MAX)) { put_uint64 (_tmp_buf + 1, size); - next_step (_tmp_buf, 9, &v2_encoder_t::size_ready, false); + header_size = 9; // flags byte + size 8 bytes } else { _tmp_buf[1] = static_cast (size); - next_step (_tmp_buf, 2, &v2_encoder_t::size_ready, false); } + + // Encode the subscribe/cancel byte. This is done in the encoder as + // opposed to when the subscribe message is created to allow different + // protocol behaviour on the wire in the v3.1 and legacy encoders. + // It results in the work being done multiple times in case the sub + // is sending the subscription/cancel to multiple pubs, but it cannot + // be avoided. This processing can be moved to xsub once support for + // ZMTP < 3.1 is dropped. + if (in_progress ()->is_subscribe ()) + _tmp_buf[header_size++] = 1; + else if (in_progress ()->is_cancel ()) + _tmp_buf[header_size++] = 0; + + next_step (_tmp_buf, header_size, &v2_encoder_t::size_ready, false); } void zmq::v2_encoder_t::size_ready () diff --git a/src/v2_encoder.hpp b/src/v2_encoder.hpp index 88f2303b..4f60616f 100644 --- a/src/v2_encoder.hpp +++ b/src/v2_encoder.hpp @@ -46,7 +46,8 @@ class v2_encoder_t ZMQ_FINAL : public encoder_base_t void size_ready (); void message_ready (); - unsigned char _tmp_buf[9]; + // flags byte + size byte (or 8 bytes) + sub/cancel byte + unsigned char _tmp_buf[10]; ZMQ_NON_COPYABLE_NOR_MOVABLE (v2_encoder_t) }; diff --git a/src/v3_1_encoder.cpp b/src/v3_1_encoder.cpp new file mode 100644 index 00000000..f9e11dfa --- /dev/null +++ b/src/v3_1_encoder.cpp @@ -0,0 +1,105 @@ +/* + Copyright (c) 2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "v2_protocol.hpp" +#include "v3_1_encoder.hpp" +#include "msg.hpp" +#include "likely.hpp" +#include "wire.hpp" + +#include + +zmq::v3_1_encoder_t::v3_1_encoder_t (size_t bufsize_) : + encoder_base_t (bufsize_) +{ + // Write 0 bytes to the batch and go to message_ready state. + next_step (NULL, 0, &v3_1_encoder_t::message_ready, true); +} + +zmq::v3_1_encoder_t::~v3_1_encoder_t () +{ +} + +void zmq::v3_1_encoder_t::message_ready () +{ + // Encode flags. + size_t size = in_progress ()->size (); + size_t header_size = 2; // flags byte + size byte + unsigned char &protocol_flags = _tmp_buf[0]; + protocol_flags = 0; + if (in_progress ()->flags () & msg_t::more) + protocol_flags |= v2_protocol_t::more_flag; + if (in_progress ()->size () > UCHAR_MAX) + protocol_flags |= v2_protocol_t::large_flag; + if (in_progress ()->flags () & msg_t::command + || in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) { + protocol_flags |= v2_protocol_t::command_flag; + if (in_progress ()->is_subscribe ()) + size += zmq::msg_t::sub_cmd_name_size; + else if (in_progress ()->is_cancel ()) + size += zmq::msg_t::cancel_cmd_name_size; + } + + // Encode the message length. For messages less then 256 bytes, + // the length is encoded as 8-bit unsigned integer. For larger + // messages, 64-bit unsigned integer in network byte order is used. + if (unlikely (size > UCHAR_MAX)) { + put_uint64 (_tmp_buf + 1, size); + header_size = 9; // flags byte + size 8 bytes + } else { + _tmp_buf[1] = static_cast (size); + } + + // Encode the sub/cancel command string. This is done in the encoder as + // opposed to when the subscribe message is created to allow different + // protocol behaviour on the wire in the v3.1 and legacy encoders. + // It results in the work being done multiple times in case the sub + // is sending the subscription/cancel to multiple pubs, but it cannot + // be avoided. This processing can be moved to xsub once support for + // ZMTP < 3.1 is dropped. + if (in_progress ()->is_subscribe ()) { + memcpy (_tmp_buf + header_size, zmq::sub_cmd_name, + zmq::msg_t::sub_cmd_name_size); + header_size += zmq::msg_t::sub_cmd_name_size; + } else if (in_progress ()->is_cancel ()) { + memcpy (_tmp_buf + header_size, zmq::cancel_cmd_name, + zmq::msg_t::cancel_cmd_name_size); + header_size += zmq::msg_t::cancel_cmd_name_size; + } + + next_step (_tmp_buf, header_size, &v3_1_encoder_t::size_ready, false); +} + +void zmq::v3_1_encoder_t::size_ready () +{ + // Write message body into the buffer. + next_step (in_progress ()->data (), in_progress ()->size (), + &v3_1_encoder_t::message_ready, true); +} diff --git a/src/v3_1_encoder.hpp b/src/v3_1_encoder.hpp new file mode 100644 index 00000000..d5d9c136 --- /dev/null +++ b/src/v3_1_encoder.hpp @@ -0,0 +1,56 @@ +/* + Copyright (c) 2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_V3_1_ENCODER_HPP_INCLUDED__ +#define __ZMQ_V3_1_ENCODER_HPP_INCLUDED__ + +#include "encoder.hpp" +#include "msg.hpp" + +namespace zmq +{ +// Encoder for 0MQ framing protocol. Converts messages into data stream. + +class v3_1_encoder_t ZMQ_FINAL : public encoder_base_t +{ + public: + v3_1_encoder_t (size_t bufsize_); + ~v3_1_encoder_t () ZMQ_FINAL; + + private: + void size_ready (); + void message_ready (); + + unsigned char _tmp_buf[9 + zmq::msg_t::sub_cmd_name_size]; + + ZMQ_NON_COPYABLE_NOR_MOVABLE (v3_1_encoder_t) +}; +} + +#endif diff --git a/src/xpub.cpp b/src/xpub.cpp index 36748e13..f817edcd 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -102,6 +102,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) size_t size = 0; bool subscribe = false; bool is_subscribe_or_cancel = false; + bool notify = false; const bool first_part = !_more_recv; _more_recv = (msg.flags () & msg_t::more) != 0; @@ -144,25 +145,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) _manual_subscriptions.add (data, size, pipe_); _pending_pipes.push_back (pipe_); - - // ZMTP 3.1 hack: we need to support sub/cancel commands, but - // we can't give them back to userspace as it would be an API - // breakage since the payload of the message is completely - // different. Manually craft an old-style message instead. - data = data - 1; - size = size + 1; - if (subscribe) - *data = 1; - else - *data = 0; - - _pending_data.push_back (blob_t (data, size)); - if (metadata) - metadata->add_ref (); - _pending_metadata.push_back (metadata); - _pending_flags.push_back (0); } else { - bool notify; if (!subscribe) { const mtrie_t::rm_result rm_result = _subscriptions.rm (data, size, pipe_); @@ -172,25 +155,36 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) const bool first_added = _subscriptions.add (data, size, pipe_); notify = first_added || _verbose_subs; } - - // If the request was a new subscription, or the subscription - // was removed, or verbose mode is enabled, store it so that - // it can be passed to the user on next recv call. - if (options.type == ZMQ_XPUB && notify) { - data = data - 1; - size = size + 1; - if (subscribe) - *data = 1; - else - *data = 0; - - _pending_data.push_back (blob_t (data, size)); - if (metadata) - metadata->add_ref (); - _pending_metadata.push_back (metadata); - _pending_flags.push_back (0); - } } + + // If the request was a new subscription, or the subscription + // was removed, or verbose mode or manual mode are enabled, store it + // so that it can be passed to the user on next recv call. + if (_manual || (options.type == ZMQ_XPUB && notify)) { + // ZMTP 3.1 hack: we need to support sub/cancel commands, but + // we can't give them back to userspace as it would be an API + // breakage since the payload of the message is completely + // different. Manually craft an old-style message instead. + // Although with other transports it would be possible to simply + // reuse the same buffer and prefix a 0/1 byte to the topic, with + // inproc the subscribe/cancel command string is not present in + // the message, so this optimization is not possible. + // The pushback makes a copy of the data array anyway, so the + // number of buffer copies does not change. + blob_t notification (size + 1); + if (subscribe) + *notification.data () = 1; + else + *notification.data () = 0; + memcpy (notification.data () + 1, data, size); + + _pending_data.push_back (ZMQ_MOVE (notification)); + if (metadata) + metadata->add_ref (); + _pending_metadata.push_back (metadata); + _pending_flags.push_back (0); + } + msg.close (); } } diff --git a/src/xsub.cpp b/src/xsub.cpp index 404b7a89..66c4d16b 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -135,10 +135,7 @@ int zmq::xsub_t::xsend (msg_t *msg_) // however this is alread done on the XPUB side and // doing it here as well breaks ZMQ_XPUB_VERBOSE // when there are forwarding devices involved. - if (msg_->is_subscribe ()) { - data = static_cast (msg_->command_body ()); - size = msg_->command_body_size (); - } else { + if (!msg_->is_subscribe ()) { data = data + 1; size = size - 1; } @@ -148,10 +145,7 @@ int zmq::xsub_t::xsend (msg_t *msg_) } if (msg_->is_cancel () || (size > 0 && *data == 0)) { // Process unsubscribe message - if (msg_->is_cancel ()) { - data = static_cast (msg_->command_body ()); - size = msg_->command_body_size (); - } else { + if (!msg_->is_cancel ()) { data = data + 1; size = size - 1; } @@ -271,16 +265,8 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, // Create the subscription message. msg_t msg; - const int rc = msg.init_size (size_ + 1); + const int rc = msg.init_subscribe (size_, data_); errno_assert (rc == 0); - unsigned char *data = static_cast (msg.data ()); - data[0] = 1; - - // We explicitly allow a NULL subscription with size zero - if (size_) { - assert (data_); - memcpy (data + 1, data_, size_); - } // Send it to the pipe. const bool sent = pipe->write (&msg); diff --git a/src/zmtp_engine.cpp b/src/zmtp_engine.cpp index aeda8f20..83ed3678 100644 --- a/src/zmtp_engine.cpp +++ b/src/zmtp_engine.cpp @@ -47,6 +47,7 @@ #include "v1_decoder.hpp" #include "v2_encoder.hpp" #include "v2_decoder.hpp" +#include "v3_1_encoder.hpp" #include "null_mechanism.hpp" #include "plain_client.hpp" #include "plain_server.hpp" @@ -115,8 +116,9 @@ void zmq::zmtp_engine_t::plug_internal () in_event (); } -// Position of the revision field in the greeting. +// Position of the revision and minor fields in the greeting. const size_t revision_pos = 10; +const size_t minor_pos = 11; bool zmq::zmtp_engine_t::handshake () { @@ -128,8 +130,8 @@ bool zmq::zmtp_engine_t::handshake () const bool unversioned = rc != 0; if (!(this - ->*select_handshake_fun (unversioned, - _greeting_recv[revision_pos])) ()) + ->*select_handshake_fun (unversioned, _greeting_recv[revision_pos], + _greeting_recv[minor_pos])) ()) return false; // Start polling for output if necessary. @@ -228,9 +230,8 @@ void zmq::zmtp_engine_t::receive_greeting_versioned () } } -zmq::zmtp_engine_t::handshake_fun_t -zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_, - unsigned char revision_) +zmq::zmtp_engine_t::handshake_fun_t zmq::zmtp_engine_t::select_handshake_fun ( + bool unversioned_, unsigned char revision_, unsigned char minor_) { // Is the peer using ZMTP/1.0 with no revision number? if (unversioned_) { @@ -241,8 +242,15 @@ zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_, return &zmtp_engine_t::handshake_v1_0; case ZMTP_2_0: return &zmtp_engine_t::handshake_v2_0; + case ZMTP_3_x: + switch (minor_) { + case 0: + return &zmtp_engine_t::handshake_v3_0; + default: + return &zmtp_engine_t::handshake_v3_1; + } default: - return &zmtp_engine_t::handshake_v3_0; + return &zmtp_engine_t::handshake_v3_1; } } @@ -339,15 +347,8 @@ bool zmq::zmtp_engine_t::handshake_v2_0 () return true; } -bool zmq::zmtp_engine_t::handshake_v3_0 () +bool zmq::zmtp_engine_t::handshake_v3_x () { - _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) v2_decoder_t ( - _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); - alloc_assert (_decoder); - if (_options.mechanism == ZMQ_NULL && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) @@ -408,6 +409,30 @@ bool zmq::zmtp_engine_t::handshake_v3_0 () return true; } +bool zmq::zmtp_engine_t::handshake_v3_0 () +{ + _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) v2_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); + alloc_assert (_decoder); + + return zmq::zmtp_engine_t::handshake_v3_x (); +} + +bool zmq::zmtp_engine_t::handshake_v3_1 () +{ + _encoder = new (std::nothrow) v3_1_encoder_t (_options.out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) v2_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); + alloc_assert (_decoder); + + return zmq::zmtp_engine_t::handshake_v3_x (); +} + int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_) { const int rc = msg_->init_size (_options.routing_id_size); diff --git a/src/zmtp_engine.hpp b/src/zmtp_engine.hpp index a05053e1..2d18bcfa 100644 --- a/src/zmtp_engine.hpp +++ b/src/zmtp_engine.hpp @@ -49,7 +49,8 @@ namespace zmq enum { ZMTP_1_0 = 0, - ZMTP_2_0 = 1 + ZMTP_2_0 = 1, + ZMTP_3_x = 3 }; class io_thread_t; @@ -85,12 +86,15 @@ class zmtp_engine_t ZMQ_FINAL : public stream_engine_base_t typedef bool (zmtp_engine_t::*handshake_fun_t) (); static handshake_fun_t select_handshake_fun (bool unversioned, - unsigned char revision); + unsigned char revision, + unsigned char minor); bool handshake_v1_0_unversioned (); bool handshake_v1_0 (); bool handshake_v2_0 (); + bool handshake_v3_x (); bool handshake_v3_0 (); + bool handshake_v3_1 (); int routing_id_msg (msg_t *msg_); int process_routing_id_msg (msg_t *msg_); diff --git a/tests/test_mock_pub_sub.cpp b/tests/test_mock_pub_sub.cpp index 2977a9b2..6795bb7e 100644 --- a/tests/test_mock_pub_sub.cpp +++ b/tests/test_mock_pub_sub.cpp @@ -81,7 +81,7 @@ static void recv_with_retry (raw_socket fd_, char *buffer_, int bytes_) } } -static void mock_handshake (raw_socket fd_) +static void mock_handshake (raw_socket fd_, bool sub_command, bool mock_pub) { const uint8_t zmtp_greeting[33] = {0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0}; @@ -89,31 +89,44 @@ static void mock_handshake (raw_socket fd_) memset (buffer, 0, sizeof (buffer)); memcpy (buffer, zmtp_greeting, sizeof (zmtp_greeting)); + // Mock ZMTP 3.1 which uses commands + if (sub_command) { + buffer[11] = 1; + } int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 64, 0)); TEST_ASSERT_EQUAL_INT (64, rc); recv_with_retry (fd_, buffer, 64); - const uint8_t zmtp_ready[27] = { - 4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', - 't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'}; + if (!mock_pub) { + const uint8_t zmtp_ready[27] = { + 4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', + 't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'}; + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + send (fd_, (const char *) zmtp_ready, 27, 0)); + TEST_ASSERT_EQUAL_INT (27, rc); + } else { + const uint8_t zmtp_ready[28] = { + 4, 26, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', + 't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 4, 'X', 'P', 'U', 'B'}; + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + send (fd_, (const char *) zmtp_ready, 28, 0)); + TEST_ASSERT_EQUAL_INT (28, rc); + } + // greeting - XPUB has one extra byte memset (buffer, 0, sizeof (buffer)); - memcpy (buffer, zmtp_ready, 27); - rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 27, 0)); - TEST_ASSERT_EQUAL_INT (27, rc); - - // greeting - XPUB so one extra byte - recv_with_retry (fd_, buffer, 28); + recv_with_retry (fd_, buffer, mock_pub ? 27 : 28); } static void prep_server_socket (void **server_out_, void **mon_out_, char *endpoint_, - size_t ep_length_) + size_t ep_length_, + int socket_type) { // We'll be using this socket in raw mode - void *server = test_context_socket (ZMQ_XPUB); + void *server = test_context_socket (socket_type); int value = 0; TEST_ASSERT_SUCCESS_ERRNO ( @@ -136,13 +149,14 @@ static void prep_server_socket (void **server_out_, *mon_out_ = server_mon; } -static void test_mock_sub (bool sub_command_) +static void test_mock_pub_sub (bool sub_command_, bool mock_pub_) { int rc; char my_endpoint[MAX_SOCKET_STRING]; void *server, *server_mon; - prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING); + prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING, + mock_pub_ ? ZMQ_SUB : ZMQ_XPUB); struct sockaddr_in ip4addr; raw_socket s; @@ -161,37 +175,63 @@ static void test_mock_sub (bool sub_command_) TEST_ASSERT_GREATER_THAN_INT (-1, rc); // Mock a ZMTP 3 client so we can forcibly try sub commands - mock_handshake (s); + mock_handshake (s, sub_command_, mock_pub_); // By now everything should report as connected rc = get_monitor_event (server_mon); TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc); - if (sub_command_) { - const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S', - 'C', 'R', 'I', 'B', 'E', 'A'}; - rc = - TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 13, 0)); - TEST_ASSERT_EQUAL_INT (13, rc); - } else { - const uint8_t sub[4] = {0, 2, 1, 'A'}; - rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 4, 0)); + char buffer[32]; + memset (buffer, 0, sizeof (buffer)); + + if (mock_pub_) { + rc = zmq_setsockopt (server, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + // SUB binds, let its state machine run + zmq_recv (server, buffer, 16, ZMQ_DONTWAIT); + + if (sub_command_) { + recv_with_retry (s, buffer, 13); + TEST_ASSERT_EQUAL_INT (0, + memcmp (buffer, "\4\xb\x9SUBSCRIBEA", 13)); + } else { + recv_with_retry (s, buffer, 4); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\2\1A", 4)); + } + + memcpy (buffer, "\0\4ALOL", 6); + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, buffer, 6, 0)); + TEST_ASSERT_EQUAL_INT (6, rc); + + memset (buffer, 0, sizeof (buffer)); + rc = zmq_recv (server, buffer, 4, 0); TEST_ASSERT_EQUAL_INT (4, rc); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "ALOL", 4)); + } else { + if (sub_command_) { + const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S', + 'C', 'R', 'I', 'B', 'E', 'A'}; + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + send (s, (const char *) sub, 13, 0)); + TEST_ASSERT_EQUAL_INT (13, rc); + } else { + const uint8_t sub[4] = {0, 2, 1, 'A'}; + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + send (s, (const char *) sub, 4, 0)); + TEST_ASSERT_EQUAL_INT (4, rc); + } + rc = zmq_recv (server, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2)); + + rc = zmq_send (server, "ALOL", 4, 0); + TEST_ASSERT_EQUAL_INT (4, rc); + + memset (buffer, 0, sizeof (buffer)); + recv_with_retry (s, buffer, 6); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6)); } - char buffer[16]; - memset (buffer, 0, sizeof (buffer)); - rc = zmq_recv (server, buffer, 2, 0); - TEST_ASSERT_EQUAL_INT (2, rc); - TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2)); - - rc = zmq_send (server, "ALOL", 4, 0); - TEST_ASSERT_EQUAL_INT (4, rc); - - memset (buffer, 0, sizeof (buffer)); - recv_with_retry (s, buffer, 6); - TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6)); - close (s); test_context_socket_close (server); @@ -200,12 +240,22 @@ static void test_mock_sub (bool sub_command_) void test_mock_sub_command () { - test_mock_sub (true); + test_mock_pub_sub (true, false); } void test_mock_sub_legacy () { - test_mock_sub (false); + test_mock_pub_sub (false, false); +} + +void test_mock_pub_command () +{ + test_mock_pub_sub (true, true); +} + +void test_mock_pub_legacy () +{ + test_mock_pub_sub (false, true); } int main (void) @@ -216,6 +266,8 @@ int main (void) RUN_TEST (test_mock_sub_command); RUN_TEST (test_mock_sub_legacy); + RUN_TEST (test_mock_pub_command); + RUN_TEST (test_mock_pub_legacy); return UNITY_END (); }