Implement new message encoder/decoder

This is supposed to become part of the ZMTP/1.1.

The main differences from the ZMTP/1.0 framing protocol are:
- flags field comes first, followed by the length field
- long messages are signaled using a flag rather then 0xff escape
- length field does not include the flags field, 0 is a valid value
This commit is contained in:
Martin Hurton
2012-09-05 02:01:19 +02:00
parent 8672f5829e
commit 3f6148abdf
7 changed files with 478 additions and 7 deletions

View File

@@ -40,6 +40,8 @@
#include "session_base.hpp"
#include "encoder.hpp"
#include "decoder.hpp"
#include "v1_encoder.hpp"
#include "v1_decoder.hpp"
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
@@ -390,19 +392,23 @@ bool zmq::stream_engine_t::handshake ()
// We have received either a header of identity message
// or the whole greeting.
encoder = new (std::nothrow) encoder_t (out_batch_size);
decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize);
encoder->set_msg_source (session);
decoder->set_msg_sink (session);
zmq_assert (greeting [0] != 0xff || greeting_bytes_read >= 10);
// POsition of the version field in the greeting.
const size_t version_pos = 10;
// Is the peer using the unversioned protocol?
// If so, we send and receive rests of identity
// messages.
if (greeting [0] != 0xff || !(greeting [9] & 0x01)) {
encoder = new (std::nothrow) encoder_t (out_batch_size);
alloc_assert (encoder);
encoder->set_msg_source (session);
decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize);
alloc_assert (decoder);
decoder->set_msg_sink (session);
// We have already sent the message header.
// Since there is no way to tell the encoder to
// skip the message header, we simply throw that
@@ -425,6 +431,26 @@ bool zmq::stream_engine_t::handshake ()
if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB)
decoder->set_msg_sink (this);
}
else
if (greeting [version_pos] == 0) {
// ZMTP/1.0 framing.
encoder = new (std::nothrow) encoder_t (out_batch_size);
alloc_assert (encoder);
encoder->set_msg_source (session);
decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize);
alloc_assert (decoder);
decoder->set_msg_sink (session);
}
else {
// v1 framing protocol.
encoder = new (std::nothrow) v1_encoder_t (out_batch_size, session);
alloc_assert (encoder);
decoder = new (std::nothrow)
v1_decoder_t (in_batch_size, options.maxmsgsize, session);
alloc_assert (decoder);
}
// Start polling for output if necessary.
if (outsize == 0)