radio-dish pattern

This commit is contained in:
somdoron 2016-01-27 18:19:14 +02:00
parent 511d701de3
commit b8425a25cf
16 changed files with 831 additions and 219 deletions

2
.gitignore vendored
View File

@ -122,6 +122,8 @@ test_setsockopt
test_stream_exceeds_buffer
test_poller
test_timers
test_radio_dish
test_large_msg
tests/test*.log
tests/test*.trs
src/platform.hpp*

View File

@ -458,7 +458,9 @@ set(cxx-sources
decoder_allocators.cpp
socket_poller.cpp
timers.cpp
config.hpp)
config.hpp
radio.cpp
dish.cpp)
set(rc-sources version.rc)

View File

@ -45,6 +45,8 @@ src_libzmq_la_SOURCES = \
src/decoder.hpp \
src/devpoll.cpp \
src/devpoll.hpp \
src/dish.cpp \
src/dish.hpp \
src/dist.cpp \
src/dist.hpp \
src/encoder.hpp \
@ -134,6 +136,8 @@ src_libzmq_la_SOURCES = \
src/pull.hpp \
src/push.cpp \
src/push.hpp \
src/radio.cpp \
src/radio.hpp \
src/random.cpp \
src/random.hpp \
src/raw_decoder.cpp \
@ -380,7 +384,8 @@ test_apps = \
tests/test_heartbeats \
tests/test_stream_exceeds_buffer \
tests/test_poller \
tests/test_timers
tests/test_timers \
tests/test_radio_dish
tests_test_system_SOURCES = tests/test_system.cpp
tests_test_system_LDADD = src/libzmq.la
@ -597,6 +602,9 @@ tests_test_poller_LDADD = src/libzmq.la
tests_test_timers_SOURCES = tests/test_timers.cpp
tests_test_timers_LDADD = src/libzmq.la
tests_test_radio_dish_SOURCES = tests/test_radio_dish.cpp
tests_test_radio_dish_LDADD = src/libzmq.la
if !ON_MINGW
if !ON_CYGWIN

View File

@ -260,6 +260,8 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
#define ZMQ_STREAM 11
#define ZMQ_SERVER 12
#define ZMQ_CLIENT 13
#define ZMQ_RADIO 14
#define ZMQ_DISH 15
/* Deprecated aliases */
#define ZMQ_XREQ ZMQ_DEALER
@ -356,6 +358,9 @@ ZMQ_EXPORT uint32_t zmq_msg_routing_id (zmq_msg_t *msg);
#define ZMQ_CURVE 2
#define ZMQ_GSSAPI 3
/* RADIO-DISH protocol */
#define ZMQ_GROUP_MAX_LENGTH 255
/* Deprecated options and aliases */
#define ZMQ_TCP_ACCEPT_FILTER 38
#define ZMQ_IPC_FILTER_PID 58
@ -400,6 +405,9 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events);
ZMQ_EXPORT int zmq_join (void *s, const char *group);
ZMQ_EXPORT int zmq_leave (void *s, const char *group);
/******************************************************************************/
/* I/O multiplexing. */

246
src/dish.cpp Normal file
View File

@ -0,0 +1,246 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include "macros.hpp"
#include "dish.hpp"
#include "err.hpp"
zmq::dish_t::dish_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_, true),
has_message (false)
{
options.type = ZMQ_DISH;
// When socket is being closed down we don't want to wait till pending
// subscription commands are sent to the wire.
options.linger = 0;
int rc = message.init ();
errno_assert (rc == 0);
}
zmq::dish_t::~dish_t ()
{
int rc = message.close ();
errno_assert (rc == 0);
}
void zmq::dish_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_);
fq.attach (pipe_);
dist.attach (pipe_);
// Send all the cached subscriptions to the new upstream peer.
send_subscriptions (pipe_);
}
void zmq::dish_t::xread_activated (pipe_t *pipe_)
{
fq.activated (pipe_);
}
void zmq::dish_t::xwrite_activated (pipe_t *pipe_)
{
dist.activated (pipe_);
}
void zmq::dish_t::xpipe_terminated (pipe_t *pipe_)
{
fq.pipe_terminated (pipe_);
dist.pipe_terminated (pipe_);
}
void zmq::dish_t::xhiccuped (pipe_t *pipe_)
{
// Send all the cached subscriptions to the hiccuped pipe.
send_subscriptions (pipe_);
}
int zmq::dish_t::xjoin (const char* group_)
{
if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) {
errno = EINVAL;
return -1;
}
subscriptions_t::iterator it =
std::find (subscriptions.begin (), subscriptions.end (), std::string(group_));
// User cannot join same group twice
if (it != subscriptions.end ()) {
errno = EINVAL;
return -1;
}
subscriptions.push_back (std::string (group_));
size_t len = strlen (group_);
msg_t msg;
int rc = msg.init_size (len + 1);
errno_assert (rc == 0);
char *data = (char*) msg.data ();
data[0] = 'J';
memcpy (data + 1, group_, len);
int err = 0;
rc = dist.send_to_all (&msg);
if (rc != 0)
err = errno;
int rc2 = msg.close ();
errno_assert (rc2 == 0);
if (rc != 0)
errno = err;
return rc;
}
int zmq::dish_t::xleave (const char* group_)
{
if (strlen (group_) > ZMQ_GROUP_MAX_LENGTH) {
errno = EINVAL;
return -1;
}
subscriptions_t::iterator it = std::find (subscriptions.begin (), subscriptions.end (), std::string (group_));
if (it == subscriptions.end ()) {
errno = EINVAL;
return -1;
}
subscriptions.erase (it);
size_t len = strlen (group_);
msg_t msg;
int rc = msg.init_size (len + 1);
errno_assert (rc == 0);
char *data = (char*) msg.data ();
data[0] = 'L';
memcpy (data + 1, group_, len);
int err = 0;
rc = dist.send_to_all (&msg);
if (rc != 0)
err = errno;
int rc2 = msg.close ();
errno_assert (rc2 == 0);
if (rc != 0)
errno = err;
return rc;
}
int zmq::dish_t::xsend (msg_t *msg_)
{
errno = ENOTSUP;
return -1;
}
bool zmq::dish_t::xhas_out ()
{
// Subscription can be added/removed anytime.
return true;
}
int zmq::dish_t::xrecv (msg_t *msg_)
{
// If there's already a message prepared by a previous call to zmq_poll,
// return it straight ahead.
if (has_message) {
int rc = msg_->move (message);
errno_assert (rc == 0);
has_message = false;
return 0;
}
// Get a message using fair queueing algorithm.
int rc = fq.recv (msg_);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0)
return -1;
return 0;
}
bool zmq::dish_t::xhas_in ()
{
// If there's already a message prepared by a previous call to zmq_poll,
// return straight ahead.
if (has_message)
return true;
// Get a message using fair queueing algorithm.
int rc = fq.recv (&message);
// If there's no message available, return immediately.
// The same when error occurs.
if (rc != 0) {
errno_assert (errno == EAGAIN);
return false;
}
has_message = true;
return true;
}
zmq::blob_t zmq::dish_t::get_credential () const
{
return fq.get_credential ();
}
void zmq::dish_t::send_subscriptions (pipe_t *pipe_)
{
for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) {
msg_t msg;
int rc = msg.init_size (it->length () + 1);
errno_assert (rc == 0);
char *data = (char*) msg.data ();
data [0] = 'J';
it->copy (data + 1, it->length ());
// Send it to the pipe.
bool sent = pipe_->write (&msg);
// If we reached the SNDHWM, and thus cannot send the subscription, drop
// the subscription message instead. This matches the behaviour of
// zmq_setsockopt(ZMQ_SUBSCRIBE, ...), which also drops subscriptions
// when the SNDHWM is reached.
if (!sent)
msg.close ();
}
pipe_->flush ();
}

98
src/dish.hpp Normal file
View File

