mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-15 03:21:11 +01:00
Problem: lingering subscriptions on XPUB sockets (#1566)
The patch fixes lingering subscriptions that occur upon disconnection on XPUB sockets with option XPUB_MANUAL when used in a XPUB-XSUB proxies.
This commit is contained in:
parent
9e8e81cf9a
commit
dd35e1db0f
13
src/xpub.cpp
13
src/xpub.cpp
@ -153,11 +153,17 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
manual = (*static_cast <const int*> (optval_) != 0);
|
manual = (*static_cast <const int*> (optval_) != 0);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
if (option_ == ZMQ_SUBSCRIBE && manual && last_pipe != NULL)
|
if (option_ == ZMQ_SUBSCRIBE && manual) {
|
||||||
|
if (last_pipe != NULL) {
|
||||||
subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe);
|
subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe);
|
||||||
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
if (option_ == ZMQ_UNSUBSCRIBE && manual && last_pipe != NULL)
|
if (option_ == ZMQ_UNSUBSCRIBE && manual) {
|
||||||
|
if (last_pipe != NULL) {
|
||||||
subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe);
|
subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe);
|
||||||
|
}
|
||||||
|
}
|
||||||
else
|
else
|
||||||
if (option_ == ZMQ_XPUB_WELCOME_MSG) {
|
if (option_ == ZMQ_XPUB_WELCOME_MSG) {
|
||||||
welcome_msg.close();
|
welcome_msg.close();
|
||||||
@ -183,7 +189,7 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
|
|||||||
// Remove the pipe from the trie. If there are topics that nobody
|
// Remove the pipe from the trie. If there are topics that nobody
|
||||||
// is interested in anymore, send corresponding unsubscriptions
|
// is interested in anymore, send corresponding unsubscriptions
|
||||||
// upstream.
|
// upstream.
|
||||||
subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs);
|
subscriptions.rm (pipe_, send_unsubscription, this, !(verbose_unsubs || manual));
|
||||||
|
|
||||||
dist.pipe_terminated (pipe_);
|
dist.pipe_terminated (pipe_);
|
||||||
}
|
}
|
||||||
@ -274,6 +280,7 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
|
|||||||
unsub [0] = 0;
|
unsub [0] = 0;
|
||||||
if (size_ > 0)
|
if (size_ > 0)
|
||||||
memcpy (&unsub [1], data_, size_);
|
memcpy (&unsub [1], data_, size_);
|
||||||
|
self->last_pipe = NULL;
|
||||||
self->pending_data.push_back (unsub);
|
self->pending_data.push_back (unsub);
|
||||||
self->pending_metadata.push_back (NULL);
|
self->pending_metadata.push_back (NULL);
|
||||||
self->pending_flags.push_back (0);
|
self->pending_flags.push_back (0);
|
||||||
|
@ -29,9 +29,8 @@
|
|||||||
|
|
||||||
#include "testutil.hpp"
|
#include "testutil.hpp"
|
||||||
|
|
||||||
int main (void)
|
int test_basic()
|
||||||
{
|
{
|
||||||
setup_test_environment();
|
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
assert (ctx);
|
assert (ctx);
|
||||||
|
|
||||||
@ -90,3 +89,150 @@ int main (void)
|
|||||||
|
|
||||||
return 0 ;
|
return 0 ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int test_xpub_proxy_unsubscribe_on_disconnect()
|
||||||
|
{
|
||||||
|
const char* frontend = "ipc://frontend";
|
||||||
|
const char* backend = "ipc://backend";
|
||||||
|
const char* topic = "1";
|
||||||
|
const char* payload = "X";
|
||||||
|
|
||||||
|
int manual = 1;
|
||||||
|
|
||||||
|
void *ctx = zmq_ctx_new ();
|
||||||
|
assert (ctx);
|
||||||
|
|
||||||
|
// proxy frontend
|
||||||
|
void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB);
|
||||||
|
assert (xsub_proxy);
|
||||||
|
assert (zmq_bind (xsub_proxy, frontend) == 0);
|
||||||
|
|
||||||
|
// proxy backend
|
||||||
|
void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB);
|
||||||
|
assert (xpub_proxy);
|
||||||
|
assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0);
|
||||||
|
assert (zmq_bind (xpub_proxy, backend) == 0);
|
||||||
|
|
||||||
|
// publisher
|
||||||
|
void *pub = zmq_socket (ctx, ZMQ_PUB);
|
||||||
|
assert (zmq_connect (pub, frontend) == 0);
|
||||||
|
|
||||||
|
// first subscriber subscribes
|
||||||
|
void *sub1 = zmq_socket (ctx, ZMQ_SUB);
|
||||||
|
assert (sub1);
|
||||||
|
assert (zmq_connect (sub1, backend) == 0);
|
||||||
|
assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic, 1) == 0);
|
||||||
|
|
||||||
|
// wait
|
||||||
|
assert (zmq_poll (0, 0, 100) == 0);
|
||||||
|
|
||||||
|
// proxy reroutes and confirms subscriptions
|
||||||
|
char sub_buff[2];
|
||||||
|
assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2);
|
||||||
|
assert (sub_buff [0] == 1);
|
||||||
|
assert (sub_buff [1] == *topic);
|
||||||
|
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic, 1) == 0);
|
||||||
|
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
|
||||||
|
|
||||||
|
// second subscriber subscribes
|
||||||
|
void *sub2 = zmq_socket (ctx, ZMQ_SUB);
|
||||||
|
assert (sub2);
|
||||||
|
assert (zmq_connect (sub2, backend) == 0);
|
||||||
|
assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic, 1) == 0);
|
||||||
|
|
||||||
|
// wait
|
||||||
|
assert (zmq_poll (0, 0, 100) == 0);
|
||||||
|
|
||||||
|
// proxy reroutes
|
||||||
|
assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2);
|
||||||
|
assert (sub_buff [0] == 1);
|
||||||
|
assert (sub_buff [1] == *topic);
|
||||||
|
assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic, 1) == 0);
|
||||||
|
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
|
||||||
|
|
||||||
|
// wait
|
||||||
|
assert (zmq_poll (0, 0, 100) == 0);
|
||||||
|
|
||||||
|
// let publisher send a msg
|
||||||
|
assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
|
||||||
|
assert (zmq_send (pub, payload, 1, 0) == 1);
|
||||||
|
|
||||||
|
// wait
|
||||||
|
assert (zmq_poll (0, 0, 100) == 0);
|
||||||
|
|
||||||
|
// proxy reroutes data messages to subscribers
|
||||||
|
char topic_buff[1];
|
||||||
|
char data_buff[1];
|
||||||
|
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||||
|
assert (topic_buff [0] == *topic);
|
||||||
|
assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||||
|
assert (data_buff [0] == *payload);
|
||||||
|
assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1);
|
||||||
|
assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1);
|
||||||
|
|
||||||
|
// wait
|
||||||
|
assert (zmq_poll (0, 0, 100) == 0);
|
||||||
|
|
||||||
|
// each subscriber should now get a message
|
||||||
|
assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||||
|
assert (topic_buff [0] == *topic);
|
||||||
|
assert (zmq_recv (sub2, data_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||||
|
assert (data_buff [0] == *payload);
|
||||||
|
|
||||||
|
assert (zmq_recv (sub1, topic_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||||
|
assert (topic_buff [0] == *topic);
|
||||||
|
assert (zmq_recv (sub1, data_buff, 1, ZMQ_DONTWAIT) == 1);
|
||||||
|
assert (data_buff [0] == *payload);
|
||||||
|
|
||||||
|
// Disconnect both subscribers
|
||||||
|
assert (zmq_close (sub1) == 0);
|
||||||
|
assert (zmq_close (sub2) == 0);
|
||||||
|
|
||||||
|
// wait
|
||||||
|
assert (zmq_poll (0, 0, 100) == 0);
|
||||||
|
|
||||||
|
// unsubscribe messages are passed from proxy to publisher
|
||||||
|
assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2);
|
||||||
|
assert (sub_buff [0] == 0);
|
||||||
|
assert (sub_buff [1] == *topic);
|
||||||
|
assert (zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic, 1) == 0);
|
||||||
|
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
|
||||||
|
|
||||||
|
// should receive another unsubscribe msg
|
||||||
|
assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2
|
||||||
|
&& "Should receive the second unsubscribe message.");
|
||||||
|
assert (sub_buff [0] == 0);
|
||||||
|
assert (sub_buff [1] == *topic);
|
||||||
|
assert (zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic, 1) == 0);
|
||||||
|
assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2);
|
||||||
|
|
||||||
|
// wait
|
||||||
|
assert (zmq_poll (0, 0, 100) == 0);
|
||||||
|
|
||||||
|
// let publisher send a msg
|
||||||
|
assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1);
|
||||||
|
assert (zmq_send (pub, payload, 1, 0) == 1);
|
||||||
|
|
||||||
|
// wait
|
||||||
|
assert (zmq_poll (0, 0, 100) == 0);
|
||||||
|
|
||||||
|
// nothing should come to the proxy
|
||||||
|
assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == -1);
|
||||||
|
assert (errno == EAGAIN);
|
||||||
|
|
||||||
|
assert (zmq_close (pub) == 0);
|
||||||
|
assert (zmq_close (xpub_proxy) == 0);
|
||||||
|
assert (zmq_close (xsub_proxy) == 0);
|
||||||
|
assert (zmq_ctx_term (ctx) == 0);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(void)
|
||||||
|
{
|
||||||
|
setup_test_environment ();
|
||||||
|
test_basic ();
|
||||||
|
test_xpub_proxy_unsubscribe_on_disconnect ();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user