diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index 9a435870..0f665d64 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -224,7 +224,8 @@ ZMQ_XPUB Same as ZMQ_PUB except that you can receive subscriptions from the peers in form of incoming messages. Subscription message is a byte 1 (for subscriptions) or byte 0 (for unsubscriptions) followed by the subscription -body. +body. Messages without a sub/unsub prefix are also received, but have no +effect on subscription status. [horizontal] .Summary of ZMQ_XPUB characteristics @@ -240,7 +241,8 @@ ZMQ_XSUB ^^^^^^^^ Same as ZMQ_SUB except that you subscribe by sending subscription messages to the socket. Subscription message is a byte 1 (for subscriptions) or byte 0 -(for unsubscriptions) followed by the subscription body. +(for unsubscriptions) followed by the subscription body. Messages without a +sub/unsub prefix may also be sent, but have no effect on subscription status. [horizontal] .Summary of ZMQ_XSUB characteristics diff --git a/src/xpub.cpp b/src/xpub.cpp index e724a2de..0f2a3266 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -74,6 +74,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) pending.push_back (blob_t (data, size)); } + else /*process message unrelated to sub/unsub*/ { + pending.push_back (blob_t (data, size)); + } sub.close (); } diff --git a/src/xsub.cpp b/src/xsub.cpp index 9794ef6a..8f74b960 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -87,12 +87,6 @@ int zmq::xsub_t::xsend (msg_t *msg_) size_t size = msg_->size (); unsigned char *data = (unsigned char*) msg_->data (); - // Malformed subscriptions. - if (size < 1 || (*data != 0 && *data != 1)) { - errno = EINVAL; - return -1; - } - // Process the subscription. if (*data == 1) { // this used to filter out duplicate subscriptions, @@ -102,10 +96,13 @@ int zmq::xsub_t::xsend (msg_t *msg_) subscriptions.add (data + 1, size - 1); return dist.send_to_all (msg_); } - else { + else if (*data == 0) { if (subscriptions.rm (data + 1, size - 1)) return dist.send_to_all (msg_); } + else /*upstream message unrelated to sub/unsub*/ { + return dist.send_to_all (msg_); + } int rc = msg_->close (); errno_assert (rc == 0);