rebase reconnect-redux on master (#3960)

* add option to stop reconnecting on failed handshake
This commit is contained in:
Bill Torpey
2020-06-26 18:41:44 -04:00
committed by GitHub
parent c7aef56048
commit c04f6581e0
14 changed files with 131 additions and 10 deletions

0
doc/zmq_getsockopt.txt Executable file → Normal file
View File

13
doc/zmq_setsockopt.txt Executable file → Normal file
View File

@@ -735,11 +735,20 @@ The 'ZMQ_RECONNECT_STOP_CONN_REFUSED' option will stop reconnection when 0MQ
receives the ECONNREFUSED return code from the connect. This indicates that receives the ECONNREFUSED return code from the connect. This indicates that
there is no code bound to the specified endpoint. there is no code bound to the specified endpoint.
The 'ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED' option will stop reconnection if
the 0MQ handshake fails. This can be used to detect and/or prevent errant
connection attempts to non-0MQ sockets. Note that when specifying this option
you may also want to set `ZMQ_HANDSHAKE_IVL` -- the default handshake interval
is 30000 (30 seconds), which is typically too large.
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal] [horizontal]
Option value type:: int Option value type:: int
Option value unit:: ZMQ_RECONNECT_STOP_CONN_REFUSED Option value unit:: 0, ZMQ_RECONNECT_STOP_CONN_REFUSED, ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED, ZMQ_RECONNECT_STOP_CONN_REFUSED | ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED
Default value:: 0 Default value:: 0
Applicable socket types:: all, only for connection-oriented transports Applicable socket types:: all, only for connection-oriented transports (ZMQ_HANDSHAKE_IVL is
not applicable for ZMQ_STREAM sockets)
ZMQ_RECOVERY_IVL: Set multicast recovery interval ZMQ_RECOVERY_IVL: Set multicast recovery interval

View File

@@ -684,6 +684,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
/* DRAFT ZMQ_RECONNECT_STOP options */ /* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1
#define ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED 0x2
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10 #define ZMQ_ZERO_COPY_RECV 10

View File

@@ -1252,6 +1252,7 @@ int zmq::options_t::getsockopt (int option_,
return 0; return 0;
} }
break; break;
case ZMQ_IN_BATCH_SIZE: case ZMQ_IN_BATCH_SIZE:
if (is_int) { if (is_int) {
*value = in_batch_size; *value = in_batch_size;

View File

@@ -481,7 +481,7 @@ void zmq::session_base_t::engine_error (bool handshaked_,
reconnect (); reconnect ();
break; break;
} }
/* FALLTHROUGH */
case i_engine::protocol_error: case i_engine::protocol_error:
if (_pending) { if (_pending) {
if (_pipe) if (_pipe)

0
src/stream_connecter_base.cpp Executable file → Normal file
View File

0
src/stream_connecter_base.hpp Executable file → Normal file
View File

View File

@@ -582,6 +582,11 @@ void zmq::stream_engine_base_t::mechanism_ready ()
alloc_assert (_metadata); alloc_assert (_metadata);
} }
if (_has_handshake_timer) {
cancel_timer (handshake_timer_id);
_has_handshake_timer = false;
}
_socket->event_handshake_succeeded (_endpoint_uri_pair, 0); _socket->event_handshake_succeeded (_endpoint_uri_pair, 0);
} }
@@ -689,6 +694,13 @@ void zmq::stream_engine_base_t::error (error_reason_t reason_)
|| _mechanism->status () == mechanism_t::handshaking)) { || _mechanism->status () == mechanism_t::handshaking)) {
const int err = errno; const int err = errno;
_socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err); _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err);
// special case: connecting to non-ZMTP process which immediately drops connection,
// or which never responds with greeting, should be treated as a protocol error
// (i.e. stop reconnect)
if ( ( (reason_ == connection_error) || (reason_ == timeout_error) )
&& (_options.reconnect_stop & ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED)) {
reason_ = protocol_error;
}
} }
_socket->event_disconnected (_endpoint_uri_pair, _s); _socket->event_disconnected (_endpoint_uri_pair, _s);

View File

@@ -71,6 +71,7 @@
/* DRAFT ZMQ_RECONNECT_STOP options */ /* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1
#define ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED 0x2
/* DRAFT Context options */ /* DRAFT Context options */
#define ZMQ_ZERO_COPY_RECV 10 #define ZMQ_ZERO_COPY_RECV 10

View File

@@ -138,11 +138,6 @@ bool zmq::zmtp_engine_t::handshake ()
if (_outsize == 0) if (_outsize == 0)
set_pollout (); set_pollout ();
if (_has_handshake_timer) {
cancel_timer (handshake_timer_id);
_has_handshake_timer = false;
}
return true; return true;
} }

2
tests/CMakeLists.txt Executable file → Normal file
View File

@@ -72,6 +72,7 @@ set(tests
test_monitor test_monitor
test_socket_null test_socket_null
test_reconnect_ivl test_reconnect_ivl
test_reconnect_options
test_mock_pub_sub) test_mock_pub_sub)
if(NOT WIN32) if(NOT WIN32)
@@ -155,7 +156,6 @@ if(ENABLE_DRAFTS)
test_router_notify test_router_notify
test_xpub_manual_last_value test_xpub_manual_last_value
test_peer test_peer
test_reconnect_options
test_msg_init test_msg_init
test_channel test_channel
test_hello_msg test_hello_msg

View File

