diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index a61d727c..eb16ef0e 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -892,6 +892,18 @@ Default value:: -1 Applicable socket types:: all, when using VMCI transport +ZMQ_MULTICAST_LOOP: Retrieve multicast local loopback configuration +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Retrieve the current multicast loopback configuration. A value of `1` +means that the multicast packets sent on this socket will be looped +back to local listening interface. + +[horizontal] +Option value type:: int +Option value unit:: 0, 1 +Default value:: 1 +Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport + RETURN VALUE ------------ The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 6627322f..480a21e9 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -1273,6 +1273,17 @@ Default value:: -1 Applicable socket types:: all, when using VMCI transport +ZMQ_MULTICAST_LOOP: Control multicast local loopback +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +For multicast UDP sender sockets this option sets whether the data +sent should be looped back on local listening sockets. + +[horizontal] +Option value type:: int +Option value unit:: 0, 1 +Default value:: 1 +Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it diff --git a/include/zmq.h b/include/zmq.h index 418acc8f..7bb5fd87 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -596,6 +596,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread); #define ZMQ_ZAP_ENFORCE_DOMAIN 93 #define ZMQ_LOOPBACK_FASTPATH 94 #define ZMQ_METADATA 95 +#define ZMQ_MULTICAST_LOOP 96 /* DRAFT 0MQ socket events and monitoring */ /* Unspecified system errors during handshake. Event value is an errno. */ diff --git a/src/options.cpp b/src/options.cpp index 1eea4087..27702317 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -232,6 +232,7 @@ zmq::options_t::options_t () : use_fd (-1), zap_enforce_domain (false), loopback_fastpath (false), + multicast_loop (true), zero_copy (true) { memset (curve_public_key, 0, CURVE_KEYSIZE); @@ -726,6 +727,11 @@ int zmq::options_t::setsockopt (int option_, errno = EINVAL; return -1; break; + + case ZMQ_MULTICAST_LOOP: + return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_, + &multicast_loop); + default: #if defined(ZMQ_ACT_MILITANT) // There are valid scenarios for probing with unknown socket option @@ -1120,6 +1126,13 @@ int zmq::options_t::getsockopt (int option_, } break; + case ZMQ_MULTICAST_LOOP: + if (is_int) { + *value = multicast_loop; + return 0; + } + break; + default: #if defined(ZMQ_ACT_MILITANT) malformed = false; diff --git a/src/options.hpp b/src/options.hpp index 4228353a..00a0f219 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -257,6 +257,9 @@ struct options_t // Use of loopback fastpath. bool loopback_fastpath; + // Loop sent multicast packets to local sockets + bool multicast_loop; + // Use zero copy strategy for storing message content when decoding. bool zero_copy; diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index 19120e90..75091a3f 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -119,6 +119,29 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) const ip_addr_t *out = address->resolved.udp_addr->target_addr (); out_address = out->as_sockaddr (); out_addrlen = out->sockaddr_len (); + + if (out->is_multicast ()) { + int level; + int optname; + + if (out->family () == AF_INET6) { + level = IPPROTO_IPV6; + optname = IPV6_MULTICAST_LOOP; + } else { + level = IPPROTO_IP; + optname = IP_MULTICAST_LOOP; + } + + int loop = options.multicast_loop; + int rc = setsockopt (fd, level, optname, (char *) &loop, + sizeof (loop)); + +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif + } } else { /// XXX fixme ? out_address = (sockaddr *) &raw_address; diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 10734890..779faabc 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -54,6 +54,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_); #define ZMQ_ZAP_ENFORCE_DOMAIN 93 #define ZMQ_LOOPBACK_FASTPATH 94 #define ZMQ_METADATA 95 +#define ZMQ_MULTICAST_LOOP 96 /* DRAFT 0MQ socket events and monitoring */ /* Unspecified system errors during handshake. Event value is an errno. */ diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp index 6ae20da5..cd8b6dc3 100644 --- a/tests/test_radio_dish.cpp +++ b/tests/test_radio_dish.cpp @@ -251,6 +251,238 @@ void test_radio_dish_udp (int ipv6_) } MAKE_TEST_V4V6 (test_radio_dish_udp) +#define MCAST_IPV4 "226.8.5.5" +#define MCAST_IPV6 "ff02::7a65:726f:6df1:0a01" + +static const char *mcast_url (int ipv6_) +{ + if (ipv6_) { + return "udp://[" MCAST_IPV6 "]:5555"; + } else { + return "udp://[" MCAST_IPV4 "]:5555"; + } +} + +// OSX uses a different name for this socket option +#ifndef IPV6_ADD_MEMBERSHIP +#define IPV6_ADD_MEMBERSHIP IPV6_JOIN_GROUP +#endif + +// Test if multicast is available on this machine by attempting to +// send a receive a multicast datagram +static bool is_multicast_available (int ipv6_) +{ + int family = ipv6_ ? AF_INET6 : AF_INET; + int bind_sock = -1; + int send_sock = -1; + int port = 5555; + bool success = false; + const char *msg = "it works"; + char buf[32]; + struct sockaddr_storage any; + struct sockaddr_storage mcast; + socklen_t sl; + int rc; + + if (ipv6_) { + struct sockaddr_in6 *any_ipv6 = (struct sockaddr_in6 *) &any; + struct sockaddr_in6 *mcast_ipv6 = (struct sockaddr_in6 *) &mcast; + + any_ipv6->sin6_family = AF_INET6; + any_ipv6->sin6_port = htons (port); + any_ipv6->sin6_flowinfo = 0; + any_ipv6->sin6_scope_id = 0; + + rc = inet_pton (AF_INET6, "::", &any_ipv6->sin6_addr); + if (rc == 0) { + goto out; + } + + *mcast_ipv6 = *any_ipv6; + + rc = inet_pton (AF_INET6, MCAST_IPV6, &mcast_ipv6->sin6_addr); + if (rc == 0) { + goto out; + } + + sl = sizeof (*any_ipv6); + } else { + struct sockaddr_in *any_ipv4 = (struct sockaddr_in *) &any; + struct sockaddr_in *mcast_ipv4 = (struct sockaddr_in *) &mcast; + + any_ipv4->sin_family = AF_INET; + any_ipv4->sin_port = htons (5555); + + rc = inet_pton (AF_INET, "0.0.0.0", &any_ipv4->sin_addr); + if (rc == 0) { + goto out; + } + + *mcast_ipv4 = *any_ipv4; + + rc = inet_pton (AF_INET, MCAST_IPV4, &mcast_ipv4->sin_addr); + if (rc == 0) { + goto out; + } + + sl = sizeof (*any_ipv4); + } + + bind_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP); + if (bind_sock < 0) { + goto out; + } + + send_sock = socket (family, SOCK_DGRAM, IPPROTO_UDP); + if (bind_sock < 0) { + goto out; + } + + rc = bind (bind_sock, (struct sockaddr *) &any, sl); + if (rc < 0) { + goto out; + } + + if (ipv6_) { + struct ipv6_mreq mreq; + struct sockaddr_in6 *mcast_ipv6 = (struct sockaddr_in6 *) &mcast; + + mreq.ipv6mr_multiaddr = mcast_ipv6->sin6_addr; + mreq.ipv6mr_interface = 0; + + rc = setsockopt (bind_sock, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, &mreq, + sizeof (mreq)); + if (rc < 0) { + goto out; + } + + int loop = 1; + rc = setsockopt (send_sock, IPPROTO_IPV6, IPV6_MULTICAST_LOOP, &loop, + sizeof (loop)); + if (rc < 0) { + goto out; + } + } else { + struct ip_mreq mreq; + struct sockaddr_in *mcast_ipv4 = (struct sockaddr_in *) &mcast; + + mreq.imr_multiaddr = mcast_ipv4->sin_addr; + mreq.imr_interface.s_addr = htonl (INADDR_ANY); + + rc = setsockopt (bind_sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, + sizeof (mreq)); + if (rc < 0) { + goto out; + } + + int loop = 1; + rc = setsockopt (send_sock, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, + sizeof (loop)); + if (rc < 0) { + goto out; + } + } + + msleep (SETTLE_TIME); + + rc = + sendto (send_sock, msg, strlen (msg), 0, (struct sockaddr *) &mcast, sl); + if (rc < 0) { + goto out; + } + + msleep (SETTLE_TIME); + + rc = recvfrom (bind_sock, buf, sizeof (buf) - 1, 0, NULL, 0); + if (rc < 0) { + goto out; + } + + buf[rc] = '\0'; + + success = (strcmp (msg, buf) == 0); + +out: + if (bind_sock >= 0) { + close (bind_sock); + } + + if (send_sock >= 0) { + close (send_sock); + } + + return success; +} + +static void test_radio_dish_mcast (int ipv6_) +{ + void *radio = test_context_socket (ZMQ_RADIO); + void *dish = test_context_socket (ZMQ_DISH); + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int))); + + const char *url = mcast_url (ipv6_); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url)); + + msleep (SETTLE_TIME); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV")); + + msg_send_expect_success (radio, "TV", "Friends"); + msg_recv_cmp (dish, "TV", "Friends"); + + test_context_socket_close (dish); + test_context_socket_close (radio); +} +MAKE_TEST_V4V6 (test_radio_dish_mcast) + +static void test_radio_dish_no_loop (int ipv6_) +{ + void *radio = test_context_socket (ZMQ_RADIO); + void *dish = test_context_socket (ZMQ_DISH); + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (radio, ZMQ_IPV6, &ipv6_, sizeof (int))); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (dish, ZMQ_IPV6, &ipv6_, sizeof (int))); + + // Disable multicast loop + int loop = 0; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (radio, ZMQ_MULTICAST_LOOP, &loop, sizeof (int))); + + const char *url = mcast_url (ipv6_); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dish, url)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (radio, url)); + + msleep (SETTLE_TIME); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_join (dish, "TV")); + + msg_send_expect_success (radio, "TV", "Friends"); + + // Looping is disabled, we shouldn't receive anything + msleep (SETTLE_TIME); + zmq_msg_t msg; + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg)); + + int rc = zmq_msg_recv (&msg, dish, ZMQ_DONTWAIT); + zmq_msg_close (&msg); + + TEST_ASSERT_EQUAL_INT (rc, -1); + TEST_ASSERT_EQUAL_INT (errno, EAGAIN); + + test_context_socket_close (dish); + test_context_socket_close (radio); +} +MAKE_TEST_V4V6 (test_radio_dish_no_loop) + int main (void) { setup_test_environment (); @@ -268,5 +500,18 @@ int main (void) RUN_TEST (test_radio_dish_udp_ipv4); RUN_TEST (test_radio_dish_udp_ipv6); + bool ipv4_mcast = is_multicast_available (false); + bool ipv6_mcast = is_ipv6_available () && is_multicast_available (true); + + if (ipv4_mcast) { + RUN_TEST (test_radio_dish_mcast_ipv4); + RUN_TEST (test_radio_dish_no_loop_ipv4); + } + + if (ipv6_mcast) { + RUN_TEST (test_radio_dish_mcast_ipv6); + RUN_TEST (test_radio_dish_no_loop_ipv6); + } + return UNITY_END (); }