mirror of
https://github.com/zeromq/libzmq.git
synced 2025-07-01 08:23:32 +02:00
Publisher-side filtering for multi-part messages fixed
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
parent
bd86def1c7
commit
e080e3e8b6
@ -70,6 +70,11 @@ void zmq::dist_t::match (pipe_t *pipe_)
|
|||||||
matching++;
|
matching++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void zmq::dist_t::unmatch ()
|
||||||
|
{
|
||||||
|
matching = 0;
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::dist_t::terminated (pipe_t *pipe_)
|
void zmq::dist_t::terminated (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// Remove the pipe from the list; adjust number of matching, active and/or
|
// Remove the pipe from the list; adjust number of matching, active and/or
|
||||||
@ -115,9 +120,6 @@ int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
|
|||||||
if (!msg_more)
|
if (!msg_more)
|
||||||
active = eligible;
|
active = eligible;
|
||||||
|
|
||||||
// Mark all the pipes as non-matching.
|
|
||||||
matching = 0;
|
|
||||||
|
|
||||||
more = msg_more;
|
more = msg_more;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -48,14 +48,16 @@ namespace zmq
|
|||||||
// will send message also to this pipe.
|
// will send message also to this pipe.
|
||||||
void match (class pipe_t *pipe_);
|
void match (class pipe_t *pipe_);
|
||||||
|
|
||||||
|
// Mark all pipes as non-matching.
|
||||||
|
void unmatch ();
|
||||||
|
|
||||||
// Removes the pipe from the distributor object.
|
// Removes the pipe from the distributor object.
|
||||||
void terminated (class pipe_t *pipe_);
|
void terminated (class pipe_t *pipe_);
|
||||||
|
|
||||||
// Send the message to all the outbound pipes. After the call all the
|
// Send the message to the matching outbound pipes.
|
||||||
// pipes are marked as non-matching.
|
|
||||||
int send_to_matching (class msg_t *msg_, int flags_);
|
int send_to_matching (class msg_t *msg_, int flags_);
|
||||||
|
|
||||||
// Send the message to the matching outbound pipes.
|
// Send the message to all the outbound pipes.
|
||||||
int send_to_all (class msg_t *msg_, int flags_);
|
int send_to_all (class msg_t *msg_, int flags_);
|
||||||
|
|
||||||
bool has_out ();
|
bool has_out ();
|
||||||
|
21
src/xpub.cpp
21
src/xpub.cpp
@ -26,7 +26,8 @@
|
|||||||
#include "msg.hpp"
|
#include "msg.hpp"
|
||||||
|
|
||||||
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
|
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
|
||||||
socket_base_t (parent_, tid_)
|
socket_base_t (parent_, tid_),
|
||||||
|
more (false)
|
||||||
{
|
{
|
||||||
options.type = ZMQ_XPUB;
|
options.type = ZMQ_XPUB;
|
||||||
}
|
}
|
||||||
@ -99,13 +100,27 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
|
|||||||
|
|
||||||
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
|
int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
|
||||||
{
|
{
|
||||||
// Find the matching pipes.
|
bool msg_more = msg_->flags () & msg_t::more;
|
||||||
|
|
||||||
|
// For the first part of multi-part message, find the matching pipes.
|
||||||
|
if (!more)
|
||||||
subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
|
subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
|
||||||
mark_as_matching, this);
|
mark_as_matching, this);
|
||||||
|
|
||||||
// Send the message to all the pipes that were marked as matching
|
// Send the message to all the pipes that were marked as matching
|
||||||
// in the previous step.
|
// in the previous step.
|
||||||
return dist.send_to_matching (msg_, flags_);
|
int rc = dist.send_to_matching (msg_, flags_);
|
||||||
|
if (rc != 0)
|
||||||
|
return rc;
|
||||||
|
|
||||||
|
// If we are at the end of multi-part message we can mark all the pipes
|
||||||
|
// as non-matching.
|
||||||
|
if (!msg_more)
|
||||||
|
dist.unmatch ();
|
||||||
|
|
||||||
|
more = msg_more;
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool zmq::xpub_t::xhas_out ()
|
bool zmq::xpub_t::xhas_out ()
|
||||||
|
@ -66,6 +66,9 @@ namespace zmq
|
|||||||
// Distributor of messages holding the list of outbound pipes.
|
// Distributor of messages holding the list of outbound pipes.
|
||||||
dist_t dist;
|
dist_t dist;
|
||||||
|
|
||||||
|
// True if we are in the middle of sending a multi-part message.
|
||||||
|
bool more;
|
||||||
|
|
||||||
// List of pending (un)subscriptions, ie. those that were already
|
// List of pending (un)subscriptions, ie. those that were already
|
||||||
// applied to the trie, but not yet received by the user.
|
// applied to the trie, but not yet received by the user.
|
||||||
typedef std::deque <blob_t> pending_t;
|
typedef std::deque <blob_t> pending_t;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user