Fixed issue LIBZMQ-333

- reverted commit 941be8d2175332cb720f390f93d07a0870db8824.
 - fixed zmq_device implementation for latest socket_base class
 - added back zmq_device.3 man page
This commit is contained in:
Pieter Hintjens 2012-03-16 16:39:11 -05:00
parent 32c85e0ea3
commit 9ac40c47d7
11 changed files with 650 additions and 4 deletions

View File

@ -90,7 +90,14 @@ This package contains ZeroMQ related development libraries and header files.
%{_libdir}/libzmq.so.1
%{_libdir}/libzmq.so.1.0.0
%attr(0755,root,root) %{_bindir}/zmq_forwarder
%attr(0755,root,root) %{_bindir}/zmq_queue
%attr(0755,root,root) %{_bindir}/zmq_streamer
%{_mandir}/man7/zmq.7.gz
%{_mandir}/man1/zmq_forwarder.1.gz
%{_mandir}/man1/zmq_queue.1.gz
%{_mandir}/man1/zmq_streamer.1.gz
%files devel
%defattr(-,root,root,-)

View File

@ -1,4 +1,4 @@
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_init.3 \
MAN3 = zmq_bind.3 zmq_close.3 zmq_connect.3 zmq_device.3 zmq_init.3 \
zmq_msg_close.3 zmq_msg_copy.3 zmq_msg_data.3 zmq_msg_init.3 \
zmq_msg_init_data.3 zmq_msg_init_size.3 zmq_msg_move.3 zmq_msg_size.3 \
zmq_msg_send.3 zmq_msg_recv.3 \

View File

@ -154,6 +154,16 @@ Local in-process (inter-thread) communication transport::
linkzmq:zmq_inproc[7]
Devices
~~~~~~~
0MQ provides 'devices', which are building blocks that act as intermediate
nodes in complex messaging topologies. Devices can act as brokers that other
nodes connect to, proxies that connect through to other nodes, or any mix of
these two models.
You can start a device in an application thread, see linkzmq:zmq_device[3].
ERROR HANDLING
--------------
The 0MQ library functions handle errors using the standard conventions found on

125
doc/zmq_device.txt Normal file
View File

@ -0,0 +1,125 @@
zmq_device(3)
=============
NAME
----
zmq_device - start built-in 0MQ device
SYNOPSIS
--------
*int zmq_device (int 'device', const void '*frontend', const void '*backend');*
DESCRIPTION
-----------
The _zmq_device()_ function starts a built-in 0MQ device. The 'device' argument
is one of:
'ZMQ_QUEUE'::
starts a queue device
'ZMQ_FORWARDER'::
starts a forwarder device
'ZMQ_STREAMER'::
starts a streamer device
The device connects a frontend socket to a backend socket. Conceptually, data
flows from frontend to backend. Depending on the socket types, replies may flow
in the opposite direction.
Before calling _zmq_device()_ you must set any socket options, and connect or
bind both frontend and backend sockets. The two conventional device models are:
*proxy*::
bind frontend socket to an endpoint, and connect backend socket to
downstream components. A proxy device model does not require changes to
the downstream topology but that topology is static (any changes require
reconfiguring the device).
*broker*::
bind frontend socket to one endpoint and bind backend socket to a second
endpoint. Downstream components must now connect into the device. A broker
device model allows a dynamic downstream topology (components can come and
go at any time).
_zmq_device()_ runs in the current thread and returns only if/when the current
context is closed.
QUEUE DEVICE
------------
'ZMQ_QUEUE' creates a shared queue that collects requests from a set of clients,
and distributes these fairly among a set of services. Requests are fair-queued
from frontend connections and load-balanced between backend connections.
Replies automatically return to the client that made the original request.
This device is part of the 'request-reply' pattern. The frontend speaks to
clients and the backend speaks to services. You should use 'ZMQ_QUEUE' with a
'ZMQ_ROUTER' socket for the frontend and a 'ZMQ_DEALER' socket for the backend.
Other combinations are not documented.
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
FORWARDER DEVICE
----------------
'ZMQ_FORWARDER' collects messages from a set of publishers and forwards these to
a set of subscribers. You will generally use this to bridge networks, e.g. read
on TCP unicast and forward on multicast.
This device is part of the 'publish-subscribe' pattern. The frontend speaks to
publishers and the backend speaks to subscribers. You should use
'ZMQ_FORWARDER' with a 'ZMQ_SUB' socket for the frontend and a 'ZMQ_PUB' socket
for the backend. Other combinations are not documented.
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
STREAMER DEVICE
---------------
'ZMQ_STREAMER' collects tasks from a set of pushers and forwards these to a set
of pullers. You will generally use this to bridge networks. Messages are
fair-queued from pushers and load-balanced to pullers.
This device is part of the 'pipeline' pattern. The frontend speaks to pushers
and the backend speaks to pullers. You should use 'ZMQ_STREAMER' with a
'ZMQ_PULL' socket for the frontend and a 'ZMQ_PUSH' socket for the backend.
Other combinations are not documented.
Refer to linkzmq:zmq_socket[3] for a description of these socket types.
RETURN VALUE
------------
The _zmq_device()_ function always returns `-1` and 'errno' set to *ETERM* (the
0MQ 'context' associated with either of the specified sockets was terminated).
EXAMPLE
-------
.Creating a queue broker
----
// Create frontend and backend sockets
void *frontend = zmq_socket (context, ZMQ_ROUTER);
assert (backend);
void *backend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
// Bind both sockets to TCP ports
assert (zmq_bind (frontend, "tcp://*:5555") == 0);
assert (zmq_bind (backend, "tcp://*:5556") == 0);
// Start a queue device
zmq_device (ZMQ_QUEUE, frontend, backend);
----
SEE ALSO
--------
linkzmq:zmq_bind[3]
linkzmq:zmq_connect[3]
linkzmq:zmq_socket[3]
linkzmq:zmq[7]
AUTHORS
-------
This 0MQ manual page was written by Pieter Hintjens <ph@imatix.com>

View File

@ -149,9 +149,6 @@ remove the first part of the message and use it to determine the _identity_ of
the peer the message shall be routed to. If the peer does not exist anymore
the message shall be silently discarded.
Previously this socket was called 'ZMQ_XREP' and that name remains available
for backwards compatibility.
When a 'ZMQ_ROUTER' socket enters an exceptional state due to having reached the
high water mark for all peers, or if there are no peers at all, then any
messages sent to the socket shall be dropped until the exceptional state ends.

View File

@ -279,6 +279,16 @@ typedef struct
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
/* Devices - Experimental. */
/******************************************************************************/
#define ZMQ_STREAMER 1
#define ZMQ_FORWARDER 2
#define ZMQ_QUEUE 3
ZMQ_EXPORT int zmq_device (int device, void * insocket, void* outsocket);
#undef ZMQ_EXPORT
#ifdef __cplusplus

296
include/zmq.hpp Normal file
View File

@ -0,0 +1,296 @@
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_HPP_INCLUDED__
#define __ZMQ_HPP_INCLUDED__
#include "zmq.h"
#include <cassert>
#include <cstring>
#include <exception>
namespace zmq
{
typedef zmq_free_fn free_fn;
typedef zmq_pollitem_t pollitem_t;
class error_t : public std::exception
{
public:
error_t () : errnum (zmq_errno ()) {}
virtual const char *what () const throw ()
{
return zmq_strerror (errnum);
}
int num () const
{
return errnum;
}
private:
int errnum;
};
inline int poll (zmq_pollitem_t *items_, int nitems_, long timeout_ = -1)
{
int rc = zmq_poll (items_, nitems_, timeout_);
if (rc < 0)
throw error_t ();
return rc;
}
inline void device (int device_, void * insocket_, void* outsocket_)
{
int rc = zmq_device (device_, insocket_, outsocket_);
if (rc != 0)
throw error_t ();
}
class message_t : private zmq_msg_t
{
friend class socket_t;
public:
inline message_t ()
{
int rc = zmq_msg_init (this);
if (rc != 0)
throw error_t ();
}
inline message_t (size_t size_)
{
int rc = zmq_msg_init_size (this, size_);
if (rc != 0)
throw error_t ();
}
inline message_t (void *data_, size_t size_, free_fn *ffn_,
void *hint_ = NULL)
{
int rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
if (rc != 0)
throw error_t ();
}
inline ~message_t ()
{
int rc = zmq_msg_close (this);
assert (rc == 0);
}
inline void rebuild ()
{
int rc = zmq_msg_close (this);
if (rc != 0)
throw error_t ();
rc = zmq_msg_init (this);
if (rc != 0)
throw error_t ();
}
inline void rebuild (size_t size_)
{
int rc = zmq_msg_close (this);
if (rc != 0)
throw error_t ();
rc = zmq_msg_init_size (this, size_);
if (rc != 0)
throw error_t ();
}
inline void rebuild (void *data_, size_t size_, free_fn *ffn_,
void *hint_ = NULL)
{
int rc = zmq_msg_close (this);
if (rc != 0)
throw error_t ();
rc = zmq_msg_init_data (this, data_, size_, ffn_, hint_);
if (rc != 0)
throw error_t ();
}
inline void move (message_t *msg_)
{
int rc = zmq_msg_move (this, (zmq_msg_t*) msg_);
if (rc != 0)
throw error_t ();
}
inline void copy (message_t *msg_)
{
int rc = zmq_msg_copy (this, (zmq_msg_t*) msg_);
if (rc != 0)
throw error_t ();
}
inline void *data ()
{
return zmq_msg_data (this);
}
inline size_t size ()
{
return zmq_msg_size (this);
}
private:
// Disable implicit message copying, so that users won't use shared
// messages (less efficient) without being aware of the fact.
message_t (const message_t&);
void operator = (const message_t&);
};
class context_t
{
friend class socket_t;
public:
inline context_t (int io_threads_)
{
ptr = zmq_init (io_threads_);
if (ptr == NULL)
throw error_t ();
}
inline ~context_t ()
{
int rc = zmq_term (ptr);
assert (rc == 0);
}
// Be careful with this, it's probably only useful for
// using the C api together with an existing C++ api.
// Normally you should never need to use this.
inline operator void* ()
{
return ptr;
}
private:
void *ptr;
context_t (const context_t&);
void operator = (const context_t&);
};
class socket_t
{
public:
inline socket_t (context_t &context_, int type_)
{
ptr = zmq_socket (context_.ptr, type_);
if (ptr == NULL)
throw error_t ();
}
inline ~socket_t ()
{
close();
}
inline operator void* ()
{
return ptr;
}
inline void close()
{
if(ptr == NULL)
// already closed
return ;
int rc = zmq_close (ptr);
if (rc != 0)
throw error_t ();
ptr = 0 ;
}
inline void setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
int rc = zmq_setsockopt (ptr, option_, optval_, optvallen_);
if (rc != 0)
throw error_t ();
}
inline void getsockopt (int option_, void *optval_,
size_t *optvallen_)
{
int rc = zmq_getsockopt (ptr, option_, optval_, optvallen_);
if (rc != 0)
throw error_t ();
}
inline void bind (const char *addr_)
{
int rc = zmq_bind (ptr, addr_);
if (rc != 0)
throw error_t ();
}
inline void connect (const char *addr_)
{
int rc = zmq_connect (ptr, addr_);
if (rc != 0)
throw error_t ();
}
inline bool send (message_t &msg_, int flags_ = 0)
{
int rc = zmq_send (ptr, &msg_, flags_);
if (rc == 0)
return true;
if (rc == -1 && zmq_errno () == EAGAIN)
return false;
throw error_t ();
}
inline bool recv (message_t *msg_, int flags_ = 0)
{
int rc = zmq_recv (ptr, msg_, flags_);
if (rc == 0)
return true;
if (rc == -1 && zmq_errno () == EAGAIN)
return false;
throw error_t ();
}
private:
void *ptr;
socket_t (const socket_t&);
void operator = (const socket_t&);
};
}
#endif

View File

