mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-01 10:57:59 +01:00
Merge pull request #3659 from bluca/xpub_user_msg
Problem: can no longer send user data from XSUB to XPUB
This commit is contained in:
commit
826e7db7be
11
src/xpub.cpp
11
src/xpub.cpp
@ -104,13 +104,10 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
|
|||||||
data = static_cast<unsigned char *> (sub.command_body ());
|
data = static_cast<unsigned char *> (sub.command_body ());
|
||||||
size = sub.command_body_size ();
|
size = sub.command_body_size ();
|
||||||
subscribe = sub.is_subscribe ();
|
subscribe = sub.is_subscribe ();
|
||||||
} else if (sub.size () > 0) {
|
} else if (sub.size () > 0 && (*msg_data == 0 || *msg_data == 1)) {
|
||||||
unsigned char first = *msg_data;
|
data = msg_data + 1;
|
||||||
if (first == 0 || first == 1) {
|
size = sub.size () - 1;
|
||||||
data = msg_data + 1;
|
subscribe = *msg_data == 1;
|
||||||
size = sub.size () - 1;
|
|
||||||
subscribe = first == 1;
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// Process user message coming upstream from xsub socket
|
// Process user message coming upstream from xsub socket
|
||||||
_pending_data.push_back (blob_t (msg_data, sub.size ()));
|
_pending_data.push_back (blob_t (msg_data, sub.size ()));
|
||||||
|
@ -434,6 +434,28 @@ void test_unsubscribe_cleanup ()
|
|||||||
test_context_socket_close (sub);
|
test_context_socket_close (sub);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void test_user_message ()
|
||||||
|
{
|
||||||
|
// Create a publisher
|
||||||
|
void *pub = test_context_socket (ZMQ_XPUB);
|
||||||
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname"));
|
||||||
|
|
||||||
|
// Create a subscriber
|
||||||
|
void *sub = test_context_socket (ZMQ_XSUB);
|
||||||
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname"));
|
||||||
|
|
||||||
|
// Send some data that is neither sub nor unsub
|
||||||
|
const char subscription[] = {2, 'A', 0};
|
||||||
|
send_string_expect_success (sub, subscription, 0);
|
||||||
|
|
||||||
|
// Receive subscriptions from subscriber
|
||||||
|
recv_string_expect_success (pub, subscription, 0);
|
||||||
|
|
||||||
|
// Clean up.
|
||||||
|
test_context_socket_close (pub);
|
||||||
|
test_context_socket_close (sub);
|
||||||
|
}
|
||||||
|
|
||||||
int main ()
|
int main ()
|
||||||
{
|
{
|
||||||
setup_test_environment ();
|
setup_test_environment ();
|
||||||
@ -444,6 +466,7 @@ int main ()
|
|||||||
RUN_TEST (test_xpub_proxy_unsubscribe_on_disconnect);
|
RUN_TEST (test_xpub_proxy_unsubscribe_on_disconnect);
|
||||||
RUN_TEST (test_missing_subscriptions);
|
RUN_TEST (test_missing_subscriptions);
|
||||||
RUN_TEST (test_unsubscribe_cleanup);
|
RUN_TEST (test_unsubscribe_cleanup);
|
||||||
|
RUN_TEST (test_user_message);
|
||||||
|
|
||||||
return UNITY_END ();
|
return UNITY_END ();
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user