@@ -156,6 +156,7 @@ void reconnect_success ()
} }
#ifdef ZMQ_BUILD_DRAFT_API
// test stopping reconnect on connection refused // test stopping reconnect on connection refused
void reconnect_stop_on_refused () void reconnect_stop_on_refused ()
{ {
@@ -215,6 +216,61 @@ void reconnect_stop_on_refused ()
// TODO why does this use zero_linger? // TODO why does this use zero_linger?
test_context_socket_close_zero_linger (sub_mon); test_context_socket_close_zero_linger (sub_mon);
} }
#endif
#ifdef ZMQ_BUILD_DRAFT_API
// test stopping reconnect on connection refused
void reconnect_stop_on_handshake_failed ()
{
char bind_address[MAX_SOCKET_STRING];
size_t addr_length = sizeof (bind_address);
void* dummy = test_context_socket (ZMQ_STREAM);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dummy, "tcp://127.0.0.1:0"));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (dummy, ZMQ_LAST_ENDPOINT, bind_address, &addr_length));
// setup sub socket
void *sub = test_context_socket (ZMQ_SUB);
// Monitor all events on sub
TEST_ASSERT_SUCCESS_ERRNO (
zmq_socket_monitor (sub, "inproc://monitor-sub", ZMQ_EVENT_ALL));
// Create socket for collecting monitor events
void *sub_mon = test_context_socket (ZMQ_PAIR);
// Connect so they'll get events
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_mon, "inproc://monitor-sub"));
// set handshake interval (i.e., timeout) to a more reasonable value
int handshakeInterval = 1000;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_HANDSHAKE_IVL,
&handshakeInterval,
sizeof (handshakeInterval)));
// set option to stop reconnecting on failed handshake
int stopReconnectOnError = ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED;
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_RECONNECT_STOP,
&stopReconnectOnError,
sizeof (stopReconnectOnError)));
// connect to dummy stream socket above
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, bind_address));
#if 1
// ZMQ_EVENT_DISCONNECTED should be last event, because of ZMQ_RECONNECT_STOP set above
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_DELAYED);
expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECTED);
expect_monitor_event (sub_mon, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL);
expect_monitor_event (sub_mon, ZMQ_EVENT_DISCONNECTED);
#else
print_events(sub_mon, 2 * 1000, 1000);
#endif
// Close sub
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (sub);
test_context_socket_close_zero_linger (dummy);
// Close monitor
// TODO why does this use zero_linger?
test_context_socket_close_zero_linger (sub_mon);
}
#endif
void setUp () void setUp ()
{ {
@@ -234,7 +290,9 @@ int main (void)
RUN_TEST (reconnect_default); RUN_TEST (reconnect_default);
RUN_TEST (reconnect_success); RUN_TEST (reconnect_success);
#ifdef ZMQ_BUILD_DRAFT_API
RUN_TEST (reconnect_stop_on_refused); RUN_TEST (reconnect_stop_on_refused);
RUN_TEST (reconnect_stop_on_handshake_failed);
#endif
return UNITY_END (); return UNITY_END ();
} }

View File

@@ -343,3 +343,43 @@ void expect_monitor_event_v2 (void *monitor_,
free (remote_address); free (remote_address);
TEST_ASSERT_FALSE_MESSAGE (failed, buf); TEST_ASSERT_FALSE_MESSAGE (failed, buf);
} }
const char* get_zmqEventName(uint64_t event)
{
switch(event) {
case ZMQ_EVENT_CONNECTED : return "CONNECTED";
case ZMQ_EVENT_CONNECT_DELAYED : return "CONNECT_DELAYED";
case ZMQ_EVENT_CONNECT_RETRIED : return "CONNECT_RETRIED";
case ZMQ_EVENT_LISTENING : return "LISTENING";
case ZMQ_EVENT_BIND_FAILED : return "BIND_FAILED";
case ZMQ_EVENT_ACCEPTED : return "ACCEPTED";
case ZMQ_EVENT_ACCEPT_FAILED : return "ACCEPT_FAILED";
case ZMQ_EVENT_CLOSED : return "CLOSED";
case ZMQ_EVENT_CLOSE_FAILED : return "CLOSE_FAILED";
case ZMQ_EVENT_DISCONNECTED : return "DISCONNECTED";
case ZMQ_EVENT_MONITOR_STOPPED : return "MONITOR_STOPPED";
case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL : return "HANDSHAKE_FAILED_NO_DETAIL";
case ZMQ_EVENT_HANDSHAKE_SUCCEEDED : return "HANDSHAKE_SUCCEEDED";
case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL : return "HANDSHAKE_FAILED_PROTOCOL";
case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH : return "HANDSHAKE_FAILED_AUTH";
default : return "UNKNOWN";
}
}
void print_events(void* socket, int timeout, int limit)
{
// print events received
int value;
char *event_address;
int event = get_monitor_event_with_timeout (socket, &value, &event_address,
timeout);
int i = 0;;
while ((event != -1) && (++i < limit)) {
const char* eventName = get_zmqEventName(event);
printf("Got event: %s\n", eventName);
event = get_monitor_event_with_timeout (socket, &value, &event_address,
timeout);
}
}

View File

@@ -76,4 +76,8 @@ void expect_monitor_event_v2 (void *monitor_,
const char *expected_local_address_ = NULL, const char *expected_local_address_ = NULL,
const char *expected_remote_address_ = NULL); const char *expected_remote_address_ = NULL);
const char* get_zmqEventName(uint64_t event);
void print_events(void* socket, int timeout, int limit);
#endif #endif