From 5ebfd1728f2276b9aa4f3c1bac6b489d7eae20ae Mon Sep 17 00:00:00 2001 From: somdoron Date: Fri, 29 Jan 2016 21:17:11 +0200 Subject: [PATCH] make udp support for radio-dish --- .gitignore | 1 + CMakeLists.txt | 4 +- Makefile.am | 9 +- src/address.cpp | 10 ++ src/address.hpp | 2 + src/radio.cpp | 13 ++- src/radio.hpp | 5 + src/session_base.cpp | 29 +++++- src/socket_base.cpp | 24 ++++- src/udp_address.cpp | 166 ++++++++++++++++++++++++++++++++ src/udp_address.hpp | 84 ++++++++++++++++ src/udp_engine.cpp | 222 +++++++++++++++++++++++++++++++++++++++++++ src/udp_engine.hpp | 62 ++++++++++++ tests/test_udp.cpp | 121 +++++++++++++++++++++++ 14 files changed, 744 insertions(+), 8 deletions(-) create mode 100644 src/udp_address.cpp create mode 100644 src/udp_address.hpp create mode 100644 src/udp_engine.cpp create mode 100644 src/udp_engine.hpp create mode 100644 tests/test_udp.cpp diff --git a/.gitignore b/.gitignore index 31374c67..1a47b83e 100644 --- a/.gitignore +++ b/.gitignore @@ -123,6 +123,7 @@ test_stream_exceeds_buffer test_poller test_timers test_radio_dish +test_udp test_large_msg tests/test*.log tests/test*.trs diff --git a/CMakeLists.txt b/CMakeLists.txt index f6eb3c35..8ba88e32 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -460,7 +460,9 @@ set(cxx-sources timers.cpp config.hpp radio.cpp - dish.cpp) + dish.cpp + udp_engine.cpp + udp_address.cpp) set(rc-sources version.rc) diff --git a/Makefile.am b/Makefile.am index f92ae23c..586a8fa6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -193,6 +193,10 @@ src_libzmq_la_SOURCES = \ src/tipc_listener.hpp \ src/trie.cpp \ src/trie.hpp \ + src/udp_address.cpp \ + src/udp_address.hpp \ + src/udp_engine.cpp \ + src/udp_engine.hpp \ src/v1_decoder.cpp \ src/v1_decoder.hpp \ src/v2_decoder.cpp \ @@ -385,7 +389,8 @@ test_apps = \ tests/test_stream_exceeds_buffer \ tests/test_poller \ tests/test_timers \ - tests/test_radio_dish + tests/test_radio_dish \ + tests/test_udp tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_LDADD = src/libzmq.la @@ -605,6 +610,8 @@ tests_test_timers_LDADD = src/libzmq.la tests_test_radio_dish_SOURCES = tests/test_radio_dish.cpp tests_test_radio_dish_LDADD = src/libzmq.la +tests_test_udp_SOURCES = tests/test_udp.cpp +tests_test_udp_LDADD = src/libzmq.la if !ON_MINGW if !ON_CYGWIN diff --git a/src/address.cpp b/src/address.cpp index cc42deef..9c2096a9 100644 --- a/src/address.cpp +++ b/src/address.cpp @@ -33,6 +33,7 @@ #include "ctx.hpp" #include "err.hpp" #include "tcp_address.hpp" +#include "udp_address.hpp" #include "ipc_address.hpp" #include "tipc_address.hpp" @@ -59,6 +60,11 @@ zmq::address_t::~address_t () LIBZMQ_DELETE(resolved.tcp_addr); } } + if (protocol == "udp") { + if (resolved.udp_addr) { + LIBZMQ_DELETE(resolved.udp_addr); + } + } #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS else if (protocol == "ipc") { @@ -91,6 +97,10 @@ int zmq::address_t::to_string (std::string &addr_) const if (resolved.tcp_addr) return resolved.tcp_addr->to_string (addr_); } + if (protocol == "udp") { + if (resolved.udp_addr) + return resolved.udp_addr->to_string (addr_); + } #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS else if (protocol == "ipc") { diff --git a/src/address.hpp b/src/address.hpp index 578f025b..61feb428 100644 --- a/src/address.hpp +++ b/src/address.hpp @@ -36,6 +36,7 @@ namespace zmq { class ctx_t; class tcp_address_t; + class udp_address_t; #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS class ipc_address_t; #endif @@ -57,6 +58,7 @@ namespace zmq // Protocol specific resolved address union { tcp_address_t *tcp_addr; + udp_address_t *udp_addr; #if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS ipc_address_t *ipc_addr; #endif diff --git a/src/radio.cpp b/src/radio.cpp index 51e654c9..8623d460 100644 --- a/src/radio.cpp +++ b/src/radio.cpp @@ -57,9 +57,12 @@ void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) dist.attach (pipe_); + if (subscribe_to_all_) + udp_pipes.push_back (pipe_); // The pipe is active when attached. Let's read the subscriptions from // it, if any. - xread_activated (pipe_); + else + xread_activated (pipe_); } void zmq::radio_t::xread_activated (pipe_t *pipe_) @@ -102,6 +105,11 @@ void zmq::radio_t::xpipe_terminated (pipe_t *pipe_) } } + udp_pipes_t::iterator it = std::find(udp_pipes.begin(), + udp_pipes.end (), pipe_); + if (it != udp_pipes.end ()) + udp_pipes.erase (it); + dist.pipe_terminated (pipe_); } @@ -121,6 +129,9 @@ int zmq::radio_t::xsend (msg_t *msg_) for (subscriptions_t::iterator it = range.first; it != range.second; ++it) dist.match (it-> second); + for (udp_pipes_t::iterator it = udp_pipes.begin (); it != udp_pipes.end (); ++it) + dist.match (*it); + int rc = dist.send_to_matching (msg_); return rc; diff --git a/src/radio.hpp b/src/radio.hpp index 8e76a550..eb0dbf58 100644 --- a/src/radio.hpp +++ b/src/radio.hpp @@ -32,6 +32,7 @@ #include #include +#include #include "socket_base.hpp" #include "session_base.hpp" @@ -70,6 +71,10 @@ namespace zmq typedef std::multimap subscriptions_t; subscriptions_t subscriptions; + // List of udp pipes + typedef std::vector udp_pipes_t; + udp_pipes_t udp_pipes; + // Distributor of messages holding the list of outbound pipes. dist_t dist; diff --git a/src/session_base.cpp b/src/session_base.cpp index 515f2947..9da1b656 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -42,6 +42,7 @@ #include "pgm_receiver.hpp" #include "address.hpp" #include "norm_engine.hpp" +#include "udp_engine.hpp" #include "ctx.hpp" #include "req.hpp" @@ -498,7 +499,7 @@ void zmq::session_base_t::reconnect () // and reestablish later on if (pipe && options.immediate == 1 && addr->protocol != "pgm" && addr->protocol != "epgm" - && addr->protocol != "norm") { + && addr->protocol != "norm" && addr->protocol != "udp") { pipe->hiccup (); pipe->terminate (false); terminating_pipes.insert (pipe); @@ -567,6 +568,32 @@ void zmq::session_base_t::start_connecting (bool wait_) } #endif +if (addr->protocol == "udp") { + zmq_assert (options.type == ZMQ_DISH || options.type == ZMQ_RADIO); + + udp_engine_t* engine = new (std::nothrow) udp_engine_t (); + alloc_assert (engine); + + bool recv = false; + bool send = false; + + if (options.type == ZMQ_RADIO) { + send = true; + recv = false; + } + else if (options.type == ZMQ_DISH) { + send = false; + recv = true; + } + + int rc = engine->init (addr, send, recv); + errno_assert (rc == 0); + + send_attach (this, engine); + + return; +} + #ifdef ZMQ_HAVE_OPENPGM // Both PGM and EPGM transports are using the same infrastructure. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 7ff71106..8df6fec5 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -65,6 +65,7 @@ #include "address.hpp" #include "ipc_address.hpp" #include "tcp_address.hpp" +#include "udp_address.hpp" #include "tipc_address.hpp" #include "mailbox.hpp" #include "mailbox_safe.hpp" @@ -260,7 +261,8 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) && protocol_ != "epgm" && protocol_ != "tipc" && protocol_ != "norm" - && protocol_ != "vmci") { + && protocol_ != "vmci" + && protocol_ != "udp") { errno = EPROTONOSUPPORT; return -1; } @@ -314,6 +316,9 @@ int zmq::socket_base_t::check_protocol (const std::string &protocol_) return -1; } + if (protocol_ == "udp" && (options.type != ZMQ_DISH && options.type != ZMQ_RADIO)) + return -1; + // Protocol is available. return 0; } @@ -556,9 +561,9 @@ int zmq::socket_base_t::bind (const char *addr_) return rc; } - if (protocol == "pgm" || protocol == "epgm" || protocol == "norm") { + if (protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp") { // For convenience's sake, bind can be used interchangeable with - // connect for PGM, EPGM and NORM transports. + // connect for PGM, EPGM, NORM and UDP transports. EXIT_MUTEX (); rc = connect (addr_); if (rc != -1) @@ -878,6 +883,17 @@ int zmq::socket_base_t::connect (const char *addr_) } #endif +if (protocol == "udp") { + paddr->resolved.udp_addr = new (std::nothrow) udp_address_t (); + alloc_assert (paddr->resolved.udp_addr); + int rc = paddr->resolved.udp_addr->resolve (address.c_str()); + if (rc != 0) { + LIBZMQ_DELETE(paddr); + EXIT_MUTEX (); + return -1; + } +} + // TBD - Should we check address for ZMQ_HAVE_NORM??? #ifdef ZMQ_HAVE_OPENPGM @@ -927,7 +943,7 @@ int zmq::socket_base_t::connect (const char *addr_) // PGM does not support subscription forwarding; ask for all data to be // sent to this pipe. (same for NORM, currently?) - bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm"; + bool subscribe_to_all = protocol == "pgm" || protocol == "epgm" || protocol == "norm" || protocol == "udp"; pipe_t *newpipe = NULL; if (options.immediate != 1 || subscribe_to_all) { diff --git a/src/udp_address.cpp b/src/udp_address.cpp new file mode 100644 index 00000000..cfa48e45 --- /dev/null +++ b/src/udp_address.cpp @@ -0,0 +1,166 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include + +#include "macros.hpp" +#include "udp_address.hpp" +#include "platform.hpp" +#include "stdint.hpp" +#include "err.hpp" +#include "ip.hpp" + +#ifdef ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#endif + +zmq::udp_address_t::udp_address_t () +{ + memset (&bind_address, 0, sizeof bind_address); +} + +zmq::udp_address_t::~udp_address_t () +{ +} + +int zmq::udp_address_t::resolve (const char *name_) +{ + // Find the ':' at end that separates address from the port number. + const char *delimiter = strrchr (name_, ':'); + if (!delimiter) { + errno = EINVAL; + return -1; + } + + // Separate the address/port. + std::string addr_str (name_, delimiter - name_); + std::string port_str (delimiter + 1); + + // Remove square brackets around the address, if any, as used in IPv6 + if (addr_str.size () >= 2 && addr_str [0] == '[' && + addr_str [addr_str.size () - 1] == ']') + addr_str = addr_str.substr (1, addr_str.size () - 2); + + // Parse the port number (0 is not a valid port). + uint16_t port = (uint16_t) atoi (port_str.c_str ()); + if (port == 0) { + errno = EINVAL; + return -1; + } + + dest_address.sin_family = AF_INET; + dest_address.sin_port = htons (port); + dest_address.sin_addr.s_addr = inet_addr (addr_str.c_str ()); + + if (dest_address.sin_addr.s_addr == INADDR_NONE) { + errno = EINVAL; + return -1; + } + + // we will check only first byte of IP + // and if it from 224 to 239, then it can + // represent multicast IP. + int i = dest_address.sin_addr.s_addr & 0xFF; + if(i >= 224 && i <= 239) { + multicast = dest_address.sin_addr; + is_mutlicast = true; + } + else + is_mutlicast = false; + + interface.s_addr = htons (INADDR_ANY); + if (interface.s_addr == INADDR_NONE) { + errno = EINVAL; + return -1; + } + + bind_address.sin_family = AF_INET; + bind_address.sin_port = htons (port); + bind_address.sin_addr.s_addr = htons (INADDR_ANY); + + address = name_; + + return 0; +} + +int zmq::udp_address_t::to_string (std::string &addr_) +{ + addr_ = address; + return 0; +} + +bool zmq::udp_address_t::is_mcast () const +{ + return is_mutlicast; +} + +const sockaddr* zmq::udp_address_t::bind_addr () const +{ + return (sockaddr *) &bind_address; +} + +socklen_t zmq::udp_address_t::bind_addrlen () const +{ + return sizeof (sockaddr_in); +} + +const sockaddr* zmq::udp_address_t::dest_addr () const +{ + return (sockaddr *) &dest_address; +} + +socklen_t zmq::udp_address_t::dest_addrlen () const +{ + return sizeof (sockaddr_in); +} + +const in_addr zmq::udp_address_t::multicast_ip () const +{ + return multicast; +} + +const in_addr zmq::udp_address_t::interface_ip () const +{ + return interface; +} + +#if defined ZMQ_HAVE_WINDOWS +unsigned short zmq::udp_address_t::family () const +#else +sa_family_t zmq::udp_address_t::family () const +#endif +{ + return AF_INET; +} diff --git a/src/udp_address.hpp b/src/udp_address.hpp new file mode 100644 index 00000000..676bc645 --- /dev/null +++ b/src/udp_address.hpp @@ -0,0 +1,84 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_UDP_ADDRESS_HPP_INCLUDED__ +#define __ZMQ_UDP_ADDRESS_HPP_INCLUDED__ + +#include "platform.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#endif + +namespace zmq +{ + + class udp_address_t + { + public: + + udp_address_t (); + virtual ~udp_address_t (); + + int resolve (const char *name_); + + // The opposite to resolve() + virtual int to_string (std::string &addr_); + +#if defined ZMQ_HAVE_WINDOWS + unsigned short family () const; +#else + sa_family_t family () const; +#endif + const sockaddr *bind_addr () const; + socklen_t bind_addrlen () const; + + const sockaddr *dest_addr () const; + socklen_t dest_addrlen () const; + + bool is_mcast () const; + + const in_addr multicast_ip () const; + const in_addr interface_ip () const; + + private: + + in_addr multicast; + in_addr interface; + sockaddr_in bind_address; + sockaddr_in dest_address; + bool is_mutlicast; + std::string address; + }; +} + +#endif diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp new file mode 100644 index 00000000..e45f8c12 --- /dev/null +++ b/src/udp_engine.cpp @@ -0,0 +1,222 @@ +#include "platform.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" +#else +#include +#include +#include +#include +#endif + +#include "udp_engine.hpp" +#include "session_base.hpp" +#include "v2_protocol.hpp" +#include "err.hpp" +#include "ip.hpp" + +zmq::udp_engine_t::udp_engine_t() : + plugged (false), + session(NULL) +{ +} + +zmq::udp_engine_t::~udp_engine_t() +{ + zmq_assert (!plugged); + + if (fd != retired_fd) { +#ifdef ZMQ_HAVE_WINDOWS + int rc = closesocket (fd); + wsa_assert (rc != SOCKET_ERROR); +#else + int rc = close (fd); + errno_assert (rc == 0); +#endif + fd = retired_fd; + } +} + +int zmq::udp_engine_t::init (address_t* address_, bool send_, bool recv_) +{ + zmq_assert (address_); + zmq_assert (send_ || recv_); + send_enabled = send_; + recv_enabled = recv_; + address = address_; + + fd = open_socket (address->resolved.udp_addr->family (), SOCK_DGRAM, IPPROTO_UDP); + if (fd == retired_fd) + return -1; + + unblock_socket (fd); + + return 0; +} + +void zmq::udp_engine_t::plug (io_thread_t* io_thread_, session_base_t *session_) +{ + zmq_assert (!plugged); + plugged = true; + + zmq_assert (!session); + zmq_assert (session_); + session = session_; + + // Connect to I/O threads poller object. + io_object_t::plug (io_thread_); + handle = add_fd (fd); + + if (send_enabled) + set_pollout (handle); + + if (recv_enabled) { + int on = 1; + int rc = setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, (char *) &on, sizeof (on)); + #ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); + #else + errno_assert (rc == 0); + #endif + + rc = bind (fd, address->resolved.udp_addr->bind_addr (), address->resolved.udp_addr->bind_addrlen ()); + #ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); + #else + errno_assert (rc == 0); + #endif + + if (address->resolved.udp_addr->is_mcast ()) { + struct ip_mreq mreq; + mreq.imr_multiaddr = address->resolved.udp_addr->multicast_ip (); + mreq.imr_interface = address->resolved.udp_addr->interface_ip (); + + int rc = setsockopt (fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, &mreq, sizeof (mreq)); + +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif + } + + set_pollin (handle); + + // Call restart output to drop all join/leave commands + restart_output (); + } +} + +void zmq::udp_engine_t::terminate() +{ + zmq_assert (plugged); + plugged = false; + + rm_fd (handle); + + // Disconnect from I/O threads poller object. + io_object_t::unplug (); + + delete this; +} + +void zmq::udp_engine_t::out_event() +{ + msg_t group_msg; + int rc = session->pull_msg (&group_msg); + errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN)); + + if (rc == 0) { + msg_t body_msg; + rc = session->pull_msg (&body_msg); + + size_t group_size = group_msg.size (); + size_t body_size = body_msg.size (); + size_t size = group_size + body_size + 1; + + // TODO: check if larger than maximum size + out_buffer[0] = (unsigned char) group_size; + memcpy (out_buffer + 1, group_msg.data (), group_size); + memcpy (out_buffer + 1 + group_size, body_msg.data (), body_size); + + rc = group_msg.close (); + errno_assert (rc == 0); + + body_msg.close (); + errno_assert (rc == 0); + + rc = sendto (fd, out_buffer, size, 0, + address->resolved.udp_addr->dest_addr (), + address->resolved.udp_addr->dest_addrlen ()); + errno_assert (rc != -1); + } + else + reset_pollout (handle); +} + +void zmq::udp_engine_t::restart_output() +{ + // If we don't support send we just drop all messages + if (!send_enabled) { + msg_t msg; + while (session->pull_msg (&msg) == 0) + msg.close (); + } + else { + set_pollout(handle); + out_event (); + } +} + +void zmq::udp_engine_t::in_event() +{ + size_t read = recv (fd, in_buffer, MAX_UDP_MSG, 0); + + if (read > 0) { + size_t group_size = in_buffer[0]; + + // This doesn't fit, just ingore + if (read - 1 < group_size) + return; + + size_t body_size = read -1 - group_size; + + msg_t msg; + int rc = msg.init_size (group_size); + errno_assert (rc == 0); + msg.set_flags (msg_t::more); + memcpy (msg.data (), in_buffer + 1, group_size); + + rc = session->push_msg (&msg); + errno_assert (rc == 0 || (rc == -1 && errno == EAGAIN)); + + // Pipe is full + if (rc != 0) { + rc = msg.close (); + errno_assert (rc == 0); + + reset_pollin (handle); + return; + } + + rc = msg.close (); + errno_assert (rc == 0); + rc = msg.init_size (body_size); + errno_assert (rc == 0); + memcpy (msg.data (), in_buffer + 1 + group_size, body_size); + rc = session->push_msg (&msg); + errno_assert (rc == 0); + rc = msg.close (); + errno_assert (rc == 0); + session->flush (); + } +} + +void zmq::udp_engine_t::restart_input() +{ + if (!recv_enabled) + return; + + set_pollin (handle); + in_event (); +} diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp new file mode 100644 index 00000000..f6339ff0 --- /dev/null +++ b/src/udp_engine.hpp @@ -0,0 +1,62 @@ + +#ifndef __ZMQ_UDP_ENGINE_HPP_INCLUDED__ +#define __ZMQ_UDP_ENGINE_HPP_INCLUDED__ + +#include "io_object.hpp" +#include "i_engine.hpp" +#include "address.hpp" +#include "udp_address.hpp" + +#define MAX_UDP_MSG 8192 + +namespace zmq +{ + class io_thread_t; + class session_base_t; + + class udp_engine_t : public io_object_t, public i_engine + { + public: + udp_engine_t (); + ~udp_engine_t (); + + int init (address_t *address_, bool send_, bool recv_); + + // i_engine interface implementation. + // Plug the engine to the session. + void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_); + + // Terminate and deallocate the engine. Note that 'detached' + // events are not fired on termination. + void terminate (); + + // This method is called by the session to signalise that more + // messages can be written to the pipe. + void restart_input (); + + // This method is called by the session to signalise that there + // are messages to send available. + void restart_output (); + + void zap_msg_available () {}; + + void in_event (); + void out_event (); + + private: + + bool plugged; + + fd_t fd; + session_base_t* session; + handle_t handle; + address_t *address; + + unsigned char out_buffer[MAX_UDP_MSG]; + unsigned char in_buffer[MAX_UDP_MSG]; + bool send_enabled; + bool recv_enabled; + }; +} + +#endif diff --git a/tests/test_udp.cpp b/tests/test_udp.cpp new file mode 100644 index 00000000..05a78d75 --- /dev/null +++ b/tests/test_udp.cpp @@ -0,0 +1,121 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +int msg_send (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_) +{ + int rc = zmq_msg_init_size (msg_, strlen (body_)); + if (rc != 0) + return rc; + + memcpy (zmq_msg_data (msg_), body_, strlen (body_)); + + rc = zmq_msg_set_group (msg_, group_); + if (rc != 0) { + zmq_msg_close (msg_); + return rc; + } + + rc = zmq_msg_send (msg_, s_, 0); + + zmq_msg_close (msg_); + + return rc; +} + +int msg_recv_cmp (zmq_msg_t *msg_, void *s_, const char* group_, const char* body_) +{ + int rc = zmq_msg_init (msg_); + if (rc != 0) + return -1; + + int recv_rc = zmq_msg_recv (msg_, s_, 0); + if (recv_rc == -1) + return -1; + + if (strcmp (zmq_msg_group (msg_), group_) != 0) + { + zmq_msg_close (msg_); + return -1; + } + + char * body = (char*) malloc (sizeof(char) * (zmq_msg_size (msg_) + 1)); + memcpy (body, zmq_msg_data (msg_), zmq_msg_size (msg_)); + body [zmq_msg_size (msg_)] = '\0'; + + if (strcmp (body, body_) != 0) + { + zmq_msg_close (msg_); + return -1; + } + + zmq_msg_close (msg_); + return recv_rc; +} + +int main (void) +{ + setup_test_environment (); + void *ctx = zmq_ctx_new (); + assert (ctx); + + zmq_msg_t msg; + + void *radio = zmq_socket (ctx, ZMQ_RADIO); + void *dish = zmq_socket (ctx, ZMQ_DISH); + + int rc = zmq_bind (radio, "udp://127.0.0.1:5556"); + assert (rc == 0); + + rc = zmq_connect (dish, "udp://127.0.0.1:5556"); + assert (rc == 0); + + zmq_sleep (1); + + rc = zmq_join (dish, "TV"); + assert (rc == 0); + + rc = msg_send (&msg, radio, "TV", "Friends"); + assert (rc != -1); + + rc = msg_recv_cmp (&msg, dish, "TV", "Friends"); + assert (rc != -1); + + rc = zmq_close (dish); + assert (rc == 0); + + rc = zmq_close (radio); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +}