diff --git a/src/xpub.cpp b/src/xpub.cpp index afc0ba2a..9c4f1e2f 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -89,6 +89,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) // Apply the subscription to the trie unsigned char *const data = (unsigned char *) sub.data (); const size_t size = sub.size (); + metadata_t* metadata = sub.metadata(); if (size > 0 && (*data == 0 || *data == 1)) { if (manual) { @@ -100,7 +101,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) pending_pipes.push_back(pipe_); pending_data.push_back(blob_t(data, size)); - pending_metadata.push_back(sub.metadata()); + if (metadata) + metadata->add_ref(); + pending_metadata.push_back(metadata); pending_flags.push_back(0); } else @@ -117,7 +120,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || (*data == 0 && verbose_unsubs && verbose_subs))) { pending_data.push_back(blob_t(data, size)); - pending_metadata.push_back(sub.metadata()); + if (metadata) + metadata->add_ref(); + pending_metadata.push_back(metadata); pending_flags.push_back(0); } } @@ -125,7 +130,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) else { // Process user message coming upstream from xsub socket pending_data.push_back (blob_t (data, size)); - pending_metadata.push_back (sub.metadata ()); + if (metadata) + metadata->add_ref(); + pending_metadata.push_back (metadata); pending_flags.push_back (sub.flags ()); } sub.close (); @@ -280,6 +287,8 @@ int zmq::xpub_t::xrecv (msg_t *msg_) // set metadata only if there is some if (metadata_t* metadata = pending_metadata.front ()) { msg_->set_metadata (metadata); + // Remove ref corresponding to vector placement + metadata->drop_ref(); } msg_->set_flags (pending_flags.front ());