diff --git a/.gitignore b/.gitignore index e49d0dc4..6bec86fa 100644 --- a/.gitignore +++ b/.gitignore @@ -126,6 +126,8 @@ test_poller test_timers test_radio_dish test_udp +test_scatter_gather +test_socketopt_hwm test_use_fd_ipc test_use_fd_tcp tests/test*.log diff --git a/CMakeLists.txt b/CMakeLists.txt index 90a30421..e8030f0e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -491,7 +491,9 @@ set (cxx-sources radio.cpp dish.cpp udp_engine.cpp - udp_address.cpp) + udp_address.cpp + scatter.cpp + gather.cpp) set (rc-sources version.rc) diff --git a/Makefile.am b/Makefile.am index 21140d89..aa0e7ec0 100644 --- a/Makefile.am +++ b/Makefile.am @@ -57,6 +57,8 @@ src_libzmq_la_SOURCES = \ src/fd.hpp \ src/fq.cpp \ src/fq.hpp \ + src/gather.cpp \ + src/gather.hpp \ src/gssapi_mechanism_base.cpp \ src/gssapi_mechanism_base.hpp \ src/gssapi_client.cpp \ @@ -153,6 +155,8 @@ src_libzmq_la_SOURCES = \ src/req.hpp \ src/router.cpp \ src/router.hpp \ + src/scatter.cpp \ + src/scatter.hpp \ src/select.cpp \ src/select.hpp \ src/server.cpp \ @@ -396,7 +400,8 @@ test_apps = \ tests/test_poller \ tests/test_timers \ tests/test_radio_dish \ - tests/test_udp + tests/test_udp \ + tests/test_scatter_gather tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_LDADD = src/libzmq.la @@ -616,6 +621,9 @@ tests_test_radio_dish_LDADD = src/libzmq.la tests_test_udp_SOURCES = tests/test_udp.cpp tests_test_udp_LDADD = src/libzmq.la +tests_test_scatter_gather_SOURCES = tests/test_scatter_gather.cpp +tests_test_scatter_gather_LDADD = src/libzmq.la + if !ON_MINGW if !ON_CYGWIN test_apps += \ diff --git a/include/zmq.h b/include/zmq.h index bafbd84d..28d6f900 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -264,6 +264,8 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg); #define ZMQ_CLIENT 13 #define ZMQ_RADIO 14 #define ZMQ_DISH 15 +#define ZMQ_GATHER 16 +#define ZMQ_SCATTER 17 /* Deprecated aliases */ #define ZMQ_XREQ ZMQ_DEALER diff --git a/src/gather.cpp b/src/gather.cpp new file mode 100644 index 00000000..6379afef --- /dev/null +++ b/src/gather.cpp @@ -0,0 +1,94 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "macros.hpp" +#include "gather.hpp" +#include "err.hpp" +#include "msg.hpp" +#include "pipe.hpp" + +zmq::gather_t::gather_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_, true) +{ + options.type = ZMQ_GATHER; +} + +zmq::gather_t::~gather_t () +{ +} + +void zmq::gather_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +{ + LIBZMQ_UNUSED (subscribe_to_all_); + + zmq_assert (pipe_); + fq.attach (pipe_); +} + +void zmq::gather_t::xread_activated (pipe_t *pipe_) +{ + fq.activated (pipe_); +} + +void zmq::gather_t::xpipe_terminated (pipe_t *pipe_) +{ + fq.pipe_terminated (pipe_); +} + +int zmq::gather_t::xrecv (msg_t *msg_) +{ + int rc = fq.recvpipe (msg_, NULL); + + // Drop any messages with more flag + while (rc == 0 && msg_->flags () & msg_t::more) { + + // drop all frames of the current multi-frame message + rc = fq.recvpipe (msg_, NULL); + + while (rc == 0 && msg_->flags () & msg_t::more) + rc = fq.recvpipe (msg_, NULL); + + // get the new message + if (rc == 0) + rc = fq.recvpipe (msg_, NULL); + } + + return rc; +} + +bool zmq::gather_t::xhas_in () +{ + return fq.has_in (); +} + +zmq::blob_t zmq::gather_t::get_credential () const +{ + return fq.get_credential (); +} diff --git a/src/gather.hpp b/src/gather.hpp new file mode 100644 index 00000000..605a9278 --- /dev/null +++ b/src/gather.hpp @@ -0,0 +1,75 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_GATHER_HPP_INCLUDED__ +#define __ZMQ_GATHER_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "session_base.hpp" +#include "fq.hpp" + +namespace zmq +{ + + class ctx_t; + class pipe_t; + class msg_t; + class io_thread_t; + + class gather_t : + public socket_base_t + { + public: + + gather_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); + ~gather_t (); + + protected: + + // Overrides of functions from socket_base_t. + void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + int xrecv (zmq::msg_t *msg_); + bool xhas_in (); + blob_t get_credential () const; + void xread_activated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); + + private: + + // Fair queueing object for inbound pipes. + fq_t fq; + + gather_t (const gather_t&); + const gather_t &operator = (const gather_t&); + + }; + +} + +#endif diff --git a/src/mechanism.cpp b/src/mechanism.cpp index eff01dd6..2df803ce 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -77,8 +77,9 @@ const char *zmq::mechanism_t::socket_type_string (int socket_type) const "DEALER", "ROUTER", "PULL", "PUSH", "XPUB", "XSUB", "STREAM", "SERVER", "CLIENT", - "RADIO", "DISH"}; - zmq_assert (socket_type >= 0 && socket_type <= 15); + "RADIO", "DISH", + "GATHER", "SCATTER"}; + zmq_assert (socket_type >= 0 && socket_type <= 17); return names [socket_type]; } @@ -198,6 +199,10 @@ bool zmq::mechanism_t::check_socket_type (const std::string& type_) const return type_ == "DISH"; case ZMQ_DISH: return type_ == "RADIO"; + case ZMQ_GATHER: + return type_ == "SCATTER"; + case ZMQ_SCATTER: + return type_ == "GATHER"; default: break; } diff --git a/src/scatter.cpp b/src/scatter.cpp new file mode 100644 index 00000000..dde3a799 --- /dev/null +++ b/src/scatter.cpp @@ -0,0 +1,83 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "precompiled.hpp" +#include "macros.hpp" +#include "scatter.hpp" +#include "pipe.hpp" +#include "err.hpp" +#include "msg.hpp" + +zmq::scatter_t::scatter_t (class ctx_t *parent_, uint32_t tid_, int sid_) : + socket_base_t (parent_, tid_, sid_, true) +{ + options.type = ZMQ_SCATTER; +} + +zmq::scatter_t::~scatter_t () +{ +} + +void zmq::scatter_t::xattach_pipe (pipe_t *pipe_, bool subscribe_to_all_) +{ + LIBZMQ_UNUSED (subscribe_to_all_); + + // Don't delay pipe termination as there is no one + // to receive the delimiter. + pipe_->set_nodelay (); + + zmq_assert (pipe_); + lb.attach (pipe_); +} + +void zmq::scatter_t::xwrite_activated (pipe_t *pipe_) +{ + lb.activated (pipe_); +} + +void zmq::scatter_t::xpipe_terminated (pipe_t *pipe_) +{ + lb.pipe_terminated (pipe_); +} + +int zmq::scatter_t::xsend (msg_t *msg_) +{ + // SCATTER sockets do not allow multipart data (ZMQ_SNDMORE) + if (msg_->flags () & msg_t::more) { + errno = EINVAL; + return -1; + } + + return lb.send (msg_); +} + +bool zmq::scatter_t::xhas_out () +{ + return lb.has_out (); +} diff --git a/src/scatter.hpp b/src/scatter.hpp new file mode 100644 index 00000000..8c651480 --- /dev/null +++ b/src/scatter.hpp @@ -0,0 +1,73 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#ifndef __ZMQ_SCATTER_HPP_INCLUDED__ +#define __ZMQ_SCATTER_HPP_INCLUDED__ + +#include "socket_base.hpp" +#include "session_base.hpp" +#include "lb.hpp" + +namespace zmq +{ + + class ctx_t; + class pipe_t; + class msg_t; + class io_thread_t; + + class scatter_t : + public socket_base_t + { + public: + + scatter_t (zmq::ctx_t *parent_, uint32_t tid_, int sid_); + ~scatter_t (); + + protected: + + // Overrides of functions from socket_base_t. + void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_); + int xsend (zmq::msg_t *msg_); + bool xhas_out (); + void xwrite_activated (zmq::pipe_t *pipe_); + void xpipe_terminated (zmq::pipe_t *pipe_); + + private: + + // Load balancer managing the outbound pipes. + lb_t lb; + + scatter_t (const scatter_t&); + const scatter_t &operator = (const scatter_t&); + }; + +} + +#endif diff --git a/src/session_base.cpp b/src/session_base.cpp index 342d3ad8..cc15cb84 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -81,6 +81,8 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_STREAM: case ZMQ_SERVER: case ZMQ_CLIENT: + case ZMQ_GATHER: + case ZMQ_SCATTER: s = new (std::nothrow) session_base_t (io_thread_, active_, socket_, options_, addr_); break; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index c4f39e2f..75b3758d 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -96,6 +96,8 @@ #include "client.hpp" #include "radio.hpp" #include "dish.hpp" +#include "gather.hpp" +#include "scatter.hpp" #define ENTER_MUTEX() \ if (thread_safe) \ @@ -163,6 +165,12 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_, case ZMQ_DISH: s = new (std::nothrow) dish_t (parent_, tid_, sid_); break; + case ZMQ_GATHER: + s = new (std::nothrow) gather_t (parent_, tid_, sid_); + break; + case ZMQ_SCATTER: + s = new (std::nothrow) scatter_t (parent_, tid_, sid_); + break; default: errno = EINVAL; return NULL; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ad1e90a9..1d6e5a60 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -71,6 +71,7 @@ set(tests test_timers test_radio_dish test_udp + test_scatter_gather ) if(NOT WIN32) list(APPEND tests @@ -162,4 +163,3 @@ foreach(TEST_SOURCE ${ALL_TEST_SOURCES}) message(AUTHOR_WARNING "Test '${TESTNAME}' is not known to CTest.") endif() endforeach() - diff --git a/tests/test_scatter_gather.cpp b/tests/test_scatter_gather.cpp new file mode 100644 index 00000000..a9ac3073 --- /dev/null +++ b/tests/test_scatter_gather.cpp @@ -0,0 +1,84 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +int main (void) +{ + setup_test_environment (); + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *scatter = zmq_socket (ctx, ZMQ_SCATTER); + void *gather = zmq_socket (ctx, ZMQ_GATHER); + void *gather2 = zmq_socket (ctx, ZMQ_GATHER); + + int rc = zmq_bind (scatter, "inproc://test-scatter-gather"); + assert (rc == 0); + + rc = zmq_connect (gather, "inproc://test-scatter-gather"); + assert (rc == 0); + + rc = zmq_connect (gather2, "inproc://test-scatter-gather"); + assert (rc == 0); + + // Should fail, multipart is not supported + rc = s_sendmore (scatter, "1"); + assert (rc == -1); + + rc = s_send (scatter, "1"); + assert (rc == 1); + + rc = s_send (scatter, "2"); + assert (rc == 1); + + char* message = s_recv (gather); + assert (message); + assert (streq(message, "1")); + free(message); + + message = s_recv (gather2); + assert (message); + assert (streq(message, "2")); + free(message); + + rc = zmq_close (scatter); + assert (rc == 0); + + rc = zmq_close (gather); + assert (rc == 0); + + rc = zmq_close (gather2); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +}