diff --git a/CMakeLists.txt b/CMakeLists.txt index 2ba77858..35c82e22 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -865,6 +865,7 @@ set(cxx-sources v1_encoder.cpp v2_decoder.cpp v2_encoder.cpp + v3_1_encoder.cpp xpub.cpp xsub.cpp zmq.cpp @@ -1007,6 +1008,7 @@ set(cxx-sources v1_encoder.hpp v2_decoder.hpp v2_encoder.hpp + v3_1_encoder.hpp v2_protocol.hpp vmci.hpp vmci_address.hpp diff --git a/Makefile.am b/Makefile.am index 746545cc..7dad758f 100644 --- a/Makefile.am +++ b/Makefile.am @@ -239,6 +239,8 @@ src_libzmq_la_SOURCES = \ src/v1_encoder.hpp \ src/v2_encoder.cpp \ src/v2_encoder.hpp \ + src/v3_1_encoder.cpp \ + src/v3_1_encoder.hpp \ src/v2_protocol.hpp \ src/vmci.cpp \ src/vmci.hpp \ diff --git a/src/msg.cpp b/src/msg.cpp index 612a18b2..bcf88bd1 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -212,6 +212,36 @@ int zmq::msg_t::init_leave () return 0; } +int zmq::msg_t::init_subscribe (const size_t size_, const unsigned char *topic) +{ + int rc = init_size (size_); + if (rc == 0) { + set_flags (zmq::msg_t::subscribe); + + // We explicitly allow a NULL subscription with size zero + if (size_) { + assert (topic); + memcpy (data (), topic, size_); + } + } + return rc; +} + +int zmq::msg_t::init_cancel (const size_t size_, const unsigned char *topic) +{ + int rc = init_size (size_); + if (rc == 0) { + set_flags (zmq::msg_t::cancel); + + // We explicitly allow a NULL subscription with size zero + if (size_) { + assert (topic); + memcpy (data (), topic, size_); + } + } + return rc; +} + int zmq::msg_t::close () { // Check the validity of the message. @@ -487,9 +517,12 @@ size_t zmq::msg_t::command_body_size () const { if (this->is_ping () || this->is_pong ()) return this->size () - ping_cmd_name_size; - if (this->is_subscribe ()) + else if (!(this->flags () & msg_t::command) + && (this->is_subscribe () || this->is_cancel ())) + return this->size (); + else if (this->is_subscribe ()) return this->size () - sub_cmd_name_size; - if (this->is_cancel ()) + else if (this->is_cancel ()) return this->size () - cancel_cmd_name_size; return 0; @@ -498,12 +531,17 @@ size_t zmq::msg_t::command_body_size () const void *zmq::msg_t::command_body () { unsigned char *data = NULL; + if (this->is_ping () || this->is_pong ()) data = static_cast (this->data ()) + ping_cmd_name_size; - if (this->is_subscribe ()) + // With inproc, command flag is not set for sub/cancel + else if (!(this->flags () & msg_t::command) + && (this->is_subscribe () || this->is_cancel ())) + data = static_cast (this->data ()); + else if (this->is_subscribe ()) data = static_cast (this->data ()) + sub_cmd_name_size; - if (this->is_cancel ()) + else if (this->is_cancel ()) data = static_cast (this->data ()) + cancel_cmd_name_size; diff --git a/src/msg.hpp b/src/msg.hpp index c9c8ec56..532cf87d 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -54,6 +54,9 @@ namespace zmq // Note that this structure needs to be explicitly constructed // (init functions) and destructed (close function). +static const char cancel_cmd_name[] = "\6CANCEL"; +static const char sub_cmd_name[] = "\x9SUBSCRIBE"; + class msg_t { public: @@ -109,6 +112,8 @@ class msg_t int init_delimiter (); int init_join (); int init_leave (); + int init_subscribe (const size_t size_, const unsigned char *topic); + int init_cancel (const size_t size_, const unsigned char *topic); int close (); int move (msg_t &src_); int copy (msg_t &src_); diff --git a/src/sub.cpp b/src/sub.cpp index e235360d..7240054a 100644 --- a/src/sub.cpp +++ b/src/sub.cpp @@ -56,15 +56,15 @@ int zmq::sub_t::xsetsockopt (int option_, // Create the subscription message. msg_t msg; - int rc = msg.init_size (optvallen_ + 1); - errno_assert (rc == 0); - unsigned char *data = static_cast (msg.data ()); - *data = (option_ == ZMQ_SUBSCRIBE); - // We explicitly allow a NULL subscription with size zero - if (optvallen_) { - assert (optval_); - memcpy (data + 1, optval_, optvallen_); + int rc; + const unsigned char *data = static_cast (optval_); + if (option_ == ZMQ_SUBSCRIBE) { + rc = msg.init_subscribe (optvallen_, data); + } else { + rc = msg.init_cancel (optvallen_, data); } + errno_assert (rc == 0); + // Pass it further on in the stack. rc = xsub_t::xsend (&msg); return close_and_return (&msg, rc); diff --git a/src/v1_encoder.cpp b/src/v1_encoder.cpp index 8ef22ae7..38e7e982 100644 --- a/src/v1_encoder.cpp +++ b/src/v1_encoder.cpp @@ -55,23 +55,41 @@ void zmq::v1_encoder_t::size_ready () void zmq::v1_encoder_t::message_ready () { + size_t header_size = 2; // flags byte + size byte // Get the message size. size_t size = in_progress ()->size (); // Account for the 'flags' byte. size++; + // Account for the subscribe/cancel byte. + if (in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) + size++; + // For messages less than 255 bytes long, write one byte of message size. // For longer messages write 0xff escape character followed by 8-byte // message size. In both cases 'flags' field follows. if (size < UCHAR_MAX) { _tmpbuf[0] = static_cast (size); _tmpbuf[1] = (in_progress ()->flags () & msg_t::more); - next_step (_tmpbuf, 2, &v1_encoder_t::size_ready, false); } else { _tmpbuf[0] = UCHAR_MAX; put_uint64 (_tmpbuf + 1, size); _tmpbuf[9] = (in_progress ()->flags () & msg_t::more); - next_step (_tmpbuf, 10, &v1_encoder_t::size_ready, false); + header_size = 10; } + + // Encode the subscribe/cancel byte. This is done in the encoder as + // opposed to when the subscribe message is created to allow different + // protocol behaviour on the wire in the v3.1 and legacy encoders. + // It results in the work being done multiple times in case the sub + // is sending the subscription/cancel to multiple pubs, but it cannot + // be avoided. This processing can be moved to xsub once support for + // ZMTP < 3.1 is dropped. + if (in_progress ()->is_subscribe ()) + _tmpbuf[header_size++] = 1; + else if (in_progress ()->is_cancel ()) + _tmpbuf[header_size++] = 0; + + next_step (_tmpbuf, header_size, &v1_encoder_t::size_ready, false); } diff --git a/src/v1_encoder.hpp b/src/v1_encoder.hpp index 5d4d7b45..3f7a57e3 100644 --- a/src/v1_encoder.hpp +++ b/src/v1_encoder.hpp @@ -46,7 +46,7 @@ class v1_encoder_t ZMQ_FINAL : public encoder_base_t void size_ready (); void message_ready (); - unsigned char _tmpbuf[10]; + unsigned char _tmpbuf[11]; ZMQ_NON_COPYABLE_NOR_MOVABLE (v1_encoder_t) }; diff --git a/src/v2_encoder.cpp b/src/v2_encoder.cpp index 88f005a9..f331adee 100644 --- a/src/v2_encoder.cpp +++ b/src/v2_encoder.cpp @@ -50,6 +50,8 @@ zmq::v2_encoder_t::~v2_encoder_t () void zmq::v2_encoder_t::message_ready () { // Encode flags. + size_t size = in_progress ()->size (); + size_t header_size = 2; // flags byte + size byte unsigned char &protocol_flags = _tmp_buf[0]; protocol_flags = 0; if (in_progress ()->flags () & msg_t::more) @@ -58,18 +60,32 @@ void zmq::v2_encoder_t::message_ready () protocol_flags |= v2_protocol_t::large_flag; if (in_progress ()->flags () & msg_t::command) protocol_flags |= v2_protocol_t::command_flag; + if (in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) + ++size; // Encode the message length. For messages less then 256 bytes, // the length is encoded as 8-bit unsigned integer. For larger // messages, 64-bit unsigned integer in network byte order is used. - const size_t size = in_progress ()->size (); if (unlikely (size > UCHAR_MAX)) { put_uint64 (_tmp_buf + 1, size); - next_step (_tmp_buf, 9, &v2_encoder_t::size_ready, false); + header_size = 9; // flags byte + size 8 bytes } else { _tmp_buf[1] = static_cast (size); - next_step (_tmp_buf, 2, &v2_encoder_t::size_ready, false); } + + // Encode the subscribe/cancel byte. This is done in the encoder as + // opposed to when the subscribe message is created to allow different + // protocol behaviour on the wire in the v3.1 and legacy encoders. + // It results in the work being done multiple times in case the sub + // is sending the subscription/cancel to multiple pubs, but it cannot + // be avoided. This processing can be moved to xsub once support for + // ZMTP < 3.1 is dropped. + if (in_progress ()->is_subscribe ()) + _tmp_buf[header_size++] = 1; + else if (in_progress ()->is_cancel ()) + _tmp_buf[header_size++] = 0; + + next_step (_tmp_buf, header_size, &v2_encoder_t::size_ready, false); } void zmq::v2_encoder_t::size_ready () diff --git a/src/v2_encoder.hpp b/src/v2_encoder.hpp index 88f2303b..4f60616f 100644 --- a/src/v2_encoder.hpp +++ b/src/v2_encoder.hpp @@ -46,7 +46,8 @@ class v2_encoder_t ZMQ_FINAL : public encoder_base_t void size_ready (); void message_ready (); - unsigned char _tmp_buf[9]; + // flags byte + size byte (or 8 bytes) + sub/cancel byte + unsigned char _tmp_buf[10]; ZMQ_NON_COPYABLE_NOR_MOVABLE (v2_encoder_t) }; diff --git a/src/v3_1_encoder.cpp b/src/v3_1_encoder.cpp new file mode 100644 index 00000000..f9e11dfa --- /dev/null +++ b/src/v3_1_encoder.cpp @@ -0,0 +1,105 @@ +/* + Copyright (c) 2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "v2_protocol.hpp" +#include "v3_1_encoder.hpp" +#include "msg.hpp" +#include "likely.hpp" +#include "wire.hpp" + +#include + +zmq::v3_1_encoder_t::v3_1_encoder_t (size_t bufsize_) : + encoder_base_t (bufsize_) +{ + // Write 0 bytes to the batch and go to message_ready state. + next_step (NULL, 0, &v3_1_encoder_t::message_ready, true); +} + +zmq::v3_1_encoder_t::~v3_1_encoder_t () +{ +} + +void zmq::v3_1_encoder_t::message_ready () +{ + // Encode flags. + size_t size = in_progress ()->size (); + size_t header_size = 2; // flags byte + size byte + unsigned char &protocol_flags = _tmp_buf[0]; + protocol_flags = 0; + if (in_progress ()->flags () & msg_t::more) + protocol_flags |= v2_protocol_t::more_flag; + if (in_progress ()->size () > UCHAR_MAX) + protocol_flags |= v2_protocol_t::large_flag; + if (in_progress ()->flags () & msg_t::command + || in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) { + protocol_flags |= v2_protocol_t::command_flag; + if (in_progress ()->is_subscribe ()) + size += zmq::msg_t::sub_cmd_name_size; + else if (in_progress ()->is_cancel ()) + size += zmq::msg_t::cancel_cmd_name_size; + } + + // Encode the message length. For messages less then 256 bytes, + // the length is encoded as 8-bit unsigned integer. For larger + // messages, 64-bit unsigned integer in network byte order is used. + if (unlikely (size > UCHAR_MAX)) { + put_uint64 (_tmp_buf + 1, size); + header_size = 9; // flags byte + size 8 bytes + } else { + _tmp_buf[1] = static_cast (size); + } + + // Encode the sub/cancel command string. This is done in the encoder as + // opposed to when the subscribe message is created to allow different + // protocol behaviour on the wire in the v3.1 and legacy encoders. + // It results in the work being done multiple times in case the sub + // is sending the subscription/cancel to multiple pubs, but it cannot + // be avoided. This processing can be moved to xsub once support for + // ZMTP < 3.1 is dropped. + if (in_progress ()->is_subscribe ()) { + memcpy (_tmp_buf + header_size, zmq::sub_cmd_name, + zmq::msg_t::sub_cmd_name_size); + header_size += zmq::msg_t::sub_cmd_name_size; + } else if (in_progress ()->is_cancel ()) { + memcpy (_tmp_buf + header_size, zmq::cancel_cmd_name, + zmq::msg_t::cancel_cmd_name_size); + header_size += zmq::msg_t::cancel_cmd_name_size; + } + + next_step (_tmp_buf, header_size, &v3_1_encoder_t::size_ready, false); +} + +void zmq::v3_1_encoder_t::size_ready () +{ + // Write message body into the buffer. + next_step (in_progress ()->data (), in_progress ()->size (), + &v3_1_encoder_t::message_ready, true); +} diff --git a/src/v3_1_encoder.hpp b/src/v3_1_encoder.hpp new file mode 100644 index 00000000..d5d9c136 --- /dev/null +++ b/src/v3_1_encoder.hpp @@ -0,0 +1,56 @@ +/* + Copyright (c) 2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_V3_1_ENCODER_HPP_INCLUDED__ +#define __ZMQ_V3_1_ENCODER_HPP_INCLUDED__ + +#include "encoder.hpp" +#include "msg.hpp" + +namespace zmq +{ +// Encoder for 0MQ framing protocol. Converts messages into data stream. + +class v3_1_encoder_t ZMQ_FINAL : public encoder_base_t +{ + public: + v3_1_encoder_t (size_t bufsize_); + ~v3_1_encoder_t () ZMQ_FINAL; + + private: + void size_ready (); + void message_ready (); + + unsigned char _tmp_buf[9 + zmq::msg_t::sub_cmd_name_size]; + + ZMQ_NON_COPYABLE_NOR_MOVABLE (v3_1_encoder_t) +}; +} + +#endif diff --git a/src/xpub.cpp b/src/xpub.cpp index 36748e13..f817edcd 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -102,6 +102,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) size_t size = 0; bool subscribe = false; bool is_subscribe_or_cancel = false; + bool notify = false; const bool first_part = !_more_recv; _more_recv = (msg.flags () & msg_t::more) != 0; @@ -144,25 +145,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) _manual_subscriptions.add (data, size, pipe_); _pending_pipes.push_back (pipe_); - - // ZMTP 3.1 hack: we need to support sub/cancel commands, but - // we can't give them back to userspace as it would be an API - // breakage since the payload of the message is completely - // different. Manually craft an old-style message instead. - data = data - 1; - size = size + 1; - if (subscribe) - *data = 1; - else - *data = 0; - - _pending_data.push_back (blob_t (data, size)); - if (metadata) - metadata->add_ref (); - _pending_metadata.push_back (metadata); - _pending_flags.push_back (0); } else { - bool notify; if (!subscribe) { const mtrie_t::rm_result rm_result = _subscriptions.rm (data, size, pipe_); @@ -172,25 +155,36 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) const bool first_added = _subscriptions.add (data, size, pipe_); notify = first_added || _verbose_subs; } - - // If the request was a new subscription, or the subscription - // was removed, or verbose mode is enabled, store it so that - // it can be passed to the user on next recv call. - if (options.type == ZMQ_XPUB && notify) { - data = data - 1; - size = size + 1; - if (subscribe) - *data = 1; - else - *data = 0; - - _pending_data.push_back (blob_t (data, size)); - if (metadata) - metadata->add_ref (); - _pending_metadata.push_back (metadata); - _pending_flags.push_back (0); - } } + + // If the request was a new subscription, or the subscription + // was removed, or verbose mode or manual mode are enabled, store it + // so that it can be passed to the user on next recv call. + if (_manual || (options.type == ZMQ_XPUB && notify)) { + // ZMTP 3.1 hack: we need to support sub/cancel commands, but + // we can't give them back to userspace as it would be an API + // breakage since the payload of the message is completely + // different. Manually craft an old-style message instead. + // Although with other transports it would be possible to simply + // reuse the same buffer and prefix a 0/1 byte to the topic, with + // inproc the subscribe/cancel command string is not present in + // the message, so this optimization is not possible. + // The pushback makes a copy of the data array anyway, so the + // number of buffer copies does not change. + blob_t notification (size + 1); + if (subscribe) + *notification.data () = 1; + else + *notification.data () = 0; + memcpy (notification.data () + 1, data, size); + + _pending_data.push_back (ZMQ_MOVE (notification)); + if (metadata) + metadata->add_ref (); + _pending_metadata.push_back (metadata); + _pending_flags.push_back (0); + } + msg.close (); } } diff --git a/src/xsub.cpp b/src/xsub.cpp index 404b7a89..66c4d16b 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -135,10 +135,7 @@ int zmq::xsub_t::xsend (msg_t *msg_) // however this is alread done on the XPUB side and // doing it here as well breaks ZMQ_XPUB_VERBOSE // when there are forwarding devices involved. - if (msg_->is_subscribe ()) { - data = static_cast (msg_->command_body ()); - size = msg_->command_body_size (); - } else { + if (!msg_->is_subscribe ()) { data = data + 1; size = size - 1; } @@ -148,10 +145,7 @@ int zmq::xsub_t::xsend (msg_t *msg_) } if (msg_->is_cancel () || (size > 0 && *data == 0)) { // Process unsubscribe message - if (msg_->is_cancel ()) { - data = static_cast (msg_->command_body ()); - size = msg_->command_body_size (); - } else { + if (!msg_->is_cancel ()) { data = data + 1; size = size - 1; } @@ -271,16 +265,8 @@ void zmq::xsub_t::send_subscription (unsigned char *data_, // Create the subscription message. msg_t msg; - const int rc = msg.init_size (size_ + 1); + const int rc = msg.init_subscribe (size_, data_); errno_assert (rc == 0); - unsigned char *data = static_cast (msg.data ()); - data[0] = 1; - - // We explicitly allow a NULL subscription with size zero - if (size_) { - assert (data_); - memcpy (data + 1, data_, size_); - } // Send it to the pipe. const bool sent = pipe->write (&msg); diff --git a/src/zmtp_engine.cpp b/src/zmtp_engine.cpp index aeda8f20..d9a186b1 100644 --- a/src/zmtp_engine.cpp +++ b/src/zmtp_engine.cpp @@ -47,6 +47,7 @@ #include "v1_decoder.hpp" #include "v2_encoder.hpp" #include "v2_decoder.hpp" +#include "v3_1_encoder.hpp" #include "null_mechanism.hpp" #include "plain_client.hpp" #include "plain_server.hpp" @@ -115,8 +116,9 @@ void zmq::zmtp_engine_t::plug_internal () in_event (); } -// Position of the revision field in the greeting. +// Position of the revision and minor fields in the greeting. const size_t revision_pos = 10; +const size_t minor_pos = 11; bool zmq::zmtp_engine_t::handshake () { @@ -128,8 +130,8 @@ bool zmq::zmtp_engine_t::handshake () const bool unversioned = rc != 0; if (!(this - ->*select_handshake_fun (unversioned, - _greeting_recv[revision_pos])) ()) + ->*select_handshake_fun (unversioned, _greeting_recv[revision_pos], + _greeting_recv[minor_pos])) ()) return false; // Start polling for output if necessary. @@ -203,7 +205,7 @@ void zmq::zmtp_engine_t::receive_greeting_versioned () || _greeting_recv[revision_pos] == ZMTP_2_0) _outpos[_outsize++] = _options.type; else { - _outpos[_outsize++] = 0; // Minor version number + _outpos[_outsize++] = 1; // Minor version number memset (_outpos + _outsize, 0, 20); zmq_assert (_options.mechanism == ZMQ_NULL @@ -228,9 +230,8 @@ void zmq::zmtp_engine_t::receive_greeting_versioned () } } -zmq::zmtp_engine_t::handshake_fun_t -zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_, - unsigned char revision_) +zmq::zmtp_engine_t::handshake_fun_t zmq::zmtp_engine_t::select_handshake_fun ( + bool unversioned_, unsigned char revision_, unsigned char minor_) { // Is the peer using ZMTP/1.0 with no revision number? if (unversioned_) { @@ -241,8 +242,15 @@ zmq::zmtp_engine_t::select_handshake_fun (bool unversioned_, return &zmtp_engine_t::handshake_v1_0; case ZMTP_2_0: return &zmtp_engine_t::handshake_v2_0; + case ZMTP_3_x: + switch (minor_) { + case 0: + return &zmtp_engine_t::handshake_v3_0; + default: + return &zmtp_engine_t::handshake_v3_1; + } default: - return &zmtp_engine_t::handshake_v3_0; + return &zmtp_engine_t::handshake_v3_1; } } @@ -339,15 +347,8 @@ bool zmq::zmtp_engine_t::handshake_v2_0 () return true; } -bool zmq::zmtp_engine_t::handshake_v3_0 () +bool zmq::zmtp_engine_t::handshake_v3_x () { - _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); - alloc_assert (_encoder); - - _decoder = new (std::nothrow) v2_decoder_t ( - _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); - alloc_assert (_decoder); - if (_options.mechanism == ZMQ_NULL && memcmp (_greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) @@ -408,6 +409,30 @@ bool zmq::zmtp_engine_t::handshake_v3_0 () return true; } +bool zmq::zmtp_engine_t::handshake_v3_0 () +{ + _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) v2_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); + alloc_assert (_decoder); + + return zmq::zmtp_engine_t::handshake_v3_x (); +} + +bool zmq::zmtp_engine_t::handshake_v3_1 () +{ + _encoder = new (std::nothrow) v3_1_encoder_t (_options.out_batch_size); + alloc_assert (_encoder); + + _decoder = new (std::nothrow) v2_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); + alloc_assert (_decoder); + + return zmq::zmtp_engine_t::handshake_v3_x (); +} + int zmq::zmtp_engine_t::routing_id_msg (msg_t *msg_) { const int rc = msg_->init_size (_options.routing_id_size); diff --git a/src/zmtp_engine.hpp b/src/zmtp_engine.hpp index a05053e1..2d18bcfa 100644 --- a/src/zmtp_engine.hpp +++ b/src/zmtp_engine.hpp @@ -49,7 +49,8 @@ namespace zmq enum { ZMTP_1_0 = 0, - ZMTP_2_0 = 1 + ZMTP_2_0 = 1, + ZMTP_3_x = 3 }; class io_thread_t; @@ -85,12 +86,15 @@ class zmtp_engine_t ZMQ_FINAL : public stream_engine_base_t typedef bool (zmtp_engine_t::*handshake_fun_t) (); static handshake_fun_t select_handshake_fun (bool unversioned, - unsigned char revision); + unsigned char revision, + unsigned char minor); bool handshake_v1_0_unversioned (); bool handshake_v1_0 (); bool handshake_v2_0 (); + bool handshake_v3_x (); bool handshake_v3_0 (); + bool handshake_v3_1 (); int routing_id_msg (msg_t *msg_); int process_routing_id_msg (msg_t *msg_); diff --git a/tests/test_mock_pub_sub.cpp b/tests/test_mock_pub_sub.cpp index 2977a9b2..6795bb7e 100644 --- a/tests/test_mock_pub_sub.cpp +++ b/tests/test_mock_pub_sub.cpp @@ -81,7 +81,7 @@ static void recv_with_retry (raw_socket fd_, char *buffer_, int bytes_) } } -static void mock_handshake (raw_socket fd_) +static void mock_handshake (raw_socket fd_, bool sub_command, bool mock_pub) { const uint8_t zmtp_greeting[33] = {0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0}; @@ -89,31 +89,44 @@ static void mock_handshake (raw_socket fd_) memset (buffer, 0, sizeof (buffer)); memcpy (buffer, zmtp_greeting, sizeof (zmtp_greeting)); + // Mock ZMTP 3.1 which uses commands + if (sub_command) { + buffer[11] = 1; + } int rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 64, 0)); TEST_ASSERT_EQUAL_INT (64, rc); recv_with_retry (fd_, buffer, 64); - const uint8_t zmtp_ready[27] = { - 4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', - 't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'}; + if (!mock_pub) { + const uint8_t zmtp_ready[27] = { + 4, 25, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', + 't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 3, 'S', 'U', 'B'}; + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + send (fd_, (const char *) zmtp_ready, 27, 0)); + TEST_ASSERT_EQUAL_INT (27, rc); + } else { + const uint8_t zmtp_ready[28] = { + 4, 26, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', + 't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 4, 'X', 'P', 'U', 'B'}; + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + send (fd_, (const char *) zmtp_ready, 28, 0)); + TEST_ASSERT_EQUAL_INT (28, rc); + } + // greeting - XPUB has one extra byte memset (buffer, 0, sizeof (buffer)); - memcpy (buffer, zmtp_ready, 27); - rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (fd_, buffer, 27, 0)); - TEST_ASSERT_EQUAL_INT (27, rc); - - // greeting - XPUB so one extra byte - recv_with_retry (fd_, buffer, 28); + recv_with_retry (fd_, buffer, mock_pub ? 27 : 28); } static void prep_server_socket (void **server_out_, void **mon_out_, char *endpoint_, - size_t ep_length_) + size_t ep_length_, + int socket_type) { // We'll be using this socket in raw mode - void *server = test_context_socket (ZMQ_XPUB); + void *server = test_context_socket (socket_type); int value = 0; TEST_ASSERT_SUCCESS_ERRNO ( @@ -136,13 +149,14 @@ static void prep_server_socket (void **server_out_, *mon_out_ = server_mon; } -static void test_mock_sub (bool sub_command_) +static void test_mock_pub_sub (bool sub_command_, bool mock_pub_) { int rc; char my_endpoint[MAX_SOCKET_STRING]; void *server, *server_mon; - prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING); + prep_server_socket (&server, &server_mon, my_endpoint, MAX_SOCKET_STRING, + mock_pub_ ? ZMQ_SUB : ZMQ_XPUB); struct sockaddr_in ip4addr; raw_socket s; @@ -161,37 +175,63 @@ static void test_mock_sub (bool sub_command_) TEST_ASSERT_GREATER_THAN_INT (-1, rc); // Mock a ZMTP 3 client so we can forcibly try sub commands - mock_handshake (s); + mock_handshake (s, sub_command_, mock_pub_); // By now everything should report as connected rc = get_monitor_event (server_mon); TEST_ASSERT_EQUAL_INT (ZMQ_EVENT_ACCEPTED, rc); - if (sub_command_) { - const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S', - 'C', 'R', 'I', 'B', 'E', 'A'}; - rc = - TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 13, 0)); - TEST_ASSERT_EQUAL_INT (13, rc); - } else { - const uint8_t sub[4] = {0, 2, 1, 'A'}; - rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, (const char *) sub, 4, 0)); + char buffer[32]; + memset (buffer, 0, sizeof (buffer)); + + if (mock_pub_) { + rc = zmq_setsockopt (server, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + // SUB binds, let its state machine run + zmq_recv (server, buffer, 16, ZMQ_DONTWAIT); + + if (sub_command_) { + recv_with_retry (s, buffer, 13); + TEST_ASSERT_EQUAL_INT (0, + memcmp (buffer, "\4\xb\x9SUBSCRIBEA", 13)); + } else { + recv_with_retry (s, buffer, 4); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\2\1A", 4)); + } + + memcpy (buffer, "\0\4ALOL", 6); + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO (send (s, buffer, 6, 0)); + TEST_ASSERT_EQUAL_INT (6, rc); + + memset (buffer, 0, sizeof (buffer)); + rc = zmq_recv (server, buffer, 4, 0); TEST_ASSERT_EQUAL_INT (4, rc); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "ALOL", 4)); + } else { + if (sub_command_) { + const uint8_t sub[13] = {4, 11, 9, 'S', 'U', 'B', 'S', + 'C', 'R', 'I', 'B', 'E', 'A'}; + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + send (s, (const char *) sub, 13, 0)); + TEST_ASSERT_EQUAL_INT (13, rc); + } else { + const uint8_t sub[4] = {0, 2, 1, 'A'}; + rc = TEST_ASSERT_SUCCESS_RAW_ERRNO ( + send (s, (const char *) sub, 4, 0)); + TEST_ASSERT_EQUAL_INT (4, rc); + } + rc = zmq_recv (server, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2)); + + rc = zmq_send (server, "ALOL", 4, 0); + TEST_ASSERT_EQUAL_INT (4, rc); + + memset (buffer, 0, sizeof (buffer)); + recv_with_retry (s, buffer, 6); + TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6)); } - char buffer[16]; - memset (buffer, 0, sizeof (buffer)); - rc = zmq_recv (server, buffer, 2, 0); - TEST_ASSERT_EQUAL_INT (2, rc); - TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\1A", 2)); - - rc = zmq_send (server, "ALOL", 4, 0); - TEST_ASSERT_EQUAL_INT (4, rc); - - memset (buffer, 0, sizeof (buffer)); - recv_with_retry (s, buffer, 6); - TEST_ASSERT_EQUAL_INT (0, memcmp (buffer, "\0\4ALOL", 6)); - close (s); test_context_socket_close (server); @@ -200,12 +240,22 @@ static void test_mock_sub (bool sub_command_) void test_mock_sub_command () { - test_mock_sub (true); + test_mock_pub_sub (true, false); } void test_mock_sub_legacy () { - test_mock_sub (false); + test_mock_pub_sub (false, false); +} + +void test_mock_pub_command () +{ + test_mock_pub_sub (true, true); +} + +void test_mock_pub_legacy () +{ + test_mock_pub_sub (false, true); } int main (void) @@ -216,6 +266,8 @@ int main (void) RUN_TEST (test_mock_sub_command); RUN_TEST (test_mock_sub_legacy); + RUN_TEST (test_mock_pub_command); + RUN_TEST (test_mock_pub_legacy); return UNITY_END (); } diff --git a/tests/test_stream.cpp b/tests/test_stream.cpp index b842b7f4..0ae8bf07 100644 --- a/tests/test_stream.cpp +++ b/tests/test_stream.cpp @@ -40,7 +40,7 @@ typedef uint8_t byte; typedef struct { byte signature[10]; // 0xFF 8*0x00 0x7F - byte version[2]; // 0x03 0x00 for ZMTP/3.0 + byte version[2]; // 0x03 0x01 for ZMTP/3.1 byte mechanism[20]; // "NULL" byte as_server; byte filler[31]; @@ -52,7 +52,7 @@ typedef struct // 8-byte size is set to 1 for backwards compatibility static zmtp_greeting_t greeting = { - {0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 0}, {'N', 'U', 'L', 'L'}, 0, {0}}; + {0xFF, 0, 0, 0, 0, 0, 0, 0, 1, 0x7F}, {3, 1}, {'N', 'U', 'L', 'L'}, 0, {0}}; static void test_stream_to_dealer () { @@ -135,8 +135,8 @@ static void test_stream_to_dealer () } // First two bytes are major and minor version numbers. - TEST_ASSERT_EQUAL_INT (3, buffer[0]); // ZMTP/3.0 - TEST_ASSERT_EQUAL_INT (0, buffer[1]); + TEST_ASSERT_EQUAL_INT (3, buffer[0]); // ZMTP/3.1 + TEST_ASSERT_EQUAL_INT (1, buffer[1]); // Mechanism is "NULL" TEST_ASSERT_EQUAL_INT8_ARRAY (buffer + 2,