ZMQ_FILTER socket option added

This option is a performance tweak. In devices XSUB socket filters
the messages just to send them to XPUB socket which filters them
once more. Setting ZMQ_FILTER option to 0 allows to switch the
filtering in XSUB socket off.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik 2011-06-12 15:24:08 +02:00
parent e080e3e8b6
commit ff93f54653
6 changed files with 62 additions and 3 deletions

View File

@ -297,7 +297,7 @@ Applicable socket types:: all
ZMQ_MULTICAST_HOPS: Maximum network hops for multicast packets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The option shell retrieve time-to-live used for outbound multicast packets.
The option shall retrieve time-to-live used for outbound multicast packets.
The default of 1 means that the multicast packets don't leave the local network.
[horizontal]
@ -307,6 +307,23 @@ Default value:: 1
Applicable socket types:: all, when using multicast transports
ZMQ_FILTER: Switches message filtering on or off
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The option shall retrieve the filtering behaiour of the socket.
If 1, messages are filtered according to subcriptions as expected.
If 0, messages are not filtered. This is a performance tweak. If a device
receives a message from XSUB socket and it is about to send it to XPUB socket
immediately, filtering would be done twice. We can thus turn off filtering in
XSUB socket and rely on filtering in XPUB socket.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 1
Applicable socket types:: ZMQ_SUB, ZMQ_XSUB
ZMQ_FD: Retrieve file descriptor associated with the socket
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
The 'ZMQ_FD' option shall retrieve the file descriptor associated with the

View File

@ -284,6 +284,7 @@ Option value unit:: connections
Default value:: 100
Applicable socket types:: all, only for connection-oriented transports.
ZMQ_MAXMSGSIZE: Maximum acceptable inbound message size
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -296,6 +297,7 @@ Option value unit:: bytes
Default value:: -1
Applicable socket types:: all
ZMQ_MULTICAST_HOPS: Maximum network hops for multicast packets
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@ -309,6 +311,23 @@ Option value unit:: network hops
Default value:: 1
Applicable socket types:: all, when using multicast transports
ZMQ_FILTER: Switches message filtering on or off
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If set to 1, messages are filtered according to subcriptions as expected.
If set to 0, messages are not filtered. This is a performance tweak. If a device
receives a message from XSUB socket and it is about to send it to XPUB socket
immediately, filtering would be done twice. We can thus turn off filtering in
XSUB socket and rely on filtering in XPUB socket.
[horizontal]
Option value type:: int
Option value unit:: boolean
Default value:: 1
Applicable socket types:: ZMQ_SUB, ZMQ_XSUB
RETURN VALUE
------------
The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it

View File

@ -180,6 +180,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_SNDHWM 23
#define ZMQ_RCVHWM 24
#define ZMQ_MULTICAST_HOPS 25
#define ZMQ_FILTER 26
/* Send/recv options. */
#define ZMQ_DONTWAIT 1

View File

@ -38,6 +38,7 @@ zmq::options_t::options_t () :
reconnect_ivl_max (0),
backlog (100),
maxmsgsize (-1),
filter (1),
immediate_connect (true)
{
}
@ -172,6 +173,15 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
multicast_hops = *((int*) optval_);
return 0;
case ZMQ_FILTER:
if (optvallen_ != sizeof (int) || (*((int*) optval_) != 0 &&
*((int*) optval_) != 1)) {
errno = EINVAL;
return -1;
}
filter = *((int*) optval_);
return 0;
}
errno = EINVAL;
@ -317,6 +327,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
case ZMQ_FILTER:
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
return -1;
}
*((int*) optval_) = filter;
*optvallen_ = sizeof (int);
return 0;
}
errno = EINVAL;

View File

@ -75,6 +75,9 @@ namespace zmq
// Maximal size of message to handle.
int64_t maxmsgsize;
// If 1, (X)SUB socket should filter the messages. If 0, it should not.
int filter;
// If true, when connecting, pipes are created immediately without
// waiting for the connection to be established. That way the socket
// is not aware of the peer's identity, however, it is able to send

View File

@ -133,7 +133,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_)
// Check whether the message matches at least one subscription.
// Non-initial parts of the message are passed
if (more || match (msg_)) {
if (more || !options.filter || match (msg_)) {
more = msg_->flags () & msg_t::more;
return 0;
}
@ -173,7 +173,7 @@ bool zmq::xsub_t::xhas_in ()
}
// Check whether the message matches at least one subscription.
if (match (&message)) {
if (!options.filter || match (&message)) {
has_message = true;
return true;
}