@ -0,0 +1,98 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DISH_HPP_INCLUDED__
#define __ZMQ_DISH_HPP_INCLUDED__
#include <string>
#include <vector>
#include "socket_base.hpp"
#include "session_base.hpp"
#include "dist.hpp"
#include "fq.hpp"
#include "trie.hpp"
namespace zmq
{
class ctx_t;
class pipe_t;
class io_thread_t;
class dish_t :
public socket_base_t
{
public:
dish_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~dish_t ();
protected:
// Overrides of functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
blob_t get_credential () const;
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xhiccuped (pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
int xjoin (const char *group_);
int xleave (const char *group_);
private:
// Send subscriptions to a pipe
void send_subscriptions (pipe_t *pipe_);
// Fair queueing object for inbound pipes.
fq_t fq;
// Object for distributing the subscriptions upstream.
dist_t dist;
// The repository of subscriptions.
typedef std::vector<std::string> subscriptions_t;
subscriptions_t subscriptions;
// If true, 'message' contains a matching message to return on the
// next recv call.
bool has_message;
msg_t message;
dish_t (const dish_t&);
const dish_t &operator = (const dish_t&);
};
}
#endif

View File

@ -75,8 +75,9 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const
static const char *names [] = {"PAIR", "PUB", "SUB", "REQ", "REP",
"DEALER", "ROUTER", "PULL", "PUSH",
"XPUB", "XSUB", "STREAM",
"SERVER", "CLIENT"};
zmq_assert (socket_type >= 0 && socket_type <= 13);
"SERVER", "CLIENT",
"RADIO", "DISH"};
zmq_assert (socket_type >= 0 && socket_type <= 15);
return names [socket_type];
}
@ -192,6 +193,10 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const
return type_ == "CLIENT";
case ZMQ_CLIENT:
return type_ == "SERVER";
case ZMQ_RADIO:
return type_ == "DISH";
case ZMQ_DISH:
return type_ == "RADIO";
default:
break;
}

170
src/radio.cpp Normal file
View File

@ -0,0 +1,170 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <string.h>
#include "radio.hpp"
#include "macros.hpp"
#include "pipe.hpp"
#include "err.hpp"
#include "msg.hpp"
zmq::radio_t::radio_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
socket_base_t (parent_, tid_, sid_, true)
{
options.type = ZMQ_RADIO;
}
zmq::radio_t::~radio_t ()
{
}
void zmq::radio_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_)
{
LIBZMQ_UNUSED (subscribe_to_all_);
zmq_assert (pipe_);
// Don't delay pipe termination as there is no one
// to receive the delimiter.
pipe_->set_nodelay ();
dist.attach (pipe_);
// The pipe is active when attached. Let's read the subscriptions from
// it, if any.
xread_activated (pipe_);
}
void zmq::radio_t::xread_activated (pipe_t *pipe_)
{
// There are some subscriptions waiting. Let's process them.
msg_t sub;
while (pipe_->read (&sub)) {
// Apply the subscription to the trie
const char * data = (char *) sub.data ();
const size_t size = sub.size ();
if (size > 0 && (*data == 'J' || *data == 'L')) {
std::string group = std::string (data + 1, sub. size() - 1);
if (*data == 'J')
subscriptions.insert (subscriptions_t::value_type (group, pipe_));
else {
std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
subscriptions.equal_range (group);
for (subscriptions_t::iterator it = range.first; it != range.second; ++it) {
if (it->second == pipe_) {
subscriptions.erase (it);
break;
}
}
}
}
sub.close ();
}
}
void zmq::radio_t::xwrite_activated (pipe_t *pipe_)
{
dist.activated (pipe_);
}
void zmq::radio_t::xpipe_terminated (pipe_t *pipe_)
{
for (subscriptions_t::iterator it = subscriptions.begin (); it != subscriptions.end (); ++it) {
if (it->second == pipe_) {
subscriptions.erase (it);
}
}
dist.pipe_terminated (pipe_);
}
int zmq::radio_t::xsend (msg_t *msg_)
{
// Radio sockets do not allow multipart data (ZMQ_SNDMORE)
if (msg_->flags () & msg_t::more) {
errno = EINVAL;
return -1;
}
size_t size = msg_->size ();
char *group = (char*) msg_->data();
// Maximum allowed group length is 255
if (size > ZMQ_GROUP_MAX_LENGTH)
size = ZMQ_GROUP_MAX_LENGTH;
// Check if NULL terminated
bool terminated = false;
for (size_t index = 0; index < size; index++) {
if (group[index] == '\0') {
terminated = true;
break;
}
}
if (!terminated) {
// User didn't include a group in the message
errno = EINVAL;
return -1;
}
dist.unmatch ();
std::pair<subscriptions_t::iterator, subscriptions_t::iterator> range =
subscriptions.equal_range (std::string(group));
for (subscriptions_t::iterator it = range.first; it != range.second; ++it) {
dist.match (it-> second);
}
int rc = dist.send_to_matching (msg_);
return rc;
}
bool zmq::radio_t::xhas_out ()
{
return dist.has_out ();
}
int zmq::radio_t::xrecv (msg_t *msg_)
{
// Messages cannot be received from PUB socket.
errno = ENOTSUP;
return -1;
}
bool zmq::radio_t::xhas_in ()
{
return false;
}

