mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-13 10:52:56 +01:00
Merge pull request #3823 from somdoron/ZMQ_PEER
problem: zeromq doesn't has a thread-safe peer to peer socket
This commit is contained in:
commit
52044b38f7
4
.gitignore
vendored
4
.gitignore
vendored
@ -156,12 +156,14 @@ test_ws_transport
|
||||
test_wss_transport
|
||||
test_socks
|
||||
test_xpub_manual_last_value
|
||||
test_peer
|
||||
unittest_ip_resolver
|
||||
unittest_mtrie
|
||||
unittest_poller
|
||||
unittest_radix_tree
|
||||
unittest_udp_address
|
||||
unittest_ypipe
|
||||
unittests/unittest_curve_encoding
|
||||
tests/test*.log
|
||||
tests/test*.trs
|
||||
unittests/unittest*.log
|
||||
@ -205,4 +207,4 @@ core
|
||||
build
|
||||
test-suite.log
|
||||
.idea/
|
||||
cmake-build-debug/
|
||||
cmake-build-debug/
|
||||
|
@ -822,6 +822,7 @@ set(cxx-sources
|
||||
own.cpp
|
||||
null_mechanism.cpp
|
||||
pair.cpp
|
||||
peer.cpp
|
||||
pgm_receiver.cpp
|
||||
pgm_sender.cpp
|
||||
pgm_socket.cpp
|
||||
@ -948,6 +949,7 @@ set(cxx-sources
|
||||
options.hpp
|
||||
own.hpp
|
||||
pair.hpp
|
||||
peer.hpp
|
||||
pgm_receiver.hpp
|
||||
pgm_sender.hpp
|
||||
pgm_socket.hpp
|
||||
|
@ -126,6 +126,8 @@ src_libzmq_la_SOURCES = \
|
||||
src/own.hpp \
|
||||
src/pair.cpp \
|
||||
src/pair.hpp \
|
||||
src/peer.cpp \
|
||||
src/peer.hpp \
|
||||
src/pgm_receiver.cpp \
|
||||
src/pgm_receiver.hpp \
|
||||
src/pgm_sender.cpp \
|
||||
@ -1036,7 +1038,8 @@ test_apps += tests/test_poller \
|
||||
tests/test_dgram \
|
||||
tests/test_app_meta \
|
||||
tests/test_xpub_manual_last_value \
|
||||
tests/test_router_notify
|
||||
tests/test_router_notify \
|
||||
tests/test_peer
|
||||
|
||||
tests_test_poller_SOURCES = tests/test_poller.cpp
|
||||
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
@ -1077,6 +1080,10 @@ tests_test_app_meta_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
tests_test_router_notify_SOURCES = tests/test_router_notify.cpp
|
||||
tests_test_router_notify_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
tests_test_router_notify_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
|
||||
tests_test_peer_SOURCES = tests/test_peer.cpp
|
||||
tests_test_peer_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
tests_test_peer_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
endif
|
||||
|
||||
if ENABLE_STATIC
|
||||
|
@ -1,7 +1,7 @@
|
||||
#
|
||||
# documentation
|
||||
#
|
||||
MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_disconnect.3 zmq_close.3 \
|
||||
MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_connect_peer.3 zmq_disconnect.3 zmq_close.3 \
|
||||
zmq_ctx_new.3 zmq_ctx_term.3 zmq_ctx_get.3 zmq_ctx_set.3 zmq_ctx_shutdown.3 \
|
||||
zmq_msg_init.3 zmq_msg_init_data.3 zmq_msg_init_size.3 \
|
||||
zmq_msg_move.3 zmq_msg_copy.3 zmq_msg_size.3 zmq_msg_data.3 zmq_msg_close.3 \
|
||||
|
92
doc/zmq_connect_peer.txt
Normal file
92
doc/zmq_connect_peer.txt
Normal file
@ -0,0 +1,92 @@
|
||||
zmq_connect_peer(3)
|
||||
===================
|
||||
|
||||
|
||||
NAME
|
||||
----
|
||||
zmq_connect_peer - create outgoing connection from socket and return the connection routing id in thread-safe and atomic way.
|
||||
|
||||
|
||||
SYNOPSIS
|
||||
--------
|
||||
*uint32_t zmq_connect_peer (void '*socket', const char '*endpoint');*
|
||||
|
||||
|
||||
DESCRIPTION
|
||||
-----------
|
||||
The _zmq_connect_peer()_ function connects a 'ZMQ_PEER' socket to an 'endpoint' and then returns the endpoint 'routing_id'.
|
||||
|
||||
The 'endpoint' is a string consisting of a 'transport'`://` followed by an
|
||||
'address'. The 'transport' specifies the underlying protocol to use. The
|
||||
'address' specifies the transport-specific address to connect to.
|
||||
|
||||
The function is supported only on the 'ZMQ_PEER' socket type and would return `0` with 'errno' set to 'ENOTSUP' otherwise.
|
||||
|
||||
The _zmq_connect_peer()_ support the following transports:
|
||||
|
||||
'tcp':: unicast transport using TCP, see linkzmq:zmq_tcp[7]
|
||||
'ipc':: local inter-process communication transport, see linkzmq:zmq_ipc[7]
|
||||
'inproc':: local in-process (inter-thread) communication transport, see linkzmq:zmq_inproc[7]
|
||||
'ws':: unicast transport using WebSockets, see linkzmq:zmq_ws[7]
|
||||
'wss':: unicast transport using WebSockets over TLS, see linkzmq:zmq_wss[7]
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
The _zmq_connect_peer()_ function returns the peer 'routing_id' if successful. Otherwise it returns
|
||||
`0` and sets 'errno' to one of the values defined below.
|
||||
|
||||
|
||||
ERRORS
|
||||
------
|
||||
*EINVAL*::
|
||||
The endpoint supplied is invalid.
|
||||
*EPROTONOSUPPORT*::
|
||||
The requested 'transport' protocol is not supported with 'ZMQ_PEER'.
|
||||
*ENOCOMPATPROTO*::
|
||||
The requested 'transport' protocol is not compatible with the socket type.
|
||||
*ETERM*::
|
||||
The 0MQ 'context' associated with the specified 'socket' was terminated.
|
||||
*ENOTSOCK*::
|
||||
The provided 'socket' was invalid.
|
||||
*EMTHREAD*::
|
||||
No I/O thread is available to accomplish the task.
|
||||
*ENOTSUP*::
|
||||
The socket is not of type 'ZMQ_PEER'.
|
||||
*EFAULT*::
|
||||
The 'ZMQ_IMMEDIATE' option is set on the socket.
|
||||
|
||||
EXAMPLE
|
||||
-------
|
||||
.Connecting a peer socket to a TCP transport and sending a message
|
||||
----
|
||||
/* Create a ZMQ_SUB socket */
|
||||
void *socket = zmq_socket (context, ZMQ_PEER);
|
||||
assert (socket);
|
||||
/* Connect it to the host server001, port 5555 using a TCP transport */
|
||||
uint32_t routing_id = zmq_connect (socket, "tcp://server001:5555");
|
||||
assert (routing_id == 0);
|
||||
/* Sending a message to the peer */
|
||||
zmq_msg_t msg;
|
||||
int rc = zmq_msg_init_data (&msg, "HELLO", 5, NULL, NULL);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_set_routing_id (&msg, routing_id);
|
||||
assert (rc == 0);
|
||||
rc = zmq_msg_send (&msg, socket, 0);
|
||||
assert (rc == 5);
|
||||
rc = zmq_msg_close (&msg);
|
||||
assert (rc == 0);
|
||||
----
|
||||
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
linkzmq:zmq_connect[3]
|
||||
linkzmq:zmq_bind[3]
|
||||
linkzmq:zmq_socket[3]
|
||||
linkzmq:zmq[7]
|
||||
|
||||
|
||||
AUTHORS
|
||||
-------
|
||||
This page was written by the 0MQ community. To make a change please
|
||||
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
|
@ -59,6 +59,7 @@ Following are the thread safe sockets:
|
||||
* ZMQ_RADIO
|
||||
* ZMQ_SCATTER
|
||||
* ZMQ_GATHER
|
||||
* ZMQ_PEER
|
||||
|
||||
.Socket types
|
||||
The following sections present the socket types defined by 0MQ, grouped by the
|
||||
@ -434,6 +435,48 @@ Outgoing routing strategy:: N/A
|
||||
Action in mute state:: Block
|
||||
|
||||
|
||||
Peer-to-peer pattern
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
The peer-to-peer pattern is used to connect a peer to multiple peers.
|
||||
Peer can both connect and bind and mix both of them with the same socket.
|
||||
The peer-to-peer pattern is useful to build peer-to-peer networks (e.g zyre, bitcoin, torrent)
|
||||
where a peer can both accept connections from other peers or connect to them.
|
||||
|
||||
NOTE: Peer-to-peer is still in draft phase.
|
||||
|
||||
ZMQ_PEER
|
||||
^^^^^^^^
|
||||
A 'ZMQ_PEER' socket talks to a set of 'ZMQ_PEER' sockets.
|
||||
|
||||
To connect and fetch the 'routing_id' of the peer use linkzmq:zmq_connect_peer[3].
|
||||
|
||||
Each received message has a 'routing_id' that is a 32-bit unsigned integer.
|
||||
The application can fetch this with linkzmq:zmq_msg_routing_id[3].
|
||||
|
||||
To send a message to a given 'ZMQ_PEER' peer the application must set the peer's
|
||||
'routing_id' on the message, using linkzmq:zmq_msg_set_routing_id[3].
|
||||
|
||||
If the 'routing_id' is not specified, or does not refer to a connected client
|
||||
peer, the send call will fail with EHOSTUNREACH. If the outgoing buffer for
|
||||
the peer is full, the send call shall block, unless ZMQ_DONTWAIT is
|
||||
used in the send, in which case it shall fail with EAGAIN. The 'ZMQ_PEER'
|
||||
socket shall not drop messages in any case.
|
||||
|
||||
NOTE: 'ZMQ_PEER' sockets are threadsafe. They do not accept the ZMQ_SNDMORE
|
||||
option on sends not ZMQ_RCVMORE on receives. This limits them to single part
|
||||
data.
|
||||
|
||||
[horizontal]
|
||||
.Summary of ZMQ_PEER characteristics
|
||||
Compatible peer sockets:: 'ZMQ_PEER'
|
||||
Direction:: Bidirectional
|
||||
Send/receive pattern:: Unrestricted
|
||||
Outgoing routing strategy:: See text
|
||||
Incoming routing strategy:: Fair-queued
|
||||
Action in mute state:: Return EAGAIN
|
||||
|
||||
|
||||
Native Pattern
|
||||
~~~~~~~~~~~~~~
|
||||
The native pattern is used for communicating with TCP peers and allows
|
||||
|
@ -263,7 +263,7 @@ typedef struct zmq_msg_t
|
||||
#endif
|
||||
} zmq_msg_t;
|
||||
|
||||
typedef void(zmq_free_fn) (void *data_, void *hint_);
|
||||
typedef void (zmq_free_fn) (void *data_, void *hint_);
|
||||
|
||||
ZMQ_EXPORT int zmq_msg_init (zmq_msg_t *msg_);
|
||||
ZMQ_EXPORT int zmq_msg_init_size (zmq_msg_t *msg_, size_t size_);
|
||||
@ -597,7 +597,7 @@ ZMQ_EXPORT void zmq_atomic_counter_destroy (void **counter_p_);
|
||||
|
||||
#define ZMQ_HAVE_TIMERS
|
||||
|
||||
typedef void(zmq_timer_fn) (int timer_id, void *arg);
|
||||
typedef void (zmq_timer_fn) (int timer_id, void *arg);
|
||||
|
||||
ZMQ_EXPORT void *zmq_timers_new (void);
|
||||
ZMQ_EXPORT int zmq_timers_destroy (void **timers_p);
|
||||
@ -634,7 +634,7 @@ ZMQ_EXPORT unsigned long zmq_stopwatch_stop (void *watch_);
|
||||
/* Sleeps for specified number of seconds. */
|
||||
ZMQ_EXPORT void zmq_sleep (int seconds_);
|
||||
|
||||
typedef void(zmq_thread_fn) (void *);
|
||||
typedef void (zmq_thread_fn) (void *);
|
||||
|
||||
/* Start a thread. Returns a handle to the thread. */
|
||||
ZMQ_EXPORT void *zmq_threadstart (zmq_thread_fn *func_, void *arg_);
|
||||
@ -658,6 +658,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
|
||||
#define ZMQ_GATHER 16
|
||||
#define ZMQ_SCATTER 17
|
||||
#define ZMQ_DGRAM 18
|
||||
#define ZMQ_PEER 19
|
||||
|
||||
/* DRAFT Socket options. */
|
||||
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
|
||||
@ -694,6 +695,7 @@ ZMQ_EXPORT int zmq_ctx_get_ext (void *context_,
|
||||
/* DRAFT Socket methods. */
|
||||
ZMQ_EXPORT int zmq_join (void *s, const char *group);
|
||||
ZMQ_EXPORT int zmq_leave (void *s, const char *group);
|
||||
ZMQ_EXPORT uint32_t zmq_connect_peer (void *s_, const char *addr_);
|
||||
|
||||
/* DRAFT Msg methods. */
|
||||
ZMQ_EXPORT int zmq_msg_set_routing_id (zmq_msg_t *msg, uint32_t routing_id);
|
||||
|
@ -93,6 +93,7 @@ const char socket_type_dish[] = "DISH";
|
||||
const char socket_type_gather[] = "GATHER";
|
||||
const char socket_type_scatter[] = "SCATTER";
|
||||
const char socket_type_dgram[] = "DGRAM";
|
||||
const char socket_type_peer[] = "PEER";
|
||||
#endif
|
||||
|
||||
const char *zmq::mechanism_t::socket_type_string (int socket_type_)
|
||||
@ -106,7 +107,7 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type_)
|
||||
#ifdef ZMQ_BUILD_DRAFT_API
|
||||
socket_type_server, socket_type_client, socket_type_radio,
|
||||
socket_type_dish, socket_type_gather, socket_type_scatter,
|
||||
socket_type_dgram
|
||||
socket_type_dgram, socket_type_peer
|
||||
#endif
|
||||
};
|
||||
static const size_t names_count = sizeof (names) / sizeof (names[0]);
|
||||
@ -353,6 +354,8 @@ bool zmq::mechanism_t::check_socket_type (const char *type_,
|
||||
return strequals (type_, len_, socket_type_gather);
|
||||
case ZMQ_DGRAM:
|
||||
return strequals (type_, len_, socket_type_dgram);
|
||||
case ZMQ_PEER:
|
||||
return strequals (type_, len_, socket_type_peer);
|
||||
#endif
|
||||
default:
|
||||
break;
|
||||
|
68
src/peer.cpp
Normal file
68
src/peer.cpp
Normal file
@ -0,0 +1,68 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "precompiled.hpp"
|
||||
#include "macros.hpp"
|
||||
#include "peer.hpp"
|
||||
#include "pipe.hpp"
|
||||
#include "wire.hpp"
|
||||
#include "random.hpp"
|
||||
#include "likely.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
zmq::peer_t::peer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||
server_t (parent_, tid_, sid_)
|
||||
{
|
||||
options.type = ZMQ_PEER;
|
||||
}
|
||||
|
||||
uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_)
|
||||
{
|
||||
scoped_optional_lock_t sync_lock (&_sync);
|
||||
|
||||
// connect_peer cannot work with immediate enabled
|
||||
if (options.immediate == 1) {
|
||||
errno = EFAULT;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int rc = socket_base_t::connect_internal (endpoint_uri_);
|
||||
if (rc != 0)
|
||||
return 0;
|
||||
|
||||
return _peer_last_routing_id;
|
||||
}
|
||||
|
||||
void zmq::peer_t::xattach_pipe (pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_)
|
||||
{
|
||||
server_t::xattach_pipe (pipe_, subscribe_to_all_, locally_initiated_);
|
||||
_peer_last_routing_id = pipe_->get_server_socket_routing_id ();
|
||||
}
|
67
src/peer.hpp
Normal file
67
src/peer.hpp
Normal file
@ -0,0 +1,67 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef __ZMQ_PEER_HPP_INCLUDED__
|
||||
#define __ZMQ_PEER_HPP_INCLUDED__
|
||||
|
||||
#include <map>
|
||||
|
||||
#include "socket_base.hpp"
|
||||
#include "server.hpp"
|
||||
#include "session_base.hpp"
|
||||
#include "stdint.hpp"
|
||||
#include "blob.hpp"
|
||||
#include "fq.hpp"
|
||||
|
||||
namespace zmq
|
||||
{
|
||||
class ctx_t;
|
||||
class msg_t;
|
||||
class pipe_t;
|
||||
|
||||
class peer_t ZMQ_FINAL : public server_t
|
||||
{
|
||||
public:
|
||||
peer_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
|
||||
// Overrides of functions from socket_base_t.
|
||||
void xattach_pipe (zmq::pipe_t *pipe_,
|
||||
bool subscribe_to_all_,
|
||||
bool locally_initiated_);
|
||||
|
||||
uint32_t connect_peer (const char *endpoint_uri_);
|
||||
|
||||
private:
|
||||
uint32_t _peer_last_routing_id;
|
||||
|
||||
ZMQ_NON_COPYABLE_NOR_MOVABLE (peer_t)
|
||||
};
|
||||
}
|
||||
|
||||
#endif
|
@ -45,7 +45,7 @@ class msg_t;
|
||||
class pipe_t;
|
||||
|
||||
// TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
|
||||
class server_t ZMQ_FINAL : public socket_base_t
|
||||
class server_t : public socket_base_t
|
||||
{
|
||||
public:
|
||||
server_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_);
|
||||
|
@ -87,6 +87,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
|
||||
case ZMQ_GATHER:
|
||||
case ZMQ_SCATTER:
|
||||
case ZMQ_DGRAM:
|
||||
case ZMQ_PEER:
|
||||
s = new (std::nothrow)
|
||||
session_base_t (io_thread_, active_, socket_, options_, addr_);
|
||||
break;
|
||||
|
@ -100,6 +100,7 @@
|
||||
#include "gather.hpp"
|
||||
#include "scatter.hpp"
|
||||
#include "dgram.hpp"
|
||||
#include "peer.hpp"
|
||||
|
||||
void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_,
|
||||
pipe_t *pipe_)
|
||||
@ -207,6 +208,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_,
|
||||
case ZMQ_DGRAM:
|
||||
s = new (std::nothrow) dgram_t (parent_, tid_, sid_);
|
||||
break;
|
||||
case ZMQ_PEER:
|
||||
s = new (std::nothrow) peer_t (parent_, tid_, sid_);
|
||||
break;
|
||||
default:
|
||||
errno = EINVAL;
|
||||
return NULL;
|
||||
@ -228,6 +232,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_,
|
||||
int sid_,
|
||||
bool thread_safe_) :
|
||||
own_t (parent_, tid_),
|
||||
_sync (),
|
||||
_tag (0xbaddecaf),
|
||||
_ctx_terminated (false),
|
||||
_destroyed (false),
|
||||
@ -240,7 +245,6 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_,
|
||||
_monitor_events (0),
|
||||
_thread_safe (thread_safe_),
|
||||
_reaper_signaler (NULL),
|
||||
_sync (),
|
||||
_monitor_sync ()
|
||||
{
|
||||
options.socket_id = sid_;
|
||||
@ -740,7 +744,11 @@ int zmq::socket_base_t::bind (const char *endpoint_uri_)
|
||||
int zmq::socket_base_t::connect (const char *endpoint_uri_)
|
||||
{
|
||||
scoped_optional_lock_t sync_lock (_thread_safe ? &_sync : NULL);
|
||||
return connect_internal (endpoint_uri_);
|
||||
}
|
||||
|
||||
int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
|
||||
{
|
||||
if (unlikely (_ctx_terminated)) {
|
||||
errno = ETERM;
|
||||
return -1;
|
||||
|
@ -205,6 +205,11 @@ class socket_base_t : public own_t,
|
||||
// Delay actual destruction of the socket.
|
||||
void process_destroy () ZMQ_FINAL;
|
||||
|
||||
int connect_internal (const char *endpoint_uri_);
|
||||
|
||||
// Mutex for synchronize access to the socket in thread safe mode
|
||||
mutex_t _sync;
|
||||
|
||||
private:
|
||||
// test if event should be sent and then dispatch it
|
||||
void event (const endpoint_uri_pair_t &endpoint_uri_pair_,
|
||||
@ -336,9 +341,6 @@ class socket_base_t : public own_t,
|
||||
// Signaler to be used in the reaping stage
|
||||
signaler_t *_reaper_signaler;
|
||||
|
||||
// Mutex for synchronize access to the socket in thread safe mode
|
||||
mutex_t _sync;
|
||||
|
||||
// Mutex to synchronize access to the monitor Pair socket
|
||||
mutex_t _monitor_sync;
|
||||
|
||||
|
23
src/zmq.cpp
23
src/zmq.cpp
@ -40,6 +40,7 @@
|
||||
|
||||
#include "macros.hpp"
|
||||
#include "poller.hpp"
|
||||
#include "peer.hpp"
|
||||
|
||||
#if !defined ZMQ_HAVE_POLLER
|
||||
// On AIX platform, poll.h has to be included first to get consistent
|
||||
@ -336,6 +337,28 @@ int zmq_connect (void *s_, const char *addr_)
|
||||
return s->connect (addr_);
|
||||
}
|
||||
|
||||
uint32_t zmq_connect_peer (void *s_, const char *addr_)
|
||||
{
|
||||
zmq::peer_t *s = static_cast<zmq::peer_t *> (s_);
|
||||
if (!s_ || !s->check_tag ()) {
|
||||
errno = ENOTSOCK;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int socket_type;
|
||||
size_t socket_type_size = sizeof (socket_type);
|
||||
if (s->getsockopt (ZMQ_TYPE, &socket_type, &socket_type_size) != 0)
|
||||
return 0;
|
||||
|
||||
if (socket_type != ZMQ_PEER) {
|
||||
errno = ENOTSUP;
|
||||
return 0;
|
||||
}
|
||||
|
||||
return s->connect_peer (addr_);
|
||||
}
|
||||
|
||||
|
||||
int zmq_unbind (void *s_, const char *addr_)
|
||||
{
|
||||
zmq::socket_base_t *s = as_socket_base_t (s_);
|
||||
|
@ -45,6 +45,7 @@
|
||||
#define ZMQ_GATHER 16
|
||||
#define ZMQ_SCATTER 17
|
||||
#define ZMQ_DGRAM 18
|
||||
#define ZMQ_PEER 19
|
||||
|
||||
/* DRAFT Socket options. */
|
||||
#define ZMQ_ZAP_ENFORCE_DOMAIN 93
|
||||
|
@ -162,6 +162,7 @@ if(ENABLE_DRAFTS)
|
||||
test_app_meta
|
||||
test_router_notify
|
||||
test_xpub_manual_last_value
|
||||
test_peer
|
||||
)
|
||||
endif()
|
||||
|
||||
|
113
tests/test_peer.cpp
Normal file
113
tests/test_peer.cpp
Normal file
@ -0,0 +1,113 @@
|
||||
/*
|
||||
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "testutil.hpp"
|
||||
#include "testutil_unity.hpp"
|
||||
|
||||
SETUP_TEARDOWN_TESTCONTEXT
|
||||
|
||||
void test_peer ()
|
||||
{
|
||||
size_t len = MAX_SOCKET_STRING;
|
||||
char my_endpoint[MAX_SOCKET_STRING];
|
||||
|
||||
void *peer1 = test_context_socket (ZMQ_PEER);
|
||||
bind_loopback (peer1, false, my_endpoint, len);
|
||||
|
||||
void *peer2 = test_context_socket (ZMQ_PEER);
|
||||
uint32_t peer1_routing_id = zmq_connect_peer (peer2, my_endpoint);
|
||||
TEST_ASSERT_NOT_EQUAL (0, peer1_routing_id);
|
||||
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 1));
|
||||
|
||||
char *data = static_cast<char *> (zmq_msg_data (&msg));
|
||||
data[0] = 1;
|
||||
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
zmq_msg_set_routing_id (&msg, peer1_routing_id));
|
||||
|
||||
int rc = zmq_msg_send (&msg, peer2, 0);
|
||||
TEST_ASSERT_EQUAL_INT (1, rc);
|
||||
}
|
||||
|
||||
uint32_t peer2_routing_id;
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
|
||||
|
||||
int rc = TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&msg, peer1, 0));
|
||||
TEST_ASSERT_EQUAL_INT (1, rc);
|
||||
|
||||
peer2_routing_id = zmq_msg_routing_id (&msg);
|
||||
TEST_ASSERT_NOT_EQUAL (0, peer2_routing_id);
|
||||
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
|
||||
}
|
||||
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init_size (&msg, 1));
|
||||
|
||||
char *data = static_cast<char *> (zmq_msg_data (&msg));
|
||||
data[0] = 2;
|
||||
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
zmq_msg_set_routing_id (&msg, peer2_routing_id));
|
||||
|
||||
int rc = zmq_msg_send (&msg, peer1, 0);
|
||||
TEST_ASSERT_EQUAL_INT (1, rc);
|
||||
}
|
||||
|
||||
{
|
||||
zmq_msg_t msg;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&msg));
|
||||
|
||||
int rc = zmq_msg_recv (&msg, peer2, 0);
|
||||
TEST_ASSERT_EQUAL_INT (1, rc);
|
||||
|
||||
uint32_t routing_id = zmq_msg_routing_id (&msg);
|
||||
TEST_ASSERT_EQUAL_UINT32 (peer1_routing_id, routing_id);
|
||||
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
|
||||
}
|
||||
|
||||
test_context_socket_close (peer1);
|
||||
test_context_socket_close (peer2);
|
||||
}
|
||||
|
||||
int main (void)
|
||||
{
|
||||
setup_test_environment ();
|
||||
|
||||
UNITY_BEGIN ();
|
||||
RUN_TEST (test_peer);
|
||||
return UNITY_END ();
|
||||
}
|
Loading…
Reference in New Issue
Block a user