diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 6cac0b08..6bbc72b0 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -63,12 +63,10 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, cons endpoint (endpoint_), plugged (false), terminating (false), + read_msg (&stream_engine_t::read_identity), + write_msg (&stream_engine_t::write_identity), io_error (false), congested (false), - identity_received (false), - identity_sent (false), - rx_initialized (false), - tx_initialized (false), subscription_required (false), socket (NULL) { @@ -157,6 +155,9 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, // disable handshaking for raw socket handshaking = false; + + read_msg = &stream_engine_t::pull_msg_from_session; + write_msg = &stream_engine_t::push_msg_to_session; } else { // Send the 'length' and 'flags' fields of the identity message. @@ -248,7 +249,7 @@ void zmq::stream_engine_t::in_event () insize -= processed; if (rc == 0 || rc == -1) break; - rc = write_msg (decoder->msg ()); + rc = (this->*write_msg) (decoder->msg ()); if (rc == -1) break; } @@ -286,7 +287,7 @@ void zmq::stream_engine_t::out_event () outsize = encoder->encode (&outpos, 0); while (outsize < out_batch_size) { - if (read_msg (&tx_msg) == -1) + if ((this->*read_msg) (&tx_msg) == -1) break; encoder->load_msg (&tx_msg); unsigned char *bufptr = outpos + outsize; @@ -355,7 +356,7 @@ void zmq::stream_engine_t::activate_in () zmq_assert (session != NULL); zmq_assert (decoder != NULL); - int rc = write_msg (decoder->msg ()); + int rc = (this->*write_msg) (decoder->msg ()); if (rc == -1) { if (errno == EAGAIN) session->flush (); @@ -372,7 +373,7 @@ void zmq::stream_engine_t::activate_in () insize -= processed; if (rc == 0 || rc == -1) break; - rc = write_msg (decoder->msg ()); + rc = (this->*write_msg) (decoder->msg ()); if (rc == -1) break; } @@ -499,60 +500,63 @@ bool zmq::stream_engine_t::handshake () return true; } -int zmq::stream_engine_t::read_msg (msg_t *msg_) +int zmq::stream_engine_t::read_identity (msg_t *msg_) { - if (likely (tx_initialized || options.raw_sock)) - return session->pull_msg (msg_); - - if (!identity_sent) { - int rc = msg_->init_size (options.identity_size); - errno_assert (rc == 0); + int rc = msg_->init_size (options.identity_size); + errno_assert (rc == 0); + if (options.identity_size > 0) memcpy (msg_->data (), options.identity, options.identity_size); - identity_sent = true; - tx_initialized = true; - return 0; - } - - tx_initialized = true; + read_msg = &stream_engine_t::pull_msg_from_session; return 0; } -int zmq::stream_engine_t::write_msg (msg_t *msg_) +int zmq::stream_engine_t::write_identity (msg_t *msg_) { - if (likely (rx_initialized || options.raw_sock)) - return session->push_msg (msg_); - - if (!identity_received) { - if (options.recv_identity) { - msg_->set_flags (msg_t::identity); - int rc = session->push_msg (msg_); - if (rc == -1) - return -1; - } - else { - int rc = msg_->close (); - errno_assert (rc == 0); - rc = msg_->init (); - errno_assert (rc == 0); - } - - identity_received = true; + if (options.recv_identity) { + msg_->set_flags (msg_t::identity); + int rc = session->push_msg (msg_); + errno_assert (rc == 0); } + else { + int rc = msg_->close (); + errno_assert (rc == 0); + rc = msg_->init (); + errno_assert (rc == 0); + } + + if (subscription_required) + write_msg = &stream_engine_t::write_subscription_msg; + else + write_msg = &stream_engine_t::push_msg_to_session; + + return 0; +} + +int zmq::stream_engine_t::pull_msg_from_session (msg_t *msg_) +{ + return session->pull_msg (msg_); +} + +int zmq::stream_engine_t::push_msg_to_session (msg_t *msg_) +{ + return session->push_msg (msg_); +} + +int zmq::stream_engine_t::write_subscription_msg (msg_t *msg_) +{ + msg_t subscription; // Inject the subscription message, so that also // ZMQ 2.x peers receive published messages. - if (subscription_required) { - int rc = msg_->init_size (1); - errno_assert (rc == 0); - *(unsigned char*) msg_->data () = 1; - rc = session->push_msg (msg_); - if (rc == -1) - return -1; - subscription_required = false; - } + int rc = subscription.init_size (1); + errno_assert (rc == 0); + *(unsigned char*) subscription.data () = 1; + rc = session->push_msg (&subscription); + if (rc == -1) + return -1; - rx_initialized = true; - return 0; + write_msg = &stream_engine_t::push_msg_to_session; + return push_msg_to_session (msg_); } void zmq::stream_engine_t::error () diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 3829f925..64e2a4a2 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -91,9 +91,13 @@ namespace zmq // peer -1 is returned. int read (void *data_, size_t size_); - int read_msg (msg_t *msg_); + int read_identity (msg_t *msg_); + int write_identity (msg_t *msg_); - int write_msg (msg_t *msg_); + int pull_msg_from_session (msg_t *msg_); + int push_msg_to_session (msg_t *msg); + + int write_subscription_msg (msg_t *msg_); // Underlying socket. fd_t s; @@ -137,24 +141,16 @@ namespace zmq bool plugged; bool terminating; + int (stream_engine_t::*read_msg) (msg_t *msg_); + + int (stream_engine_t::*write_msg) (msg_t *msg_); + bool io_error; // True iff the session could not accept more // messages due to flow control. bool congested; - // True iff the engine has received identity message. - bool identity_received; - - // True iff the engine has sent identity message. - bool identity_sent; - - // True iff the engine has received all ZMTP control messages. - bool rx_initialized; - - // True iff the engine has sent all ZMTP control messages. - bool tx_initialized; - // Indicates whether the engine is to inject a phony // subscription message into the incomming stream. // Needed to support old peers.