82
src/radio.hpp Normal file
View File

@ -0,0 +1,82 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_RADIO_HPP_INCLUDED__
#define __ZMQ_RADIO_HPP_INCLUDED__
#include <map>
#include <string>
#include "socket_base.hpp"
#include "session_base.hpp"
#include "mtrie.hpp"
#include "array.hpp"
#include "dist.hpp"
namespace zmq
{
class ctx_t;
class msg_t;
class pipe_t;
class io_thread_t;
class radio_t :
public socket_base_t
{
public:
radio_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
~radio_t ();
// Implementations of virtual functions from socket_base_t.
void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_ = false);
int xsend (zmq::msg_t *msg_);
bool xhas_out ();
int xrecv (zmq::msg_t *msg_);
bool xhas_in ();
void xread_activated (zmq::pipe_t *pipe_);
void xwrite_activated (zmq::pipe_t *pipe_);
void xpipe_terminated (zmq::pipe_t *pipe_);
private:
// List of all subscriptions mapped to corresponding pipes.
typedef std::multimap<std::string, pipe_t*> subscriptions_t;
subscriptions_t subscriptions;
// Distributor of messages holding the list of outbound pipes.
dist_t dist;
radio_t (const radio_t&);
const radio_t &operator = (const radio_t&);
};
}
#endif

View File

@ -69,6 +69,8 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case ZMQ_STREAM:
case ZMQ_SERVER:
case ZMQ_CLIENT:
case ZMQ_RADIO:
case ZMQ_DISH:
s = new (std::nothrow) session_base_t (io_thread_, active_,
socket_, options_, addr_);
break;
@ -643,4 +645,3 @@ void zmq::session_base_t::start_connecting (bool wait_)
zmq_assert (false);
}

View File

@ -92,6 +92,8 @@
#include "stream.hpp"
#include "server.hpp"
#include "client.hpp"
#include "radio.hpp"
#include "dish.hpp"
#define ENTER_MUTEX() \
if (thread_safe) \
@ -153,6 +155,12 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_CLIENT:
s = new (std::nothrow) client_t (parent_, tid_, sid_);
break;
case ZMQ_RADIO:
s = new (std::nothrow) radio_t (parent_, tid_, sid_);
break;
case ZMQ_DISH:
s = new (std::nothrow) dish_t (parent_, tid_, sid_);
break;
default:
errno = EINVAL;
return NULL;
@ -457,6 +465,28 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return rc;
}
int zmq::socket_base_t::join (const char* group_)
{
ENTER_MUTEX ();
int rc = xjoin (group_);
EXIT_MUTEX();
return rc;
}
int zmq::socket_base_t::leave (const char* group_)
{
ENTER_MUTEX ();
int rc = xleave (group_);
EXIT_MUTEX();
return rc;
}
int zmq::socket_base_t::add_signaler(signaler_t *s_)
{
ENTER_MUTEX ();
@ -1396,6 +1426,18 @@ bool zmq::socket_base_t::xhas_in ()
return false;
}
int zmq::socket_base_t::xjoin (const char *group_)
{
errno = ENOTSUP;
return -1;
}
int zmq::socket_base_t::xleave (const char *group_)
{
errno = ENOTSUP;
return -1;
}
int zmq::socket_base_t::xrecv (msg_t *)
{
errno = ENOTSUP;

View File

@ -99,6 +99,10 @@ namespace zmq
bool has_in ();
bool has_out ();
// Joining and leaving groups
int join (const char *group);
int leave (const char *group);
// Using this function reaper thread ask the socket to register with
// its poller.
void start_reaping (poller_t *poller_);
@ -168,6 +172,10 @@ namespace zmq
virtual void xhiccuped (pipe_t *pipe_);
virtual void xpipe_terminated (pipe_t *pipe_) = 0;
// the default implementation assumes that joub and leave are not supported.
virtual int xjoin (const char *group_);
virtual int xleave (const char *group_);
// Delay actual destruction of the socket.
void process_destroy ();
@ -287,4 +295,3 @@ namespace zmq
}
#endif

View File

@ -300,6 +300,28 @@ int zmq_socket_monitor (void *s_, const char *addr_, int events_)
return result;
}
int zmq_join (void *s_, const char* group_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->join (group_);
return result;
}
int zmq_leave (void *s_, const char* group_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
int result = s->leave (group_);
return result;
}
int zmq_bind (void *s_, const char *addr_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {

View File

@ -70,6 +70,7 @@ set(tests
test_xpub_manual
test_xpub_welcome_msg
test_timers
test_radio_dish
)
if(NOT WIN32)
list(APPEND tests

View File

@ -1,210 +0,0 @@
#! /bin/sh
# tests/test_large_msg - temporary wrapper script for .libs/test_large_msg
# Generated by libtool (GNU libtool) 2.4.6
#
# The tests/test_large_msg program cannot be directly executed until all the libtool
# libraries that it depends on are installed.
#
# This wrapper script should never be moved out of the build directory.
# If it is, it will not operate correctly.
# Sed substitution that helps us do robust quoting. It backslashifies
# metacharacters that are still active within double-quoted strings.
sed_quote_subst='s|\([`"$\\]\)|\\\1|g'
# Be Bourne compatible
if test -n "${ZSH_VERSION+set}" && (emulate sh) >/dev/null 2>&1; then
emulate sh
NULLCMD=:
# Zsh 3.x and 4.x performs word splitting on ${1+"$@"}, which
# is contrary to our usage. Disable this feature.
alias -g '${1+"$@"}'='"$@"'
setopt NO_GLOB_SUBST
else
case `(set -o) 2>/dev/null` in *posix*) set -o posix;; esac
fi
BIN_SH=xpg4; export BIN_SH # for Tru64
DUALCASE=1; export DUALCASE # for MKS sh
# The HP-UX ksh and POSIX shell print the target directory to stdout
# if CDPATH is set.
(unset CDPATH) >/dev/null 2>&1 && unset CDPATH
relink_command=""
# This environment variable determines our operation mode.
if test "$libtool_install_magic" = "%%%MAGIC variable%%%"; then
# install mode needs the following variables:
generated_by_libtool_version='2.4.6'
notinst_deplibs=' src/libzmq.la'
else
# When we are sourced in execute mode, $file and $ECHO are already set.
if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then
file="$0"
# A function that is used when there is no print builtin or printf.
func_fallback_echo ()
{
eval 'cat <<_LTECHO_EOF
$1
_LTECHO_EOF'
}
ECHO="printf %s\\n"
fi
# Very basic option parsing. These options are (a) specific to
# the libtool wrapper, (b) are identical between the wrapper
# /script/ and the wrapper /executable/ that is used only on
# windows platforms, and (c) all begin with the string --lt-
# (application programs are unlikely to have options that match
# this pattern).
#
# There are only two supported options: --lt-debug and
# --lt-dump-script. There is, deliberately, no --lt-help.
#
# The first argument to this parsing function should be the
# script's ./libtool value, followed by no.
lt_option_debug=
func_parse_lt_options ()
{
lt_script_arg0=$0
shift
for lt_opt
do
case "$lt_opt" in
--lt-debug) lt_option_debug=1 ;;
--lt-dump-script)
lt_dump_D=`$ECHO "X$lt_script_arg0" | /usr/bin/sed -e 's/^X//' -e 's%/[^/]*$%%'`
test "X$lt_dump_D" = "X$lt_script_arg0" && lt_dump_D=.
lt_dump_F=`$ECHO "X$lt_script_arg0" | /usr/bin/sed -e 's/^X//' -e 's%^.*/%%'`
cat "$lt_dump_D/$lt_dump_F"
exit 0
;;
--lt-*)
$ECHO "Unrecognized --lt- option: '$lt_opt'" 1>&2
exit 1
;;
esac
done
# Print the debug banner immediately:
if test -n "$lt_option_debug"; then
echo "test_large_msg:tests/test_large_msg:$LINENO: libtool wrapper (GNU libtool) 2.4.6" 1>&2
fi
}
# Used when --lt-debug. Prints its arguments to stdout
# (redirection is the responsibility of the caller)
func_lt_dump_args ()
{
lt_dump_args_N=1;
for lt_arg
do
$ECHO "test_large_msg:tests/test_large_msg:$LINENO: newargv[$lt_dump_args_N]: $lt_arg"
lt_dump_args_N=`expr $lt_dump_args_N + 1`
done
}
# Core function for launching the target application
func_exec_program_core ()
{
if test -n "$lt_option_debug"; then
$ECHO "test_large_msg:tests/test_large_msg:$LINENO: newargv[0]: $progdir/$program" 1>&2
func_lt_dump_args ${1+"$@"} 1>&2
fi
exec "$progdir/$program" ${1+"$@"}
$ECHO "$0: cannot exec $program $*" 1>&2
exit 1
}
# A function to encapsulate launching the target application
# Strips options in the --lt-* namespace from $@ and
# launches target application with the remaining arguments.
func_exec_program ()
{
case " $* " in
*\ --lt-*)
for lt_wr_arg
do
case $lt_wr_arg in
--lt-*) ;;
*) set x "$@" "$lt_wr_arg"; shift;;
esac
shift
done ;;
esac
func_exec_program_core ${1+"$@"}
}
# Parse options
func_parse_lt_options "$0" ${1+"$@"}
# Find the directory that this script lives in.
thisdir=`$ECHO "$file" | /usr/bin/sed 's%/[^/]*$%%'`
test "x$thisdir" = "x$file" && thisdir=.
# Follow symbolic links until we get to the real thisdir.
file=`ls -ld "$file" | /usr/bin/sed -n 's/.*-> //p'`
while test -n "$file"; do
destdir=`$ECHO "$file" | /usr/bin/sed 's%/[^/]*$%%'`
# If there was a directory component, then change thisdir.
if test "x$destdir" != "x$file"; then
case "$destdir" in
[\\/]* | [A-Za-z]:[\\/]*) thisdir="$destdir" ;;
*) thisdir="$thisdir/$destdir" ;;
esac
fi
file=`$ECHO "$file" | /usr/bin/sed 's%^.*/%%'`
file=`ls -ld "$thisdir/$file" | /usr/bin/sed -n 's/.*-> //p'`
done
# Usually 'no', except on cygwin/mingw when embedded into
# the cwrapper.
WRAPPER_SCRIPT_BELONGS_IN_OBJDIR=no
if test "$WRAPPER_SCRIPT_BELONGS_IN_OBJDIR" = "yes"; then
# special case for '.'
if test "$thisdir" = "."; then
thisdir=`pwd`
fi
# remove .libs from thisdir
case "$thisdir" in
*[\\/].libs ) thisdir=`$ECHO "$thisdir" | /usr/bin/sed 's%[\\/][^\\/]*$%%'` ;;
.libs ) thisdir=. ;;
esac
fi
# Try to get the absolute directory name.
absdir=`cd "$thisdir" && pwd`
test -n "$absdir" && thisdir="$absdir"
program='test_large_msg'
progdir="$thisdir/.libs"
if test -f "$progdir/$program"; then
# Add our own library path to DYLD_LIBRARY_PATH
DYLD_LIBRARY_PATH="/Users/benjaminrk/dev/zmq/libzmq/src/.libs:$DYLD_LIBRARY_PATH"
# Some systems cannot cope with colon-terminated DYLD_LIBRARY_PATH
# The second colon is a workaround for a bug in BeOS R4 sed
DYLD_LIBRARY_PATH=`$ECHO "$DYLD_LIBRARY_PATH" | /usr/bin/sed 's/::*$//'`
export DYLD_LIBRARY_PATH
if test "$libtool_execute_magic" != "%%%MAGIC variable%%%"; then
# Run the actual program with our arguments.
func_exec_program ${1+"$@"}
fi
else
# The program doesn't exist.
$ECHO "$0: error: '$progdir/$program' does not exist" 1>&2
$ECHO "This script is just a wrapper for $program." 1>&2
$ECHO "See the libtool documentation for more information." 1>&2
exit 1
fi
fi

