From daa7a8021f6802d12d8fcfd771dda3101d192f15 Mon Sep 17 00:00:00 2001 From: danielkr Date: Sat, 17 Aug 2013 23:08:07 +0300 Subject: [PATCH] Plug in dbuffer to serve the ZMQ_CONFLATE option ZMQ_CONFLATE option is passed to pipepair() which creates a usual ypipe_t or ypipe_conflate_t and plugs it into pipe_t under a common abstract base. --- src/Makefile.am | 5 +- src/pipe.cpp | 51 +++++++++++++---- src/pipe.hpp | 15 +++-- src/session_base.cpp | 17 +++++- src/socket_base.cpp | 27 +++++++-- src/ypipe.hpp | 3 +- src/ypipe_base.hpp | 44 ++++++++++++++ src/ypipe_conflate.hpp | 127 +++++++++++++++++++++++++++++++++++++++++ 8 files changed, 263 insertions(+), 26 deletions(-) create mode 100644 src/ypipe_base.hpp create mode 100644 src/ypipe_conflate.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 62a8ba08..57bd08a4 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -88,6 +88,7 @@ libzmq_la_SOURCES = \ dealer.hpp \ xsub.hpp \ ypipe.hpp \ + ypipe_flat.hpp \ yqueue.hpp \ z85_codec.hpp \ address.cpp \ @@ -163,7 +164,9 @@ libzmq_la_SOURCES = \ raw_decoder.hpp \ raw_decoder.cpp \ raw_encoder.hpp \ - raw_encoder.cpp + raw_encoder.cpp \ + ypipe_conflate.hpp \ + dbuffer.hpp if ON_MINGW libzmq_la_LDFLAGS = -no-undefined -avoid-version -version-info @LTVER@ @LIBZMQ_EXTRA_LDFLAGS@ diff --git a/src/pipe.cpp b/src/pipe.cpp index e4b841ad..6bc7addd 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -23,22 +23,37 @@ #include "pipe.hpp" #include "err.hpp" +#include "ypipe.hpp" +#include "ypipe_conflate.hpp" + int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]) + int hwms_ [2], bool delays_ [2], bool conflate_ [2]) { // Creates two pipe objects. These objects are connected by two ypipes, // each to pass messages in one direction. - pipe_t::upipe_t *upipe1 = new (std::nothrow) pipe_t::upipe_t (); + typedef ypipe_t upipe_normal_t; + typedef ypipe_conflate_t upipe_conflate_t; + + pipe_t::upipe_t *upipe1; + if(conflate_ [0]) + upipe1 = new (std::nothrow) upipe_conflate_t (); + else + upipe1 = new (std::nothrow) upipe_normal_t (); alloc_assert (upipe1); - pipe_t::upipe_t *upipe2 = new (std::nothrow) pipe_t::upipe_t (); + + pipe_t::upipe_t *upipe2; + if(conflate_ [1]) + upipe2 = new (std::nothrow) upipe_conflate_t (); + else + upipe2 = new (std::nothrow) upipe_normal_t (); alloc_assert (upipe2); pipes_ [0] = new (std::nothrow) pipe_t (parents_ [0], upipe1, upipe2, - hwms_ [1], hwms_ [0], delays_ [0]); + hwms_ [1], hwms_ [0], delays_ [0], conflate_ [0]); alloc_assert (pipes_ [0]); pipes_ [1] = new (std::nothrow) pipe_t (parents_ [1], upipe2, upipe1, - hwms_ [0], hwms_ [1], delays_ [1]); + hwms_ [0], hwms_ [1], delays_ [1], conflate_ [1]); alloc_assert (pipes_ [1]); pipes_ [0]->set_peer (pipes_ [1]); @@ -48,7 +63,7 @@ int zmq::pipepair (class object_t *parents_ [2], class pipe_t* pipes_ [2], } zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_) : + int inhwm_, int outhwm_, bool delay_, bool conflate_) : object_t (parent_), inpipe (inpipe_), outpipe (outpipe_), @@ -62,7 +77,8 @@ zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, peer (NULL), sink (NULL), state (active), - delay (delay_) + delay (delay_), + conflate (conflate_) { } @@ -303,11 +319,16 @@ void zmq::pipe_t::process_pipe_term_ack () // First, delete all the unread messages in the pipe. We have to do it by // hand because msg_t doesn't have automatic destructor. Then deallocate // the ypipe itself. - msg_t msg; - while (inpipe->read (&msg)) { - int rc = msg.close (); - errno_assert (rc == 0); + + if (!conflate) + { + msg_t msg; + while (inpipe->read (&msg)) { + int rc = msg.close (); + errno_assert (rc == 0); + } } + delete inpipe; // Deallocate the pipe object @@ -439,7 +460,13 @@ void zmq::pipe_t::hiccup () inpipe = NULL; // Create new inpipe. - inpipe = new (std::nothrow) pipe_t::upipe_t (); + if (conflate) + inpipe = new (std::nothrow) + ypipe_t (); + else + inpipe = new (std::nothrow) + ypipe_conflate_t (); + alloc_assert (inpipe); in_active = true; diff --git a/src/pipe.hpp b/src/pipe.hpp index d329b582..5405110c 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -21,7 +21,7 @@ #define __ZMQ_PIPE_HPP_INCLUDED__ #include "msg.hpp" -#include "ypipe.hpp" +#include "ypipe_base.hpp" #include "config.hpp" #include "object.hpp" #include "stdint.hpp" @@ -40,8 +40,10 @@ namespace zmq // Delay specifies how the pipe behaves when the peer terminates. If true // pipe receives all the pending messages before terminating, otherwise it // terminates straight away. + // If conflate is true, only the most recently arrived message could be + // read (older messages are discarded) int pipepair (zmq::object_t *parents_ [2], zmq::pipe_t* pipes_ [2], - int hwms_ [2], bool delays_ [2]); + int hwms_ [2], bool delays_ [2], bool conflate_ [2]); struct i_pipe_events { @@ -65,7 +67,8 @@ namespace zmq { // This allows pipepair to create pipe objects. friend int pipepair (zmq::object_t *parents_ [2], - zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2]); + zmq::pipe_t* pipes_ [2], int hwms_ [2], bool delays_ [2], + bool conflate_ [2]); public: @@ -110,7 +113,7 @@ namespace zmq private: // Type of the underlying lock-free pipe. - typedef ypipe_t upipe_t; + typedef ypipe_base_t upipe_t; // Command handlers. void process_activate_read (); @@ -125,7 +128,7 @@ namespace zmq // Constructor is private. Pipe can only be created using // pipepair function. pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, - int inhwm_, int outhwm_, bool delay_); + int inhwm_, int outhwm_, bool delay_, bool conflate_); // Pipepair uses this function to let us know about // the peer pipe object. @@ -196,6 +199,8 @@ namespace zmq // Computes appropriate low watermark from the given high watermark. static int compute_lwm (int hwm_); + bool conflate; + // Disable copying. pipe_t (const pipe_t&); const pipe_t &operator = (const pipe_t&); diff --git a/src/session_base.cpp b/src/session_base.cpp index 302fec8a..025bafd1 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -300,7 +300,8 @@ int zmq::session_base_t::zap_connect () pipe_t *new_pipes [2] = {NULL, NULL}; int hwms [2] = {0, 0}; bool delays [2] = {false, false}; - int rc = pipepair (parents, new_pipes, hwms, delays); + bool conflates [2] = {false, false}; + int rc = pipepair (parents, new_pipes, hwms, delays, conflates); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -331,9 +332,19 @@ void zmq::session_base_t::process_attach (i_engine *engine_) if (!pipe && !is_terminating ()) { object_t *parents [2] = {this, socket}; pipe_t *pipes [2] = {NULL, NULL}; - int hwms [2] = {options.rcvhwm, options.sndhwm}; + + bool conflate = options.conflate && + (options.type == ZMQ_DEALER || + options.type == ZMQ_PULL || + options.type == ZMQ_PUSH || + options.type == ZMQ_PUB || + options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : options.rcvhwm, + conflate? -1 : options.sndhwm}; bool delays [2] = {options.delay_on_close, options.delay_on_disconnect}; - int rc = pipepair (parents, pipes, hwms, delays); + bool conflates [2] = {conflate, conflate}; + int rc = pipepair (parents, pipes, hwms, delays, conflates); errno_assert (rc == 0); // Plug the local end of the pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 8034d143..fea6c85a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -450,9 +450,18 @@ int zmq::socket_base_t::connect (const char *addr_) // Create a bi-directional pipe to connect the peers. object_t *parents [2] = {this, peer.socket}; pipe_t *new_pipes [2] = {NULL, NULL}; - int hwms [2] = {sndhwm, rcvhwm}; + + bool conflate = options.conflate && + (options.type == ZMQ_DEALER || + options.type == ZMQ_PULL || + options.type == ZMQ_PUSH || + options.type == ZMQ_PUB || + options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : sndhwm, conflate? -1 : rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - int rc = pipepair (parents, new_pipes, hwms, delays); + bool conflates [2] = {conflate, conflate}; + int rc = pipepair (parents, new_pipes, hwms, delays, conflates); errno_assert (rc == 0); // Attach local end of the pipe to this socket object. @@ -553,9 +562,19 @@ int zmq::socket_base_t::connect (const char *addr_) // Create a bi-directional pipe. object_t *parents [2] = {this, session}; pipe_t *new_pipes [2] = {NULL, NULL}; - int hwms [2] = {options.sndhwm, options.rcvhwm}; + + bool conflate = options.conflate && + (options.type == ZMQ_DEALER || + options.type == ZMQ_PULL || + options.type == ZMQ_PUSH || + options.type == ZMQ_PUB || + options.type == ZMQ_SUB); + + int hwms [2] = {conflate? -1 : options.sndhwm, + conflate? -1 : options.rcvhwm}; bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, new_pipes, hwms, delays); + bool conflates [2] = {conflate, conflate}; + rc = pipepair (parents, new_pipes, hwms, delays, conflates); errno_assert (rc == 0); // Attach local end of the pipe to the socket object. diff --git a/src/ypipe.hpp b/src/ypipe.hpp index 182df4e7..86e5d01c 100644 --- a/src/ypipe.hpp +++ b/src/ypipe.hpp @@ -23,6 +23,7 @@ #include "atomic_ptr.hpp" #include "yqueue.hpp" #include "platform.hpp" +#include "ypipe_base.hpp" namespace zmq { @@ -34,7 +35,7 @@ namespace zmq // N is granularity of the pipe, i.e. how many items are needed to // perform next memory allocation. - template class ypipe_t + template class ypipe_t : public ypipe_base_t { public: diff --git a/src/ypipe_base.hpp b/src/ypipe_base.hpp new file mode 100644 index 00000000..b7e7081b --- /dev/null +++ b/src/ypipe_base.hpp @@ -0,0 +1,44 @@ + +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_YPIPE_BASE_HPP_INCLUDED__ +#define __ZMQ_YPIPE_BASE_HPP_INCLUDED__ + + +namespace zmq +{ + // ypipe_base abstracts ypipe and ypipe_conflate specific + // classes, one is selected according to a the conflate + // socket option + + template class ypipe_base_t + { + public: + virtual ~ypipe_base_t () {} + virtual void write (const T &value_, bool incomplete_) = 0; + virtual bool unwrite (T *value_) = 0; + virtual bool flush () = 0; + virtual bool check_read () = 0; + virtual bool read (T *value_) = 0; + virtual bool probe (bool (*fn)(T &)) = 0; + }; +} + +#endif diff --git a/src/ypipe_conflate.hpp b/src/ypipe_conflate.hpp new file mode 100644 index 00000000..6dc20ef9 --- /dev/null +++ b/src/ypipe_conflate.hpp @@ -0,0 +1,127 @@ +/* + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__ +#define __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__ + +#include "platform.hpp" +#include "dbuffer.hpp" +#include "ypipe_base.hpp" + +namespace zmq +{ + + // Adapter for dbuffer, to plug it in instead of a queue for the sake + // of implementing the conflate socket option, which, if set, makes + // the receiving side to discard all incoming messages but the last one. + // + // reader_awake flag is needed here to mimic ypipe delicate behaviour + // around the reader being asleep (see 'c' pointer being NULL in ypipe.hpp) + + template class ypipe_conflate_t : public ypipe_base_t + { + public: + + // Initialises the pipe. + inline ypipe_conflate_t () + : reader_awake(false) + { + } + + // The destructor doesn't have to be virtual. It is mad virtual + // just to keep ICC and code checking tools from complaining. + inline virtual ~ypipe_conflate_t () + { + } + + // Following function (write) deliberately copies uninitialised data + // when used with zmq_msg. Initialising the VSM body for + // non-VSM messages won't be good for performance. + +#ifdef ZMQ_HAVE_OPENVMS +#pragma message save +#pragma message disable(UNINIT) +#endif + inline void write (const T &value_, bool incomplete_) + { + (void)incomplete_; + + dbuffer.write (value_); + } + +#ifdef ZMQ_HAVE_OPENVMS +#pragma message restore +#endif + + // There are no incomplete items for conflate ypipe + inline bool unwrite (T *value_) + { + return false; + } + + // Flush is no-op for conflate ypipe. Reader asleep behaviour + // is as of the usual ypipe. + // Returns false if the reader thread is sleeping. In that case, + // caller is obliged to wake the reader up before using the pipe again. + inline bool flush () + { + return reader_awake; + } + + // Check whether item is available for reading. + inline bool check_read () + { + bool res = dbuffer.check_read (); + if (!res) + reader_awake = false; + + return res; + } + + // Reads an item from the pipe. Returns false if there is no value. + // available. + inline bool read (T *value_) + { + if (!check_read ()) + return false; + + return dbuffer.read (value_); + } + + // Applies the function fn to the first elemenent in the pipe + // and returns the value returned by the fn. + // The pipe mustn't be empty or the function crashes. + inline bool probe (bool (*fn)(T &)) + { + return dbuffer.probe (fn); + } + + protected: + + dbuffer_t dbuffer; + bool reader_awake; + + // Disable copying of ypipe object. + ypipe_conflate_t (const ypipe_conflate_t&); + const ypipe_conflate_t &operator = (const ypipe_conflate_t&); + }; + +} + +#endif