diff --git a/.gitignore b/.gitignore index 539751da..6c839e07 100644 --- a/.gitignore +++ b/.gitignore @@ -125,6 +125,8 @@ test_timers test_radio_dish test_udp test_large_msg +test_pre_allocated_fd_ipc +test_pre_allocated_fd_tcp tests/test*.log tests/test*.trs src/platform.hpp* diff --git a/AUTHORS b/AUTHORS index a8d8fbf4..ec947e5f 100644 --- a/AUTHORS +++ b/AUTHORS @@ -61,6 +61,7 @@ Jon Dyte Kamil Shakirov Ken Steele Laurent Alebarde +Luca Boccassi Marc Rossi Mark Barbisan Martin Hurton diff --git a/Makefile.am b/Makefile.am index fd86a8dd..6c966199 100644 --- a/Makefile.am +++ b/Makefile.am @@ -617,6 +617,8 @@ test_apps += \ tests/test_shutdown_stress \ tests/test_pair_ipc \ tests/test_reqrep_ipc \ + tests/test_pre_allocated_fd_ipc \ + tests/test_pre_allocated_fd_tcp \ tests/test_timeo \ tests/test_filter_ipc @@ -639,6 +641,16 @@ tests_test_timeo_LDADD = src/libzmq.la tests_test_filter_ipc_SOURCES = tests/test_filter_ipc.cpp tests_test_filter_ipc_LDADD = src/libzmq.la +tests_test_pre_allocated_fd_ipc_SOURCES = \ + tests/test_pre_allocated_fd_ipc.cpp \ + tests/testutil.hpp +tests_test_pre_allocated_fd_ipc_LDADD = src/libzmq.la + +tests_test_pre_allocated_fd_tcp_SOURCES = \ + tests/test_pre_allocated_fd_tcp.cpp \ + tests/testutil.hpp +tests_test_pre_allocated_fd_tcp_LDADD = src/libzmq.la + if HAVE_FORK test_apps += tests/test_fork diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 2ffd95b0..0589ceee 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -464,6 +464,19 @@ Default value:: null string Applicable socket types:: all, when using TCP or IPC transports +ZMQ_PRE_ALLOCATED_FD: Retrieve the pre-allocated socket file descriptor +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_PRE_ALLOCATED_FD' option shall retrieve the pre-allocated file +descriptor that has been assigned to a ZMQ socket, if any. -1 shall be +returned if a pre-allocated file descriptor was not set for the socket. + +[horizontal] +Option value type:: int +Option value unit:: file descriptor +Default value:: -1 +Applicable socket types:: all bound sockets, when using IPC or TCP transport + + ZMQ_RATE: Retrieve multicast data rate ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_RATE' option shall retrieve the maximum send or receive data rate for diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 220a54c2..7948f526 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -492,6 +492,28 @@ Default value:: not set Applicable socket types:: all, when using TCP transport +ZMQ_PRE_ALLOCATED_FD: Set the pre-allocated socket file descriptor +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +When set to a positive integer value before zmq_bind is called on the socket, +the socket shall use the corresponding file descriptor for connections over +TCP or IPC instead of allocating a new file descriptor. +Useful for writing systemd socket activated services. If set to -1 (default), +a new file descriptor will be allocated instead (default behaviour). + +NOTE: if set after calling zmq_bind, this option shall have no effect. +NOTE: the file descriptor passed through MUST have been ran through the "bind" + and "listen" system calls beforehand. Also, socket option that would + normally be passed through zmq_setsockopt like TCP buffers length, + IP_TOS or SO_REUSEADDR MUST be set beforehand by the caller, as they + must be set before the socket is bound. + +[horizontal] +Option value type:: int +Option value unit:: file descriptor +Default value:: -1 +Applicable socket types:: all bound sockets, when using IPC or TCP transport + + ZMQ_PROBE_ROUTER: bootstrap connections to ROUTER sockets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ When set to 1, the socket will automatically send an empty message when a diff --git a/include/zmq.h b/include/zmq.h index 96bbe1d4..8ac82b06 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -344,6 +344,7 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg); #define ZMQ_VMCI_BUFFER_MIN_SIZE 86 #define ZMQ_VMCI_BUFFER_MAX_SIZE 87 #define ZMQ_VMCI_CONNECT_TIMEOUT 88 +#define ZMQ_PRE_ALLOCATED_FD 89 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 6571f194..b4268ac7 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -155,7 +155,12 @@ int zmq::ipc_listener_t::set_address (const char *addr_) // Get rid of the file associated with the UNIX domain socket that // may have been left behind by the previous run of the application. - ::unlink (addr.c_str()); + // MUST NOT unlink if the FD is managed by the user, or it will stop + // working after the first client connects. The user will take care of + // cleaning up the file after the service is stopped. + if (options.pre_allocated_fd == -1) { + ::unlink (addr.c_str()); + } filename.clear (); // Initialise the address structure. @@ -164,26 +169,30 @@ int zmq::ipc_listener_t::set_address (const char *addr_) if (rc != 0) return -1; - // Create a listening socket. - s = open_socket (AF_UNIX, SOCK_STREAM, 0); - if (s == -1) - return -1; - address.to_string (endpoint); - // Bind the socket to the file path. - rc = bind (s, address.addr (), address.addrlen ()); - if (rc != 0) - goto error; + if (options.pre_allocated_fd != -1) { + s = options.pre_allocated_fd; + } else { + // Create a listening socket. + s = open_socket (AF_UNIX, SOCK_STREAM, 0); + if (s == -1) + return -1; + + // Bind the socket to the file path. + rc = bind (s, address.addr (), address.addrlen ()); + if (rc != 0) + goto error; + + // Listen for incoming connections. + rc = listen (s, options.backlog); + if (rc != 0) + goto error; + } filename.assign (addr.c_str()); has_file = true; - // Listen for incoming connections. - rc = listen (s, options.backlog); - if (rc != 0) - goto error; - socket->event_listening (endpoint, s); return 0; @@ -204,7 +213,10 @@ int zmq::ipc_listener_t::close () // If there's an underlying UNIX domain socket, get rid of the file it // is associated with. - if (has_file && !filename.empty ()) { + // MUST NOT unlink if the FD is managed by the user, or it will stop + // working after the first client connects. The user will take care of + // cleaning up the file after the service is stopped. + if (has_file && !filename.empty () && options.pre_allocated_fd == -1) { rc = ::unlink(filename.c_str ()); if (rc != 0) { socket->event_close_failed (endpoint, zmq_errno()); diff --git a/src/options.cpp b/src/options.cpp index f13b187c..1acabae1 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -77,7 +77,8 @@ zmq::options_t::options_t () : connected (false), heartbeat_ttl (0), heartbeat_interval (0), - heartbeat_timeout (-1) + heartbeat_timeout (-1), + pre_allocated_fd (-1) { #if defined ZMQ_HAVE_VMCI vmci_buffer_size = 0; @@ -621,6 +622,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, break; # endif + case ZMQ_PRE_ALLOCATED_FD: + if (is_int && value >= -1) { + pre_allocated_fd = value; + return 0; + } + break; + default: #if defined (ZMQ_ACT_MILITANT) // There are valid scenarios for probing with unknown socket option @@ -1031,6 +1039,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) } break; + case ZMQ_PRE_ALLOCATED_FD: + if (is_int) { + *value = pre_allocated_fd; + return 0; + } + break; + default: #if defined (ZMQ_ACT_MILITANT) malformed = false; diff --git a/src/options.hpp b/src/options.hpp index 54a8a301..7116461c 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -231,6 +231,11 @@ namespace zmq uint64_t vmci_buffer_max_size; int vmci_connect_timeout; # endif + + // When creating a new ZMQ socket, if this option is set the value + // will be used as the File Descriptor instead of allocating a new + // one via the socket () system call. + int pre_allocated_fd; }; } diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index ad382715..5c8f7e4e 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -166,6 +166,14 @@ int zmq::tcp_listener_t::set_address (const char *addr_) if (rc != 0) return -1; + address.to_string (endpoint); + + if (options.pre_allocated_fd != -1) { + s = options.pre_allocated_fd; + socket->event_listening (endpoint, (int) s); + return 0; + } + // Create a listening socket. s = open_socket (address.family (), SOCK_STREAM, IPPROTO_TCP); #ifdef ZMQ_HAVE_WINDOWS @@ -224,8 +232,6 @@ int zmq::tcp_listener_t::set_address (const char *addr_) errno_assert (rc == 0); #endif - address.to_string (endpoint); - // Bind the socket to the network interface and port. rc = bind (s, address.addr (), address.addrlen ()); #ifdef ZMQ_HAVE_WINDOWS diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 13a4de64..4a8b4a7e 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -90,6 +90,8 @@ if(NOT WIN32) test_stream_exceeds_buffer test_router_mandatory_hwm test_term_endpoint_tipc + test_pre_allocated_fd_ipc + test_pre_allocated_fd_tcp ) if(HAVE_FORK) list(APPEND tests test_fork) diff --git a/tests/test_pre_allocated_fd_ipc.cpp b/tests/test_pre_allocated_fd_ipc.cpp new file mode 100644 index 00000000..c3358b27 --- /dev/null +++ b/tests/test_pre_allocated_fd_ipc.cpp @@ -0,0 +1,213 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include +#include "testutil.hpp" + +void pre_allocate_sock (void *zmq_socket, const char *path) +{ + struct sockaddr_un addr; + addr.sun_family = AF_UNIX; + strcpy (addr.sun_path, path); + + unlink (path); + + int s_pre = socket (AF_UNIX, SOCK_STREAM, 0); + assert (s_pre != -1); + + int rc = bind (s_pre, (struct sockaddr *) &addr, + sizeof (struct sockaddr_un)); + assert (rc == 0); + + rc = listen (s_pre, SOMAXCONN); + assert (rc == 0); + + rc = zmq_setsockopt (zmq_socket, ZMQ_PRE_ALLOCATED_FD, &s_pre, + sizeof (s_pre)); + assert(rc == 0); +} + +void test_req_rep () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_REP); + assert (sb); + + pre_allocate_sock(sb, "/tmp/tester"); + + int rc = zmq_bind (sb, "ipc:///tmp/tester"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_REQ); + assert (sc); + rc = zmq_connect (sc, "ipc:///tmp/tester"); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + rc = unlink ("/tmp/tester"); + assert (rc == 0); +} + +void test_pair () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_PAIR); + assert (sb); + + pre_allocate_sock(sb, "/tmp/tester"); + + int rc = zmq_bind (sb, "ipc:///tmp/tester"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_PAIR); + assert (sc); + rc = zmq_connect (sc, "ipc:///tmp/tester"); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + rc = unlink ("/tmp/tester"); + assert (rc == 0); +} + +void test_client_server () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_SERVER); + assert (sb); + + pre_allocate_sock(sb, "/tmp/tester"); + + int rc = zmq_bind (sb, "ipc:///tmp/tester"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_CLIENT); + assert (sc); + rc = zmq_connect (sc, "ipc:///tmp/tester"); + assert (rc == 0); + + zmq_msg_t msg; + rc = zmq_msg_init_size (&msg, 1); + assert (rc == 0); + + char *data = (char *) zmq_msg_data (&msg); + data [0] = 1; + + rc = zmq_msg_send (&msg, sc, ZMQ_SNDMORE); + assert (rc == -1); + + rc = zmq_msg_send (&msg, sc, 0); + assert (rc == 1); + + rc = zmq_msg_init (&msg); + assert (rc == 0); + + rc = zmq_msg_recv (&msg, sb, 0); + assert (rc == 1); + + uint32_t routing_id = zmq_msg_routing_id (&msg); + assert (routing_id != 0); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_msg_init_size (&msg, 1); + assert (rc == 0); + + data = (char *)zmq_msg_data (&msg); + data[0] = 2; + + rc = zmq_msg_set_routing_id (&msg, routing_id); + assert (rc == 0); + + rc = zmq_msg_send (&msg, sb, ZMQ_SNDMORE); + assert (rc == -1); + + rc = zmq_msg_send (&msg, sb, 0); + assert (rc == 1); + + rc = zmq_msg_recv (&msg, sc, 0); + assert (rc == 1); + + routing_id = zmq_msg_routing_id (&msg); + assert (routing_id == 0); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + rc = unlink ("/tmp/tester"); + assert (rc == 0); +} + +int main (void) +{ + setup_test_environment(); + + test_req_rep(); + test_pair(); + test_client_server(); + + return 0 ; +} diff --git a/tests/test_pre_allocated_fd_tcp.cpp b/tests/test_pre_allocated_fd_tcp.cpp new file mode 100644 index 00000000..701ff62f --- /dev/null +++ b/tests/test_pre_allocated_fd_tcp.cpp @@ -0,0 +1,206 @@ +/* + Copyright (c) 2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include +#include +#include +#include "testutil.hpp" + +void pre_allocate_sock (void *zmq_socket, const char *address, + const char *port) +{ + struct addrinfo *addr; + int rc = getaddrinfo (address, port, NULL, &addr); + assert (rc == 0); + + int s_pre = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + assert (s_pre != -1); + + int flag = 1; + rc = setsockopt (s_pre, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); + assert (rc == 0); + + rc = bind (s_pre, addr->ai_addr, addr->ai_addrlen); + assert (rc == 0); + + rc = listen (s_pre, SOMAXCONN); + assert (rc == 0); + + rc = zmq_setsockopt (zmq_socket, ZMQ_PRE_ALLOCATED_FD, &s_pre, + sizeof (s_pre)); + assert(rc == 0); +} + +void test_req_rep () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_REP); + assert (sb); + + pre_allocate_sock(sb, "127.0.0.1", "5560"); + + int rc = zmq_bind (sb, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_REQ); + assert (sc); + rc = zmq_connect (sc, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +void test_pair () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_PAIR); + assert (sb); + + pre_allocate_sock(sb, "127.0.0.1", "5560"); + + int rc = zmq_bind (sb, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_PAIR); + assert (sc); + rc = zmq_connect (sc, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + bounce (sb, sc); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +void test_client_server () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb = zmq_socket (ctx, ZMQ_SERVER); + assert (sb); + + pre_allocate_sock(sb, "127.0.0.1", "5560"); + + int rc = zmq_bind (sb, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_CLIENT); + assert (sc); + rc = zmq_connect (sc, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + zmq_msg_t msg; + rc = zmq_msg_init_size (&msg, 1); + assert (rc == 0); + + char *data = (char *) zmq_msg_data (&msg); + data [0] = 1; + + rc = zmq_msg_send (&msg, sc, ZMQ_SNDMORE); + assert (rc == -1); + + rc = zmq_msg_send (&msg, sc, 0); + assert (rc == 1); + + rc = zmq_msg_init (&msg); + assert (rc == 0); + + rc = zmq_msg_recv (&msg, sb, 0); + assert (rc == 1); + + uint32_t routing_id = zmq_msg_routing_id (&msg); + assert (routing_id != 0); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_msg_init_size (&msg, 1); + assert (rc == 0); + + data = (char *)zmq_msg_data (&msg); + data[0] = 2; + + rc = zmq_msg_set_routing_id (&msg, routing_id); + assert (rc == 0); + + rc = zmq_msg_send (&msg, sb, ZMQ_SNDMORE); + assert (rc == -1); + + rc = zmq_msg_send (&msg, sb, 0); + assert (rc == 1); + + rc = zmq_msg_recv (&msg, sc, 0); + assert (rc == 1); + + routing_id = zmq_msg_routing_id (&msg); + assert (routing_id == 0); + + rc = zmq_msg_close (&msg); + assert (rc == 0); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +int main (void) +{ + setup_test_environment(); + + test_req_rep(); + test_pair(); + test_client_server(); + + return 0 ; +} diff --git a/tests/test_setsockopt.cpp b/tests/test_setsockopt.cpp index 2a91df61..3b092fef 100644 --- a/tests/test_setsockopt.cpp +++ b/tests/test_setsockopt.cpp @@ -70,9 +70,37 @@ void test_setsockopt_tcp_send_buffer() zmq_ctx_term(ctx); } +void test_setsockopt_pre_allocated_fd() +{ + int rc; + void *ctx = zmq_ctx_new(); + void *socket = zmq_socket(ctx, ZMQ_PUSH); + + int val = 0; + size_t placeholder = sizeof(val); + + rc = zmq_getsockopt(socket, ZMQ_PRE_ALLOCATED_FD, &val, &placeholder); + assert(rc == 0); + assert(val == -1); + + val = 3; + + rc = zmq_setsockopt(socket, ZMQ_PRE_ALLOCATED_FD, &val, sizeof(val)); + assert(rc == 0); + assert(val == 3); + + rc = zmq_getsockopt(socket, ZMQ_PRE_ALLOCATED_FD, &val, &placeholder); + assert(rc == 0); + assert(val == 3); + + zmq_close(socket); + zmq_ctx_term(ctx); +} + int main() { test_setsockopt_tcp_recv_buffer(); test_setsockopt_tcp_send_buffer(); + test_setsockopt_pre_allocated_fd(); }