@ -16,6 +16,7 @@ libzmq_la_SOURCES = \
config.hpp \
ctx.hpp \
decoder.hpp \
device.hpp \
devpoll.hpp \
dist.hpp \
encoder.hpp \
@ -81,6 +82,7 @@ libzmq_la_SOURCES = \
clock.cpp \
ctx.cpp \
decoder.cpp \
device.cpp \
devpoll.cpp \
dist.cpp \
encoder.cpp \

120
src/device.cpp Normal file
View File

@ -0,0 +1,120 @@
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stddef.h>
#include "../include/zmq.h"
#include "device.hpp"
#include "socket_base.hpp"
#include "likely.hpp"
#include "err.hpp"
int zmq::device (class socket_base_t *insocket_,
class socket_base_t *outsocket_)
{
msg_t msg;
int rc = msg.init ();
if (rc != 0) {
return -1;
}
int64_t more;
size_t moresz;
zmq_pollitem_t items [2];
items [0].socket = insocket_;
items [0].fd = 0;
items [0].events = ZMQ_POLLIN;
items [0].revents = 0;
items [1].socket = outsocket_;
items [1].fd = 0;
items [1].events = ZMQ_POLLIN;
items [1].revents = 0;
while (true) {
// Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], 2, -1);
if (unlikely (rc < 0)) {
return -1;
}
// The algorithm below asumes ratio of request and replies processed
// under full load to be 1:1. Although processing requests replies
// first is tempting it is suspectible to DoS attacks (overloading
// the system with unsolicited replies).
// Process a request.
if (items [0].revents & ZMQ_POLLIN) {
while (true) {
rc = insocket_->recv (&msg, 0);
if (unlikely (rc < 0)) {
return -1;
}
moresz = sizeof (more);
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) {
return -1;
}
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0)) {
return -1;
}
if (!more)
break;
}
}
// Process a reply.
if (items [1].revents & ZMQ_POLLIN) {
while (true) {
rc = outsocket_->recv (&msg, 0);
if (unlikely (rc < 0)) {
return -1;
}
moresz = sizeof (more);
rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0)) {
return -1;
}
rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0)) {
return -1;
}
if (!more)
break;
}
}
}
return 0;
}

32
src/device.hpp Normal file
View File

@ -0,0 +1,32 @@
/*
Copyright (c) 2007-2011 iMatix Corporation
Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __ZMQ_DEVICE_HPP_INCLUDED__
#define __ZMQ_DEVICE_HPP_INCLUDED__
namespace zmq
{
int device (class socket_base_t *insocket_,
class socket_base_t *outsocket_);
}
#endif

View File

@ -68,6 +68,7 @@ struct iovec {
#include <stdlib.h>
#include <new>
#include "device.hpp"
#include "socket_base.hpp"
#include "stdint.hpp"
#include "config.hpp"
@ -910,3 +911,49 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#if defined ZMQ_POLL_BASED_ON_POLL
#undef ZMQ_POLL_BASED_ON_POLL
#endif
int zmq_device (int device_, void *insocket_, void *outsocket_)
{
if (!insocket_ || !outsocket_) {
errno = EFAULT;
return -1;
}
if (device_ != ZMQ_FORWARDER && device_ != ZMQ_QUEUE &&
device_ != ZMQ_STREAMER) {
errno = EINVAL;
return -1;
}
return zmq::device ((zmq::socket_base_t*) insocket_,
(zmq::socket_base_t*) outsocket_);
}
////////////////////////////////////////////////////////////////////////////////
// 0MQ utils - to be used by perf tests
////////////////////////////////////////////////////////////////////////////////
void zmq_sleep (int seconds_)
{
#if defined ZMQ_HAVE_WINDOWS
Sleep (seconds_ * 1000);
#else
sleep (seconds_);
#endif
}
void *zmq_stopwatch_start ()
{
uint64_t *watch = (uint64_t*) malloc (sizeof (uint64_t));
alloc_assert (watch);
*watch = zmq::clock_t::now_us ();
return (void*) watch;
}
unsigned long zmq_stopwatch_stop (void *watch_)
{
uint64_t end = zmq::clock_t::now_us ();
uint64_t start = *(uint64_t*) watch_;
free (watch_);
return (unsigned long) (end - start);
}