mirror of
https://github.com/zeromq/libzmq.git
synced 2025-07-03 17:17:14 +02:00
Merge pull request #4137 from tarmo/xpub-manual-subscription-race
Problem: XPUB socket allows manual subscription on terminated pipe
This commit is contained in:
commit
8432cc37f8
12
src/dist.cpp
12
src/dist.cpp
@ -64,6 +64,18 @@ void zmq::dist_t::attach (pipe_t *pipe_)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool zmq::dist_t::has_pipe (pipe_t *pipe_)
|
||||||
|
{
|
||||||
|
std::size_t claimed_index = _pipes.index (pipe_);
|
||||||
|
|
||||||
|
// If pipe claims to be outside the available index space it can't be in the distributor.
|
||||||
|
if (claimed_index >= _pipes.size ()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return _pipes[claimed_index] == pipe_;
|
||||||
|
}
|
||||||
|
|
||||||
void zmq::dist_t::match (pipe_t *pipe_)
|
void zmq::dist_t::match (pipe_t *pipe_)
|
||||||
{
|
{
|
||||||
// If pipe is already matching do nothing.
|
// If pipe is already matching do nothing.
|
||||||
|
@ -51,6 +51,9 @@ class dist_t
|
|||||||
// Adds the pipe to the distributor object.
|
// Adds the pipe to the distributor object.
|
||||||
void attach (zmq::pipe_t *pipe_);
|
void attach (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
|
// Checks if this pipe is present in the distributor.
|
||||||
|
bool has_pipe (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
// Activates pipe that have previously reached high watermark.
|
// Activates pipe that have previously reached high watermark.
|
||||||
void activated (zmq::pipe_t *pipe_);
|
void activated (zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
|
12
src/xpub.cpp
12
src/xpub.cpp
@ -272,6 +272,12 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
|
|||||||
// care of by the manual call above. subscriptions is the real mtrie,
|
// care of by the manual call above. subscriptions is the real mtrie,
|
||||||
// so the pipe must be removed from there or it will be left over.
|
// so the pipe must be removed from there or it will be left over.
|
||||||
_subscriptions.rm (pipe_, stub, static_cast<void *> (NULL), false);
|
_subscriptions.rm (pipe_, stub, static_cast<void *> (NULL), false);
|
||||||
|
|
||||||
|
// In case the pipe is currently set as last we must clear it to prevent
|
||||||
|
// subscriptions from being re-added.
|
||||||
|
if (pipe_ == _last_pipe) {
|
||||||
|
_last_pipe = NULL;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// Remove the pipe from the trie. If there are topics that nobody
|
// Remove the pipe from the trie. If there are topics that nobody
|
||||||
// is interested in anymore, send corresponding unsubscriptions
|
// is interested in anymore, send corresponding unsubscriptions
|
||||||
@ -348,6 +354,12 @@ int zmq::xpub_t::xrecv (msg_t *msg_)
|
|||||||
if (_manual && !_pending_pipes.empty ()) {
|
if (_manual && !_pending_pipes.empty ()) {
|
||||||
_last_pipe = _pending_pipes.front ();
|
_last_pipe = _pending_pipes.front ();
|
||||||
_pending_pipes.pop_front ();
|
_pending_pipes.pop_front ();
|
||||||
|
|
||||||
|
// If the distributor doesn't know about this pipe it must have already
|
||||||
|
// been terminated and thus we can't allow manual subscriptions.
|
||||||
|
if (_last_pipe != NULL && !_dist.has_pipe (_last_pipe)) {
|
||||||
|
_last_pipe = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int rc = msg_->close ();
|
int rc = msg_->close ();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user