diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index b8bb20a2..5213c0ea 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -266,6 +266,29 @@ Default value:: 0 (false) Applicable socket types:: all, primarily when using TCP/IPC transports. +ZMQ_INVERT_MATCHING: Retrieve inverted filtering status +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Returns the value of the 'ZMQ_INVERT_MATCHING' option. A value of `1` +means the socket uses inverted prefix matching. + +On 'PUB' and 'XPUB' sockets, this causes messages to be sent to all +connected sockets 'except' those subscribed to a prefix that matches +the message. On 'SUB' sockets, this causes only incoming messages that +do 'not' match any of the socket's subscriptions to be received by the user. + +Whenever 'ZMQ_INVERT_MATCHING' is set to 1 on a 'PUB' socket, all 'SUB' +sockets connecting to it must also have the option set to 1. Failure to +do so will have the 'SUB' sockets reject everything the 'PUB' socket sends +them. 'XSUB' sockets do not need to do this because they do not filter +incoming messages. + +[horizontal] +Option value type:: int +Option value unit:: 0,1 +Default value:: 0 +Applicable socket types:: ZMQ_PUB, ZMQ_XPUB, ZMQ_SUB + + ZMQ_IPV4ONLY: Retrieve IPv4-only socket override status ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Retrieve the IPv4-only option for the socket. This option is deprecated. diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index cca37c49..52ee0a82 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -937,6 +937,29 @@ Option value unit:: boolean Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. + +ZMQ_INVERT_MATCHING: Invert message filtering +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Reverses the filtering behavior of PUB-SUB sockets, when set to 1. + +On 'PUB' and 'XPUB' sockets, this causes messages to be sent to all +connected sockets 'except' those subscribed to a prefix that matches +the message. On 'SUB' sockets, this causes only incoming messages that +do 'not' match any of the socket's subscriptions to be received by the user. + +Whenever 'ZMQ_INVERT_MATCHING' is set to 1 on a 'PUB' socket, all 'SUB' +sockets connecting to it must also have the option set to 1. Failure to +do so will have the 'SUB' sockets reject everything the 'PUB' socket sends +them. 'XSUB' sockets do not need to do this because they do not filter +incoming messages. + +[horizontal] +Option value type:: int +Option value unit:: 0,1 +Default value:: 0 +Applicable socket types:: ZMQ_PUB, ZMQ_XPUB, ZMQ_SUB + + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it diff --git a/include/zmq.h b/include/zmq.h index 83b42ccb..43955205 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -298,6 +298,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property); #define ZMQ_XPUB_MANUAL 71 #define ZMQ_XPUB_WELCOME_MSG 72 #define ZMQ_STREAM_NOTIFY 73 +#define ZMQ_INVERT_MATCHING 74 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/dist.cpp b/src/dist.cpp index 20322104..35cdb445 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -69,6 +69,22 @@ void zmq::dist_t::match (pipe_t *pipe_) matching++; } +void zmq::dist_t::reverse_match () +{ + pipes_t::size_type prev_matching = matching; + + // Reset matching to 0 + unmatch(); + + // Mark all matching pipes as not matching and vice-versa. + // To do this, push all pipes that are eligible but not + // matched - i.e. between "matching" and "eligible" - + // to the beginning of the queue. + for (pipes_t::size_type i = prev_matching; i < eligible; ++i) { + pipes.swap(i, matching++); + } +} + void zmq::dist_t::unmatch () { matching = 0; diff --git a/src/dist.hpp b/src/dist.hpp index b5d0982c..cff530e7 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -50,6 +50,9 @@ namespace zmq // will send message also to this pipe. void match (zmq::pipe_t *pipe_); + // Marks all pipes that are not matched as matched and vice-versa. + void reverse_match(); + // Mark all pipes as non-matching. void unmatch (); diff --git a/src/options.cpp b/src/options.cpp index 2d6e95b0..f08c595d 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -45,6 +45,7 @@ zmq::options_t::options_t () : ipv6 (0), immediate (0), filter (false), + invert_matching(false), recv_identity (false), raw_socket (false), raw_notify (false), @@ -500,6 +501,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, } break; + case ZMQ_INVERT_MATCHING: + if (is_int) { + invert_matching = (value != 0); + return 0; + } + break; + default: #if defined (ZMQ_ACT_MILITANT) // There are valid scenarios for probing with unknown socket option @@ -846,6 +854,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) } break; + case ZMQ_INVERT_MATCHING: + if (is_int) { + *value = invert_matching; + return 0; + } + break; + default: #if defined (ZMQ_ACT_MILITANT) malformed = false; diff --git a/src/options.hpp b/src/options.hpp index c6475745..eae018ce 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -108,6 +108,11 @@ namespace zmq // If 1, (X)SUB socket should filter the messages. If 0, it should not. bool filter; + // If true, the subscription matching on (X)PUB and (X)SUB sockets + // is reversed. Messages are sent to and received by non-matching + // sockets. + bool invert_matching; + // If true, the identity message is forwarded to the socket. bool recv_identity; diff --git a/src/xpub.cpp b/src/xpub.cpp index 1088da5b..a1f36ab9 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -179,9 +179,14 @@ int zmq::xpub_t::xsend (msg_t *msg_) bool msg_more = msg_->flags () & msg_t::more ? true : false; // For the first part of multi-part message, find the matching pipes. - if (!more) + if (!more) { subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), mark_as_matching, this); + // If inverted matching is used, reverse the selection now + if (options.invert_matching) { + dist.reverse_match(); + } + } int rc = -1; // Assume we fail if (lossy || dist.check_hwm ()) { diff --git a/src/xsub.cpp b/src/xsub.cpp index 5401e2ae..ddbc8f9f 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -206,7 +206,9 @@ zmq::blob_t zmq::xsub_t::get_credential () const bool zmq::xsub_t::match (msg_t *msg_) { - return subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); + bool matching = subscriptions.check ((unsigned char*) msg_->data (), msg_->size ()); + + return matching ^ options.invert_matching; } void zmq::xsub_t::send_subscription (unsigned char *data_, size_t size_, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 54b50da9..27ac4653 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -44,6 +44,7 @@ set(tests test_diffserv test_connect_rid test_xpub_nodrop + test_pub_invert_matching ) if(NOT WIN32) list(APPEND tests diff --git a/tests/test_pub_invert_matching.cpp b/tests/test_pub_invert_matching.cpp new file mode 100644 index 00000000..a42efdf5 --- /dev/null +++ b/tests/test_pub_invert_matching.cpp @@ -0,0 +1,126 @@ +/* + Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +int main (void) +{ + setup_test_environment(); + void *ctx = zmq_ctx_new (); + assert (ctx); + + // Create a publisher + void *pub = zmq_socket (ctx, ZMQ_PUB); + assert (pub); + int rc = zmq_bind (pub, "inproc://soname"); + assert (rc == 0); + + // Create two subscribers + void *sub1 = zmq_socket (ctx, ZMQ_SUB); + assert (sub1); + rc = zmq_connect (sub1, "inproc://soname"); + assert (rc == 0); + + void *sub2 = zmq_socket (ctx, ZMQ_SUB); + assert (sub2); + rc = zmq_connect (sub2, "inproc://soname"); + assert (rc == 0); + + // Subscribe pub1 to one prefix + // and pub2 to another prefix. + const char PREFIX1[] = "prefix1"; + const char PREFIX2[] = "p2"; + + rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, PREFIX1, sizeof(PREFIX1)); + assert (rc == 0); + + rc = zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, PREFIX2, sizeof(PREFIX2)); + assert (rc == 0); + + // Send a message with the first prefix + rc = zmq_send_const(pub, PREFIX1, sizeof(PREFIX1), 0); + assert (rc == sizeof(PREFIX1)); + + // sub1 should receive it, but not sub2 + rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT); + assert (rc == sizeof(PREFIX1)); + + rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + // Send a message with the second prefix + rc = zmq_send_const(pub, PREFIX2, sizeof(PREFIX2), 0); + assert (rc == sizeof(PREFIX2)); + + // sub2 should receive it, but not sub1 + rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT); + assert (rc == sizeof(PREFIX2)); + + rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + // Now invert the matching + int invert = 1; + rc = zmq_setsockopt (pub, ZMQ_INVERT_MATCHING, &invert, sizeof(invert)); + assert (rc == 0); + + // ... on both sides, otherwise the SUB socket will filter the messages out + rc = zmq_setsockopt (sub1, ZMQ_INVERT_MATCHING, &invert, sizeof(invert)); + rc = zmq_setsockopt (sub2, ZMQ_INVERT_MATCHING, &invert, sizeof(invert)); + assert (rc == 0); + + // Send a message with the first prefix + rc = zmq_send_const(pub, PREFIX1, sizeof(PREFIX1), 0); + assert (rc == sizeof(PREFIX1)); + + // sub2 should receive it, but not sub1 + rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT); + assert (rc == sizeof(PREFIX1)); + + rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + // Send a message with the second prefix + rc = zmq_send_const(pub, PREFIX2, sizeof(PREFIX2), 0); + assert (rc == sizeof(PREFIX2)); + + // sub1 should receive it, but not sub2 + rc = zmq_recv (sub1, NULL, 0, ZMQ_DONTWAIT); + assert (rc == sizeof(PREFIX2)); + + rc = zmq_recv (sub2, NULL, 0, ZMQ_DONTWAIT); + assert (rc == -1); + assert (errno == EAGAIN); + + + // Clean up. + rc = zmq_close (pub); + assert (rc == 0); + rc = zmq_close (sub1); + assert (rc == 0); + rc = zmq_close (sub2); + assert (rc == 0); + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +}