diff --git a/.gitignore b/.gitignore index 8cf102d8..31374c67 100644 --- a/.gitignore +++ b/.gitignore @@ -122,6 +122,8 @@ test_setsockopt test_stream_exceeds_buffer test_poller test_timers +test_radio_dish +test_large_msg tests/test*.log tests/test*.trs src/platform.hpp* diff --git a/CMakeLists.txt b/CMakeLists.txt index 8dcec895..f6eb3c35 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -458,7 +458,9 @@ set(cxx-sources decoder_allocators.cpp socket_poller.cpp timers.cpp - config.hpp) + config.hpp + radio.cpp + dish.cpp) set(rc-sources version.rc) diff --git a/Makefile.am b/Makefile.am index a77bee9f..5bcc0ccb 100644 --- a/Makefile.am +++ b/Makefile.am @@ -45,6 +45,8 @@ src_libzmq_la_SOURCES = \ src/decoder.hpp \ src/devpoll.cpp \ src/devpoll.hpp \ + src/dish.cpp \ + src/dish.hpp \ src/dist.cpp \ src/dist.hpp \ src/encoder.hpp \ @@ -134,6 +136,8 @@ src_libzmq_la_SOURCES = \ src/pull.hpp \ src/push.cpp \ src/push.hpp \ + src/radio.cpp \ + src/radio.hpp \ src/random.cpp \ src/random.hpp \ src/raw_decoder.cpp \ @@ -380,7 +384,8 @@ test_apps = \ tests/test_heartbeats \ tests/test_stream_exceeds_buffer \ tests/test_poller \ - tests/test_timers + tests/test_timers \ + tests/test_radio_dish tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_LDADD = src/libzmq.la @@ -597,6 +602,9 @@ tests_test_poller_LDADD = src/libzmq.la tests_test_timers_SOURCES = tests/test_timers.cpp tests_test_timers_LDADD = src/libzmq.la +tests_test_radio_dish_SOURCES = tests/test_radio_dish.cpp +tests_test_radio_dish_LDADD = src/libzmq.la + if !ON_MINGW if !ON_CYGWIN diff --git a/include/zmq.h b/include/zmq.h index 353c33aa..6afbf17c 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -260,6 +260,8 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); #define ZMQ_STREAM 11 #define ZMQ_SERVER 12 #define ZMQ_CLIENT 13 +#define ZMQ_RADIO 14 +#define ZMQ_DISH 15 /* Deprecated aliases */ #define ZMQ_XREQ ZMQ_DEALER @@ -356,6 +358,9 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg); #define ZMQ_CURVE 2 #define ZMQ_GSSAPI 3 +/* RADIO-DISH protocol */ +#define ZMQ_GROUP_MAX_LENGTH 255 + /* Deprecated options and aliases */ #define ZMQ_TCP_ACCEPT_FILTER 38 #define ZMQ_IPC_FILTER_PID 58 @@ -400,6 +405,9 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags); ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events); +ZMQ_EXPORT int zmq_join (void *s, const char *group); +ZMQ_EXPORT int zmq_leave (void *s, const char *group); + /******************************************************************************/ /* I/O multiplexing. */ diff --git a/src/dish.cpp b/src/dish.cpp new file mode 100644 index 00000000..00ce29dd --- /dev/null +++ b/src/dish.cpp @@ -0,0 +1,246 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include + +#include "macros.hpp" +#include "dish.hpp" +#include "err.hpp" + +zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_, true), + has_message (false) +{ + options.type = ZMQ_DISH; + + // When socket is being closed down we don't want to wait till pending + // subscription commands are sent to the wire. + options.linger = 0; + + int rc = message.init (); + errno_assert (rc == 0); +} + +zmq::dish_t::~dish_t () +{ + int rc = message.close (); + errno_assert (rc == 0); +} + +void zmq::dish_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +{ + LIBZMQ_UNUSED (subscribe_to_all_); + + zmq_assert (pipe_); + fq.attach (pipe_); + dist.attach (pipe_); + + // Send all the cached subscriptions to the new upstream peer. + send_subscriptions (pipe_); +} + +void zmq::dish_t::xread_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); +} + +void zmq::dish_t::xwrite_activated (pipe_t *pipe_) +{ + dist.activated (pipe_); +} + +void zmq::dish_t::xpipe_terminated (pipe_t *pipe_) +{ + fq.pipe_terminated (pipe_); + dist.pipe_terminated (pipe_); +} + +void zmq::dish_t::xhiccuped (pipe_t *pipe_) +{ + // Send all the cached subscriptions to the hiccuped pipe. + send_subscriptions (pipe_); +} + +int zmq::dish_t::xjoin (const char* group_) +{ + if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { + errno = EINVAL; + return -1; + } + + subscriptions_t::iterator it = + std::find (subscriptions.begin (), subscriptions.end (), std::string(group_)); + + // User cannot join same group twice + if (it != subscriptions.end ()) { + errno = EINVAL; + return -1; + } + + subscriptions.push_back (std::string (group_)); + + size_t len = strlen (group_); + msg_t msg; + int rc = msg.init_size (len + 1); + errno_assert (rc == 0); + + char *data = (char*) msg.data (); + data[0] = 'J'; + memcpy (data + 1, group_, len); + + int err = 0; + rc = dist.send_to_all (&msg); + if (rc != 0) + err = errno; + int rc2 = msg.close (); + errno_assert (rc2 == 0); + if (rc != 0) + errno = err; + return rc; +} + +int zmq::dish_t::xleave (const char* group_) +{ + if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) { + errno = EINVAL; + return -1; + } + + subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), std::string (group_)); + + if (it == subscriptions.end ()) { + errno = EINVAL; + return -1; + } + + subscriptions.erase (it); + + size_t len = strlen (group_); + msg_t msg; + int rc = msg.init_size (len + 1); + errno_assert (rc == 0); + + char *data = (char*) msg.data (); + data[0] = 'L'; + memcpy (data + 1, group_, len); + + int err = 0; + rc = dist.send_to_all (&msg); + if (rc != 0) + err = errno; + int rc2 = msg.close (); + errno_assert (rc2 == 0); + if (rc != 0) + errno = err; + return rc; +} + +int zmq::dish_t::xsend (msg_t *msg_) +{ + errno = ENOTSUP; + return -1; +} + +bool zmq::dish_t::xhas_out () +{ + // Subscription can be added/removed anytime. + return true; +} + +int zmq::dish_t::xrecv (msg_t *msg_) +{ + // If there's already a message prepared by a previous call to zmq_poll, + // return it straight ahead. + if (has_message) { + int rc = msg_->move (message); + errno_assert (rc == 0); + has_message = false; + return 0; + } + + // Get a message using fair queueing algorithm. + int rc = fq.recv (msg_); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) + return -1; + + return 0; +} + +bool zmq::dish_t::xhas_in () +{ + // If there's already a message prepared by a previous call to zmq_poll, + // return straight ahead. + if (has_message) + return true; + + // Get a message using fair queueing algorithm. + int rc = fq.recv (&message); + + // If there's no message available, return immediately. + // The same when error occurs. + if (rc != 0) { + errno_assert (errno == EAGAIN); + return false; + } + + has_message = true; + return true; +} + +zmq::blob_t zmq::dish_t::get_credential () const +{ + return fq.get_credential (); +} + +void zmq::dish_t::send_subscriptions (pipe_t *pipe_) +{ + for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) { + msg_t msg; + int rc = msg.init_size (it->length () + 1); + errno_assert (rc == 0); + char *data = (char*) msg.data (); + data [0] = 'J'; + it->copy (data + 1, it->length ()); + + // Send it to the pipe. + bool sent = pipe_->write (&msg); + + // If we reached the SNDHWM, and thus cannot send the subscription, drop + // the subscription message instead. This matches the behaviour of + // zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions + // when the SNDHWM is reached. + if (!sent) + msg.close (); + } + + pipe_->flush (); +} diff --git a/src/dish.hpp b/src/dish.hpp new file mode 100644 index 00000000..17a634e0 --- /dev/null +++ b/src/dish.hpp @@ -0,0 +1,98 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_DISH_HPP_INCLUDED__ +#define __ZMQ_DISH_HPP_INCLUDED__ + +#include +#include + +#include "socket_base.hpp" +#include "session_base.hpp" +#include "dist.hpp" +#include "fq.hpp" +#include "trie.hpp" + +namespace zmq +{ + + class ctx_t; + class pipe_t; + class io_thread_t; + + class dish_t : + public socket_base_t + { + public: + + dish_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); + ~dish_t (); + + protected: + + // Overrides of functions from socket_base_t. + void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + int xsend (zmq::msg_t *msg_); + bool xhas_out (); + int xrecv (zmq::msg_t *msg_); + bool xhas_in (); + blob_t get_credential () const; + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xhiccuped (pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); + int xjoin (const char *group_); + int xleave (const char *group_); + private: + + // Send subscriptions to a pipe + void send_subscriptions (pipe_t *pipe_); + + // Fair queueing object for inbound pipes. + fq_t fq; + + // Object for distributing the subscriptions upstream. + dist_t dist; + + // The repository of subscriptions. + typedef std::vector subscriptions_t; + subscriptions_t subscriptions; + + // If true, 'message' contains a matching message to return on the + // next recv call. + bool has_message; + msg_t message; + + dish_t (const dish_t&); + const dish_t &operator = (const dish_t&); + }; + +} + +#endif diff --git a/src/mechanism.cpp b/src/mechanism.cpp index 8dd7f104..e192f3bd 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -74,9 +74,10 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const { static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP", "DEALER", "ROUTER", "PULL", "PUSH", - "XPUB", "XSUB", "STREAM", - "SERVER", "CLIENT"}; - zmq_assert (socket_type >= 0 && socket_type <= 13); + "XPUB", "XSUB", "STREAM", + "SERVER", "CLIENT", + "RADIO", "DISH"}; + zmq_assert (socket_type >= 0 && socket_type <= 15); return names [socket_type]; } @@ -192,6 +193,10 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const return type_ == "CLIENT"; case ZMQ_CLIENT: return type_ == "SERVER"; + case ZMQ_RADIO: + return type_ == "DISH"; + case ZMQ_DISH: + return type_ == "RADIO"; default: break; } diff --git a/src/radio.cpp b/src/radio.cpp new file mode 100644 index 00000000..84878e0b --- /dev/null +++ b/src/radio.cpp @@ -0,0 +1,170 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include + +#include "radio.hpp" +#include "macros.hpp" +#include "pipe.hpp" +#include "err.hpp" +#include "msg.hpp" + +zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_, true) +{ + options.type = ZMQ_RADIO; +} + +zmq::radio_t::~radio_t () +{ +} + +void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +{ + LIBZMQ_UNUSED (subscribe_to_all_); + + zmq_assert (pipe_); + + // Don't delay pipe termination as there is no one + // to receive the delimiter. + pipe_->set_nodelay (); + + dist.attach (pipe_); + + // The pipe is active when attached. Let's read the subscriptions from + // it, if any. + xread_activated (pipe_); +} + +void zmq::radio_t::xread_activated (pipe_t *pipe_) +{ + // There are some subscriptions waiting. Let's process them. + msg_t sub; + while (pipe_->read (&sub)) { + // Apply the subscription to the trie + const char * data = (char *) sub.data (); + const size_t size = sub.size (); + if (size > 0 && (*data == 'J' || *data == 'L')) { + std::string group = std::string (data + 1, sub. size() - 1); + + if (*data == 'J') + subscriptions.insert (subscriptions_t::value_type (group, pipe_)); + else { + std::pair range = + subscriptions.equal_range (group); + + for (subscriptions_t::iterator it = range.first; it != range.second; ++it) { + if (it->second == pipe_) { + subscriptions.erase (it); + break; + } + } + } + } + sub.close (); + } +} + +void zmq::radio_t::xwrite_activated (pipe_t *pipe_) +{ + dist.activated (pipe_); +} + +void zmq::radio_t::xpipe_terminated (pipe_t *pipe_) +{ + for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) { + if (it->second == pipe_) { + subscriptions.erase (it); + } + } + + dist.pipe_terminated (pipe_); +} + +int zmq::radio_t::xsend (msg_t *msg_) +{ + // Radio sockets do not allow multipart data (ZMQ_SNDMORE) + if (msg_->flags () & msg_t::more) { + errno = EINVAL; + return -1; + } + + size_t size = msg_->size (); + char *group = (char*) msg_->data(); + + // Maximum allowed group length is 255 + if (size > ZMQ_GROUP_MAX_LENGTH) + size = ZMQ_GROUP_MAX_LENGTH; + + // Check if NULL terminated + bool terminated = false; + + for (size_t index = 0; index < size; index++) { + if (group[index] == '\0') { + terminated = true; + break; + } + } + + if (!terminated) { + // User didn't include a group in the message + errno = EINVAL; + return -1; + } + + dist.unmatch (); + + std::pair range = + subscriptions.equal_range (std::string(group)); + + for (subscriptions_t::iterator it = range.first; it != range.second; ++it) { + dist.match (it-> second); + } + + int rc = dist.send_to_matching (msg_); + + return rc; +} + +bool zmq::radio_t::xhas_out () +{ + return dist.has_out (); +} + +int zmq::radio_t::xrecv (msg_t *msg_) +{ + // Messages cannot be received from PUB socket. + errno = ENOTSUP; + return -1; +} + +bool zmq::radio_t::xhas_in () +{ + return false; +} diff --git a/src/radio.hpp b/src/radio.hpp new file mode 100644 index 00000000..5858476b --- /dev/null +++ b/src/radio.hpp @@ -0,0 +1,82 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_RADIO_HPP_INCLUDED__ +#define __ZMQ_RADIO_HPP_INCLUDED__ + +#include +#include + +#include "socket_base.hpp" +#include "session_base.hpp" +#include "mtrie.hpp" +#include "array.hpp" +#include "dist.hpp" + +namespace zmq +{ + + class ctx_t; + class msg_t; + class pipe_t; + class io_thread_t; + + class radio_t : + public socket_base_t + { + public: + + radio_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); + ~radio_t (); + + // Implementations of virtual functions from socket_base_t. + void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false); + int xsend (zmq::msg_t *msg_); + bool xhas_out (); + int xrecv (zmq::msg_t *msg_); + bool xhas_in (); + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); + + private: + // List of all subscriptions mapped to corresponding pipes. + typedef std::multimap subscriptions_t; + subscriptions_t subscriptions; + + // Distributor of messages holding the list of outbound pipes. + dist_t dist; + + radio_t (const radio_t&); + const radio_t &operator = (const radio_t&); + }; + +} + +#endif diff --git a/src/session_base.cpp b/src/session_base.cpp index 231534f4..9df22b13 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -69,6 +69,8 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_STREAM: case ZMQ_SERVER: case ZMQ_CLIENT: + case ZMQ_RADIO: + case ZMQ_DISH: s = new (std::nothrow) session_base_t (io_thread_, active_, socket_, options_, addr_); break; @@ -643,4 +645,3 @@ void zmq::session_base_t::start_connecting (bool wait_) zmq_assert (false); } - diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 32ddeb5a..334502cb 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -92,6 +92,8 @@ #include "stream.hpp" #include "server.hpp" #include "client.hpp" +#include "radio.hpp" +#include "dish.hpp" #define ENTER_MUTEX() \ if (thread_safe) \ @@ -153,6 +155,12 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_CLIENT: s = new (std::nothrow) client_t (parent_, tid_, sid_); break; + case ZMQ_RADIO: + s = new (std::nothrow) radio_t (parent_, tid_, sid_); + break; + case ZMQ_DISH: + s = new (std::nothrow) dish_t (parent_, tid_, sid_); + break; default: errno = EINVAL; return NULL; @@ -457,6 +465,28 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_, return rc; } +int zmq::socket_base_t::join (const char* group_) +{ + ENTER_MUTEX (); + + int rc = xjoin (group_); + + EXIT_MUTEX(); + + return rc; +} + +int zmq::socket_base_t::leave (const char* group_) +{ + ENTER_MUTEX (); + + int rc = xleave (group_); + + EXIT_MUTEX(); + + return rc; +} + int zmq::socket_base_t::add_signaler(signaler_t *s_) { ENTER_MUTEX (); @@ -1396,6 +1426,18 @@ bool zmq::socket_base_t::xhas_in () return false; } +int zmq::socket_base_t::xjoin (const char *group_) +{ + errno = ENOTSUP; + return -1; +} + +int zmq::socket_base_t::xleave (const char *group_) +{ + errno = ENOTSUP; + return -1; +} + int zmq::socket_base_t::xrecv (msg_t *) { errno = ENOTSUP; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 4e0571ae..73f5ba37 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -99,6 +99,10 @@ namespace zmq bool has_in (); bool has_out (); + // Joining and leaving groups + int join (const char *group); + int leave (const char *group); + // Using this function reaper thread ask the socket to register with // its poller. void start_reaping (poller_t *poller_); @@ -168,6 +172,10 @@ namespace zmq virtual void xhiccuped (pipe_t *pipe_); virtual void xpipe_terminated (pipe_t *pipe_) = 0; + // the default implementation assumes that joub and leave are not supported. + virtual int xjoin (const char *group_); + virtual int xleave (const char *group_); + // Delay actual destruction of the socket. void process_destroy (); @@ -176,7 +184,7 @@ namespace zmq // Monitor socket cleanup void stop_monitor (bool send_monitor_stopped_event_ = true); - + // Next assigned name on a zmq_connect() call used by ROUTER and STREAM socket types std::string connect_rid; @@ -281,10 +289,9 @@ namespace zmq mutex_t sync; socket_base_t (const socket_base_t&); - const socket_base_t &operator = (const socket_base_t&); + const socket_base_t &operator = (const socket_base_t&); }; } #endif - diff --git a/src/zmq.cpp b/src/zmq.cpp index d4480c93..45a244a3 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -300,6 +300,28 @@ int zmq_socket_monitor (void *s_, const char *addr_, int events_) return result; } +int zmq_join (void *s_, const char* group_) +{ + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + int result = s->join (group_); + return result; +} + +int zmq_leave (void *s_, const char* group_) +{ + if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { + errno = ENOTSOCK; + return -1; + } + zmq::socket_base_t *s = (zmq::socket_base_t *) s_; + int result = s->leave (group_); + return result; +} + int zmq_bind (void *s_, const char *addr_) { if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) { diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 17bbb054..47f451fa 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -70,6 +70,7 @@ set(tests test_xpub_manual test_xpub_welcome_msg test_timers + test_radio_dish ) if(NOT WIN32) list(APPEND tests diff --git a/tests/test_large_msg b/tests/test_large_msg deleted file mode 100755 index 9ede070b..00000000 --- a/tests/test_large_msg +++ /dev/null @@ -1,210 +0,0 @@ -#! /bin/sh - -# tests/test_large_msg - temporary wrapper script for .libs/test_large_msg -# Generated by libtool (GNU libtool) 2.4.6 -# -# The tests/test_large_msg program cannot be directly executed until all the libtool -# libraries that it depends on are installed. -# -# This wrapper script should never be moved out of the build directory. -# If it is, it will not operate correctly. - -# Sed substitution that helps us do robust quoting. It backslashifies -# metacharacters that are still active within double-quoted strings. -sed_quote_subst='s|\([`"$\\]\)|\\\1|g' - -# Be Bourne compatible -if test -n "${ZSH_VERSION+set}" && (emulate sh) >/dev/null 2>&1; then - emulate sh - NULLCMD=: - # Zsh 3.x and 4.x performs word splitting on ${1+"$@"}, which - # is contrary to our usage. Disable this feature. - alias -g '${1+"$@"}'='"$@"' - setopt NO_GLOB_SUBST -else - case `(set -o) 2>/dev/null` in *posix*) set -o posix;; esac -fi -BIN_SH=xpg4; export BIN_SH # for Tru64 -DUALCASE=1; export DUALCASE # for MKS sh - -# The HP-UX ksh and POSIX shell print the target directory to stdout -# if CDPATH is set. -(unset CDPATH) >/dev/null 2>&1 && unset CDPATH - -relink_command="" - -# This environment variable determines our operation mode. -if test "$libtool_install_magic" = "%%%MAGIC variable%%%"; then - # install mode needs the following variables: - generated_by_libtool_version='2.4.6' - notinst_deplibs=' src/libzmq.la' -else - # When we are sourced in execute mode, $file and $ECHO are already set. - if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then - file="$0" - -# A function that is used when there is no print builtin or printf. -func_fallback_echo () -{ - eval 'cat <<_LTECHO_EOF -$1 -_LTECHO_EOF' -} - ECHO="printf %s\\n" - fi - -# Very basic option parsing. These options are (a) specific to -# the libtool wrapper, (b) are identical between the wrapper -# /script/ and the wrapper /executable/ that is used only on -# windows platforms, and (c) all begin with the string --lt- -# (application programs are unlikely to have options that match -# this pattern). -# -# There are only two supported options: --lt-debug and -# --lt-dump-script. There is, deliberately, no --lt-help. -# -# The first argument to this parsing function should be the -# script's ./libtool value, followed by no. -lt_option_debug= -func_parse_lt_options () -{ - lt_script_arg0=$0 - shift - for lt_opt - do - case "$lt_opt" in - --lt-debug) lt_option_debug=1 ;; - --lt-dump-script) - lt_dump_D=`$ECHO "X$lt_script_arg0" | /usr/bin/sed -e 's/^X//' -e 's%/[^/]*$%%'` - test "X$lt_dump_D" = "X$lt_script_arg0" && lt_dump_D=. - lt_dump_F=`$ECHO "X$lt_script_arg0" | /usr/bin/sed -e 's/^X//' -e 's%^.*/%%'` - cat "$lt_dump_D/$lt_dump_F" - exit 0 - ;; - --lt-*) - $ECHO "Unrecognized --lt- option: '$lt_opt'" 1>&2 - exit 1 - ;; - esac - done - - # Print the debug banner immediately: - if test -n "$lt_option_debug"; then - echo "test_large_msg:tests/test_large_msg:$LINENO: libtool wrapper (GNU libtool) 2.4.6" 1>&2 - fi -} - -# Used when --lt-debug. Prints its arguments to stdout -# (redirection is the responsibility of the caller) -func_lt_dump_args () -{ - lt_dump_args_N=1; - for lt_arg - do - $ECHO "test_large_msg:tests/test_large_msg:$LINENO: newargv[$lt_dump_args_N]: $lt_arg" - lt_dump_args_N=`expr $lt_dump_args_N + 1` - done -} - -# Core function for launching the target application -func_exec_program_core () -{ - - if test -n "$lt_option_debug"; then - $ECHO "test_large_msg:tests/test_large_msg:$LINENO: newargv[0]: $progdir/$program" 1>&2 - func_lt_dump_args ${1+"$@"} 1>&2 - fi - exec "$progdir/$program" ${1+"$@"} - - $ECHO "$0: cannot exec $program $*" 1>&2 - exit 1 -} - -# A function to encapsulate launching the target application -# Strips options in the --lt-* namespace from $@ and -# launches target application with the remaining arguments. -func_exec_program () -{ - case " $* " in - *\ --lt-*) - for lt_wr_arg - do - case $lt_wr_arg in - --lt-*) ;; - *) set x "$@" "$lt_wr_arg"; shift;; - esac - shift - done ;; - esac - func_exec_program_core ${1+"$@"} -} - - # Parse options - func_parse_lt_options "$0" ${1+"$@"} - - # Find the directory that this script lives in. - thisdir=`$ECHO "$file" | /usr/bin/sed 's%/[^/]*$%%'` - test "x$thisdir" = "x$file" && thisdir=. - - # Follow symbolic links until we get to the real thisdir. - file=`ls -ld "$file" | /usr/bin/sed -n 's/.*-> //p'` - while test -n "$file"; do - destdir=`$ECHO "$file" | /usr/bin/sed 's%/[^/]*$%%'` - - # If there was a directory component, then change thisdir. - if test "x$destdir" != "x$file"; then - case "$destdir" in - [\\/]* | [A-Za-z]:[\\/]*) thisdir="$destdir" ;; - *) thisdir="$thisdir/$destdir" ;; - esac - fi - - file=`$ECHO "$file" | /usr/bin/sed 's%^.*/%%'` - file=`ls -ld "$thisdir/$file" | /usr/bin/sed -n 's/.*-> //p'` - done - - # Usually 'no', except on cygwin/mingw when embedded into - # the cwrapper. - WRAPPER_SCRIPT_BELONGS_IN_OBJDIR=no - if test "$WRAPPER_SCRIPT_BELONGS_IN_OBJDIR" = "yes"; then - # special case for '.' - if test "$thisdir" = "."; then - thisdir=`pwd` - fi - # remove .libs from thisdir - case "$thisdir" in - *[\\/].libs ) thisdir=`$ECHO "$thisdir" | /usr/bin/sed 's%[\\/][^\\/]*$%%'` ;; - .libs ) thisdir=. ;; - esac - fi - - # Try to get the absolute directory name. - absdir=`cd "$thisdir" && pwd` - test -n "$absdir" && thisdir="$absdir" - - program='test_large_msg' - progdir="$thisdir/.libs" - - - if test -f "$progdir/$program"; then - # Add our own library path to DYLD_LIBRARY_PATH - DYLD_LIBRARY_PATH="/Users/benjaminrk/dev/zmq/libzmq/src/.libs:$DYLD_LIBRARY_PATH" - - # Some systems cannot cope with colon-terminated DYLD_LIBRARY_PATH - # The second colon is a workaround for a bug in BeOS R4 sed - DYLD_LIBRARY_PATH=`$ECHO "$DYLD_LIBRARY_PATH" | /usr/bin/sed 's/::*$//'` - - export DYLD_LIBRARY_PATH - - if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then - # Run the actual program with our arguments. - func_exec_program ${1+"$@"} - fi - else - # The program doesn't exist. - $ECHO "$0: error: '$progdir/$program' does not exist" 1>&2 - $ECHO "This script is just a wrapper for $program." 1>&2 - $ECHO "See the libtool documentation for more information." 1>&2 - exit 1 - fi -fi diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp new file mode 100644 index 00000000..ff0cfed0 --- /dev/null +++ b/tests/test_radio_dish.cpp @@ -0,0 +1,128 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +int main (void) +{ + setup_test_environment (); + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *radio = zmq_socket (ctx, ZMQ_RADIO); + void *dish = zmq_socket (ctx, ZMQ_DISH); + + int rc = zmq_bind (radio, "inproc://test-radio-dish"); + assert (rc == 0); + + // Leaving a group which we didn't join + rc = zmq_leave (dish, "World"); + assert (rc == -1); + + // Joining too long group + char too_long_group[ZMQ_GROUP_MAX_LENGTH + 2]; + for (int index = 0; index < ZMQ_GROUP_MAX_LENGTH + 2; index++) + too_long_group[index] = 'A'; + too_long_group[ZMQ_GROUP_MAX_LENGTH + 1] = '\0'; + rc = zmq_join (dish, too_long_group); + assert (rc == -1); + + // Joining + rc = zmq_join (dish, "World"); + assert (rc == 0); + + // Duplicate Joining + rc = zmq_join (dish, "World"); + assert (rc == -1); + + // Connecting + rc = zmq_connect (dish, "inproc://test-radio-dish"); + assert (rc == 0); + + zmq_sleep (1); + + // This is not going to be sent as dish only subscribe to "World" + rc = zmq_send (radio, "Hello\0Message", 13, 0); + assert (rc == 13); + + // This is going to be sent to the dish + rc = zmq_send (radio, "World\0Message", 13, 0); + assert (rc == 13); + + char* data = (char*) malloc (sizeof(char) * 13); + + rc = zmq_recv (dish, data, 13, 0); + assert (rc == 13); + assert (strcmp (data, "World") == 0); + + // Join group during connection optvallen + rc = zmq_join (dish, "Hello"); + assert (rc == 0); + + zmq_sleep (1); + + // This should arrive now as we joined the group + rc = zmq_send (radio, "Hello\0Message", 13, 0); + assert (rc == 13); + + rc = zmq_recv (dish, data, 13, 0); + assert (rc == 13); + assert (strcmp (data, "Hello") == 0); + + // Leaving group + rc = zmq_leave (dish, "Hello"); + assert (rc == 0); + + zmq_sleep (1); + + // This is not going to be sent as dish only subscribe to "World" + rc = zmq_send (radio, "Hello\0Message", 13, 0); + assert (rc == 13); + + // This is going to be sent to the dish + rc = zmq_send (radio, "World\0Message", 13, 0); + assert (rc == 13); + + rc = zmq_recv (dish, data, 13, 0); + assert (rc == 13); + assert (strcmp (data, "World") == 0); + + free (data); + + rc = zmq_close (dish); + assert (rc == 0); + + rc = zmq_close (radio); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +}