Fixed issue #525 - multipart upstreaming from xsub to xpub

This commit is contained in:
Pieter Hintjens
2013-04-18 17:23:57 +02:00
parent 9df7c70aba
commit f0cf4095b5
3 changed files with 24 additions and 18 deletions

View File

@@ -56,9 +56,8 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// There are some subscriptions waiting. Let's process them.
msg_t sub;
while (pipe_->read (&sub)) {
// Apply the subscription to the trie.
unsigned char *const data = (unsigned char*) sub.data ();
// Apply the subscription to the trie
unsigned char *const data = (unsigned char *) sub.data ();
const size_t size = sub.size ();
if (size > 0 && (*data == 0 || *data == 1)) {
bool unique;
@@ -69,13 +68,16 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
// If the subscription is not a duplicate store it so that it can be
// passed to used on next recv call. (Unsubscribe is not verbose.)
if (options.type == ZMQ_XPUB && (unique || (*data && verbose)))
pending.push_back (blob_t (data, size));
if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
pending_data.push_back (blob_t (data, size));
pending_flags.push_back (0);
}
}
else
else {
// Process user message coming upstream from xsub socket
pending.push_back (blob_t (data, size));
pending_data.push_back (blob_t (data, size));
pending_flags.push_back (sub.flags ());
}
sub.close ();
}
}
@@ -149,24 +151,27 @@ bool zmq::xpub_t::xhas_out ()
int zmq::xpub_t::xrecv (msg_t *msg_)
{
// If there is at least one
if (pending.empty ()) {
if (pending_data.empty ()) {
errno = EAGAIN;
return -1;
}
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init_size (pending.front ().size ());
rc = msg_->init_size (pending_data.front ().size ());
errno_assert (rc == 0);
memcpy (msg_->data (), pending.front ().data (),
pending.front ().size ());
pending.pop_front ();
memcpy (msg_->data (),
pending_data.front ().data (),
pending_data.front ().size ());
msg_->set_flags (pending_flags.front ());
pending_data.pop_front ();
pending_flags.pop_front ();
return 0;
}
bool zmq::xpub_t::xhas_in ()
{
return !pending.empty ();
return !pending_data.empty ();
}
void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
@@ -180,7 +185,8 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
blob_t unsub (size_ + 1, 0);
unsub [0] = 0;
memcpy (&unsub [1], data_, size_);
self->pending.push_back (unsub);
self->pending_data.push_back (unsub);
self->pending_flags.push_back (0);
}
}