diff --git a/src/command.hpp b/src/command.hpp index a924b4e8..a72d3cab 100644 --- a/src/command.hpp +++ b/src/command.hpp @@ -39,8 +39,8 @@ namespace zmq own, attach, bind, - revive, - reader_info, + activate_reader, + activate_writer, pipe_term, pipe_term_ack, term_req, @@ -83,14 +83,13 @@ namespace zmq // Sent by pipe writer to inform dormant pipe reader that there // are messages in the pipe. struct { - } revive; + } activate_reader; - // Sent by pipe reader to inform pipe writer - // about how many messages it has read so far. - // Used to implement the flow control. + // Sent by pipe reader to inform pipe writer about how many + // messages it has read so far. struct { uint64_t msgs_read; - } reader_info; + } activate_writer; // Sent by pipe reader to pipe writer to ask it to terminate // its end of the pipe. diff --git a/src/fq.cpp b/src/fq.cpp index 8f6485f0..5673796e 100644 --- a/src/fq.cpp +++ b/src/fq.cpp @@ -103,8 +103,8 @@ int zmq::fq_t::recv (zmq_msg_t *msg_, int flags_) // without blocking. zmq_assert (!(more && !fetched)); - // Note that when message is not fetched, current pipe is killed and - // replaced by another active pipe. Thus we don't have to increase + // Note that when message is not fetched, current pipe is deactivated + // and replaced by another active pipe. Thus we don't have to increase // the 'current' pointer. if (fetched) { more = msg_->flags & ZMQ_MSG_MORE; diff --git a/src/object.cpp b/src/object.cpp index 3466431f..5f4a94ef 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -57,8 +57,12 @@ void zmq::object_t::process_command (command_t &cmd_) { switch (cmd_.type) { - case command_t::revive: - process_revive (); + case command_t::activate_reader: + process_activate_reader (); + break; + + case command_t::activate_writer: + process_activate_writer (cmd_.args.activate_writer.msgs_read); break; case command_t::stop: @@ -89,10 +93,6 @@ void zmq::object_t::process_command (command_t &cmd_) process_seqnum (); break; - case command_t::reader_info: - process_reader_info (cmd_.args.reader_info.msgs_read); - break; - case command_t::pipe_term: process_pipe_term (); return; @@ -248,18 +248,18 @@ void zmq::object_t::send_bind (own_t *destination_, reader_t *in_pipe_, send_command (cmd); } -void zmq::object_t::send_revive (object_t *destination_) +void zmq::object_t::send_activate_reader (reader_t *destination_) { command_t cmd; #if defined ZMQ_MAKE_VALGRIND_HAPPY memset (&cmd, 0, sizeof (cmd)); #endif cmd.destination = destination_; - cmd.type = command_t::revive; + cmd.type = command_t::activate_reader; send_command (cmd); } -void zmq::object_t::send_reader_info (writer_t *destination_, +void zmq::object_t::send_activate_writer (writer_t *destination_, uint64_t msgs_read_) { command_t cmd; @@ -267,8 +267,8 @@ void zmq::object_t::send_reader_info (writer_t *destination_, memset (&cmd, 0, sizeof (cmd)); #endif cmd.destination = destination_; - cmd.type = command_t::reader_info; - cmd.args.reader_info.msgs_read = msgs_read_; + cmd.type = command_t::activate_writer; + cmd.args.activate_writer.msgs_read = msgs_read_; send_command (cmd); } @@ -356,12 +356,12 @@ void zmq::object_t::process_bind (reader_t *in_pipe_, writer_t *out_pipe_, zmq_assert (false); } -void zmq::object_t::process_revive () +void zmq::object_t::process_activate_reader () { zmq_assert (false); } -void zmq::object_t::process_reader_info (uint64_t msgs_read_) +void zmq::object_t::process_activate_writer (uint64_t msgs_read_) { zmq_assert (false); } diff --git a/src/object.hpp b/src/object.hpp index e083ce3c..8652a86a 100644 --- a/src/object.hpp +++ b/src/object.hpp @@ -68,8 +68,8 @@ namespace zmq void send_bind (class own_t *destination_, class reader_t *in_pipe_, class writer_t *out_pipe_, const blob_t &peer_identity_, bool inc_seqnum_ = true); - void send_revive (class object_t *destination_); - void send_reader_info (class writer_t *destination_, + void send_activate_reader (class reader_t *destination_); + void send_activate_writer (class writer_t *destination_, uint64_t msgs_read_); void send_pipe_term (class writer_t *destination_); void send_pipe_term_ack (class reader_t *destination_); @@ -87,8 +87,8 @@ namespace zmq const blob_t &peer_identity_); virtual void process_bind (class reader_t *in_pipe_, class writer_t *out_pipe_, const blob_t &peer_identity_); - virtual void process_revive (); - virtual void process_reader_info (uint64_t msgs_read_); + virtual void process_activate_reader (); + virtual void process_activate_writer (uint64_t msgs_read_); virtual void process_pipe_term (); virtual void process_pipe_term_ack (); virtual void process_term_req (class own_t *object_); diff --git a/src/pipe.cpp b/src/pipe.cpp index 8785330d..7fa7133c 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -112,7 +112,7 @@ bool zmq::reader_t::read (zmq_msg_t *msg_) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) - send_reader_info (writer, msgs_read); + send_activate_writer (writer, msgs_read); return true; } @@ -127,7 +127,7 @@ void zmq::reader_t::terminate () send_pipe_term (writer); } -void zmq::reader_t::process_revive () +void zmq::reader_t::process_activate_reader () { // Forward the event to the sink (either socket or session). sink->activated (this); @@ -258,7 +258,7 @@ void zmq::writer_t::rollback () void zmq::writer_t::flush () { if (!pipe->flush ()) - send_revive (reader); + send_activate_reader (reader); } void zmq::writer_t::terminate () @@ -288,7 +288,7 @@ void zmq::writer_t::write_delimiter () flush (); } -void zmq::writer_t::process_reader_info (uint64_t msgs_read_) +void zmq::writer_t::process_activate_writer (uint64_t msgs_read_) { zmq_msg_t msg; diff --git a/src/pipe.hpp b/src/pipe.hpp index 421ebc93..dcdd927d 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -81,7 +81,7 @@ namespace zmq void set_writer (class writer_t *writer_); // Command handlers. - void process_revive (); + void process_activate_reader (); void process_pipe_term_ack (); // Returns true if the message is delimiter; false otherwise. @@ -150,7 +150,7 @@ namespace zmq uint64_t hwm_, int64_t swap_size_); ~writer_t (); - void process_reader_info (uint64_t msgs_read_); + void process_activate_writer (uint64_t msgs_read_); // Command handlers. void process_pipe_term (); diff --git a/src/socket_base.cpp b/src/socket_base.cpp index fe06d2fa..0dae1b26 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -495,8 +495,9 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_) errno = err; // If the message cannot be fetched immediately, there are two scenarios. - // For non-blocking recv, commands are processed in case there's a revive - // command already waiting int a command pipe. If it's not, return EAGAIN. + // For non-blocking recv, commands are processed in case there's an + // activate_reader command already waiting int a command pipe. + // If it's not, return EAGAIN. if (flags_ & ZMQ_NOBLOCK) { if (errno != EAGAIN) return -1;