From 3f6148abdf4c548eeb6f13aee38a4190468fdadc Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Wed, 5 Sep 2012 02:01:19 +0200 Subject: [PATCH] Implement new message encoder/decoder This is supposed to become part of the ZMTP/1.1. The main differences from the ZMTP/1.0 framing protocol are: - flags field comes first, followed by the length field - long messages are signaled using a flag rather then 0xff escape - length field does not include the flags field, 0 is a valid value --- src/Makefile.am | 2 + src/stream_engine.cpp | 40 ++++++++-- src/v1_decoder.cpp | 167 ++++++++++++++++++++++++++++++++++++++++++ src/v1_decoder.hpp | 70 ++++++++++++++++++ src/v1_encoder.cpp | 103 ++++++++++++++++++++++++++ src/v1_encoder.hpp | 60 +++++++++++++++ src/v1_protocol.hpp | 43 +++++++++++ 7 files changed, 478 insertions(+), 7 deletions(-) create mode 100644 src/v1_decoder.cpp create mode 100644 src/v1_decoder.hpp create mode 100644 src/v1_encoder.cpp create mode 100644 src/v1_encoder.hpp create mode 100644 src/v1_protocol.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 5adac699..28dc852e 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -133,6 +133,8 @@ libzmq_la_SOURCES = \ xpub.cpp \ router.cpp \ dealer.cpp \ + v1_decoder.cpp \ + v1_encoder.cpp \ xsub.cpp \ zmq.cpp \ zmq_utils.cpp diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 9a3fb275..4cec369a 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -40,6 +40,8 @@ #include "session_base.hpp" #include "encoder.hpp" #include "decoder.hpp" +#include "v1_encoder.hpp" +#include "v1_decoder.hpp" #include "config.hpp" #include "err.hpp" #include "ip.hpp" @@ -390,19 +392,23 @@ bool zmq::stream_engine_t::handshake () // We have received either a header of identity message // or the whole greeting. - - encoder = new (std::nothrow) encoder_t (out_batch_size); - decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize); - - encoder->set_msg_source (session); - decoder->set_msg_sink (session); - zmq_assert (greeting [0] != 0xff || greeting_bytes_read >= 10); + // POsition of the version field in the greeting. + const size_t version_pos = 10; + // Is the peer using the unversioned protocol? // If so, we send and receive rests of identity // messages. if (greeting [0] != 0xff || !(greeting [9] & 0x01)) { + encoder = new (std::nothrow) encoder_t (out_batch_size); + alloc_assert (encoder); + encoder->set_msg_source (session); + + decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize); + alloc_assert (decoder); + decoder->set_msg_sink (session); + // We have already sent the message header. // Since there is no way to tell the encoder to // skip the message header, we simply throw that @@ -425,6 +431,26 @@ bool zmq::stream_engine_t::handshake () if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) decoder->set_msg_sink (this); } + else + if (greeting [version_pos] == 0) { + // ZMTP/1.0 framing. + encoder = new (std::nothrow) encoder_t (out_batch_size); + alloc_assert (encoder); + encoder->set_msg_source (session); + + decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize); + alloc_assert (decoder); + decoder->set_msg_sink (session); + } + else { + // v1 framing protocol. + encoder = new (std::nothrow) v1_encoder_t (out_batch_size, session); + alloc_assert (encoder); + + decoder = new (std::nothrow) + v1_decoder_t (in_batch_size, options.maxmsgsize, session); + alloc_assert (decoder); + } // Start polling for output if necessary. if (outsize == 0) diff --git a/src/v1_decoder.cpp b/src/v1_decoder.cpp new file mode 100644 index 00000000..8ce87f96 --- /dev/null +++ b/src/v1_decoder.cpp @@ -0,0 +1,167 @@ +/* + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2009 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include + +#include "platform.hpp" +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#endif + +#include "v1_protocol.hpp" +#include "v1_decoder.hpp" +#include "likely.hpp" +#include "wire.hpp" +#include "err.hpp" + +zmq::v1_decoder_t::v1_decoder_t (size_t bufsize_, + int64_t maxmsgsize_, i_msg_sink *msg_sink_) : + decoder_base_t (bufsize_), + msg_sink (msg_sink_), + msg_flags (0), + maxmsgsize (maxmsgsize_) +{ + int rc = in_progress.init (); + errno_assert (rc == 0); + + // At the beginning, read one byte and go to flags_ready state. + next_step (tmpbuf, 1, &v1_decoder_t::flags_ready); +} + +zmq::v1_decoder_t::~v1_decoder_t () +{ + int rc = in_progress.close (); + errno_assert (rc == 0); +} + +void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) +{ + msg_sink = msg_sink_; +} + +bool zmq::v1_decoder_t::stalled () const +{ + return next == &v1_decoder_t::message_ready; +} + +bool zmq::v1_decoder_t::flags_ready () +{ + msg_flags = 0; + if (tmpbuf [0] & v1_protocol_t::more_flag) + msg_flags |= msg_t::more; + + // The payload length is either one or eight bytes, + // depending on whether the 'large' bit is set. + if (tmpbuf [0] & v1_protocol_t::large_flag) + next_step (tmpbuf, 8, &v1_decoder_t::eight_byte_size_ready); + else + next_step (tmpbuf, 1, &v1_decoder_t::one_byte_size_ready); + + return true; +} + +bool zmq::v1_decoder_t::one_byte_size_ready () +{ + int rc = 0; + + // Message size must not exceed the maximum allowed size. + if (maxmsgsize >= 0) + if (unlikely (tmpbuf [0] > static_cast (maxmsgsize))) + goto error; + + // in_progress is initialised at this point so in theory we should + // close it before calling zmq_msg_init_size, however, it's a 0-byte + // message and thus we can treat it as uninitialised... + rc = in_progress.init_size (tmpbuf [0]); + if (unlikely (rc)) { + errno_assert (errno == ENOMEM); + int rc = in_progress.init (); + errno_assert (rc == 0); + goto error; + } + + in_progress.set_flags (msg_flags); + next_step (in_progress.data (), in_progress.size (), + &v1_decoder_t::message_ready); + + return true; + +error: + decoding_error (); + return false; +} + +bool zmq::v1_decoder_t::eight_byte_size_ready () +{ + int rc = 0; + + // The payload size is encoded as 64-bit unsigned integer. + // The most significant byte comes first. + const uint64_t msg_size = get_uint64 (tmpbuf); + + // Message size must not exceed the maximum allowed size. + if (maxmsgsize >= 0) + if (unlikely (msg_size > static_cast (maxmsgsize))) + goto error; + + // Message size must fit into size_t data type. + if (unlikely (msg_size != static_cast (msg_size))) + goto error; + + // in_progress is initialised at this point so in theory we should + // close it before calling init_size, however, it's a 0-byte + // message and thus we can treat it as uninitialised. + rc = in_progress.init_size (static_cast (msg_size)); + if (unlikely (rc)) { + errno_assert (errno == ENOMEM); + int rc = in_progress.init (); + errno_assert (rc == 0); + goto error; + } + + in_progress.set_flags (msg_flags); + next_step (in_progress.data (), in_progress.size (), + &v1_decoder_t::message_ready); + + return true; + +error: + decoding_error (); + return false; +} + +bool zmq::v1_decoder_t::message_ready () +{ + // Message is completely read. Push it further and start reading + // new message. (in_progress is a 0-byte message after this point.) + if (unlikely (!msg_sink)) + return false; + int rc = msg_sink->push_msg (&in_progress); + if (unlikely (rc != 0)) { + if (errno != EAGAIN) + decoding_error (); + return false; + } + + next_step (tmpbuf, 1, &v1_decoder_t::flags_ready); + return true; +} diff --git a/src/v1_decoder.hpp b/src/v1_decoder.hpp new file mode 100644 index 00000000..8336367e --- /dev/null +++ b/src/v1_decoder.hpp @@ -0,0 +1,70 @@ +/* + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_V1_DECODER_HPP_INCLUDED__ +#define __ZMQ_V1_DECODER_HPP_INCLUDED__ + +#include "err.hpp" +#include "msg.hpp" +#include "decoder.hpp" +#include "i_msg_sink.hpp" +#include "stdint.hpp" + +namespace zmq +{ + + // Decoder for 0MQ v1 framing protocol. Converts data stream into messages. + + class v1_decoder_t : public decoder_base_t + { + public: + + v1_decoder_t (size_t bufsize_, + int64_t maxmsgsize_, i_msg_sink *msg_sink_); + virtual ~v1_decoder_t (); + + // i_decoder interface. + virtual void set_msg_sink (i_msg_sink *msg_sink_); + + virtual bool stalled () const; + + private: + + bool flags_ready (); + bool one_byte_size_ready (); + bool eight_byte_size_ready (); + bool message_ready (); + + i_msg_sink *msg_sink; + unsigned char tmpbuf [8]; + unsigned char msg_flags; + msg_t in_progress; + + const int64_t maxmsgsize; + + v1_decoder_t (const v1_decoder_t&); + void operator = (const v1_decoder_t&); + }; + +} + +#endif + diff --git a/src/v1_encoder.cpp b/src/v1_encoder.cpp new file mode 100644 index 00000000..c0eb5201 --- /dev/null +++ b/src/v1_encoder.cpp @@ -0,0 +1,103 @@ +/* + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2011 VMware, Inc. + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "v1_protocol.hpp" +#include "v1_encoder.hpp" +#include "likely.hpp" +#include "wire.hpp" + +zmq::v1_encoder_t::v1_encoder_t (size_t bufsize_, i_msg_source *msg_source_) : + encoder_base_t (bufsize_), + msg_source (msg_source_) +{ + int rc = in_progress.init (); + errno_assert (rc == 0); + + // Write 0 bytes to the batch and go to message_ready state. + next_step (NULL, 0, &v1_encoder_t::message_ready, true); +} + +zmq::v1_encoder_t::~v1_encoder_t () +{ + int rc = in_progress.close (); + errno_assert (rc == 0); +} + +void zmq::v1_encoder_t::set_msg_source (i_msg_source *msg_source_) +{ + msg_source = msg_source_; +} + +bool zmq::v1_encoder_t::message_ready () +{ + // Release the content of the old message. + int rc = in_progress.close (); + errno_assert (rc == 0); + + // Read new message. If there is none, return false. + // Note that new state is set only if write is successful. That way + // unsuccessful write will cause retry on the next state machine + // invocation. + if (unlikely (!msg_source)) { + rc = in_progress.init (); + errno_assert (rc == 0); + return false; + } + + rc = msg_source->pull_msg (&in_progress); + if (unlikely (rc)) { + errno_assert (errno == EAGAIN); + rc = in_progress.init (); + errno_assert (rc == 0); + return false; + } + + // Encode flags. + unsigned char &protocol_flags = tmpbuf [0]; + protocol_flags = 0; + if (in_progress.flags () & msg_t::more) + protocol_flags |= v1_protocol_t::more_flag; + if (in_progress.size () > 255) + protocol_flags |= v1_protocol_t::large_flag; + + // Encode the message length. For messages less then 256 bytes, + // the length is encoded as 8-bit unsigned integer. For larger + // messages, 64-bit unsigned integer in network byte order is used. + const size_t size = in_progress.size (); + if (unlikely (size > 255)) { + put_uint64 (tmpbuf + 1, size); + next_step (tmpbuf, 9, &v1_encoder_t::size_ready, false); + } + else { + tmpbuf [1] = static_cast (size); + next_step (tmpbuf, 2, &v1_encoder_t::size_ready, false); + } + return true; +} + +bool zmq::v1_encoder_t::size_ready () +{ + // Write message body into the buffer. + next_step (in_progress.data (), in_progress.size (), + &v1_encoder_t::message_ready, !(in_progress.flags () & msg_t::more)); + return true; +} diff --git a/src/v1_encoder.hpp b/src/v1_encoder.hpp new file mode 100644 index 00000000..bd09f6fa --- /dev/null +++ b/src/v1_encoder.hpp @@ -0,0 +1,60 @@ +/* + Copyright (c) 2009-2011 250bpm s.r.o. + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_V1_ENCODER_HPP_INCLUDED__ +#define __ZMQ_V1_ENCODER_HPP_INCLUDED__ + +#include "msg.hpp" +#include "i_msg_source.hpp" +#include "encoder.hpp" + +namespace zmq +{ + + class i_msg_source; + + // Encoder for 0MQ framing protocol. Converts messages into data stream. + + class v1_encoder_t : public encoder_base_t + { + public: + + v1_encoder_t (size_t bufsize_, i_msg_source *msg_source_); + virtual ~v1_encoder_t (); + + virtual void set_msg_source (i_msg_source *msg_source_); + + private: + + bool size_ready (); + bool message_ready (); + + i_msg_source *msg_source; + msg_t in_progress; + unsigned char tmpbuf [9]; + + v1_encoder_t (const v1_encoder_t&); + const v1_encoder_t &operator = (const v1_encoder_t&); + }; +} + +#endif + diff --git a/src/v1_protocol.hpp b/src/v1_protocol.hpp new file mode 100644 index 00000000..6c5b5aeb --- /dev/null +++ b/src/v1_protocol.hpp @@ -0,0 +1,43 @@ +/* + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_V1_PROTOCOL_HPP_INCLUDED__ +#define __ZMQ_V1_PROTOCOL_HPP_INCLUDED__ + +namespace zmq +{ + + // Definition of constans for v1 transport protocol. + class v1_protocol_t + { + public: + // Message flags. + enum + { + more_flag = 1, + large_flag = 2 + }; + + }; + +} + +#endif +