diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index fdc4217d..03023d4b 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -385,6 +385,20 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER +ZMQ_XPUB_VERBOSE: Set the XPUB socket behavior +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Sets the 'XPUB' socket behavior on new subscriptions. A value of '0' is the default +and passes only new subscription messages to upstream. A value of '1' passes all +subscription messages upstream. + +[horizontal] +Option value type:: int +Option value unit:: 0, 1 +Default value:: 0 +Applicable socket types:: ZMQ_XPUB + + ZMQ_TCP_KEEPALIVE: Override SO_KEEPALIVE socket option ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Override 'SO_KEEPALIVE' socket option(where supported by OS). diff --git a/include/zmq.h b/include/zmq.h index 26d89e84..26beeca8 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -249,6 +249,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_ACCEPT_FILTER 38 #define ZMQ_DELAY_ATTACH_ON_CONNECT 39 +#define ZMQ_XPUB_VERBOSE 40 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/xpub.cpp b/src/xpub.cpp index 2081545c..e446f0f0 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -28,6 +28,7 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), + verbose(false), more (false) { options.type = ZMQ_XPUB; @@ -70,7 +71,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) // If the subscription is not a duplicate store it so that it can be // passed to used on next recv call. - if (unique && options.type != ZMQ_PUB) + if (options.type == ZMQ_XPUB && (unique || verbose)) pending.push_back (blob_t (data, size)); } @@ -83,6 +84,21 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) dist.activated (pipe_); } +int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, + size_t optvallen_) +{ + if (option_ != ZMQ_XPUB_VERBOSE) { + errno = EINVAL; + return -1; + } + if (optvallen_ != sizeof (int) || *static_cast (optval_) < 0) { + errno = EINVAL; + return -1; + } + verbose = *static_cast (optval_); + return 0; +} + void zmq::xpub_t::xterminated (pipe_t *pipe_) { // Remove the pipe from the trie. If there are topics that nobody diff --git a/src/xpub.hpp b/src/xpub.hpp index 92a589a5..045253d5 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -54,6 +54,7 @@ namespace zmq bool xhas_in (); void xread_activated (zmq::pipe_t *pipe_); void xwrite_activated (zmq::pipe_t *pipe_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); void xterminated (zmq::pipe_t *pipe_); private: @@ -72,6 +73,10 @@ namespace zmq // Distributor of messages holding the list of outbound pipes. dist_t dist; + // If true, send all subscription messages upstream, not just + // unique ones + bool verbose; + // True if we are in the middle of sending a multi-part message. bool more;