diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index a1a1551b..1a1b5b2d 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -401,6 +401,14 @@ bool zmq::stream_engine_t::handshake () // Make sure the decoder sees the data we have already received. inpos = greeting; insize = greeting_bytes_read; + + // To allow for interoperability with peers that do not forward + // their subscriptions, we inject a phony subsription + // message into the incomming message stream. To put this + // message right after the identity message, we temporarily + // divert the message stream from session to ourselves. + if (options.type == ZMQ_PUB || options.type == ZMQ_XPUB) + decoder.set_msg_sink (this); } // Start polling for output if necessary. @@ -414,6 +422,30 @@ bool zmq::stream_engine_t::handshake () return true; } +int zmq::stream_engine_t::push_msg (msg_t *msg_) +{ + zmq_assert (options.type == ZMQ_PUB || options.type == ZMQ_XPUB); + + // The first message is identity. + // Let the session process it. + int rc = session->push_msg (msg_); + errno_assert (rc == 0); + + // Inject the subscription message so that the ZMQ 2.x peer + // receives our messages. + rc = msg_->init_size (1); + errno_assert (rc == 0); + *(unsigned char*) msg_->data () = 1; + rc = session->push_msg (msg_); + session->flush (); + + // Once we have injected the subscription message, we can + // Divert the message flow back to the session. + decoder.set_msg_sink (session); + + return rc; +} + void zmq::stream_engine_t::error () { zmq_assert (session); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 90b9221f..86b3fef4 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -26,6 +26,7 @@ #include "fd.hpp" #include "i_engine.hpp" +#include "i_msg_sink.hpp" #include "io_object.hpp" #include "encoder.hpp" #include "decoder.hpp" @@ -41,7 +42,7 @@ namespace zmq // This engine handles any socket with SOCK_STREAM semantics, // e.g. TCP socket or an UNIX domain socket. - class stream_engine_t : public io_object_t, public i_engine + class stream_engine_t : public io_object_t, public i_engine, public i_msg_sink { public: @@ -55,6 +56,9 @@ namespace zmq void activate_in (); void activate_out (); + // i_msg_sink interface implementation. + virtual int push_msg (msg_t *msg_); + // i_poll_events interface implementation. void in_event (); void out_event ();