128
tests/test_radio_dish.cpp Normal file
View File

@ -0,0 +1,128 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
int main (void)
{
setup_test_environment ();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *radio = zmq_socket (ctx, ZMQ_RADIO);
void *dish = zmq_socket (ctx, ZMQ_DISH);
int rc = zmq_bind (radio, "inproc://test-radio-dish");
assert (rc == 0);
// Leaving a group which we didn't join
rc = zmq_leave (dish, "World");
assert (rc == -1);
// Joining too long group
char too_long_group[ZMQ_GROUP_MAX_LENGTH + 2];
for (int index = 0; index < ZMQ_GROUP_MAX_LENGTH + 2; index++)
too_long_group[index] = 'A';
too_long_group[ZMQ_GROUP_MAX_LENGTH + 1] = '\0';
rc = zmq_join (dish, too_long_group);
assert (rc == -1);
// Joining
rc = zmq_join (dish, "World");
assert (rc == 0);
// Duplicate Joining
rc = zmq_join (dish, "World");
assert (rc == -1);
// Connecting
rc = zmq_connect (dish, "inproc://test-radio-dish");
assert (rc == 0);
zmq_sleep (1);
// This is not going to be sent as dish only subscribe to "World"
rc = zmq_send (radio, "Hello\0Message", 13, 0);
assert (rc == 13);
// This is going to be sent to the dish
rc = zmq_send (radio, "World\0Message", 13, 0);
assert (rc == 13);
char* data = (char*) malloc (sizeof(char) * 13);
rc = zmq_recv (dish, data, 13, 0);
assert (rc == 13);
assert (strcmp (data, "World") == 0);
// Join group during connection optvallen
rc = zmq_join (dish, "Hello");
assert (rc == 0);
zmq_sleep (1);
// This should arrive now as we joined the group
rc = zmq_send (radio, "Hello\0Message", 13, 0);
assert (rc == 13);
rc = zmq_recv (dish, data, 13, 0);
assert (rc == 13);
assert (strcmp (data, "Hello") == 0);
// Leaving group
rc = zmq_leave (dish, "Hello");
assert (rc == 0);
zmq_sleep (1);
// This is not going to be sent as dish only subscribe to "World"
rc = zmq_send (radio, "Hello\0Message", 13, 0);
assert (rc == 13);
// This is going to be sent to the dish
rc = zmq_send (radio, "World\0Message", 13, 0);
assert (rc == 13);
rc = zmq_recv (dish, data, 13, 0);
assert (rc == 13);
assert (strcmp (data, "World") == 0);
free (data);
rc = zmq_close (dish);
assert (rc == 0);
rc = zmq_close (radio);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0 ;
}