diff --git a/.gitignore b/.gitignore index 6ccb0a72..b37f80ff 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,7 @@ tests/test_invalid_rep tests/test_msg_flags tests/test_ts_context tests/test_connect_resolve +tests/test_connect_delay tests/test_term_endpoint src/platform.hpp* src/stamp-h1 diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 0eae8d91..e224ffcd 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -342,6 +342,21 @@ Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. +ZMQ_DELAY_ATTACH_ON_CONNECT +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Retrieve the state of the attach on connect value. If set to `1`, will delay the +attachment of a pipe on connect until the underlying connection has completed. +This will cause the socket to block if there are no other connections, but will +prevent queues from filling on pipes awaiting connection. + +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 (false) +Applicable socket types:: all, primarily when using TCP/IPC transports. + + ZMQ_FD: Retrieve file descriptor associated with the socket ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_FD' option shall retrieve the file descriptor associated with the diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index fee30cc8..7fa2c49e 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -352,6 +352,20 @@ Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. +ZMQ_DELAY_ATTACH_ON_CONNECT +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If set to `1`, will delay the attachment of a pipe on connect until the underlying +connection has completed. This will cause the socket to block if there are no other +connections, but will prevent queues from filling on pipes awaiting connection. + +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 (false) +Applicable socket types:: all, primarily when using TCP/IPC transports. + + ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/include/zmq.h b/include/zmq.h index 58e4017a..b8813947 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_ACCEPT_FILTER 38 +#define ZMQ_DELAY_ATTACH_ON_CONNECT 39 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/lb.cpp b/src/lb.cpp index 6732f274..d5f2f001 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -41,8 +41,7 @@ zmq::lb_t::~lb_t () void zmq::lb_t::attach (pipe_t *pipe_) { pipes.push_back (pipe_); - pipes.swap (active, pipes.size () - 1); - active++; + activated (pipe_); } void zmq::lb_t::terminated (pipe_t *pipe_) diff --git a/src/options.cpp b/src/options.cpp index fdd5299d..d61ad146 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -44,6 +44,7 @@ zmq::options_t::options_t () : rcvtimeo (-1), sndtimeo (-1), ipv4only (1), + delay_attach_on_connect (0), delay_on_close (true), delay_on_disconnect (true), filter (false), @@ -218,6 +219,8 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ipv4only = val; return 0; } + + case ZMQ_TCP_KEEPALIVE: { @@ -236,6 +239,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; } + case ZMQ_DELAY_ATTACH_ON_CONNECT: + { + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + int val = *((int*) optval_); + if (val != 0 && val != 1) { + errno = EINVAL; + return -1; + } + delay_attach_on_connect = val; + return 0; + } + case ZMQ_TCP_KEEPALIVE_CNT: { if (optvallen_ != sizeof (int)) { @@ -483,6 +501,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *((int*) optval_) = ipv4only; *optvallen_ = sizeof (int); return 0; + + case ZMQ_DELAY_ATTACH_ON_CONNECT: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = delay_attach_on_connect; + *optvallen_ = sizeof (int); + return 0; case ZMQ_TCP_KEEPALIVE: if (*optvallen_ < sizeof (int)) { diff --git a/src/options.hpp b/src/options.hpp index 2f6e6283..fe435b16 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -96,6 +96,10 @@ namespace zmq // possible to communicate with IPv6-only hosts. If 0, the socket can // connect to and accept connections from both IPv4 and IPv6 hosts. int ipv4only; + + // If 1, connecting pipes are not attached immediately, meaning a send() + // on a socket with only connecting pipes would block + int delay_attach_on_connect; // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. diff --git a/src/session_base.cpp b/src/session_base.cpp index 8a879584..d4e9855c 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -229,20 +229,30 @@ void zmq::session_base_t::clean_pipes () void zmq::session_base_t::terminated (pipe_t *pipe_) { - // Drop the reference to the deallocated pipe. - zmq_assert (pipe == pipe_); - pipe = NULL; + // Drop the reference to the deallocated pipe if required. + zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1); - // If we are waiting for pending messages to be sent, at this point - // we are sure that there will be no more messages and we can proceed - // with termination safely. - if (pending) + if (pipe == pipe_) + // If this is our current pipe, remove it + pipe = NULL; + else + // Remove the pipe from the detached pipes set + terminating_pipes.erase (pipe_); + + // If we are waiting for pending messages to be sent, at this point + // we are sure that there will be no more messages and we can proceed + // with termination safely. + if (pending && !pipe && terminating_pipes.size () == 0) proceed_with_term (); } void zmq::session_base_t::read_activated (pipe_t *pipe_) { - zmq_assert (pipe == pipe_); + // Skip activating if we're detaching this pipe + if (pipe != pipe_) { + zmq_assert (terminating_pipes.count (pipe_) == 1); + return; + } if (likely (engine != NULL)) engine->activate_out (); @@ -252,7 +262,11 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::write_activated (pipe_t *pipe_) { - zmq_assert (pipe == pipe_); + // Skip activating if we're detaching this pipe + if (pipe != pipe_) { + zmq_assert (terminating_pipes.count (pipe_) == 1); + return; + } if (engine) engine->activate_in (); @@ -395,6 +409,16 @@ void zmq::session_base_t::detached () return; } + // For delayed connect situations, terminate the pipe + // and reestablish later on + if (pipe && options.delay_attach_on_connect == 1 + && addr->protocol != "pgm" && addr->protocol != "epgm") { + pipe->hiccup (); + pipe->terminate (false); + terminating_pipes.insert (pipe); + pipe = NULL; + } + reset (); // Reconnect. diff --git a/src/session_base.hpp b/src/session_base.hpp index 8244cb59..fa765ba3 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -103,6 +103,9 @@ namespace zmq // Pipe connecting the session to its socket. zmq::pipe_t *pipe; + + // This set is added to with pipes we are disconnecting, but haven't yet completed + std::set terminating_pipes; // This flag is true if the remainder of the message being processed // is still in the in pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index eaddced6..60c7ceb9 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -530,27 +530,29 @@ int zmq::socket_base_t::connect (const char *addr_) options, paddr); errno_assert (session); - // Create a bi-directional pipe. - object_t *parents [2] = {this, session}; - pipe_t *pipes [2] = {NULL, NULL}; - int hwms [2] = {options.sndhwm, options.rcvhwm}; - bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, pipes, hwms, delays); - errno_assert (rc == 0); - // PGM does not support subscription forwarding; ask for all data to be // sent to this pipe. bool icanhasall = false; if (protocol == "pgm" || protocol == "epgm") icanhasall = true; - // Attach local end of the pipe to the socket object. - attach_pipe (pipes [0], icanhasall); + if (options.delay_attach_on_connect != 1 || icanhasall) { + // Create a bi-directional pipe. + object_t *parents [2] = {this, session}; + pipe_t *pipes [2] = {NULL, NULL}; + int hwms [2] = {options.sndhwm, options.rcvhwm}; + bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; + rc = pipepair (parents, pipes, hwms, delays); + errno_assert (rc == 0); - // Attach remote end of the pipe to the session object later on. - session->attach_pipe (pipes [1]); + // Attach local end of the pipe to the socket object. + attach_pipe (pipes [0], icanhasall); - // Save last endpoint URI + // Attach remote end of the pipe to the session object later on. + session->attach_pipe (pipes [1]); + } + + // Save last endpoint URI paddr->to_string (options.last_endpoint); add_endpoint (addr_, (own_t *) session); @@ -968,7 +970,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_) void zmq::socket_base_t::hiccuped (pipe_t *pipe_) { - xhiccuped (pipe_); + if (options.delay_attach_on_connect == 1) + pipe_->terminate (false); + else + // Notify derived sockets of the hiccup + xhiccuped (pipe_); } void zmq::socket_base_t::terminated (pipe_t *pipe_) diff --git a/tests/Makefile.am b/tests/Makefile.am index aa2c529a..024867d2 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -13,6 +13,7 @@ noinst_PROGRAMS = test_pair_inproc \ test_invalid_rep \ test_msg_flags \ test_connect_resolve \ + test_connect_delay \ test_last_endpoint \ test_term_endpoint \ test_monitor @@ -34,6 +35,7 @@ test_sub_forward_SOURCES = test_sub_forward.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp test_msg_flags_SOURCES = test_msg_flags.cpp test_connect_resolve_SOURCES = test_connect_resolve.cpp +test_connect_delay_SOURCES = test_connect_delay.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp new file mode 100644 index 00000000..570292c6 --- /dev/null +++ b/tests/test_connect_delay.cpp @@ -0,0 +1,260 @@ +/* +Copyright (c) 2012 Ian Barber +Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +This file is part of 0MQ. + +0MQ is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +0MQ is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see . +*/ + + +#include +#include +#include +#include +#include + +#include "../include/zmq.h" + +static void *server (void *c) +{ + void *socket, *context; + char buffer[16]; + int rc, val; + + context = zmq_init (1); + assert (context); + + socket = zmq_socket (context, ZMQ_PULL); + assert (socket); + + val = 0; + rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + rc = zmq_bind (socket, "ipc:///tmp/recon"); + assert (rc == 0); + + memset (&buffer, 0, sizeof(buffer)); + rc = zmq_recv (socket, &buffer, sizeof(buffer), 0); + + // Intentionally bail out + rc = zmq_close (socket); + assert (rc == 0); + + rc = zmq_term (context); + assert (rc == 0); + + usleep (200000); + + context = zmq_init (1); + assert (context); + + socket = zmq_socket (context, ZMQ_PULL); + assert (socket); + + val = 0; + rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + rc = zmq_bind (socket, "ipc:///tmp/recon"); + assert (rc == 0); + + usleep (200000); + + memset (&buffer, 0, sizeof(buffer)); + rc = zmq_recv (socket, &buffer, sizeof(buffer), ZMQ_DONTWAIT); + assert (rc != -1); + + // Start closing the socket while the connecting process is underway. + rc = zmq_close (socket); + assert (rc == 0); + + rc = zmq_term (context); + assert (rc == 0); + + pthread_exit(NULL); +} + +static void *worker (void *n) +{ + void *socket, *context; + int rc, hadone, val; + + context = zmq_init (1); + assert (context); + + socket = zmq_socket (context, ZMQ_PUSH); + assert (socket); + + val = 0; + rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + val = 1; + rc = zmq_setsockopt (socket, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); + assert (rc == 0); + + rc = zmq_connect (socket, "ipc:///tmp/recon"); + assert (rc == 0); + + hadone = 0; + // Not checking RC as some may be -1 + for (int i = 0; i < 4; i++) { + usleep(200000); + rc = zmq_send (socket, "hi", 2, ZMQ_DONTWAIT); + if (rc != -1) + hadone ++; + } + + assert (hadone >= 2); + assert (hadone < 4); + + rc = zmq_close (socket); + assert (rc == 0); + + rc = zmq_term (context); + assert (rc == 0); + + pthread_exit(NULL); +} + +int main (int argc, char *argv []) +{ + fprintf (stderr, "test_connect_delay running...\n"); + int val; + int rc; + char buffer[16]; + int seen = 0; + + void *context = zmq_ctx_new(); + assert (context); + void *to = zmq_socket(context, ZMQ_PULL); + assert (to); + + val = 0; + rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + rc = zmq_bind(to, "tcp://*:5555"); + assert (rc == 0); + + // Create a socket pushing to two endpoints - only 1 message should arrive. + void *from = zmq_socket (context, ZMQ_PUSH); + assert(from); + + val = 0; + zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); + rc = zmq_connect (from, "tcp://localhost:5556"); + assert (rc == 0); + rc = zmq_connect (from, "tcp://localhost:5555"); + assert (rc == 0); + + for (int i = 0; i < 10; ++i) + { + std::string message("message "); + message += ('0' + i); + rc = zmq_send (from, message.data(), message.size(), 0); + assert(rc >= 0); + } + + sleep(1); + seen = 0; + for (int i = 0; i < 10; ++i) + { + memset (&buffer, 0, sizeof(buffer)); + rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); + if( rc == -1) + break; + seen++; + } + assert (seen == 5); + + rc = zmq_close (from); + assert (rc == 0); + + rc = zmq_close (to); + assert (rc == 0); + + rc = zmq_ctx_destroy(context); + assert (rc == 0); + + context = zmq_ctx_new(); + fprintf (stderr, " Rerunning with DELAY_ATTACH_ON_CONNECT\n"); + + to = zmq_socket (context, ZMQ_PULL); + assert (to); + rc = zmq_bind (to, "tcp://*:5560"); + assert (rc == 0); + + val = 0; + rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + // Create a socket pushing to two endpoints - all messages should arrive. + from = zmq_socket (context, ZMQ_PUSH); + assert (from); + + val = 0; + rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + val = 1; + rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); + assert (rc == 0); + + rc = zmq_connect (from, "tcp://localhost:5561"); + assert (rc == 0); + + rc = zmq_connect (from, "tcp://localhost:5560"); + assert (rc == 0); + + for (int i = 0; i < 10; ++i) + { + std::string message("message "); + message += ('0' + i); + rc = zmq_send (from, message.data(), message.size(), 0); + assert (rc >= 0); + } + + sleep(1); + + seen = 0; + for (int i = 0; i < 10; ++i) + { + memset(&buffer, 0, sizeof(buffer)); + rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); + assert (rc != -1); + } + + rc = zmq_close (from); + assert (rc == 0); + + rc = zmq_close (to); + assert (rc == 0); + + rc = zmq_ctx_destroy(context); + assert (rc == 0); + + fprintf (stderr, " Running DELAY_ATTACH_ON_CONNECT with disconnect\n"); + + pthread_t serv, work; + + rc = pthread_create (&serv, NULL, server, NULL); + assert (rc == 0); + + rc = pthread_create (&work, NULL, worker, NULL); + assert (rc == 0); + + pthread_exit(NULL); +} \ No newline at end of file