From 4a0bde8130be6f709c4d81c355a99fe2e61e1b14 Mon Sep 17 00:00:00 2001 From: KIU Shueng Chuan Date: Tue, 4 Aug 2015 20:19:01 +0800 Subject: [PATCH 1/4] rename timer_started to reconnect_timer_started --- src/tcp_connecter.cpp | 12 ++++++------ src/tcp_connecter.hpp | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index b328d487..8623b259 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -67,7 +67,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, s (retired_fd), handle_valid (false), delayed_start (delayed_start_), - timer_started (false), + reconnect_timer_started (false), session (session_), current_reconnect_ivl (options.reconnect_ivl) { @@ -79,7 +79,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, zmq::tcp_connecter_t::~tcp_connecter_t () { - zmq_assert (!timer_started); + zmq_assert (!reconnect_timer_started); zmq_assert (!handle_valid); zmq_assert (s == retired_fd); } @@ -94,9 +94,9 @@ void zmq::tcp_connecter_t::process_plug () void zmq::tcp_connecter_t::process_term (int linger_) { - if (timer_started) { + if (reconnect_timer_started) { cancel_timer (reconnect_timer_id); - timer_started = false; + reconnect_timer_started = false; } if (handle_valid) { @@ -154,7 +154,7 @@ void zmq::tcp_connecter_t::out_event () void zmq::tcp_connecter_t::timer_event (int id_) { zmq_assert (id_ == reconnect_timer_id); - timer_started = false; + reconnect_timer_started = false; start_connecting (); } @@ -192,7 +192,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer () const int interval = get_new_reconnect_ivl (); add_timer (interval, reconnect_timer_id); socket->event_connect_retried (endpoint, interval); - timer_started = true; + reconnect_timer_started = true; } int zmq::tcp_connecter_t::get_new_reconnect_ivl () diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 340c7f56..6a284ed4 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -108,7 +108,7 @@ namespace zmq const bool delayed_start; // True iff a timer has been started. - bool timer_started; + bool reconnect_timer_started; // Reference to the session we belong to. zmq::session_base_t *session; From c0ca2be6425f70bb0b7a04962077a9003ec2252a Mon Sep 17 00:00:00 2001 From: KIU Shueng Chuan Date: Tue, 4 Aug 2015 20:47:07 +0800 Subject: [PATCH 2/4] add ZMQ_CONNECT_TIMEOUT socket option --- include/zmq.h | 1 + src/options.cpp | 15 +++++++++++++++ src/options.hpp | 5 +++++ 3 files changed, 21 insertions(+) diff --git a/include/zmq.h b/include/zmq.h index aaf3d55e..fd86f15e 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -320,6 +320,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg); #define ZMQ_HEARTBEAT_TTL 76 #define ZMQ_HEARTBEAT_TIMEOUT 77 #define ZMQ_XPUB_VERBOSE_UNSUBSCRIBE 78 +#define ZMQ_CONNECT_TIMEOUT 79 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/options.cpp b/src/options.cpp index 6d4d000d..40ba22a9 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -46,6 +46,7 @@ zmq::options_t::options_t () : tos (0), type (-1), linger (-1), + connect_timeout (0), reconnect_ivl (100), reconnect_ivl_max (0), backlog (100), @@ -158,6 +159,13 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, } break; + case ZMQ_CONNECT_TIMEOUT: + if (is_int && value >= 0) { + connect_timeout = value; + return 0; + } + break; + case ZMQ_RECONNECT_IVL: if (is_int && value >= -1) { reconnect_ivl = value; @@ -653,6 +661,13 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) } break; + case ZMQ_CONNECT_TIMEOUT: + if (is_int) { + *value = connect_timeout; + return 0; + } + break; + case ZMQ_RECONNECT_IVL: if (is_int) { *value = reconnect_ivl; diff --git a/src/options.hpp b/src/options.hpp index bdaea966..e0b1d2a0 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -92,6 +92,11 @@ namespace zmq // Linger time, in milliseconds. int linger; + // Maximum interval in milliseconds beyond which userspace will + // timeout connect(). + // Default 0 (unused) + int connect_timeout; + // Minimum interval between attempts to reconnect, in milliseconds. // Default 100ms int reconnect_ivl; From eeb697b5acacf89ced182c834a14bade154c1bdd Mon Sep 17 00:00:00 2001 From: KIU Shueng Chuan Date: Tue, 4 Aug 2015 20:47:31 +0800 Subject: [PATCH 3/4] add connect timeout logic --- src/tcp_connecter.cpp | 40 +++++++++++++++++++++++++++++++++++++--- src/tcp_connecter.hpp | 6 +++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 8623b259..2d29af6f 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -67,6 +67,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, s (retired_fd), handle_valid (false), delayed_start (delayed_start_), + connect_timer_started (false), reconnect_timer_started (false), session (session_), current_reconnect_ivl (options.reconnect_ivl) @@ -79,6 +80,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, zmq::tcp_connecter_t::~tcp_connecter_t () { + zmq_assert (!connect_timer_started); zmq_assert (!reconnect_timer_started); zmq_assert (!handle_valid); zmq_assert (s == retired_fd); @@ -94,6 +96,11 @@ void zmq::tcp_connecter_t::process_plug () void zmq::tcp_connecter_t::process_term (int linger_) { + if (connect_timer_started) { + cancel_timer (connect_timer_id); + connect_timer_started = false; + } + if (reconnect_timer_started) { cancel_timer (reconnect_timer_id); reconnect_timer_started = false; @@ -120,6 +127,11 @@ void zmq::tcp_connecter_t::in_event () void zmq::tcp_connecter_t::out_event () { + if (connect_timer_started) { + cancel_timer (connect_timer_id); + connect_timer_started = false; + } + rm_fd (handle); handle_valid = false; @@ -153,9 +165,20 @@ void zmq::tcp_connecter_t::out_event () void zmq::tcp_connecter_t::timer_event (int id_) { - zmq_assert (id_ == reconnect_timer_id); - reconnect_timer_started = false; - start_connecting (); + zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id); + if (id_ == connect_timer_id) { + connect_timer_started = false; + + rm_fd (handle); + handle_valid = false; + + close (); + add_reconnect_timer (); + } + else if (id_ == reconnect_timer_id) { + reconnect_timer_started = false; + start_connecting (); + } } void zmq::tcp_connecter_t::start_connecting () @@ -177,6 +200,9 @@ void zmq::tcp_connecter_t::start_connecting () handle_valid = true; set_pollout (handle); socket->event_connect_delayed (endpoint, zmq_errno()); + + // add userspace connect timeout + add_connect_timer (); } // Handle any other error condition by eventual reconnect. @@ -187,6 +213,14 @@ void zmq::tcp_connecter_t::start_connecting () } } +void zmq::tcp_connecter_t::add_connect_timer () +{ + if (options.connect_timeout > 0) { + add_timer (options.connect_timeout, connect_timer_id); + connect_timer_started = true; + } +} + void zmq::tcp_connecter_t::add_reconnect_timer () { const int interval = get_new_reconnect_ivl (); diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 6a284ed4..267627d0 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -57,7 +57,7 @@ namespace zmq private: // ID of the timer used to delay the reconnection. - enum {reconnect_timer_id = 1}; + enum {reconnect_timer_id = 1, connect_timer_id}; // Handlers for incoming commands. void process_plug (); @@ -71,6 +71,9 @@ namespace zmq // Internal function to start the actual connection establishment. void start_connecting (); + // Internal function to add a connect timer + void add_connect_timer(); + // Internal function to add a reconnect timer void add_reconnect_timer(); @@ -108,6 +111,7 @@ namespace zmq const bool delayed_start; // True iff a timer has been started. + bool connect_timer_started; bool reconnect_timer_started; // Reference to the session we belong to. From c9971e08a0b611fe1b6b6162a2a430ae53d3a3dd Mon Sep 17 00:00:00 2001 From: KIU Shueng Chuan Date: Tue, 4 Aug 2015 22:14:50 +0800 Subject: [PATCH 4/4] write man pages --- doc/zmq_getsockopt.txt | 14 ++++++++++++++ doc/zmq_setsockopt.txt | 14 ++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 243cb2d8..4e0cfb9d 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -63,6 +63,20 @@ Default value:: 100 Applicable socket types:: all, only for connection-oriented transports +ZMQ_CONNECT_TIMEOUT: Retrieve connect() timeout +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Retrieves how long to wait before timing-out a connect() system call. +The connect() system call normally takes a long time before it returns a +time out error. Setting this option allows the library to time out the call +at an earlier interval. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: 0 (disabled) +Applicable socket types:: all, when using TCP transports. + + ZMQ_CURVE_PUBLICKEY: Retrieve current CURVE public key ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 13918956..07c2a2ef 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -109,6 +109,20 @@ Default value:: 0 (false) Applicable socket types:: ZMQ_PULL, ZMQ_PUSH, ZMQ_SUB, ZMQ_PUB, ZMQ_DEALER +ZMQ_CONNECT_TIMEOUT: Set connect() timeout +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Sets how long to wait before timing-out a connect() system call. +The connect() system call normally takes a long time before it returns a +time out error. Setting this option allows the library to time out the call +at an earlier interval. + +[horizontal] +Option value type:: int +Option value unit:: milliseconds +Default value:: 0 (disabled) +Applicable socket types:: all, when using TCP transports. + + ZMQ_CURVE_PUBLICKEY: Set CURVE public key ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Sets the socket's long term public key. You must set this on CURVE client