diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt old mode 100755 new mode 100644 diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt old mode 100755 new mode 100644 index 20e86ad9..3e88059b --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -735,11 +735,20 @@ The 'ZMQ_RECONNECT_STOP_CONN_REFUSED' option will stop reconnection when 0MQ receives the ECONNREFUSED return code from the connect. This indicates that there is no code bound to the specified endpoint. +The 'ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED' option will stop reconnection if +the 0MQ handshake fails. This can be used to detect and/or prevent errant +connection attempts to non-0MQ sockets. Note that when specifying this option +you may also want to set `ZMQ_HANDSHAKE_IVL` -- the default handshake interval +is 30000 (30 seconds), which is typically too large. + +NOTE: in DRAFT state, not yet available in stable releases. + [horizontal] Option value type:: int -Option value unit:: ZMQ_RECONNECT_STOP_CONN_REFUSED +Option value unit:: 0, ZMQ_RECONNECT_STOP_CONN_REFUSED, ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED, ZMQ_RECONNECT_STOP_CONN_REFUSED | ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED Default value:: 0 -Applicable socket types:: all, only for connection-oriented transports +Applicable socket types:: all, only for connection-oriented transports (ZMQ_HANDSHAKE_IVL is +not applicable for ZMQ_STREAM sockets) ZMQ_RECOVERY_IVL: Set multicast recovery interval diff --git a/include/zmq.h b/include/zmq.h index df403c8f..21e67eb8 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -684,6 +684,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 +#define ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED 0x2 /* DRAFT Context options */ #define ZMQ_ZERO_COPY_RECV 10 diff --git a/src/options.cpp b/src/options.cpp index b257fe14..a5c61f24 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -1252,6 +1252,7 @@ int zmq::options_t::getsockopt (int option_, return 0; } break; + case ZMQ_IN_BATCH_SIZE: if (is_int) { *value = in_batch_size; diff --git a/src/session_base.cpp b/src/session_base.cpp index e8cd6e45..b7da4ccc 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -481,7 +481,7 @@ void zmq::session_base_t::engine_error (bool handshaked_, reconnect (); break; } - /* FALLTHROUGH */ + case i_engine::protocol_error: if (_pending) { if (_pipe) diff --git a/src/stream_connecter_base.cpp b/src/stream_connecter_base.cpp old mode 100755 new mode 100644 diff --git a/src/stream_connecter_base.hpp b/src/stream_connecter_base.hpp old mode 100755 new mode 100644 diff --git a/src/stream_engine_base.cpp b/src/stream_engine_base.cpp index 9cddacac..31f41896 100644 --- a/src/stream_engine_base.cpp +++ b/src/stream_engine_base.cpp @@ -582,6 +582,11 @@ void zmq::stream_engine_base_t::mechanism_ready () alloc_assert (_metadata); } + if (_has_handshake_timer) { + cancel_timer (handshake_timer_id); + _has_handshake_timer = false; + } + _socket->event_handshake_succeeded (_endpoint_uri_pair, 0); } @@ -689,6 +694,13 @@ void zmq::stream_engine_base_t::error (error_reason_t reason_) || _mechanism->status () == mechanism_t::handshaking)) { const int err = errno; _socket->event_handshake_failed_no_detail (_endpoint_uri_pair, err); + // special case: connecting to non-ZMTP process which immediately drops connection, + // or which never responds with greeting, should be treated as a protocol error + // (i.e. stop reconnect) + if ( ( (reason_ == connection_error) || (reason_ == timeout_error) ) + && (_options.reconnect_stop & ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED)) { + reason_ = protocol_error; + } } _socket->event_disconnected (_endpoint_uri_pair, _s); diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 63fd508a..dc072447 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -71,6 +71,7 @@ /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 +#define ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED 0x2 /* DRAFT Context options */ #define ZMQ_ZERO_COPY_RECV 10 diff --git a/src/zmtp_engine.cpp b/src/zmtp_engine.cpp index b722919b..0bde8660 100644 --- a/src/zmtp_engine.cpp +++ b/src/zmtp_engine.cpp @@ -138,11 +138,6 @@ bool zmq::zmtp_engine_t::handshake () if (_outsize == 0) set_pollout (); - if (_has_handshake_timer) { - cancel_timer (handshake_timer_id); - _has_handshake_timer = false; - } - return true; } diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt old mode 100755 new mode 100644 index cbd2ecc8..09c555d4 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -72,6 +72,7 @@ set(tests test_monitor test_socket_null test_reconnect_ivl + test_reconnect_options test_mock_pub_sub) if(NOT WIN32) @@ -155,7 +156,6 @@ if(ENABLE_DRAFTS) test_router_notify test_xpub_manual_last_value test_peer - test_reconnect_options test_msg_init test_channel test_hello_msg diff --git a/tests/test_reconnect_options.cpp b/tests/test_reconnect_options.cpp index 03122d7b..57eafaae 100644 --- a/tests/test_reconnect_options.cpp +++ b/tests/test_reconnect_options.cpp @@ -156,6 +156,7 @@ void reconnect_success () } +#ifdef ZMQ_BUILD_DRAFT_API // test stopping reconnect on connection refused void reconnect_stop_on_refused () { @@ -215,6 +216,61 @@ void reconnect_stop_on_refused () // TODO why does this use zero_linger? test_context_socket_close_zero_linger (sub_mon); } +#endif + +#ifdef ZMQ_BUILD_DRAFT_API +// test stopping reconnect on connection refused +void reconnect_stop_on_handshake_failed () +{ + char bind_address[MAX_SOCKET_STRING]; + size_t addr_length = sizeof (bind_address); + void* dummy = test_context_socket (ZMQ_STREAM); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (dummy, "tcp://127.0.0.1:0")); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (dummy, ZMQ_LAST_ENDPOINT, bind_address, &addr_length)); + + // setup sub socket + void *sub = test_context_socket (ZMQ_SUB); + // Monitor all events on sub + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_socket_monitor (sub, "inproc://monitor-sub", ZMQ_EVENT_ALL)); + // Create socket for collecting monitor events + void *sub_mon = test_context_socket (ZMQ_PAIR); + // Connect so they'll get events + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub_mon, "inproc://monitor-sub")); + // set handshake interval (i.e., timeout) to a more reasonable value + int handshakeInterval = 1000; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_HANDSHAKE_IVL, + &handshakeInterval, + sizeof (handshakeInterval))); + // set option to stop reconnecting on failed handshake + int stopReconnectOnError = ZMQ_RECONNECT_STOP_HANDSHAKE_FAILED; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_RECONNECT_STOP, + &stopReconnectOnError, + sizeof (stopReconnectOnError))); + // connect to dummy stream socket above + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, bind_address)); + + #if 1 + // ZMQ_EVENT_DISCONNECTED should be last event, because of ZMQ_RECONNECT_STOP set above + expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECT_DELAYED); + expect_monitor_event (sub_mon, ZMQ_EVENT_CONNECTED); + expect_monitor_event (sub_mon, ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL); + expect_monitor_event (sub_mon, ZMQ_EVENT_DISCONNECTED); + #else + print_events(sub_mon, 2 * 1000, 1000); + #endif + + // Close sub + // TODO why does this use zero_linger? + test_context_socket_close_zero_linger (sub); + test_context_socket_close_zero_linger (dummy); + + // Close monitor + // TODO why does this use zero_linger? + test_context_socket_close_zero_linger (sub_mon); +} +#endif void setUp () { @@ -234,7 +290,9 @@ int main (void) RUN_TEST (reconnect_default); RUN_TEST (reconnect_success); + #ifdef ZMQ_BUILD_DRAFT_API RUN_TEST (reconnect_stop_on_refused); - + RUN_TEST (reconnect_stop_on_handshake_failed); + #endif return UNITY_END (); } diff --git a/tests/testutil_monitoring.cpp b/tests/testutil_monitoring.cpp index 3de2d3c7..cb4393e8 100644 --- a/tests/testutil_monitoring.cpp +++ b/tests/testutil_monitoring.cpp @@ -343,3 +343,43 @@ void expect_monitor_event_v2 (void *monitor_, free (remote_address); TEST_ASSERT_FALSE_MESSAGE (failed, buf); } + + +const char* get_zmqEventName(uint64_t event) +{ + switch(event) { + case ZMQ_EVENT_CONNECTED : return "CONNECTED"; + case ZMQ_EVENT_CONNECT_DELAYED : return "CONNECT_DELAYED"; + case ZMQ_EVENT_CONNECT_RETRIED : return "CONNECT_RETRIED"; + case ZMQ_EVENT_LISTENING : return "LISTENING"; + case ZMQ_EVENT_BIND_FAILED : return "BIND_FAILED"; + case ZMQ_EVENT_ACCEPTED : return "ACCEPTED"; + case ZMQ_EVENT_ACCEPT_FAILED : return "ACCEPT_FAILED"; + case ZMQ_EVENT_CLOSED : return "CLOSED"; + case ZMQ_EVENT_CLOSE_FAILED : return "CLOSE_FAILED"; + case ZMQ_EVENT_DISCONNECTED : return "DISCONNECTED"; + case ZMQ_EVENT_MONITOR_STOPPED : return "MONITOR_STOPPED"; + case ZMQ_EVENT_HANDSHAKE_FAILED_NO_DETAIL : return "HANDSHAKE_FAILED_NO_DETAIL"; + case ZMQ_EVENT_HANDSHAKE_SUCCEEDED : return "HANDSHAKE_SUCCEEDED"; + case ZMQ_EVENT_HANDSHAKE_FAILED_PROTOCOL : return "HANDSHAKE_FAILED_PROTOCOL"; + case ZMQ_EVENT_HANDSHAKE_FAILED_AUTH : return "HANDSHAKE_FAILED_AUTH"; + default : return "UNKNOWN"; + } +} + +void print_events(void* socket, int timeout, int limit) +{ + // print events received + int value; + char *event_address; + int event = get_monitor_event_with_timeout (socket, &value, &event_address, + timeout); + int i = 0;; + while ((event != -1) && (++i < limit)) { + const char* eventName = get_zmqEventName(event); + printf("Got event: %s\n", eventName); + event = get_monitor_event_with_timeout (socket, &value, &event_address, + timeout); + } + +} diff --git a/tests/testutil_monitoring.hpp b/tests/testutil_monitoring.hpp index 3fc1acef..2e4ac79e 100644 --- a/tests/testutil_monitoring.hpp +++ b/tests/testutil_monitoring.hpp @@ -76,4 +76,8 @@ void expect_monitor_event_v2 (void *monitor_, const char *expected_local_address_ = NULL, const char *expected_remote_address_ = NULL); + +const char* get_zmqEventName(uint64_t event); +void print_events(void* socket, int timeout, int limit); + #endif