diff --git a/CMakeLists.txt b/CMakeLists.txt index dbfe5788..972b5f31 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -844,6 +844,7 @@ endif() set(cxx-sources precompiled.cpp address.cpp + channel.cpp client.cpp clock.cpp ctx.cpp @@ -947,6 +948,7 @@ set(cxx-sources atomic_counter.hpp atomic_ptr.hpp blob.hpp + channel.hpp client.hpp clock.hpp command.hpp diff --git a/Makefile.am b/Makefile.am index 9f3ccc30..4d1a7819 100755 --- a/Makefile.am +++ b/Makefile.am @@ -26,6 +26,8 @@ src_libzmq_la_SOURCES = \ src/atomic_counter.hpp \ src/atomic_ptr.hpp \ src/blob.hpp \ + src/channel.cpp \ + src/channel.hpp \ src/client.cpp \ src/client.hpp \ src/clock.cpp \ @@ -1047,7 +1049,8 @@ test_apps += tests/test_poller \ tests/test_reconnect_options \ tests/test_msg_init \ tests/test_hello_msg \ - tests/test_disconnect_msg + tests/test_disconnect_msg \ + tests/test_channel tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la @@ -1108,6 +1111,10 @@ tests_test_hello_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_disconnect_msg_SOURCES = tests/test_disconnect_msg.cpp tests_test_disconnect_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_disconnect_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +tests_test_channel_SOURCES = tests/test_channel.cpp +tests_test_channel_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_channel_CPPFLAGS = ${TESTUTIL_CPPFLAGS} endif if FUZZING_ENGINE_LIB diff --git a/doc/zmq_bind.txt b/doc/zmq_bind.txt index 6e3f4cc1..6305cfc6 100644 --- a/doc/zmq_bind.txt +++ b/doc/zmq_bind.txt @@ -30,7 +30,7 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an 'vmci':: virtual machine communications interface (VMCI), see linkzmq:zmq_vmci[7] 'udp':: unreliable unicast and multicast using UDP, see linkzmq:zmq_udp[7] -Every 0MQ socket type except 'ZMQ_PAIR' supports one-to-many and many-to-one +Every 0MQ socket type except 'ZMQ_PAIR' and 'ZMQ_CHANNEL' supports one-to-many and many-to-one semantics. The precise semantics depend on the socket type and are defined in linkzmq:zmq_socket[3]. diff --git a/doc/zmq_connect.txt b/doc/zmq_connect.txt index bf21736d..b06e8261 100644 --- a/doc/zmq_connect.txt +++ b/doc/zmq_connect.txt @@ -30,7 +30,7 @@ The 'endpoint' is a string consisting of a 'transport'`://` followed by an 'vmci':: virtual machine communications interface (VMCI), see linkzmq:zmq_vmci[7] 'udp':: unreliable unicast and multicast using UDP, see linkzmq:zmq_udp[7] -Every 0MQ socket type except 'ZMQ_PAIR' supports one-to-many and many-to-one +Every 0MQ socket type except 'ZMQ_PAIR' and 'ZMQ_CHANNEL' supports one-to-many and many-to-one semantics. The precise semantics depend on the socket type and are defined in linkzmq:zmq_socket[3]. @@ -39,7 +39,7 @@ immediately but as needed by 0MQ. Thus a successful call to _zmq_connect()_ does not mean that the connection was or could actually be established. Because of this, for most transports and socket types the order in which a 'server' socket is bound and a 'client' socket is connected to it does not -matter. The _ZMQ_PAIR_ sockets are an exception, as they do not automatically +matter. The _ZMQ_PAIR_ and _ZMQ_CHANNEL_ sockets are an exception, as they do not automatically reconnect to endpoints. NOTE: following a _zmq_connect()_, for socket types except for ZMQ_ROUTER, diff --git a/doc/zmq_socket.txt b/doc/zmq_socket.txt index af247e92..6f53186c 100644 --- a/doc/zmq_socket.txt +++ b/doc/zmq_socket.txt @@ -41,7 +41,7 @@ the event that a peer is unavailable to receive them. Conventional sockets allow only strict one-to-one (two peers), many-to-one (many clients, one server), or in some cases one-to-many (multicast) -relationships. With the exception of 'ZMQ_PAIR', 0MQ sockets may be connected +relationships. With the exception of 'ZMQ_PAIR' and 'ZMQ_CHANNEL', 0MQ sockets may be connected *to multiple endpoints* using _zmq_connect()_, while simultaneously accepting incoming connections *from multiple endpoints* bound to the socket using _zmq_bind()_, thus allowing many-to-many relationships. @@ -60,6 +60,7 @@ Following are the thread safe sockets: * ZMQ_SCATTER * ZMQ_GATHER * ZMQ_PEER +* ZMQ_CHANNEL .Socket types The following sections present the socket types defined by 0MQ, grouped by the @@ -476,6 +477,47 @@ Outgoing routing strategy:: See text Incoming routing strategy:: Fair-queued Action in mute state:: Return EAGAIN +Channel pattern +~~~~~~~~~~~~~~~~~~~~~~ +The channel pattern is the thread-safe version of the exclusive pair pattern. +The channel pattern is used to connect a peer to precisely one other +peer. This pattern is used for inter-thread communication across the inproc +transport. + +NOTE: Channel is still in draft phase. + +ZMQ_CHANNEL +^^^^^^^^ +A socket of type 'ZMQ_CHANNEL' can only be connected to a single peer at any one +time. No message routing or filtering is performed on messages sent over a +'ZMQ_CHANNEL' socket. + +When a 'ZMQ_CHANNEL' socket enters the 'mute' state due to having reached the +high water mark for the connected peer, or, for connection-oriented transports, +if the ZMQ_IMMEDIATE option is set and there is no connected peer, then +any linkzmq:zmq_send[3] operations on the socket shall block until the peer +becomes available for sending; messages are not discarded. + +While 'ZMQ_CHANNEL' sockets can be used over transports other than linkzmq:zmq_inproc[7], +their inability to auto-reconnect coupled with the fact new incoming connections will +be terminated while any previous connections (including ones in a closing state) +exist makes them unsuitable for TCP in most cases. + +NOTE: 'ZMQ_CHANNEL' sockets are designed for inter-thread communication across +the linkzmq:zmq_inproc[7] transport and do not implement functionality such +as auto-reconnection. + +NOTE: 'ZMQ_CHANNEL' sockets are threadsafe. They do not accept ZMQ_RCVMORE on receives. +This limits them to single part data. + +[horizontal] +.Summary of ZMQ_CHANNEL characteristics +Compatible peer sockets:: 'ZMQ_CHANNEL' +Direction:: Bidirectional +Send/receive pattern:: Unrestricted +Incoming routing strategy:: N/A +Outgoing routing strategy:: N/A +Action in mute state:: Block Native Pattern ~~~~~~~~~~~~~~ diff --git a/include/zmq.h b/include/zmq.h index 2935822d..df403c8f 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -659,6 +659,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); #define ZMQ_SCATTER 17 #define ZMQ_DGRAM 18 #define ZMQ_PEER 19 +#define ZMQ_CHANNEL 20 /* DRAFT Socket options. */ #define ZMQ_ZAP_ENFORCE_DOMAIN 93 diff --git a/src/channel.cpp b/src/channel.cpp new file mode 100644 index 00000000..6a396228 --- /dev/null +++ b/src/channel.cpp @@ -0,0 +1,160 @@ +/* + Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "macros.hpp" +#include "channel.hpp" +#include "err.hpp" +#include "pipe.hpp" +#include "msg.hpp" + +zmq::channel_t::channel_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_, true), + _pipe (NULL) +{ + options.type = ZMQ_CHANNEL; +} + +zmq::channel_t::~channel_t () +{ + zmq_assert (!_pipe); +} + +void zmq::channel_t::xattach_pipe (pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_) +{ + LIBZMQ_UNUSED (subscribe_to_all_); + LIBZMQ_UNUSED (locally_initiated_); + + zmq_assert (pipe_ != NULL); + + // ZMQ_PAIR socket can only be connected to a single peer. + // The socket rejects any further connection requests. + if (_pipe == NULL) + _pipe = pipe_; + else + pipe_->terminate (false); +} + +void zmq::channel_t::xpipe_terminated (pipe_t *pipe_) +{ + if (pipe_ == _pipe) + _pipe = NULL; +} + +void zmq::channel_t::xread_activated (pipe_t *) +{ + // There's just one pipe. No lists of active and inactive pipes. + // There's nothing to do here. +} + +void zmq::channel_t::xwrite_activated (pipe_t *) +{ + // There's just one pipe. No lists of active and inactive pipes. + // There's nothing to do here. +} + +int zmq::channel_t::xsend (msg_t *msg_) +{ + // CHANNEL sockets do not allow multipart data (ZMQ_SNDMORE) + if (msg_->flags () & msg_t::more) { + errno = EINVAL; + return -1; + } + + if (!_pipe || !_pipe->write (msg_)) { + errno = EAGAIN; + return -1; + } + + _pipe->flush (); + + // Detach the original message from the data buffer. + const int rc = msg_->init (); + errno_assert (rc == 0); + + return 0; +} + +int zmq::channel_t::xrecv (msg_t *msg_) +{ + // Deallocate old content of the message. + int rc = msg_->close (); + errno_assert (rc == 0); + + if (!_pipe) { + // Initialise the output parameter to be a 0-byte message. + rc = msg_->init (); + errno_assert (rc == 0); + + errno = EAGAIN; + return -1; + } + + // Drop any messages with more flag + bool read = _pipe->read (msg_); + while (read && msg_->flags () & msg_t::more) { + // drop all frames of the current multi-frame message + read = _pipe->read (msg_); + while (read && msg_->flags () & msg_t::more) + read = _pipe->read (msg_); + + // get the new message + if (read) + read = _pipe->read (msg_); + } + + if (!read) { + // Initialise the output parameter to be a 0-byte message. + rc = msg_->init (); + errno_assert (rc == 0); + + errno = EAGAIN; + return -1; + } + + return 0; +} + +bool zmq::channel_t::xhas_in () +{ + if (!_pipe) + return false; + + return _pipe->check_read (); +} + +bool zmq::channel_t::xhas_out () +{ + if (!_pipe) + return false; + + return _pipe->check_write (); +} diff --git a/src/channel.hpp b/src/channel.hpp new file mode 100644 index 00000000..0eb360a0 --- /dev/null +++ b/src/channel.hpp @@ -0,0 +1,69 @@ +/* + Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_CHANNEL_HPP_INCLUDED__ +#define __ZMQ_CHANNEL_HPP_INCLUDED__ + +#include "blob.hpp" +#include "socket_base.hpp" +#include "session_base.hpp" + +namespace zmq +{ +class ctx_t; +class msg_t; +class pipe_t; +class io_thread_t; + +class channel_t ZMQ_FINAL : public socket_base_t +{ + public: + channel_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); + ~channel_t (); + + // Overrides of functions from socket_base_t. + void xattach_pipe (zmq::pipe_t *pipe_, + bool subscribe_to_all_, + bool locally_initiated_); + int xsend (zmq::msg_t *msg_); + int xrecv (zmq::msg_t *msg_); + bool xhas_in (); + bool xhas_out (); + void xread_activated (zmq::pipe_t *pipe_); + void xwrite_activated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); + + private: + zmq::pipe_t *_pipe; + + ZMQ_NON_COPYABLE_NOR_MOVABLE (channel_t) +}; +} + +#endif diff --git a/src/mechanism.cpp b/src/mechanism.cpp index 004e388c..0d46c11e 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -94,20 +94,24 @@ const char socket_type_gather[] = "GATHER"; const char socket_type_scatter[] = "SCATTER"; const char socket_type_dgram[] = "DGRAM"; const char socket_type_peer[] = "PEER"; +const char socket_type_channel[] = "CHANNEL"; #endif const char *zmq::mechanism_t::socket_type_string (int socket_type_) { // TODO the order must of the names must correspond to the values resp. order of ZMQ_* socket type definitions in zmq.h! - static const char *names[] = { - socket_type_pair, socket_type_pub, socket_type_sub, - socket_type_req, socket_type_rep, socket_type_dealer, - socket_type_router, socket_type_pull, socket_type_push, - socket_type_xpub, socket_type_xsub, socket_type_stream, + static const char *names[] = {socket_type_pair, socket_type_pub, + socket_type_sub, socket_type_req, + socket_type_rep, socket_type_dealer, + socket_type_router, socket_type_pull, + socket_type_push, socket_type_xpub, + socket_type_xsub, socket_type_stream, #ifdef ZMQ_BUILD_DRAFT_API - socket_type_server, socket_type_client, socket_type_radio, - socket_type_dish, socket_type_gather, socket_type_scatter, - socket_type_dgram, socket_type_peer + socket_type_server, socket_type_client, + socket_type_radio, socket_type_dish, + socket_type_gather, socket_type_scatter, + socket_type_dgram, socket_type_peer, + socket_type_channel #endif }; static const size_t names_count = sizeof (names) / sizeof (names[0]); @@ -356,6 +360,8 @@ bool zmq::mechanism_t::check_socket_type (const char *type_, return strequals (type_, len_, socket_type_dgram); case ZMQ_PEER: return strequals (type_, len_, socket_type_peer); + case ZMQ_CHANNEL: + return strequals (type_, len_, socket_type_channel); #endif default: break; diff --git a/src/session_base.cpp b/src/session_base.cpp index 6154da1f..3c5a1cd7 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -88,6 +88,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_SCATTER: case ZMQ_DGRAM: case ZMQ_PEER: + case ZMQ_CHANNEL: #ifdef ZMQ_BUILD_DRAFT_API if (options_.can_send_hello_msg && options_.hello_msg.size () > 0) s = new (std::nothrow) hello_msg_session_t ( diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 792538fb..e724b3f9 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -105,6 +105,7 @@ #include "scatter.hpp" #include "dgram.hpp" #include "peer.hpp" +#include "channel.hpp" void zmq::socket_base_t::inprocs_t::emplace (const char *endpoint_uri_, pipe_t *pipe_) @@ -217,6 +218,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, case ZMQ_PEER: s = new (std::nothrow) peer_t (parent_, tid_, sid_); break; + case ZMQ_CHANNEL: + s = new (std::nothrow) channel_t (parent_, tid_, sid_); + break; default: errno = EINVAL; return NULL; diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 97a32a67..63fd508a 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -46,6 +46,7 @@ #define ZMQ_SCATTER 17 #define ZMQ_DGRAM 18 #define ZMQ_PEER 19 +#define ZMQ_CHANNEL 20 /* DRAFT Socket options. */ #define ZMQ_ZAP_ENFORCE_DOMAIN 93 diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 213022f8..cbd2ecc8 100755 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -156,7 +156,11 @@ if(ENABLE_DRAFTS) test_xpub_manual_last_value test_peer test_reconnect_options - test_msg_init) + test_msg_init + test_channel + test_hello_msg + test_disconnect_msg + ) endif() if(ZMQ_HAVE_WS) diff --git a/tests/test_channel.cpp b/tests/test_channel.cpp new file mode 100644 index 00000000..42777b43 --- /dev/null +++ b/tests/test_channel.cpp @@ -0,0 +1,81 @@ +/* + Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include + +void *sb; +void *sc; + +void setUp () +{ + setup_test_context (); + + sb = test_context_socket (ZMQ_CHANNEL); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "inproc://a")); + + sc = test_context_socket (ZMQ_CHANNEL); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, "inproc://a")); +} + +void tearDown () +{ + test_context_socket_close (sc); + test_context_socket_close (sb); + + teardown_test_context (); +} + +void test_roundtrip () +{ + send_string_expect_success (sb, "HELLO", 0); + recv_string_expect_success (sc, "HELLO", 0); + + send_string_expect_success (sc, "WORLD", 0); + recv_string_expect_success (sb, "WORLD", 0); +} + +void test_sndmore_fails () +{ + int rc = zmq_send (sc, "X", 1, ZMQ_SNDMORE); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EINVAL, errno); +} + +int main () +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_roundtrip); + RUN_TEST (test_sndmore_fails); + return UNITY_END (); +}