From 409d5e8fff0dc0ebdefe3da2a866021ce2aca18d Mon Sep 17 00:00:00 2001 From: Ian Barber Date: Tue, 12 Jun 2012 15:31:23 +0100 Subject: [PATCH 1/3] Allow blocking while connect() is completing This patch, salvaged from a trainwreck accidental merge earlier, adds a new sockopt, ZMQ_DELAY_ATTACH_ON_CONNECT which prevents a end point being available to push messages to until it has fully connected, making connect work more like bind. This also applies to reconnecting sockets, which may cause message loss of in-queue messages, so it is sensible to use this in conjunction with a low HWM and potentially an alternative acknowledgement path. Notes on most of the individual commits can be found the repository log. --- tests/test_connect_delay.cpp | 260 +++++++++++++++++++++++++++++++++++ 1 file changed, 260 insertions(+) create mode 100644 tests/test_connect_delay.cpp diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp new file mode 100644 index 00000000..570292c6 --- /dev/null +++ b/tests/test_connect_delay.cpp @@ -0,0 +1,260 @@ +/* +Copyright (c) 2012 Ian Barber +Copyright (c) 2012 Other contributors as noted in the AUTHORS file + +This file is part of 0MQ. + +0MQ is free software; you can redistribute it and/or modify it under +the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 3 of the License, or +(at your option) any later version. + +0MQ is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License +along with this program. If not, see . +*/ + + +#include +#include +#include +#include +#include + +#include "../include/zmq.h" + +static void *server (void *c) +{ + void *socket, *context; + char buffer[16]; + int rc, val; + + context = zmq_init (1); + assert (context); + + socket = zmq_socket (context, ZMQ_PULL); + assert (socket); + + val = 0; + rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + rc = zmq_bind (socket, "ipc:///tmp/recon"); + assert (rc == 0); + + memset (&buffer, 0, sizeof(buffer)); + rc = zmq_recv (socket, &buffer, sizeof(buffer), 0); + + // Intentionally bail out + rc = zmq_close (socket); + assert (rc == 0); + + rc = zmq_term (context); + assert (rc == 0); + + usleep (200000); + + context = zmq_init (1); + assert (context); + + socket = zmq_socket (context, ZMQ_PULL); + assert (socket); + + val = 0; + rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + rc = zmq_bind (socket, "ipc:///tmp/recon"); + assert (rc == 0); + + usleep (200000); + + memset (&buffer, 0, sizeof(buffer)); + rc = zmq_recv (socket, &buffer, sizeof(buffer), ZMQ_DONTWAIT); + assert (rc != -1); + + // Start closing the socket while the connecting process is underway. + rc = zmq_close (socket); + assert (rc == 0); + + rc = zmq_term (context); + assert (rc == 0); + + pthread_exit(NULL); +} + +static void *worker (void *n) +{ + void *socket, *context; + int rc, hadone, val; + + context = zmq_init (1); + assert (context); + + socket = zmq_socket (context, ZMQ_PUSH); + assert (socket); + + val = 0; + rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + val = 1; + rc = zmq_setsockopt (socket, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); + assert (rc == 0); + + rc = zmq_connect (socket, "ipc:///tmp/recon"); + assert (rc == 0); + + hadone = 0; + // Not checking RC as some may be -1 + for (int i = 0; i < 4; i++) { + usleep(200000); + rc = zmq_send (socket, "hi", 2, ZMQ_DONTWAIT); + if (rc != -1) + hadone ++; + } + + assert (hadone >= 2); + assert (hadone < 4); + + rc = zmq_close (socket); + assert (rc == 0); + + rc = zmq_term (context); + assert (rc == 0); + + pthread_exit(NULL); +} + +int main (int argc, char *argv []) +{ + fprintf (stderr, "test_connect_delay running...\n"); + int val; + int rc; + char buffer[16]; + int seen = 0; + + void *context = zmq_ctx_new(); + assert (context); + void *to = zmq_socket(context, ZMQ_PULL); + assert (to); + + val = 0; + rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + rc = zmq_bind(to, "tcp://*:5555"); + assert (rc == 0); + + // Create a socket pushing to two endpoints - only 1 message should arrive. + void *from = zmq_socket (context, ZMQ_PUSH); + assert(from); + + val = 0; + zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); + rc = zmq_connect (from, "tcp://localhost:5556"); + assert (rc == 0); + rc = zmq_connect (from, "tcp://localhost:5555"); + assert (rc == 0); + + for (int i = 0; i < 10; ++i) + { + std::string message("message "); + message += ('0' + i); + rc = zmq_send (from, message.data(), message.size(), 0); + assert(rc >= 0); + } + + sleep(1); + seen = 0; + for (int i = 0; i < 10; ++i) + { + memset (&buffer, 0, sizeof(buffer)); + rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); + if( rc == -1) + break; + seen++; + } + assert (seen == 5); + + rc = zmq_close (from); + assert (rc == 0); + + rc = zmq_close (to); + assert (rc == 0); + + rc = zmq_ctx_destroy(context); + assert (rc == 0); + + context = zmq_ctx_new(); + fprintf (stderr, " Rerunning with DELAY_ATTACH_ON_CONNECT\n"); + + to = zmq_socket (context, ZMQ_PULL); + assert (to); + rc = zmq_bind (to, "tcp://*:5560"); + assert (rc == 0); + + val = 0; + rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + // Create a socket pushing to two endpoints - all messages should arrive. + from = zmq_socket (context, ZMQ_PUSH); + assert (from); + + val = 0; + rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); + assert (rc == 0); + + val = 1; + rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); + assert (rc == 0); + + rc = zmq_connect (from, "tcp://localhost:5561"); + assert (rc == 0); + + rc = zmq_connect (from, "tcp://localhost:5560"); + assert (rc == 0); + + for (int i = 0; i < 10; ++i) + { + std::string message("message "); + message += ('0' + i); + rc = zmq_send (from, message.data(), message.size(), 0); + assert (rc >= 0); + } + + sleep(1); + + seen = 0; + for (int i = 0; i < 10; ++i) + { + memset(&buffer, 0, sizeof(buffer)); + rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); + assert (rc != -1); + } + + rc = zmq_close (from); + assert (rc == 0); + + rc = zmq_close (to); + assert (rc == 0); + + rc = zmq_ctx_destroy(context); + assert (rc == 0); + + fprintf (stderr, " Running DELAY_ATTACH_ON_CONNECT with disconnect\n"); + + pthread_t serv, work; + + rc = pthread_create (&serv, NULL, server, NULL); + assert (rc == 0); + + rc = pthread_create (&work, NULL, worker, NULL); + assert (rc == 0); + + pthread_exit(NULL); +} \ No newline at end of file From e5904e63cebc96048bac7c30ae91c16edfff5922 Mon Sep 17 00:00:00 2001 From: Ian Barber Date: Tue, 12 Jun 2012 15:34:48 +0100 Subject: [PATCH 2/3] Allow blocking while connect() is completing This patch, salvaged from a trainwreck accidental merge earlier, adds a new sockopt, ZMQ_DELAY_ATTACH_ON_CONNECT which prevents a end point being available to push messages to until it has fully connected, making connect work more like bind. This also applies to reconnecting sockets, which may cause message loss of in-queue messages, so it is sensible to use this in conjunction with a low HWM and potentially an alternative acknowledgement path. Notes on most of the individual commits can be found the repository log. --- .gitignore | 1 + doc/zmq_getsockopt.txt | 15 +++++++++++++++ doc/zmq_setsockopt.txt | 14 ++++++++++++++ include/zmq.h | 1 + src/lb.cpp | 3 +-- src/options.cpp | 27 +++++++++++++++++++++++++++ src/options.hpp | 4 ++++ src/session_base.cpp | 38 +++++++++++++++++++++++++++++--------- src/session_base.hpp | 3 +++ src/socket_base.cpp | 34 ++++++++++++++++++++-------------- tests/Makefile.am | 2 ++ 11 files changed, 117 insertions(+), 25 deletions(-) diff --git a/.gitignore b/.gitignore index 6ccb0a72..b37f80ff 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,7 @@ tests/test_invalid_rep tests/test_msg_flags tests/test_ts_context tests/test_connect_resolve +tests/test_connect_delay tests/test_term_endpoint src/platform.hpp* src/stamp-h1 diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 0eae8d91..e224ffcd 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -342,6 +342,21 @@ Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. +ZMQ_DELAY_ATTACH_ON_CONNECT +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Retrieve the state of the attach on connect value. If set to `1`, will delay the +attachment of a pipe on connect until the underlying connection has completed. +This will cause the socket to block if there are no other connections, but will +prevent queues from filling on pipes awaiting connection. + +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 (false) +Applicable socket types:: all, primarily when using TCP/IPC transports. + + ZMQ_FD: Retrieve file descriptor associated with the socket ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_FD' option shall retrieve the file descriptor associated with the diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index fee30cc8..7fa2c49e 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -352,6 +352,20 @@ Default value:: 1 (true) Applicable socket types:: all, when using TCP transports. +ZMQ_DELAY_ATTACH_ON_CONNECT +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +If set to `1`, will delay the attachment of a pipe on connect until the underlying +connection has completed. This will cause the socket to block if there are no other +connections, but will prevent queues from filling on pipes awaiting connection. + +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 (false) +Applicable socket types:: all, primarily when using TCP/IPC transports. + + ZMQ_FAIL_UNROUTABLE: Set unroutable message behavior ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/include/zmq.h b/include/zmq.h index 58e4017a..b8813947 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -227,6 +227,7 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval); #define ZMQ_TCP_KEEPALIVE_IDLE 36 #define ZMQ_TCP_KEEPALIVE_INTVL 37 #define ZMQ_TCP_ACCEPT_FILTER 38 +#define ZMQ_DELAY_ATTACH_ON_CONNECT 39 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/lb.cpp b/src/lb.cpp index 6732f274..d5f2f001 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -41,8 +41,7 @@ zmq::lb_t::~lb_t () void zmq::lb_t::attach (pipe_t *pipe_) { pipes.push_back (pipe_); - pipes.swap (active, pipes.size () - 1); - active++; + activated (pipe_); } void zmq::lb_t::terminated (pipe_t *pipe_) diff --git a/src/options.cpp b/src/options.cpp index fdd5299d..d61ad146 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -44,6 +44,7 @@ zmq::options_t::options_t () : rcvtimeo (-1), sndtimeo (-1), ipv4only (1), + delay_attach_on_connect (0), delay_on_close (true), delay_on_disconnect (true), filter (false), @@ -218,6 +219,8 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, ipv4only = val; return 0; } + + case ZMQ_TCP_KEEPALIVE: { @@ -236,6 +239,21 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, return 0; } + case ZMQ_DELAY_ATTACH_ON_CONNECT: + { + if (optvallen_ != sizeof (int)) { + errno = EINVAL; + return -1; + } + int val = *((int*) optval_); + if (val != 0 && val != 1) { + errno = EINVAL; + return -1; + } + delay_attach_on_connect = val; + return 0; + } + case ZMQ_TCP_KEEPALIVE_CNT: { if (optvallen_ != sizeof (int)) { @@ -483,6 +501,15 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) *((int*) optval_) = ipv4only; *optvallen_ = sizeof (int); return 0; + + case ZMQ_DELAY_ATTACH_ON_CONNECT: + if (*optvallen_ < sizeof (int)) { + errno = EINVAL; + return -1; + } + *((int*) optval_) = delay_attach_on_connect; + *optvallen_ = sizeof (int); + return 0; case ZMQ_TCP_KEEPALIVE: if (*optvallen_ < sizeof (int)) { diff --git a/src/options.hpp b/src/options.hpp index 2f6e6283..fe435b16 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -96,6 +96,10 @@ namespace zmq // possible to communicate with IPv6-only hosts. If 0, the socket can // connect to and accept connections from both IPv4 and IPv6 hosts. int ipv4only; + + // If 1, connecting pipes are not attached immediately, meaning a send() + // on a socket with only connecting pipes would block + int delay_attach_on_connect; // If true, session reads all the pending messages from the pipe and // sends them to the network when socket is closed. diff --git a/src/session_base.cpp b/src/session_base.cpp index 8a879584..12900df5 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -229,20 +229,28 @@ void zmq::session_base_t::clean_pipes () void zmq::session_base_t::terminated (pipe_t *pipe_) { - // Drop the reference to the deallocated pipe. - zmq_assert (pipe == pipe_); - pipe = NULL; + // Drop the reference to the deallocated pipe if required. + zmq_assert (pipe == pipe_ || incomplete_pipes.size () > 0); - // If we are waiting for pending messages to be sent, at this point - // we are sure that there will be no more messages and we can proceed - // with termination safely. - if (pending) + if (pipe == pipe_) + // If this is our current pipe, remove it + pipe = NULL; + else + // Remove the pipe from the detached pipes set + incomplete_pipes.erase (pipe_); + + // If we are waiting for pending messages to be sent, at this point + // we are sure that there will be no more messages and we can proceed + // with termination safely. + if (pending && !pipe && incomplete_pipes.size () == 0) proceed_with_term (); } void zmq::session_base_t::read_activated (pipe_t *pipe_) { - zmq_assert (pipe == pipe_); + // Skip activating if we're detaching this pipe + if (incomplete_pipes.size () > 0 && pipe_ != pipe) + return; if (likely (engine != NULL)) engine->activate_out (); @@ -252,7 +260,9 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::write_activated (pipe_t *pipe_) { - zmq_assert (pipe == pipe_); + // Skip activating if we're detaching this pipe + if (incomplete_pipes.size () > 0 && pipe_ != pipe) + return; if (engine) engine->activate_in (); @@ -395,6 +405,16 @@ void zmq::session_base_t::detached () return; } + // For delayed connect situations, terminate the pipe + // and reestablish later on + if (pipe && options.delay_attach_on_connect == 1 + && addr->protocol != "pgm" && addr->protocol != "epgm") { + pipe->hiccup (); + pipe->terminate (false); + incomplete_pipes.insert (pipe); + pipe = NULL; + } + reset (); // Reconnect. diff --git a/src/session_base.hpp b/src/session_base.hpp index 8244cb59..7b9f3fc6 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -103,6 +103,9 @@ namespace zmq // Pipe connecting the session to its socket. zmq::pipe_t *pipe; + + // This set is added to with pipes we are disconnecting, but haven't yet completed + std::set incomplete_pipes; // This flag is true if the remainder of the message being processed // is still in the in pipe. diff --git a/src/socket_base.cpp b/src/socket_base.cpp index eaddced6..60c7ceb9 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -530,27 +530,29 @@ int zmq::socket_base_t::connect (const char *addr_) options, paddr); errno_assert (session); - // Create a bi-directional pipe. - object_t *parents [2] = {this, session}; - pipe_t *pipes [2] = {NULL, NULL}; - int hwms [2] = {options.sndhwm, options.rcvhwm}; - bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; - rc = pipepair (parents, pipes, hwms, delays); - errno_assert (rc == 0); - // PGM does not support subscription forwarding; ask for all data to be // sent to this pipe. bool icanhasall = false; if (protocol == "pgm" || protocol == "epgm") icanhasall = true; - // Attach local end of the pipe to the socket object. - attach_pipe (pipes [0], icanhasall); + if (options.delay_attach_on_connect != 1 || icanhasall) { + // Create a bi-directional pipe. + object_t *parents [2] = {this, session}; + pipe_t *pipes [2] = {NULL, NULL}; + int hwms [2] = {options.sndhwm, options.rcvhwm}; + bool delays [2] = {options.delay_on_disconnect, options.delay_on_close}; + rc = pipepair (parents, pipes, hwms, delays); + errno_assert (rc == 0); - // Attach remote end of the pipe to the session object later on. - session->attach_pipe (pipes [1]); + // Attach local end of the pipe to the socket object. + attach_pipe (pipes [0], icanhasall); - // Save last endpoint URI + // Attach remote end of the pipe to the session object later on. + session->attach_pipe (pipes [1]); + } + + // Save last endpoint URI paddr->to_string (options.last_endpoint); add_endpoint (addr_, (own_t *) session); @@ -968,7 +970,11 @@ void zmq::socket_base_t::write_activated (pipe_t *pipe_) void zmq::socket_base_t::hiccuped (pipe_t *pipe_) { - xhiccuped (pipe_); + if (options.delay_attach_on_connect == 1) + pipe_->terminate (false); + else + // Notify derived sockets of the hiccup + xhiccuped (pipe_); } void zmq::socket_base_t::terminated (pipe_t *pipe_) diff --git a/tests/Makefile.am b/tests/Makefile.am index aa2c529a..024867d2 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -13,6 +13,7 @@ noinst_PROGRAMS = test_pair_inproc \ test_invalid_rep \ test_msg_flags \ test_connect_resolve \ + test_connect_delay \ test_last_endpoint \ test_term_endpoint \ test_monitor @@ -34,6 +35,7 @@ test_sub_forward_SOURCES = test_sub_forward.cpp test_invalid_rep_SOURCES = test_invalid_rep.cpp test_msg_flags_SOURCES = test_msg_flags.cpp test_connect_resolve_SOURCES = test_connect_resolve.cpp +test_connect_delay_SOURCES = test_connect_delay.cpp test_last_endpoint_SOURCES = test_last_endpoint.cpp test_term_endpoint_SOURCES = test_term_endpoint.cpp test_monitor_SOURCES = test_monitor.cpp From bc9ae715c33fff8dbe3f4c8ecce46c691ad82f41 Mon Sep 17 00:00:00 2001 From: Ian Barber Date: Tue, 12 Jun 2012 17:56:39 +0100 Subject: [PATCH 3/3] Add asserts and rename pipe set Rename the pipeset to terminating_pipes, as suggested by Martin H. Adds asserts to test the pipe is contained in the terminating set where appropriate. --- src/session_base.cpp | 16 ++++++++++------ src/session_base.hpp | 2 +- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/session_base.cpp b/src/session_base.cpp index 12900df5..d4e9855c 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -230,27 +230,29 @@ void zmq::session_base_t::clean_pipes () void zmq::session_base_t::terminated (pipe_t *pipe_) { // Drop the reference to the deallocated pipe if required. - zmq_assert (pipe == pipe_ || incomplete_pipes.size () > 0); + zmq_assert (pipe == pipe_ || terminating_pipes.count (pipe_) == 1); if (pipe == pipe_) // If this is our current pipe, remove it pipe = NULL; else // Remove the pipe from the detached pipes set - incomplete_pipes.erase (pipe_); + terminating_pipes.erase (pipe_); // If we are waiting for pending messages to be sent, at this point // we are sure that there will be no more messages and we can proceed // with termination safely. - if (pending && !pipe && incomplete_pipes.size () == 0) + if (pending && !pipe && terminating_pipes.size () == 0) proceed_with_term (); } void zmq::session_base_t::read_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (incomplete_pipes.size () > 0 && pipe_ != pipe) + if (pipe != pipe_) { + zmq_assert (terminating_pipes.count (pipe_) == 1); return; + } if (likely (engine != NULL)) engine->activate_out (); @@ -261,8 +263,10 @@ void zmq::session_base_t::read_activated (pipe_t *pipe_) void zmq::session_base_t::write_activated (pipe_t *pipe_) { // Skip activating if we're detaching this pipe - if (incomplete_pipes.size () > 0 && pipe_ != pipe) + if (pipe != pipe_) { + zmq_assert (terminating_pipes.count (pipe_) == 1); return; + } if (engine) engine->activate_in (); @@ -411,7 +415,7 @@ void zmq::session_base_t::detached () && addr->protocol != "pgm" && addr->protocol != "epgm") { pipe->hiccup (); pipe->terminate (false); - incomplete_pipes.insert (pipe); + terminating_pipes.insert (pipe); pipe = NULL; } diff --git a/src/session_base.hpp b/src/session_base.hpp index 7b9f3fc6..fa765ba3 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -105,7 +105,7 @@ namespace zmq zmq::pipe_t *pipe; // This set is added to with pipes we are disconnecting, but haven't yet completed - std::set incomplete_pipes; + std::set terminating_pipes; // This flag is true if the remainder of the message being processed // is still in the in pipe.