diff --git a/Makefile.am b/Makefile.am index fc38631e..f1f9972e 100644 --- a/Makefile.am +++ b/Makefile.am @@ -888,7 +888,8 @@ test_apps += tests/test_poller \ tests/test_radio_dish \ tests/test_scatter_gather \ tests/test_dgram \ - tests/test_app_meta + tests/test_app_meta \ + tests/test_router_notify tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = src/libzmq.la ${UNITY_LIBS} @@ -918,6 +919,10 @@ tests_test_dgram_LDADD = src/libzmq.la tests_test_app_meta_SOURCES = tests/test_app_meta.cpp tests_test_app_meta_LDADD = src/libzmq.la ${UNITY_LIBS} tests_test_app_meta_CPPFLAGS = ${UNITY_CPPFLAGS} + +tests_test_router_notify_SOURCES = tests/test_router_notify.cpp +tests_test_router_notify_LDADD = src/libzmq.la ${UNITY_LIBS} +tests_test_router_notify_CPPFLAGS = ${UNITY_CPPFLAGS} endif if ENABLE_STATIC diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index eb16ef0e..de188ec8 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -904,6 +904,23 @@ Option value unit:: 0, 1 Default value:: 1 Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport + +ZMQ_ROUTER_NOTIFY: Retrieve router socket notification settings +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Retrieve the current notification settings of a router socket. The returned +value is a bitmask composed of ZMQ_NOTIFY_CONNECT and ZMQ_NOTIFY_DISCONNECT +flags, meaning connect and disconnect notifications are enabled, respectively. +A value of '0' means the notifications are off. + +NOTE: in DRAFT state, not yet available in stable releases. + +[horizontal] +Option value type:: int +Option value unit:: 0, ZMQ_NOTIFY_CONNECT, ZMQ_NOTIFY_DISCONNECT, ZMQ_NOTIFY_CONNECT|ZMQ_NOTIFY_DISCONNECT +Default value:: 0 +Applicable socket types:: ZMQ_ROUTER + + RETURN VALUE ------------ The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index af71f47a..b866b41b 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -1289,6 +1289,24 @@ Option value unit:: 0, 1 Default value:: 1 Applicable socket types:: ZMQ_RADIO, when using UDP multicast transport + +ZMQ_ROUTER_NOTIFY: Send connect and disconnect notifications +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Enable connect and disconnect notifications on a ROUTER socket. +When enabled, the socket delivers a zero-length message (with routing-id +as first frame) when a peer connects or disconnects. It's possible +to notify both events for a peer by OR-ing the flag values. This option +only applies to stream oriented (tcp, ipc) transports. + +NOTE: in DRAFT state, not yet available in stable releases. + +[horizontal] +Option value type:: int +Option value unit:: 0, ZMQ_NOTIFY_CONNECT, ZMQ_NOTIFY_DISCONNECT, ZMQ_NOTIFY_CONNECT|ZMQ_NOTIFY_DISCONNECT +Default value:: 0 +Applicable socket types:: ZMQ_ROUTER + + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it diff --git a/include/zmq.h b/include/zmq.h index c0094bad..3a785dcf 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -618,6 +618,8 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); #define ZMQ_LOOPBACK_FASTPATH 94 #define ZMQ_METADATA 95 #define ZMQ_MULTICAST_LOOP 96 +#define ZMQ_ROUTER_NOTIFY 97 + /* DRAFT 0MQ socket events and monitoring */ /* Unspecified system errors during handshake. Event value is an errno. */ @@ -678,6 +680,10 @@ ZMQ_EXPORT const char *zmq_msg_group (zmq_msg_t *msg); #define ZMQ_MSG_PROPERTY_USER_ID "User-Id" #define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address" +/* Router notify options */ +#define ZMQ_NOTIFY_CONNECT 1 +#define ZMQ_NOTIFY_DISCONNECT 2 + /******************************************************************************/ /* Poller polling on sockets,fd and thread-safe sockets */ /******************************************************************************/ diff --git a/src/options.cpp b/src/options.cpp index d8d40324..aad90cff 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -243,7 +243,8 @@ zmq::options_t::options_t () : zap_enforce_domain (false), loopback_fastpath (false), multicast_loop (true), - zero_copy (true) + zero_copy (true), + router_notify (0) { memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_secret_key, 0, CURVE_KEYSIZE); @@ -1149,6 +1150,16 @@ int zmq::options_t::getsockopt (int option_, } break; +#ifdef ZMQ_BUILD_DRAFT_API + case ZMQ_ROUTER_NOTIFY: + if (is_int) { + *value = router_notify; + return 0; + } + break; +#endif + + default: #if defined(ZMQ_ACT_MILITANT) malformed = false; diff --git a/src/options.hpp b/src/options.hpp index 8cf08c72..bc61d85f 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -261,6 +261,9 @@ struct options_t // Use zero copy strategy for storing message content when decoding. bool zero_copy; + // Router socket ZMQ_NOTIFY_CONNECT/ZMQ_NOTIFY_DISCONNECT notifications + int router_notify; + // Application metadata std::map app_metadata; }; diff --git a/src/router.cpp b/src/router.cpp index a97096cf..fec65ff7 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -139,6 +139,16 @@ int zmq::router_t::xsetsockopt (int option_, } break; +#ifdef ZMQ_BUILD_DRAFT_API + case ZMQ_ROUTER_NOTIFY: + if (is_int && value >= 0 + && value <= (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT)) { + options.router_notify = value; + return 0; + } + break; +#endif + default: return routing_socket_base_t::xsetsockopt (option_, optval_, optvallen_); diff --git a/src/session_base.cpp b/src/session_base.cpp index c924a814..364ee37d 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -217,6 +217,12 @@ void zmq::session_base_t::flush () _pipe->flush (); } +void zmq::session_base_t::rollback () +{ + if (_pipe) + _pipe->rollback (); +} + void zmq::session_base_t::clean_pipes () { zmq_assert (_pipe != NULL); diff --git a/src/session_base.hpp b/src/session_base.hpp index b503adc8..74e31810 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -60,6 +60,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events // Following functions are the interface exposed towards the engine. virtual void reset (); void flush (); + void rollback (); void engine_error (zmq::stream_engine_t::error_reason_t reason_); // i_pipe_events interface implementation. diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 09fbdebe..59a68a9b 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -897,6 +897,8 @@ void zmq::stream_engine_t::mechanism_ready () _has_heartbeat_timer = true; } + bool flush_session = false; + if (_options.recv_routing_id) { msg_t routing_id; _mechanism->peer_routing_id (&routing_id); @@ -908,9 +910,26 @@ void zmq::stream_engine_t::mechanism_ready () return; } errno_assert (rc == 0); - _session->flush (); + flush_session = true; } + if (_options.router_notify & ZMQ_NOTIFY_CONNECT) { + msg_t connect_notification; + connect_notification.init (); + const int rc = _session->push_msg (&connect_notification); + if (rc == -1 && errno == EAGAIN) { + // If the write is failing at this stage with + // an EAGAIN the pipe must be being shut down, + // so we can just bail out of the notification. + return; + } + errno_assert (rc == 0); + flush_session = true; + } + + if (flush_session) + _session->flush (); + _next_msg = &stream_engine_t::pull_and_encode; _process_msg = &stream_engine_t::write_credential; @@ -1038,6 +1057,18 @@ void zmq::stream_engine_t::error (error_reason_t reason_) terminator.close (); } zmq_assert (_session); + + if ((_options.router_notify & ZMQ_NOTIFY_DISCONNECT) && !_handshaking) { + // For router sockets with disconnect notification, rollback + // any incomplete message in the pipe, and push the disconnect + // notification message. + _session->rollback (); + + msg_t disconnect_notification; + disconnect_notification.init (); + _session->push_msg (&disconnect_notification); + } + #ifdef ZMQ_BUILD_DRAFT_API // protocol errors have been signaled already at the point where they occurred if (reason_ != protocol_error diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 36cd54f4..156ec85b 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -55,6 +55,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_); #define ZMQ_LOOPBACK_FASTPATH 94 #define ZMQ_METADATA 95 #define ZMQ_MULTICAST_LOOP 96 +#define ZMQ_ROUTER_NOTIFY 97 /* DRAFT 0MQ socket events and monitoring */ /* Unspecified system errors during handshake. Event value is an errno. */ @@ -115,6 +116,10 @@ const char *zmq_msg_group (zmq_msg_t *msg_); #define ZMQ_MSG_PROPERTY_USER_ID "User-Id" #define ZMQ_MSG_PROPERTY_PEER_ADDRESS "Peer-Address" +/* Router notify options */ +#define ZMQ_NOTIFY_CONNECT 1 +#define ZMQ_NOTIFY_DISCONNECT 2 + /******************************************************************************/ /* Poller polling on sockets,fd and thread-safe sockets */ /******************************************************************************/ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 591da07f..9a053f00 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -136,6 +136,7 @@ IF (ENABLE_DRAFTS) test_scatter_gather test_dgram test_app_meta + test_router_notify ) ENDIF (ENABLE_DRAFTS) diff --git a/tests/test_router_notify.cpp b/tests/test_router_notify.cpp new file mode 100644 index 00000000..ba086542 --- /dev/null +++ b/tests/test_router_notify.cpp @@ -0,0 +1,328 @@ +/* + Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include + +void setUp () +{ + setup_test_context (); +} + +void tearDown () +{ + teardown_test_context (); +} + +void test_sockopt_router_notify () +{ + void *router = test_context_socket (ZMQ_ROUTER); + int opt_notify; + + int opt_notify_read; + size_t opt_notify_read_size = sizeof (opt_notify_read); + + + // default value is off when socket is constructed + TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); + + TEST_ASSERT_EQUAL (0, opt_notify_read); + + + // valid value - Connect + opt_notify = ZMQ_NOTIFY_CONNECT; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); + + TEST_ASSERT_EQUAL (opt_notify, opt_notify_read); + + + // valid value - Disconnect + opt_notify = ZMQ_NOTIFY_DISCONNECT; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); + + TEST_ASSERT_EQUAL (opt_notify, opt_notify_read); + + + // valid value - Off + opt_notify = 0; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); + + TEST_ASSERT_EQUAL (opt_notify, opt_notify_read); + + + // valid value - Both + opt_notify = ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); + + TEST_ASSERT_EQUAL (opt_notify, opt_notify_read); + + + // value boundary + opt_notify = -1; + TEST_ASSERT_FAILURE_ERRNO ( + EINVAL, zmq_setsockopt (router, ZMQ_ROUTER_NOTIFY, &opt_notify, + sizeof (opt_notify))); + + opt_notify = (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT) + 1; + TEST_ASSERT_FAILURE_ERRNO ( + EINVAL, zmq_setsockopt (router, ZMQ_ROUTER_NOTIFY, &opt_notify, + sizeof (opt_notify))); + + // failures don't update the value + TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); + + TEST_ASSERT_EQUAL (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT, + opt_notify_read); + + + test_context_socket_close (router); + + + // check a non-router socket type + void *dealer = test_context_socket (ZMQ_DEALER); + + // setsockopt fails for non-router sockets + opt_notify = ZMQ_NOTIFY_CONNECT; + TEST_ASSERT_FAILURE_ERRNO ( + EINVAL, zmq_setsockopt (dealer, ZMQ_ROUTER_NOTIFY, &opt_notify, + sizeof (opt_notify))); + + // getsockopts returns off for any non-router socket + TEST_ASSERT_SUCCESS_ERRNO (zmq_getsockopt ( + dealer, ZMQ_ROUTER_NOTIFY, &opt_notify_read, &opt_notify_read_size)); + + TEST_ASSERT_EQUAL (0, opt_notify_read); + + + test_context_socket_close (dealer); +} + + +void test_router_notify_helper (int opt_notify) +{ + void *router = test_context_socket (ZMQ_ROUTER); + int opt_more; + size_t opt_more_length = sizeof (opt_more); + int opt_events; + size_t opt_events_length = sizeof (opt_events); + + + // valid values + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (router, ENDPOINT_0)); + + void *dealer = test_context_socket (ZMQ_DEALER); + const char *dealer_routing_id = "X"; + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (dealer, ZMQ_ROUTING_ID, dealer_routing_id, 1)); + + // dealer connects + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, ENDPOINT_0)); + + // connection notification msg + if (opt_notify & ZMQ_NOTIFY_CONNECT) { + // routing-id only message of the connect + recv_string_expect_success (router, dealer_routing_id, + 0); // 1st part: routing-id + recv_string_expect_success (router, "", 0); // 2nd part: empty + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (router, ZMQ_RCVMORE, &opt_more, &opt_more_length)); + TEST_ASSERT_EQUAL (0, opt_more); + } + + // test message from the dealer + send_string_expect_success (dealer, "Hello", 0); + recv_string_expect_success (router, dealer_routing_id, 0); + recv_string_expect_success (router, "Hello", 0); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (router, ZMQ_RCVMORE, &opt_more, &opt_more_length)); + TEST_ASSERT_EQUAL (0, opt_more); + + // dealer disconnects + TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (dealer, ENDPOINT_0)); + + // need one more process_commands() (???) + msleep (SETTLE_TIME); + zmq_getsockopt (dealer, ZMQ_EVENTS, &opt_events, &opt_events_length); + + // connection notification msg + if (opt_notify & ZMQ_NOTIFY_DISCONNECT) { + // routing-id only message of the connect + recv_string_expect_success (router, dealer_routing_id, + 0); // 1st part: routing-id + recv_string_expect_success (router, "", 0); // 2nd part: empty + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_getsockopt (router, ZMQ_RCVMORE, &opt_more, &opt_more_length)); + TEST_ASSERT_EQUAL (0, opt_more); + } + + test_context_socket_close (dealer); + test_context_socket_close (router); +} + + +void test_router_notify_connect () +{ + test_router_notify_helper (ZMQ_NOTIFY_CONNECT); +} + + +void test_router_notify_disconnect () +{ + test_router_notify_helper (ZMQ_NOTIFY_DISCONNECT); +} + + +void test_router_notify_both () +{ + test_router_notify_helper (ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT); +} + + +void test_handshake_fail () +{ + // setup router socket + void *router = test_context_socket (ZMQ_ROUTER); + int opt_timeout = 200; + int opt_notify = ZMQ_NOTIFY_CONNECT | ZMQ_NOTIFY_DISCONNECT; + + // valid values + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + router, ZMQ_RCVTIMEO, &opt_timeout, sizeof (opt_timeout))); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (router, ENDPOINT_0)); + + // send something on raw tcp + void *stream = test_context_socket (ZMQ_STREAM); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (stream, ENDPOINT_0)); + + send_string_expect_success (stream, "not-a-handshake", 0); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (stream, ENDPOINT_0)); + test_context_socket_close (stream); + + // no notification delivered + char buffer[255]; + TEST_ASSERT_FAILURE_ERRNO (EAGAIN, + zmq_recv (router, buffer, sizeof (buffer), 0)); + + test_context_socket_close (router); +} + + +void test_error_during_multipart () +{ + /* + * If the disconnect occurs in the middle of the multipart + * message, the socket should not add the notification at the + * end of the incomplete message. It must discard the incomplete + * message, and delivert the notification as a new message. + */ + + char long_str[128] = {0}; + memset (long_str, '*', sizeof (long_str) - 1); + + // setup router + void *router = test_context_socket (ZMQ_ROUTER); + + int opt_notify = ZMQ_NOTIFY_DISCONNECT; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + router, ZMQ_ROUTER_NOTIFY, &opt_notify, sizeof (opt_notify))); + + int64_t opt_maxmsgsize = 64; // the handshake fails if this is too small + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + router, ZMQ_MAXMSGSIZE, &opt_maxmsgsize, sizeof (opt_maxmsgsize))); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (router, ENDPOINT_0)); + + + // setup dealer + void *dealer = test_context_socket (ZMQ_DEALER); + const char *dealer_routing_id = "X"; + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (dealer, ZMQ_ROUTING_ID, dealer_routing_id, 1)); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, ENDPOINT_0)); + + + // send multipart message, the 2nd part causes a disconnect. + send_string_expect_success (dealer, "Hello2", ZMQ_SNDMORE); + send_string_expect_success (dealer, long_str, 0); + + // disconnect notification + recv_string_expect_success (router, dealer_routing_id, 0); + recv_string_expect_success (router, "", 0); + + + test_context_socket_close (dealer); + test_context_socket_close (router); +} + + +int main (void) +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_sockopt_router_notify); + RUN_TEST (test_router_notify_connect); + RUN_TEST (test_router_notify_disconnect); + RUN_TEST (test_router_notify_both); + RUN_TEST (test_handshake_fail); + RUN_TEST (test_error_during_multipart); + + return UNITY_END (); +}