From 107f2441d5c0dc203b5ad5a45b54cabbee488834 Mon Sep 17 00:00:00 2001 From: laplaceyang Date: Wed, 11 Jan 2017 10:49:54 +0800 Subject: [PATCH] Problem: Thread-safe solution for modify hwm of pipe Solution: where change pipe hwm, send a command (new type pipe_hwm) to peer, so peer pipe can modify hwm thread-safely --- src/command.hpp | 7 +++++++ src/object.cpp | 19 +++++++++++++++++++ src/object.hpp | 2 ++ src/pipe.cpp | 10 ++++++++++ src/pipe.hpp | 4 ++++ src/socket_base.cpp | 1 + 6 files changed, 43 insertions(+) diff --git a/src/command.hpp b/src/command.hpp index dd34a357..3389a6f9 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -64,6 +64,7 @@ namespace zmq hiccup, pipe_term, pipe_term_ack, + pipe_hwm, term_req, term, term_ack, @@ -129,6 +130,12 @@ namespace zmq struct { } pipe_term_ack; + // Sent by one of pipe to another part for modify hwm + struct { + int inhwm; + int outhwm; + } pipe_hwm; + // Sent by I/O object ot the socket to request the shutdown of // the I/O object. struct { diff --git a/src/object.cpp b/src/object.cpp index 6c2a5afe..982bc95b 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -118,6 +118,10 @@ void zmq::object_t::process_command (command_t &cmd_) process_pipe_term_ack (); break; + case command_t::pipe_hwm: + process_pipe_hwm (cmd_.args.pipe_hwm.inhwm, cmd_.args.pipe_hwm.outhwm); + break; + case command_t::term_req: process_term_req (cmd_.args.term_req.object); break; @@ -291,6 +295,16 @@ void zmq::object_t::send_pipe_term_ack (pipe_t *destination_) send_command (cmd); } +void zmq::object_t::send_pipe_hwm (pipe_t *destination_, int inhwm_, int outhwm_) +{ + command_t cmd; + cmd.destination = destination_; + cmd.type = command_t::pipe_hwm; + cmd.args.pipe_hwm.inhwm = inhwm_; + cmd.args.pipe_hwm.outhwm = outhwm_; + send_command (cmd); +} + void zmq::object_t::send_term_req (own_t *destination_, own_t *object_) { @@ -401,6 +415,11 @@ void zmq::object_t::process_pipe_term_ack () zmq_assert (false); } +void zmq::object_t::process_pipe_hwm (int, int) +{ + zmq_assert (false); +} + void zmq::object_t::process_term_req (own_t *) { zmq_assert (false); diff --git a/src/object.hpp b/src/object.hpp index 92627d2a..97755c88 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -102,6 +102,7 @@ namespace zmq void send_hiccup (zmq::pipe_t *destination_, void *pipe_); void send_pipe_term (zmq::pipe_t *destination_); void send_pipe_term_ack (zmq::pipe_t *destination_); + void send_pipe_hwm (zmq::pipe_t *destination_, int inhwm_, int outhwm_); void send_term_req (zmq::own_t *destination_, zmq::own_t *object_); void send_term (zmq::own_t *destination_, int linger_); @@ -122,6 +123,7 @@ namespace zmq virtual void process_hiccup (void *pipe_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); + virtual void process_pipe_hwm (int inhwm_, int outhwm_); virtual void process_term_req (zmq::own_t *object_); virtual void process_term (int linger_); virtual void process_term_ack (); diff --git a/src/pipe.cpp b/src/pipe.cpp index af81b064..b70601b8 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -377,6 +377,11 @@ void zmq::pipe_t::process_pipe_term_ack () delete this; } +void zmq::pipe_t::process_pipe_hwm (int inhwm_, int outhwm_) +{ + set_hwms(inhwm_, outhwm_); +} + void zmq::pipe_t::set_nodelay () { this->delay = false; @@ -533,3 +538,8 @@ bool zmq::pipe_t::check_hwm () const bool full = hwm > 0 && msgs_written - peers_msgs_read >= uint64_t (hwm); return( !full ); } + +void zmq::pipe_t::send_hwms_to_peer(int inhwm_, int outhwm_) +{ + send_pipe_hwm(peer, inhwm_, outhwm_); +} diff --git a/src/pipe.hpp b/src/pipe.hpp index cae54565..4b04bbe0 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -136,6 +136,9 @@ namespace zmq // Set the boost to high water marks, used by inproc sockets so total hwm are sum of connect and bind sockets watermarks void set_hwms_boost(int inhwmboost_, int outhwmboost_); + // send command to peer for notify the change of hwm + void send_hwms_to_peer(int inhwm_, int outhwm_); + // Returns true if HWM is not reached bool check_hwm () const; private: @@ -149,6 +152,7 @@ namespace zmq void process_hiccup (void *pipe_); void process_pipe_term (); void process_pipe_term_ack (); + void process_pipe_hwm (int inhwm_, int outhwm_); // Handler for delimiter read from the pipe. void process_delimiter (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 7508f189..bf87241b 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -1414,6 +1414,7 @@ void zmq::socket_base_t::update_pipe_options(int option_) for (pipes_t::size_type i = 0; i != pipes.size(); ++i) { pipes[i]->set_hwms(options.rcvhwm, options.sndhwm); + pipes[i]->send_hwms_to_peer(options.sndhwm, options.rcvhwm); } }