diff --git a/src/dist.cpp b/src/dist.cpp index f7f0488b..707b9c11 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -25,6 +25,7 @@ #include "likely.hpp" zmq::dist_t::dist_t () : + matching (0), active (0), eligible (0), more (false) @@ -54,10 +55,27 @@ void zmq::dist_t::attach (pipe_t *pipe_) } } +void zmq::dist_t::match (pipe_t *pipe_) +{ + // If pipe is already matching do nothing. + if (pipes.index (pipe_) < matching) + return; + + // If the pipe isn't eligible, ignore it. + if (pipes.index (pipe_) >= eligible) + return; + + // Mark the pipe as matching. + pipes.swap (pipes.index (pipe_), matching); + matching++; +} + void zmq::dist_t::terminated (pipe_t *pipe_) { - // Remove the pipe from the list; adjust number of active and/or + // Remove the pipe from the list; adjust number of matching, active and/or // eligible pipes accordingly. + if (pipes.index (pipe_) < matching) + matching--; if (pipes.index (pipe_) < active) active--; if (pipes.index (pipe_) < eligible) @@ -79,18 +97,27 @@ void zmq::dist_t::activated (pipe_t *pipe_) } } -int zmq::dist_t::send (msg_t *msg_, int flags_) +int zmq::dist_t::send_to_all (msg_t *msg_, int flags_) +{ + matching = active; + return send_to_matching (msg_, flags_); +} + +int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_) { // Is this end of a multipart message? bool msg_more = msg_->flags () & msg_t::more; - // Push the message to active pipes. + // Push the message to matching pipes. distribute (msg_, flags_); // If mutlipart message is fully sent, activate all the eligible pipes. if (!msg_more) active = eligible; + // Mark all the pipes as non-matching. + matching = 0; + more = msg_more; return 0; @@ -98,8 +125,8 @@ int zmq::dist_t::send (msg_t *msg_, int flags_) void zmq::dist_t::distribute (msg_t *msg_, int flags_) { - // If there are no active pipes available, simply drop the message. - if (active == 0) { + // If there are no matching pipes available, simply drop the message. + if (matching == 0) { int rc = msg_->close (); errno_assert (rc == 0); rc = msg_->init (); @@ -107,12 +134,12 @@ void zmq::dist_t::distribute (msg_t *msg_, int flags_) return; } - // Add active-1 references to the message. We already hold one reference, + // Add matching-1 references to the message. We already hold one reference, // that's why -1. - msg_->add_refs (active - 1); + msg_->add_refs (matching - 1); - // Push copy of the message to each active pipe. - for (pipes_t::size_type i = 0; i < active;) { + // Push copy of the message to each matching pipe. + for (pipes_t::size_type i = 0; i < matching;) { if (!write (pipes [i], msg_)) msg_->rm_refs (1); else @@ -133,6 +160,8 @@ bool zmq::dist_t::has_out () bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_) { if (!pipe_->write (msg_)) { + pipes.swap (pipes.index (pipe_), matching - 1); + matching--; pipes.swap (pipes.index (pipe_), active - 1); active--; pipes.swap (active, eligible - 1); diff --git a/src/dist.hpp b/src/dist.hpp index 10613c11..005bb601 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -38,11 +38,26 @@ namespace zmq dist_t (); ~dist_t (); + // Adds the pipe to the distributor object. void attach (class pipe_t *pipe_); + + // Activates pipe that have previously reached high watermark. void activated (class pipe_t *pipe_); + + // Mark the pipe as matching. Subsequent call to send_to_matching + // will send message also to this pipe. + void match (class pipe_t *pipe_); + + // Removes the pipe from the distributor object. void terminated (class pipe_t *pipe_); - int send (class msg_t *msg_, int flags_); + // Send the message to all the outbound pipes. After the call all the + // pipes are marked as non-matching. + int send_to_matching (class msg_t *msg_, int flags_); + + // Send the message to the matching outbound pipes. + int send_to_all (class msg_t *msg_, int flags_); + bool has_out (); private: @@ -58,6 +73,9 @@ namespace zmq typedef array_t pipes_t; pipes_t pipes; + // Number of all the pipes to send the next message to. + pipes_t::size_type matching; + // Number of active pipes. All the active pipes are located at the // beginning of the pipes array. These are the pipes the messages // can be sent to at the moment. diff --git a/src/mtrie.cpp b/src/mtrie.cpp index 91f68522..fafac2d6 100644 --- a/src/mtrie.cpp +++ b/src/mtrie.cpp @@ -206,10 +206,21 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_, return next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_); } -void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_) +void zmq::mtrie_t::match (unsigned char *data_, size_t size_, + void (*func_) (pipe_t *pipe_, void *arg_), void *arg_) { - // Merge the subscriptions from this node to the resultset. - pipes_.insert (pipes.begin (), pipes.end ()); + match_helper (data_, size_, func_, arg_); +} + +void zmq::mtrie_t::match_helper (unsigned char *data_, size_t size_, + void (*func_) (pipe_t *pipe_, void *arg_), void *arg_) +{ + // TODO: This function is on critical path. Rewrite it as iteration + // rather than recursion. + + // Signal the pipes attached to this node. + for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); ++it) + func_ (*it, arg_); // If there are no subnodes in the trie, return. if (count == 0) @@ -217,14 +228,14 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_) // If there's one subnode (optimisation). if (count == 1) { - next.node->match (data_ + 1, size_ - 1, pipes_); + next.node->match (data_ + 1, size_ - 1, func_, arg_); return; } // If there are multiple subnodes. for (unsigned char c = 0; c != count; c++) { if (next.table [c]) - next.table [c]->match (data_ + 1, size_ - 1, pipes_); + next.table [c]->match (data_ + 1, size_ - 1, func_, arg_); } } diff --git a/src/mtrie.hpp b/src/mtrie.hpp index cd470295..68a3f2ce 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -35,8 +35,6 @@ namespace zmq { public: - typedef std::set pipes_t; - mtrie_t (); ~mtrie_t (); @@ -55,8 +53,9 @@ namespace zmq // actually removed rather than de-duplicated. bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); - // Get all matching pipes. - void match (unsigned char *data_, size_t size_, pipes_t &pipes_); + // Signal all the matching pipes. + void match (unsigned char *data_, size_t size_, + void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_); private: @@ -68,8 +67,12 @@ namespace zmq void *arg_); bool rm_helper (unsigned char *prefix_, size_t size_, class pipe_t *pipe_); + void match_helper (unsigned char *data_, size_t size_, + void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_); + typedef std::set pipes_t; pipes_t pipes; + unsigned char min; unsigned short count; union { diff --git a/src/xpub.cpp b/src/xpub.cpp index 4b416963..9078de37 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -91,9 +91,21 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_) dist.terminated (pipe_); } +void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_) +{ + xpub_t *self = (xpub_t*) arg_; + self->dist.match (pipe_); +} + int zmq::xpub_t::xsend (msg_t *msg_, int flags_) -{ - return dist.send (msg_, flags_); +{ + // Find the matching pipes. + subscriptions.match ((unsigned char*) msg_->data (), msg_->size (), + mark_as_matching, this); + + // Send the message to all the pipes that were marked as matching + // in the previous step. + return dist.send_to_matching (msg_, flags_); } bool zmq::xpub_t::xhas_out () diff --git a/src/xpub.hpp b/src/xpub.hpp index c5d64b5d..740d1e27 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -57,6 +57,9 @@ namespace zmq static void send_unsubscription (unsigned char *data_, size_t size_, void *arg_); + // Function to be applied to each matching pipes. + static void mark_as_matching (class pipe_t *pipe_, void *arg_); + // List of all subscriptions mapped to corresponding pipes. mtrie_t subscriptions; diff --git a/src/xsub.cpp b/src/xsub.cpp index 729f6a41..a847d7fc 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -87,13 +87,13 @@ int zmq::xsub_t::xsend (msg_t *msg_, int flags_) // Process the subscription. if (*data == 1) { if (subscriptions.add (data + 1, size - 1)) - return dist.send (msg_, flags_); + return dist.send_to_all (msg_, flags_); else return 0; } else if (*data == 0) { if (subscriptions.rm (data + 1, size - 1)) - return dist.send (msg_, flags_); + return dist.send_to_all (msg_, flags_); else return 0; }