diff --git a/src/decoder.hpp b/src/decoder.hpp index a6b524ad..baf67cfa 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -29,6 +29,7 @@ #include "err.hpp" #include "msg.hpp" +#include "i_decoder.hpp" #include "stdint.hpp" namespace zmq @@ -47,7 +48,7 @@ namespace zmq // This class implements the state machine that parses the incoming buffer. // Derived class should implement individual state machine actions. - template class decoder_base_t + template class decoder_base_t : public i_decoder { public: diff --git a/src/encoder.hpp b/src/encoder.hpp index 41a30de2..04068ff1 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -29,6 +29,7 @@ #include "err.hpp" #include "msg.hpp" +#include "i_encoder.hpp" namespace zmq { @@ -39,7 +40,7 @@ namespace zmq // fills the outgoing buffer. Derived classes should implement individual // state machine actions. - template class encoder_base_t + template class encoder_base_t : public i_encoder { public: diff --git a/src/i_decoder.hpp b/src/i_decoder.hpp new file mode 100644 index 00000000..a0abb8e3 --- /dev/null +++ b/src/i_decoder.hpp @@ -0,0 +1,49 @@ +/* + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_I_DECODER_HPP_INCLUDED__ +#define __ZMQ_I_DECODER_HPP_INCLUDED__ + +#include + +namespace zmq +{ + + class i_msg_sink; + + // Interface to be implemented by message decoder. + + struct i_decoder + { + virtual ~i_decoder () {} + + virtual void set_msg_sink (i_msg_sink *msg_sink_) = 0; + + virtual void get_buffer (unsigned char **data_, size_t *size_) = 0; + + virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0; + + virtual bool stalled () const = 0; + + }; + +} + +#endif diff --git a/src/i_encoder.hpp b/src/i_encoder.hpp new file mode 100644 index 00000000..734e1e1a --- /dev/null +++ b/src/i_encoder.hpp @@ -0,0 +1,54 @@ +/* + Copyright (c) 2007-2012 iMatix Corporation + Copyright (c) 2007-2012 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_I_ENCODER_HPP_INCLUDED__ +#define __ZMQ_I_ENCODER_HPP_INCLUDED__ + +#include + +namespace zmq +{ + + // Forward declaration + class i_msg_source; + + // Interface to be implemented by message encoder. + + struct i_encoder + { + virtual ~i_encoder () {} + + // Set message producer. + virtual void set_msg_source (i_msg_source *msg_source_) = 0; + + // The function returns a batch of binary data. The data + // are filled to a supplied buffer. If no buffer is supplied (data_ + // is NULL) encoder will provide buffer of its own. + // If offset is not NULL, it is filled by offset of the first message + // in the batch.If there's no beginning of a message in the batch, + // offset is set to -1. + virtual void get_data (unsigned char **data_, size_t *size_, + int *offset_ = NULL) = 0; + + }; + +} + +#endif diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 1a1b5b2d..9a3fb275 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -38,6 +38,8 @@ #include "stream_engine.hpp" #include "io_thread.hpp" #include "session_base.hpp" +#include "encoder.hpp" +#include "decoder.hpp" #include "config.hpp" #include "err.hpp" #include "ip.hpp" @@ -48,11 +50,11 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons s (fd_), inpos (NULL), insize (0), - decoder (in_batch_size, options_.maxmsgsize), + decoder (NULL), input_error (false), outpos (NULL), outsize (0), - encoder (out_batch_size), + encoder (NULL), handshaking (true), greeting_bytes_read (0), greeting_size (0), @@ -106,6 +108,11 @@ zmq::stream_engine_t::~stream_engine_t () #endif s = retired_fd; } + + if (encoder != NULL) + delete encoder; + if (decoder != NULL) + delete decoder; } void zmq::stream_engine_t::plug (io_thread_t *io_thread_, @@ -156,8 +163,10 @@ void zmq::stream_engine_t::unplug () io_object_t::unplug (); // Disconnect from session object. - encoder.set_msg_source (NULL); - decoder.set_msg_sink (NULL); + if (encoder) + encoder->set_msg_source (NULL); + if (decoder) + decoder->set_msg_sink (NULL); session = NULL; } @@ -174,6 +183,7 @@ void zmq::stream_engine_t::in_event () if (!handshake ()) return; + zmq_assert (decoder); bool disconnection = false; // If there's no data to process in the buffer... @@ -183,7 +193,7 @@ void zmq::stream_engine_t::in_event () // Note that buffer can be arbitrarily large. However, we assume // the underlying TCP layer has fixed buffer size and thus the // number of bytes read will be always limited. - decoder.get_buffer (&inpos, &insize); + decoder->get_buffer (&inpos, &insize); insize = read (inpos, insize); // Check whether the peer has closed the connection. @@ -194,7 +204,7 @@ void zmq::stream_engine_t::in_event () } // Push the data to the decoder. - size_t processed = decoder.process_buffer (inpos, insize); + size_t processed = decoder->process_buffer (inpos, insize); if (unlikely (processed == (size_t) -1)) { disconnection = true; @@ -220,7 +230,7 @@ void zmq::stream_engine_t::in_event () // until after the session has accepted the message. if (disconnection) { input_error = true; - if (decoder.stalled ()) + if (decoder->stalled ()) reset_pollin (handle); else error (); @@ -233,7 +243,8 @@ void zmq::stream_engine_t::out_event () if (!outsize) { outpos = NULL; - encoder.get_data (&outpos, &outsize); + zmq_assert (encoder); + encoder->get_data (&outpos, &outsize); // If there is no data to send, stop polling for output. if (outsize == 0) { @@ -284,8 +295,9 @@ void zmq::stream_engine_t::activate_in () // There was an input error but the engine could not // be terminated (due to the stalled decoder). // Flush the pending message and terminate the engine now. - decoder.process_buffer (inpos, 0); - zmq_assert (!decoder.stalled ()); + zmq_assert (decoder); + decoder->process_buffer (inpos, 0); + zmq_assert (!decoder->stalled ()); session->flush (); error (); return; @@ -379,8 +391,11 @@ bool zmq::stream_engine_t::handshake () // We have received either a header of identity message // or the whole greeting. - encoder.set_msg_source (session); - decoder.set_msg_sink (session); + encoder = new (std::nothrow) encoder_t (out_batch_size); + decoder = new (std::nothrow) decoder_t (in_batch_size, options.maxmsgsize); + + encoder->set_msg_source (session); + decoder->set_msg_sink (session); zmq_assert (greeting [0] != 0xff || greeting_bytes_read >= 10); @@ -395,7 +410,7 @@ bool zmq::stream_engine_t::handshake () const size_t header_size = options.identity_size + 1 >= 255 ? 10 : 2; unsigned char tmp [10], *bufferp = tmp; size_t buffer_size = header_size; - encoder.get_data (&bufferp, &buffer_size); + encoder->get_data (&bufferp, &buffer_size); zmq_assert (buffer_size == header_size); // Make sure the decoder sees the data we have already received. @@ -408,7 +423,7 @@ bool zmq::stream_engine_t::handshake () // message right after the identity message, we temporarily // divert the message stream from session to ourselves. if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) - decoder.set_msg_sink (this); + decoder->set_msg_sink (this); } // Start polling for output if necessary. @@ -441,7 +456,8 @@ int zmq::stream_engine_t::push_msg (msg_t *msg_) // Once we have injected the subscription message, we can // Divert the message flow back to the session. - decoder.set_msg_sink (session); + zmq_assert (decoder); + decoder->set_msg_sink (session); return rc; } diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 2ced286e..cab414a8 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -28,8 +28,8 @@ #include "i_engine.hpp" #include "i_msg_sink.hpp" #include "io_object.hpp" -#include "encoder.hpp" -#include "decoder.hpp" +#include "i_encoder.hpp" +#include "i_decoder.hpp" #include "options.hpp" #include "../include/zmq.h" @@ -105,12 +105,12 @@ namespace zmq unsigned char *inpos; size_t insize; - decoder_t decoder; + i_decoder *decoder; bool input_error; unsigned char *outpos; size_t outsize; - encoder_t encoder; + i_encoder *encoder; // When true, we are still trying to determine whether // the peer is using versioned protocol, and if so, which