diff --git a/src/pipe.cpp b/src/pipe.cpp index 0e15dceb..31b91998 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -21,11 +21,11 @@ #include "pipe.hpp" -zmq::reader_t::reader_t (object_t *parent_, pipe_t *pipe_, +zmq::reader_t::reader_t (object_t *parent_, uint64_t hwm_, uint64_t lwm_) : object_t (parent_), - pipe (pipe_), - peer (&pipe_->writer), + pipe (NULL), + peer (NULL), hwm (hwm_), lwm (lwm_), endpoint (NULL) @@ -36,6 +36,13 @@ zmq::reader_t::~reader_t () { } +void zmq::reader_t::set_pipe (pipe_t *pipe_) +{ + zmq_assert (!pipe); + pipe = pipe_; + peer = &pipe_->writer; +} + bool zmq::reader_t::check_read () { // Check if there's an item in the pipe. @@ -94,11 +101,11 @@ void zmq::reader_t::process_pipe_term_ack () delete pipe; } -zmq::writer_t::writer_t (object_t *parent_, pipe_t *pipe_, +zmq::writer_t::writer_t (object_t *parent_, uint64_t hwm_, uint64_t lwm_) : object_t (parent_), - pipe (pipe_), - peer (&pipe_->reader), + pipe (NULL), + peer (NULL), hwm (hwm_), lwm (lwm_), endpoint (NULL) @@ -114,6 +121,13 @@ zmq::writer_t::~writer_t () { } +void zmq::writer_t::set_pipe (pipe_t *pipe_) +{ + zmq_assert (!pipe); + pipe = pipe_; + peer = &pipe_->reader; +} + bool zmq::writer_t::check_write (uint64_t size_) { // TODO: Check whether hwm is exceeded. @@ -161,9 +175,11 @@ void zmq::writer_t::process_pipe_term () zmq::pipe_t::pipe_t (object_t *reader_parent_, object_t *writer_parent_, uint64_t hwm_, uint64_t lwm_) : - reader (reader_parent_, this, hwm_, lwm_), - writer (writer_parent_, this, hwm_, lwm_) + reader (reader_parent_, hwm_, lwm_), + writer (writer_parent_, hwm_, lwm_) { + reader.set_pipe (this); + writer.set_pipe (this); reader.register_pipe (this); } diff --git a/src/pipe.hpp b/src/pipe.hpp index ecbce7df..9083ccd6 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -36,10 +36,11 @@ namespace zmq { public: - reader_t (class object_t *parent_, class pipe_t *pipe_, + reader_t (class object_t *parent_, uint64_t hwm_, uint64_t lwm_); ~reader_t (); + void set_pipe (class pipe_t *pipe_); void set_endpoint (i_endpoint *endpoint_); // Returns true if there is at least one message to read in the pipe. @@ -83,10 +84,11 @@ namespace zmq { public: - writer_t (class object_t *parent_, class pipe_t *pipe_, + writer_t (class object_t *parent_, uint64_t hwm_, uint64_t lwm_); ~writer_t (); + void set_pipe (class pipe_t *pipe_); void set_endpoint (i_endpoint *endpoint_); // Checks whether message with specified size can be written to the