From 2a41c8d7b40a36062a69536d1ad878a80d4f16a3 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Wed, 5 Sep 2012 16:37:20 +0200 Subject: [PATCH] Simplify initial handshaking --- src/stream_engine.cpp | 124 +++++++++++++----------------------------- src/stream_engine.hpp | 19 ++----- 2 files changed, 44 insertions(+), 99 deletions(-) diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 4cec369a..27925097 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -59,7 +59,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons encoder (NULL), handshaking (true), greeting_bytes_read (0), - greeting_size (0), session (NULL), options (options_), endpoint (endpoint_), @@ -132,13 +131,6 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, io_object_t::plug (io_thread_); handle = add_fd (s); - // We need to detect whether our peer is using the versioned - // protocol. The detection is done in two steps. First, we read - // first two bytes and check if the long format of length is in use. - // If so, we receive and check the 'flags' field. If the rightmost bit - // is 1, the peer is using versioned protocol. - greeting_size = 2; - // Send the 'length' and 'flags' fields of the identity message. // The 'length' field is encoded in the long format. outpos = greeting_output_buffer; @@ -311,90 +303,52 @@ void zmq::stream_engine_t::activate_in () in_event (); } -int zmq::stream_engine_t::receive_greeting () -{ - zmq_assert (greeting_bytes_read < greeting_size); - - while (greeting_bytes_read < greeting_size) { - const int n = read (greeting + greeting_bytes_read, - greeting_size - greeting_bytes_read); - if (n == -1) - return -1; - if (n == 0) - return 0; - - greeting_bytes_read += n; - - if (greeting_bytes_read < greeting_size) - continue; - - if (greeting_size == 2) { - // We have received the first two bytes from the peer. - // If the first byte is not 0xff, we know that the - // peer is using unversioned protocol. - if (greeting [0] != 0xff) - break; - - // This may still be a long identity message (either - // 254 or 255 bytes long). We need to receive 8 more - // bytes so we can inspect the potential 'flags' field. - greeting_size = 10; - } - else - if (greeting_size == 10) { - // Inspect the rightmost bit of the 10th byte (which coincides - // with the 'flags' field if a regular message was sent). - // Zero indicates this is a header of identity message - // (i.e. the peer is using the unversioned protocol). - if (!(greeting [9] & 0x01)) - break; - - // This is truly a handshake and we can now send the rest of - // the greeting message out. - - if (outsize == 0) - set_pollout (handle); - - zmq_assert (outpos != NULL); - - outpos [outsize++] = 1; // Protocol version - outpos [outsize++] = 1; // Remaining length (1 byte for v1) - outpos [outsize++] = options.type; // Socket type - - // Read the 'version' and 'remaining_length' fields. - greeting_size = 12; - } - else - if (greeting_size == 12) { - // We have received the greeting message up to - // the 'remaining_length' field. Receive the remaining - // bytes of the greeting. - greeting_size += greeting [11]; - } - } - - return 0; -} - bool zmq::stream_engine_t::handshake () { zmq_assert (handshaking); zmq_assert (greeting_bytes_read < greeting_size); - int rc = receive_greeting (); - if (rc == -1) { - error (); - return false; + // Receive the greeting. + while (greeting_bytes_read < greeting_size) { + const int n = read (greeting + greeting_bytes_read, + greeting_size - greeting_bytes_read); + if (n == -1) { + error (); + return false; + } + + if (n == 0) + return false; + + greeting_bytes_read += n; + + // We have received at least one byte from the peer. + // If the first byte is not 0xff, we know that the + // peer is using unversioned protocol. + if (greeting [0] != 0xff) + break; + + if (greeting_bytes_read < 10) + continue; + + // Inspect the right-most bit of the 10th byte (which coincides + // with the 'flags' field if a regular message was sent). + // Zero indicates this is a header of identity message + // (i.e. the peer is using the unversioned protocol). + if (!(greeting [9] & 0x01)) + break; + + // The peer is using versioned protocol. + // Send the rest of the greeting, if necessary. + if (outpos + outsize != greeting_output_buffer + greeting_size) { + if (outsize == 0) + set_pollout (handle); + outpos [outsize++] = 1; // Protocol version + outpos [outsize++] = options.type; // Socket type + } } - if (greeting_bytes_read < greeting_size) - return false; - - // We have received either a header of identity message - // or the whole greeting. - zmq_assert (greeting [0] != 0xff || greeting_bytes_read >= 10); - - // POsition of the version field in the greeting. + // Position of the version field in the greeting. const size_t version_pos = 10; // Is the peer using the unversioned protocol? diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index cab414a8..1dc56756 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -91,15 +91,9 @@ namespace zmq // Underlying socket. fd_t s; - // Maximum size of a greeting message: - // preamble (10 bytes) + version (1 byte) + remaining_length (1 byte) + - // up to 255 remaining bytes. - const static size_t maximum_greeting_size = 10 + 1 + 1 + 255; - - // Size of v1 greeting message: - // preamble (10 bytes) + version (1 byte) + remaining_length (1 byte) + - // socket_type (1) - const static size_t v1_greeting_size = 10 + 1 + 1 + 1; + // Size of the greeting message: + // Preamble (10 bytes) + version (1 byte) + socket type (1 byte). + const static size_t greeting_size = 12; handle_t handle; @@ -119,18 +113,15 @@ namespace zmq // The receive buffer holding the greeting message // that we are receiving from the peer. - unsigned char greeting [maximum_greeting_size]; + unsigned char greeting [greeting_size]; // The number of bytes of the greeting message that // we have already received. unsigned int greeting_bytes_read; - // The size of the greeting message. - unsigned int greeting_size; - // The send buffer holding the greeting message // that we are sending to the peer. - unsigned char greeting_output_buffer [v1_greeting_size]; + unsigned char greeting_output_buffer [greeting_size]; // The session this engine is attached to. zmq::session_base_t *session;