Actual message filtering happens in XPUB socket

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
Martin Sustrik 2011-06-11 20:29:56 +02:00
parent 3935258b82
commit bd86def1c7
7 changed files with 99 additions and 23 deletions

View File

@ -25,6 +25,7 @@
#include "likely.hpp" #include "likely.hpp"
zmq::dist_t::dist_t () : zmq::dist_t::dist_t () :
matching (0),
active (0), active (0),
eligible (0), eligible (0),
more (false) more (false)
@ -54,10 +55,27 @@ void zmq::dist_t::attach (pipe_t *pipe_)
} }
} }
void zmq::dist_t::match (pipe_t *pipe_)
{
// If pipe is already matching do nothing.
if (pipes.index (pipe_) < matching)
return;
// If the pipe isn't eligible, ignore it.
if (pipes.index (pipe_) >= eligible)
return;
// Mark the pipe as matching.
pipes.swap (pipes.index (pipe_), matching);
matching++;
}
void zmq::dist_t::terminated (pipe_t *pipe_) void zmq::dist_t::terminated (pipe_t *pipe_)
{ {
// Remove the pipe from the list; adjust number of active and/or // Remove the pipe from the list; adjust number of matching, active and/or
// eligible pipes accordingly. // eligible pipes accordingly.
if (pipes.index (pipe_) < matching)
matching--;
if (pipes.index (pipe_) < active) if (pipes.index (pipe_) < active)
active--; active--;
if (pipes.index (pipe_) < eligible) if (pipes.index (pipe_) < eligible)
@ -79,18 +97,27 @@ void zmq::dist_t::activated (pipe_t *pipe_)
} }
} }
int zmq::dist_t::send (msg_t *msg_, int flags_) int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
{
matching = active;
return send_to_matching (msg_, flags_);
}
int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
{ {
// Is this end of a multipart message? // Is this end of a multipart message?
bool msg_more = msg_->flags () & msg_t::more; bool msg_more = msg_->flags () & msg_t::more;
// Push the message to active pipes. // Push the message to matching pipes.
distribute (msg_, flags_); distribute (msg_, flags_);
// If mutlipart message is fully sent, activate all the eligible pipes. // If mutlipart message is fully sent, activate all the eligible pipes.
if (!msg_more) if (!msg_more)
active = eligible; active = eligible;
// Mark all the pipes as non-matching.
matching = 0;
more = msg_more; more = msg_more;
return 0; return 0;
@ -98,8 +125,8 @@ int zmq::dist_t::send (msg_t *msg_, int flags_)
void zmq::dist_t::distribute (msg_t *msg_, int flags_) void zmq::dist_t::distribute (msg_t *msg_, int flags_)
{ {
// If there are no active pipes available, simply drop the message. // If there are no matching pipes available, simply drop the message.
if (active == 0) { if (matching == 0) {
int rc = msg_->close (); int rc = msg_->close ();
errno_assert (rc == 0); errno_assert (rc == 0);
rc = msg_->init (); rc = msg_->init ();
@ -107,12 +134,12 @@ void zmq::dist_t::distribute (msg_t *msg_, int flags_)
return; return;
} }
// Add active-1 references to the message. We already hold one reference, // Add matching-1 references to the message. We already hold one reference,
// that's why -1. // that's why -1.
msg_->add_refs (active - 1); msg_->add_refs (matching - 1);
// Push copy of the message to each active pipe. // Push copy of the message to each matching pipe.
for (pipes_t::size_type i = 0; i < active;) { for (pipes_t::size_type i = 0; i < matching;) {
if (!write (pipes [i], msg_)) if (!write (pipes [i], msg_))
msg_->rm_refs (1); msg_->rm_refs (1);
else else
@ -133,6 +160,8 @@ bool zmq::dist_t::has_out ()
bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
{ {
if (!pipe_->write (msg_)) { if (!pipe_->write (msg_)) {
pipes.swap (pipes.index (pipe_), matching - 1);
matching--;
pipes.swap (pipes.index (pipe_), active - 1); pipes.swap (pipes.index (pipe_), active - 1);
active--; active--;
pipes.swap (active, eligible - 1); pipes.swap (active, eligible - 1);

View File

@ -38,11 +38,26 @@ namespace zmq
dist_t (); dist_t ();
~dist_t (); ~dist_t ();
// Adds the pipe to the distributor object.
void attach (class pipe_t *pipe_); void attach (class pipe_t *pipe_);
// Activates pipe that have previously reached high watermark.
void activated (class pipe_t *pipe_); void activated (class pipe_t *pipe_);
// Mark the pipe as matching. Subsequent call to send_to_matching
// will send message also to this pipe.
void match (class pipe_t *pipe_);
// Removes the pipe from the distributor object.
void terminated (class pipe_t *pipe_); void terminated (class pipe_t *pipe_);
int send (class msg_t *msg_, int flags_); // Send the message to all the outbound pipes. After the call all the
// pipes are marked as non-matching.
int send_to_matching (class msg_t *msg_, int flags_);
// Send the message to the matching outbound pipes.
int send_to_all (class msg_t *msg_, int flags_);
bool has_out (); bool has_out ();
private: private:
@ -58,6 +73,9 @@ namespace zmq
typedef array_t <class pipe_t, 2> pipes_t; typedef array_t <class pipe_t, 2> pipes_t;
pipes_t pipes; pipes_t pipes;
// Number of all the pipes to send the next message to.
pipes_t::size_type matching;
// Number of active pipes. All the active pipes are located at the // Number of active pipes. All the active pipes are located at the
// beginning of the pipes array. These are the pipes the messages // beginning of the pipes array. These are the pipes the messages
// can be sent to at the moment. // can be sent to at the moment.

View File

@ -206,10 +206,21 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
return next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); return next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);
} }
void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_) void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
{ {
// Merge the subscriptions from this node to the resultset. match_helper (data_, size_, func_, arg_);
pipes_.insert (pipes.begin (), pipes.end ()); }
void zmq::mtrie_t::match_helper (unsigned char *data_, size_t size_,
void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
{
// TODO: This function is on critical path. Rewrite it as iteration
// rather than recursion.
// Signal the pipes attached to this node.
for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); ++it)
func_ (*it, arg_);
// If there are no subnodes in the trie, return. // If there are no subnodes in the trie, return.
if (count == 0) if (count == 0)
@ -217,14 +228,14 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_)
// If there's one subnode (optimisation). // If there's one subnode (optimisation).
if (count == 1) { if (count == 1) {
next.node->match (data_ + 1, size_ - 1, pipes_); next.node->match (data_ + 1, size_ - 1, func_, arg_);
return; return;
} }
// If there are multiple subnodes. // If there are multiple subnodes.
for (unsigned char c = 0; c != count; c++) { for (unsigned char c = 0; c != count; c++) {
if (next.table [c]) if (next.table [c])
next.table [c]->match (data_ + 1, size_ - 1, pipes_); next.table [c]->match (data_ + 1, size_ - 1, func_, arg_);
} }
} }

View File

@ -35,8 +35,6 @@ namespace zmq
{ {
public: public:
typedef std::set <class pipe_t*> pipes_t;
mtrie_t (); mtrie_t ();
~mtrie_t (); ~mtrie_t ();
@ -55,8 +53,9 @@ namespace zmq
// actually removed rather than de-duplicated. // actually removed rather than de-duplicated.
bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_);
// Get all matching pipes. // Signal all the matching pipes.
void match (unsigned char *data_, size_t size_, pipes_t &pipes_); void match (unsigned char *data_, size_t size_,
void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_);
private: private:
@ -68,8 +67,12 @@ namespace zmq
void *arg_); void *arg_);
bool rm_helper (unsigned char *prefix_, size_t size_, bool rm_helper (unsigned char *prefix_, size_t size_,
class pipe_t *pipe_); class pipe_t *pipe_);
void match_helper (unsigned char *data_, size_t size_,
void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_);
typedef std::set <class pipe_t*> pipes_t;
pipes_t pipes; pipes_t pipes;
unsigned char min; unsigned char min;
unsigned short count; unsigned short count;
union { union {

View File

@ -91,9 +91,21 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_)
dist.terminated (pipe_); dist.terminated (pipe_);
} }
void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
{
xpub_t *self = (xpub_t*) arg_;
self->dist.match (pipe_);
}
int zmq::xpub_t::xsend (msg_t *msg_, int flags_) int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
{ {
return dist.send (msg_, flags_); // Find the matching pipes.
subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
mark_as_matching, this);
// Send the message to all the pipes that were marked as matching
// in the previous step.
return dist.send_to_matching (msg_, flags_);
} }
bool zmq::xpub_t::xhas_out () bool zmq::xpub_t::xhas_out ()

View File

@ -57,6 +57,9 @@ namespace zmq
static void send_unsubscription (unsigned char *data_, size_t size_, static void send_unsubscription (unsigned char *data_, size_t size_,
void *arg_); void *arg_);
// Function to be applied to each matching pipes.
static void mark_as_matching (class pipe_t *pipe_, void *arg_);
// List of all subscriptions mapped to corresponding pipes. // List of all subscriptions mapped to corresponding pipes.
mtrie_t subscriptions; mtrie_t subscriptions;

View File

@ -87,13 +87,13 @@ int zmq::xsub_t::xsend (msg_t *msg_, int flags_)
// Process the subscription. // Process the subscription.
if (*data == 1) { if (*data == 1) {
if (subscriptions.add (data + 1, size - 1)) if (subscriptions.add (data + 1, size - 1))
return dist.send (msg_, flags_); return dist.send_to_all (msg_, flags_);
else else
return 0; return 0;
} }
else if (*data == 0) { else if (*data == 0) {
if (subscriptions.rm (data + 1, size - 1)) if (subscriptions.rm (data + 1, size - 1))
return dist.send (msg_, flags_); return dist.send_to_all (msg_, flags_);
else else
return 0; return 0;
} }