From cd4d8bb15a18831e1f4d45f2c5bee11d0c30b436 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sun, 28 Apr 2013 10:15:25 +0200 Subject: [PATCH] Implement ZMTP/3.0 NULL mechanism --- src/stream_engine.cpp | 203 ++++++++++++++++++++++++++++++++++++++++-- src/stream_engine.hpp | 33 +++++-- 2 files changed, 224 insertions(+), 12 deletions(-) diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 961b1e7d..b894694a 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -57,6 +57,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons outsize (0), encoder (NULL), handshaking (true), + greeting_size (v2_greeting_size), greeting_bytes_read (0), session (NULL), options (options_), @@ -68,6 +69,8 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons io_error (false), congested (false), subscription_required (false), + output_paused (false), + ready_command_received (false), socket (NULL) { int rc = tx_msg.init (); @@ -417,7 +420,7 @@ bool zmq::stream_engine_t::handshake () if (greeting_recv [0] != 0xff) break; - if (greeting_bytes_read < 10) + if (greeting_bytes_read < signature_size) continue; // Inspect the right-most bit of the 10th byte (which coincides @@ -428,12 +431,32 @@ bool zmq::stream_engine_t::handshake () break; // The peer is using versioned protocol. - // Send the rest of the greeting, if necessary. - if (outpos + outsize != greeting_send + greeting_size) { + // Send the major version number. + if (outpos + outsize == greeting_send + signature_size) { if (outsize == 0) set_pollout (handle); - outpos [outsize++] = ZMTP_2_1; // Protocol revision - outpos [outsize++] = options.type; // Socket type + outpos [outsize++] = 3; // Major version number + } + + if (greeting_bytes_read > signature_size) { + if (outpos + outsize == greeting_send + signature_size + 1) { + if (outsize == 0) + set_pollout (handle); + + // Use ZMTP/2.0 to talk to older peers. + if (greeting_recv [10] == ZMTP_1_0 + || greeting_recv [10] == ZMTP_2_0) + outpos [outsize++] = options.type; + else { + outpos [outsize++] = 0; // Minor version number + memset (outpos + outsize, 0, 20); + memcpy (outpos + outsize, "NULL", 4); + outsize += 20; + memset (outpos + outsize, 0, 32); + outsize += 32; + greeting_size = v3_greeting_size; + } + } } } @@ -478,6 +501,15 @@ bool zmq::stream_engine_t::handshake () in_batch_size, options.maxmsgsize); alloc_assert (decoder); } + else + if (greeting_recv [revision_pos] == ZMTP_2_0) { + encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + alloc_assert (encoder); + + decoder = new (std::nothrow) v2_decoder_t ( + in_batch_size, options.maxmsgsize); + alloc_assert (decoder); + } else { encoder = new (std::nothrow) v2_encoder_t (out_batch_size); alloc_assert (encoder); @@ -485,6 +517,15 @@ bool zmq::stream_engine_t::handshake () decoder = new (std::nothrow) v2_decoder_t ( in_batch_size, options.maxmsgsize); alloc_assert (decoder); + + if (memcmp (greeting_recv + 12, "NULL\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0", 20) == 0) { + read_msg = &stream_engine_t::send_ready_command; + write_msg = &stream_engine_t::receive_ready_command; + } + else { + error (); + return false; + } } // Start polling for output if necessary. @@ -530,6 +571,135 @@ int zmq::stream_engine_t::write_identity (msg_t *msg_) return 0; } +int zmq::stream_engine_t::send_ready_command (msg_t *msg_) +{ + unsigned char * const command_buffer = (unsigned char *) malloc (512); + alloc_assert (command_buffer); + + unsigned char *ptr = command_buffer; + + // Add mechanism string + memcpy (ptr, "READY ", 8); + ptr += 8; + + // Add socket type property + const char *socket_type = socket_type_string (options.type); + ptr += add_property (ptr, "Socket-Type", socket_type, strlen (socket_type)); + + // Add identity property + if (options.type == ZMQ_REQ + || options.type == ZMQ_DEALER + || options.type == ZMQ_ROUTER) { + ptr += add_property (ptr, "Identity", + options.identity, options.identity_size); + } + + const size_t command_size = ptr - command_buffer; + const int rc = msg_->init_size (command_size); + errno_assert (rc == 0); + memcpy (msg_->data (), command_buffer, command_size); + free (command_buffer); + + if (ready_command_received) + read_msg = &stream_engine_t::pull_msg_from_session; + else + read_msg = &stream_engine_t::wait; + + return 0; +} + +int zmq::stream_engine_t::receive_ready_command (msg_t *msg_) +{ + const unsigned char * const command_buffer = + static_cast (msg_->data ()); + const size_t command_size = msg_->size (); + + const unsigned char *ptr = command_buffer; + size_t bytes_left = command_size; + + if (bytes_left < 8 || memcmp(ptr, "READY ", 8)) { + errno = EPROTO; + return -1; + } + + ptr += 8; + bytes_left -= 8; + + // Parse the property list + while (bytes_left > 1) { + const size_t name_length = static_cast (*ptr); + ptr += 1; + bytes_left -= 1; + + if (bytes_left < name_length) + break; + const std::string name = std::string((const char *) ptr, name_length); + ptr += name_length; + bytes_left -= name_length; + + if (bytes_left < 4) + break; + const size_t value_length = static_cast (get_uint32 (ptr)); + ptr += 4; + bytes_left -= 4; + + if (bytes_left < value_length) + break; + const unsigned char * const value = ptr; + ptr += value_length; + bytes_left -= value_length; + + if (name == "Socket-Type") { + // Implement socket type checking + } + else + if (name == "Identity") { + if (options.recv_identity) { + msg_t identity; + int rc = identity.init_size (value_length); + errno_assert (rc == 0); + memcpy (identity.data (), value, value_length); + identity.set_flags (msg_t::identity); + rc = session->push_msg (&identity); + errno_assert (rc == 0); + } + } + } + + if (bytes_left > 0) { + errno = EPROTO; + return -1; + } + + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + + write_msg = &stream_engine_t::push_msg_to_session; + + ready_command_received = true; + if (output_paused) { + activate_out (); + output_paused = false; + } + + return 0; +} + +int zmq::stream_engine_t::wait (msg_t *msg_) +{ + if (ready_command_received) { + read_msg = &stream_engine_t::pull_msg_from_session; + return pull_msg_from_session (msg_); + } + else { + output_paused = true; + errno = EAGAIN; + return -1; + } +} + int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_) { return session->pull_msg (msg_); @@ -557,6 +727,29 @@ int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) return push_msg_to_session (msg_); } +size_t zmq::stream_engine_t::add_property (unsigned char *ptr, + const char *name, const void *value, size_t value_len) +{ + const size_t name_len = strlen (name); + zmq_assert (name_len <= 255); + *ptr++ = static_cast (name_len); + memcpy (ptr, name, name_len); + ptr += name_len; + zmq_assert (value_len <= (2^31) - 1); + put_uint32 (ptr, static_cast (value_len)); + ptr += 4; + memcpy (ptr, value, value_len); + + return 1 + name_len + 4 + value_len; +} + +const char *zmq::stream_engine_t::socket_type_string (int socket_type) { + const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP", "DEALER", + "ROUTER", "PULL", "PUSH", "XPUB", "XSUB"}; + zmq_assert (socket_type >= 0 && socket_type <= 10); + return names [socket_type]; +} + void zmq::stream_engine_t::error () { zmq_assert (session); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 64e2a4a2..cb8c75e1 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -37,8 +37,7 @@ namespace zmq enum { ZMTP_1_0 = 0, - ZMTP_2_0 = 1, - ZMTP_2_1 = 2 + ZMTP_2_0 = 1 }; class io_thread_t; @@ -94,11 +93,21 @@ namespace zmq int read_identity (msg_t *msg_); int write_identity (msg_t *msg_); + int send_ready_command (msg_t *msg); + int receive_ready_command (msg_t *msg); + int pull_msg_from_session (msg_t *msg_); int push_msg_to_session (msg_t *msg); + int wait (msg_t *msg_); + int write_subscription_msg (msg_t *msg_); + size_t add_property (unsigned char *ptr, + const char *name, const void *value, size_t value_len); + + const char *socket_type_string (int socket_type); + // Underlying socket. fd_t s; @@ -119,13 +128,20 @@ namespace zmq // version. When false, normal message flow has started. bool handshaking; - // Size of the greeting message: - // Preamble (10 bytes) + version (1 byte) + socket type (1 byte). - static const size_t greeting_size = 12; + static const size_t signature_size = 10; + + // Size of ZMTP/1.0 and ZMTP/2.0 greeting message + static const size_t v2_greeting_size = 12; + + // Size of ZMTP/3.0 greeting message + static const size_t v3_greeting_size = 64; + + // Expected greeting size. + size_t greeting_size; // Greeting received from, and sent to peer - unsigned char greeting_recv [greeting_size]; - unsigned char greeting_send [greeting_size]; + unsigned char greeting_recv [v3_greeting_size]; + unsigned char greeting_send [v3_greeting_size]; // Size of greeting received so far unsigned int greeting_bytes_read; @@ -156,6 +172,9 @@ namespace zmq // Needed to support old peers. bool subscription_required; + bool output_paused; + bool ready_command_received; + // Socket zmq::socket_base_t *socket;