libzmq/src/dist.cpp
Luca Boccassi da31917f4f Relicense from LGPL3 + exceptions to Mozilla Public License version 2.0
Relicense permission collected from all relevant authors as tallied at:
https://github.com/rlenferink/libzmq-relicense/blob/master/checklist.md
The relicense grants are collected under RELICENSE/ and will be moved
to the above repository in a later commit.

Fixes https://github.com/zeromq/libzmq/issues/2376
2023-06-05 20:31:47 +01:00

220 lines
5.5 KiB
C++

/* SPDX-License-Identifier: MPL-2.0 */
#include "precompiled.hpp"
#include "dist.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
#include "likely.hpp"
zmq::dist_t::dist_t () :
_matching (0), _active (0), _eligible (0), _more (false)
{
}
zmq::dist_t::~dist_t ()
{
zmq_assert (_pipes.empty ());
}
void zmq::dist_t::attach (pipe_t *pipe_)
{
// If we are in the middle of sending a message, we'll add new pipe
// into the list of eligible pipes. Otherwise we add it to the list
// of active pipes.
if (_more) {
_pipes.push_back (pipe_);
_pipes.swap (_eligible, _pipes.size () - 1);
_eligible++;
} else {
_pipes.push_back (pipe_);
_pipes.swap (_active, _pipes.size () - 1);
_active++;
_eligible++;
}
}
bool zmq::dist_t::has_pipe (pipe_t *pipe_)
{
std::size_t claimed_index = _pipes.index (pipe_);
// If pipe claims to be outside the available index space it can't be in the distributor.
if (claimed_index >= _pipes.size ()) {
return false;
}
return _pipes[claimed_index] == pipe_;
}
void zmq::dist_t::match (pipe_t *pipe_)
{
// If pipe is already matching do nothing.
if (_pipes.index (pipe_) < _matching)
return;
// If the pipe isn't eligible, ignore it.
if (_pipes.index (pipe_) >= _eligible)
return;
// Mark the pipe as matching.
_pipes.swap (_pipes.index (pipe_), _matching);
_matching++;
}
void zmq::dist_t::reverse_match ()
{
const pipes_t::size_type prev_matching = _matching;
// Reset matching to 0
unmatch ();
// Mark all matching pipes as not matching and vice-versa.
// To do this, push all pipes that are eligible but not
// matched - i.e. between "matching" and "eligible" -
// to the beginning of the queue.
for (pipes_t::size_type i = prev_matching; i < _eligible; ++i) {
_pipes.swap (i, _matching++);
}
}
void zmq::dist_t::unmatch ()
{
_matching = 0;
}
void zmq::dist_t::pipe_terminated (pipe_t *pipe_)
{
// Remove the pipe from the list; adjust number of matching, active and/or
// eligible pipes accordingly.
if (_pipes.index (pipe_) < _matching) {
_pipes.swap (_pipes.index (pipe_), _matching - 1);
_matching--;
}
if (_pipes.index (pipe_) < _active) {
_pipes.swap (_pipes.index (pipe_), _active - 1);
_active--;
}
if (_pipes.index (pipe_) < _eligible) {
_pipes.swap (_pipes.index (pipe_), _eligible - 1);
_eligible--;
}
_pipes.erase (pipe_);
}
void zmq::dist_t::activated (pipe_t *pipe_)
{
// Move the pipe from passive to eligible state.
if (_eligible < _pipes.size ()) {
_pipes.swap (_pipes.index (pipe_), _eligible);
_eligible++;
}
// If there's no message being sent at the moment, move it to
// the active state.
if (!_more && _active < _pipes.size ()) {
_pipes.swap (_eligible - 1, _active);
_active++;
}
}
int zmq::dist_t::send_to_all (msg_t *msg_)
{
_matching = _active;
return send_to_matching (msg_);
}
int zmq::dist_t::send_to_matching (msg_t *msg_)
{
// Is this end of a multipart message?
const bool msg_more = (msg_->flags () & msg_t::more) != 0;
// Push the message to matching pipes.
distribute (msg_);
// If multipart message is fully sent, activate all the eligible pipes.
if (!msg_more)
_active = _eligible;
_more = msg_more;
return 0;
}
void zmq::dist_t::distribute (msg_t *msg_)
{
// If there are no matching pipes available, simply drop the message.
if (_matching == 0) {
int rc = msg_->close ();
errno_assert (rc == 0);
rc = msg_->init ();
errno_assert (rc == 0);
return;
}
if (msg_->is_vsm ()) {
for (pipes_t::size_type i = 0; i < _matching;) {
if (!write (_pipes[i], msg_)) {
// Use same index again because entry will have been removed.
} else {
++i;
}
}
int rc = msg_->init ();
errno_assert (rc == 0);
return;
}
// Add matching-1 references to the message. We already hold one reference,
// that's why -1.
msg_->add_refs (static_cast<int> (_matching) - 1);
// Push copy of the message to each matching pipe.
int failed = 0;
for (pipes_t::size_type i = 0; i < _matching;) {
if (!write (_pipes[i], msg_)) {
++failed;
// Use same index again because entry will have been removed.
} else {
++i;
}
}
if (unlikely (failed))
msg_->rm_refs (failed);
// Detach the original message from the data buffer. Note that we don't
// close the message. That's because we've already used all the references.
const int rc = msg_->init ();
errno_assert (rc == 0);
}
bool zmq::dist_t::has_out ()
{
return true;
}
bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
{
if (!pipe_->write (msg_)) {
_pipes.swap (_pipes.index (pipe_), _matching - 1);
_matching--;
_pipes.swap (_pipes.index (pipe_), _active - 1);
_active--;
_pipes.swap (_active, _eligible - 1);
_eligible--;
return false;
}
if (!(msg_->flags () & msg_t::more))
pipe_->flush ();
return true;
}
bool zmq::dist_t::check_hwm ()
{
for (pipes_t::size_type i = 0; i < _matching; ++i)
if (!_pipes[i]->check_hwm ())
return false;
return true;
}