problem: router doesn't know when peer disconnected

ZMQ_ROUTER_NOTIFY doesn't have a context and doesn't play nice with protocols. with ZMQ_DISCONNECT_MSG we can set it to a protocol message, like DISCONNECT in majordomo. Router will send it when a peer is disconnected. Another advantage of ZMQ_DISCONNECT_MSG is that it also works on inproc.

Together with ZMQ_HEARTBEAT it allows to build very reliable protocols, and much simpler as well.
This commit is contained in:
Doron Somech 2020-04-17 13:20:57 +03:00
parent 4c1d720a47
commit 81444136d5
18 changed files with 224 additions and 9 deletions

View File

@ -1042,7 +1042,8 @@ test_apps += tests/test_poller \
tests/test_peer \ tests/test_peer \
tests/test_reconnect_options \ tests/test_reconnect_options \
tests/test_msg_init \ tests/test_msg_init \
tests/test_hello_msg tests/test_hello_msg \
tests/test_disconnect_msg
tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
@ -1099,6 +1100,10 @@ tests_test_msg_init_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_hello_msg_SOURCES = tests/test_hello_msg.cpp tests_test_hello_msg_SOURCES = tests/test_hello_msg.cpp
tests_test_hello_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_hello_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_hello_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_hello_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_disconnect_msg_SOURCES = tests/test_disconnect_msg.cpp
tests_test_disconnect_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_disconnect_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
endif endif
if ENABLE_STATIC if ENABLE_STATIC

View File

@ -228,6 +228,18 @@ Option value size:: 32 or 41
Default value:: NULL Default value:: NULL
Applicable socket types:: all, when using TCP transport Applicable socket types:: all, when using TCP transport
ZMQ_DISCONNECT_MSG: set a disconnect message that the socket will generate when accepted peer disconnect
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When set, the socket will generate a disconnect message when accepted peer has been disconnected.
You may set this on ROUTER, SERVER and PEER sockets.
The combination with ZMQ_HEARTBEAT_IVL is powerful and simplify protocols, when heartbeat recognize a connection drop it
will generate a disconnect message that can match the protocol of the application.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_ROUTER, ZMQ_SERVER and ZMQ_PEER
ZMQ_GSSAPI_PLAINTEXT: Disable GSSAPI encryption ZMQ_GSSAPI_PLAINTEXT: Disable GSSAPI encryption
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -679,6 +679,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108 #define ZMQ_ONLY_FIRST_SUBSCRIBE 108
#define ZMQ_RECONNECT_STOP 109 #define ZMQ_RECONNECT_STOP 109
#define ZMQ_HELLO_MSG 110 #define ZMQ_HELLO_MSG 110
#define ZMQ_DISCONNECT_MSG 111
/* DRAFT ZMQ_RECONNECT_STOP options */ /* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1

View File

@ -828,6 +828,13 @@ void zmq::ctx_t::connect_inproc_sockets (
pending_connection_.bind_pipe->set_hwms (-1, -1); pending_connection_.bind_pipe->set_hwms (-1, -1);
} }
#ifdef ZMQ_BUILD_DRAFT_API
if (bind_options_.can_recv_disconnect_msg
&& !bind_options_.disconnect_msg.empty ())
pending_connection_.connect_pipe->set_disconnect_msg (
bind_options_.disconnect_msg);
#endif
if (side_ == bind_side) { if (side_ == bind_side) {
command_t cmd; command_t cmd;
cmd.type = command_t::bind; cmd.type = command_t::bind;

View File

@ -251,7 +251,9 @@ zmq::options_t::options_t () :
monitor_event_version (1), monitor_event_version (1),
wss_trust_system (false), wss_trust_system (false),
hello_msg (), hello_msg (),
can_send_hello_msg (false) can_send_hello_msg (false),
disconnect_msg (),
can_recv_disconnect_msg (false)
{ {
memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_public_key, 0, CURVE_KEYSIZE);
memset (curve_secret_key, 0, CURVE_KEYSIZE); memset (curve_secret_key, 0, CURVE_KEYSIZE);
@ -825,6 +827,16 @@ int zmq::options_t::setsockopt (int option_,
hello_msg = std::vector<unsigned char> (); hello_msg = std::vector<unsigned char> ();
} }
return 0;
case ZMQ_DISCONNECT_MSG:
if (optvallen_ > 0) {
unsigned char *bytes = (unsigned char *) optval_;
disconnect_msg =
std::vector<unsigned char> (bytes, bytes + optvallen_);
} else {
disconnect_msg = std::vector<unsigned char> ();
}
return 0; return 0;

View File

@ -301,6 +301,10 @@ struct options_t
// Hello msg // Hello msg
std::vector<unsigned char> hello_msg; std::vector<unsigned char> hello_msg;
bool can_send_hello_msg; bool can_send_hello_msg;
// Disconnect msg
std::vector<unsigned char> disconnect_msg;
bool can_recv_disconnect_msg;
}; };
inline bool get_effective_conflate_option (const options_t &options) inline bool get_effective_conflate_option (const options_t &options)

View File

@ -41,6 +41,7 @@ zmq::peer_t::peer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
{ {
options.type = ZMQ_PEER; options.type = ZMQ_PEER;
options.can_send_hello_msg = true; options.can_send_hello_msg = true;
options.can_recv_disconnect_msg = true;
} }
uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_) uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_)

View File

@ -124,10 +124,12 @@ zmq::pipe_t::pipe_t (object_t *parent_,
_server_socket_routing_id (0), _server_socket_routing_id (0),
_conflate (conflate_) _conflate (conflate_)
{ {
_disconnect_msg.init ();
} }
zmq::pipe_t::~pipe_t () zmq::pipe_t::~pipe_t ()
{ {
_disconnect_msg.close ();
} }
void zmq::pipe_t::set_peer (pipe_t *peer_) void zmq::pipe_t::set_peer (pipe_t *peer_)
@ -591,3 +593,24 @@ void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_,
send_pipe_stats_publish (socket_base_, queue_count_, send_pipe_stats_publish (socket_base_, queue_count_,
_msgs_written - _peers_msgs_read, endpoint_pair_); _msgs_written - _peers_msgs_read, endpoint_pair_);
} }
void zmq::pipe_t::send_disconnect_msg ()
{
if (_disconnect_msg.size () > 0) {
// Rollback any incomplete message in the pipe, and push the disconnect message.
rollback ();
_out_pipe->write (_disconnect_msg, false);
flush ();
_disconnect_msg.init ();
}
}
void zmq::pipe_t::set_disconnect_msg (
const std::vector<unsigned char> &disconnect_)
{
_disconnect_msg.close ();
const int rc =
_disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ());
errno_assert (rc == 0);
}

View File

@ -38,10 +38,10 @@
#include "blob.hpp" #include "blob.hpp"
#include "options.hpp" #include "options.hpp"
#include "endpoint.hpp" #include "endpoint.hpp"
#include "msg.hpp"
namespace zmq namespace zmq
{ {
class msg_t;
class pipe_t; class pipe_t;
// Create a pipepair for bi-directional transfer of messages. // Create a pipepair for bi-directional transfer of messages.
@ -147,6 +147,9 @@ class pipe_t ZMQ_FINAL : public object_t,
void send_stats_to_peer (own_t *socket_base_); void send_stats_to_peer (own_t *socket_base_);
void send_disconnect_msg ();
void set_disconnect_msg (const std::vector<unsigned char> &disconnect_);
private: private:
// Type of the underlying lock-free pipe. // Type of the underlying lock-free pipe.
typedef ypipe_base_t<msg_t> upipe_t; typedef ypipe_base_t<msg_t> upipe_t;
@ -257,6 +260,9 @@ class pipe_t ZMQ_FINAL : public object_t,
// The endpoints of this pipe. // The endpoints of this pipe.
endpoint_uri_pair_t _endpoint_pair; endpoint_uri_pair_t _endpoint_pair;
// Disconnect msg
msg_t _disconnect_msg;
ZMQ_NON_COPYABLE_NOR_MOVABLE (pipe_t) ZMQ_NON_COPYABLE_NOR_MOVABLE (pipe_t)
}; };

View File

@ -56,6 +56,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
options.recv_routing_id = true; options.recv_routing_id = true;
options.raw_socket = false; options.raw_socket = false;
options.can_send_hello_msg = true; options.can_send_hello_msg = true;
options.can_recv_disconnect_msg = true;
_prefetched_id.init (); _prefetched_id.init ();
_prefetched_msg.init (); _prefetched_msg.init ();

View File

@ -42,6 +42,7 @@ zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
{ {
options.type = ZMQ_SERVER; options.type = ZMQ_SERVER;
options.can_send_hello_msg = true; options.can_send_hello_msg = true;
options.can_recv_disconnect_msg = true;
} }
zmq::server_t::~server_t () zmq::server_t::~server_t ()

View File

@ -442,15 +442,26 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
_engine->plug (_io_thread, this); _engine->plug (_io_thread, this);
} }
void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_) void zmq::session_base_t::engine_error (bool handshaked_,
zmq::i_engine::error_reason_t reason_)
{ {
// Engine is dead. Let's forget about it. // Engine is dead. Let's forget about it.
_engine = NULL; _engine = NULL;
// Remove any half-done messages from the pipes. // Remove any half-done messages from the pipes.
if (_pipe) if (_pipe) {
clean_pipes (); clean_pipes ();
#ifdef ZMQ_BUILD_DRAFT_API
// Only send disconnect message if socket was accepted and handshake was completed
if (!_active && handshaked_ && options.can_recv_disconnect_msg
&& !options.disconnect_msg.empty ()) {
_pipe->set_disconnect_msg (options.disconnect_msg);
_pipe->send_disconnect_msg ();
}
#endif
}
zmq_assert (reason_ == i_engine::connection_error zmq_assert (reason_ == i_engine::connection_error
|| reason_ == i_engine::timeout_error || reason_ == i_engine::timeout_error
|| reason_ == i_engine::protocol_error); || reason_ == i_engine::protocol_error);

View File

@ -62,7 +62,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
virtual void reset (); virtual void reset ();
void flush (); void flush ();
void rollback (); void rollback ();
void engine_error (zmq::i_engine::error_reason_t reason_); void engine_error (bool handshaked_, zmq::i_engine::error_reason_t reason_);
// i_pipe_events interface implementation. // i_pipe_events interface implementation.
void read_activated (zmq::pipe_t *pipe_) ZMQ_FINAL; void read_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;

View File

@ -122,8 +122,10 @@ int zmq::socket_base_t::inprocs_t::erase_pipes (
return -1; return -1;
} }
for (map_t::iterator it = range.first; it != range.second; ++it) for (map_t::iterator it = range.first; it != range.second; ++it) {
it->second->send_disconnect_msg ();
it->second->terminate (true); it->second->terminate (true);
}
_inprocs.erase (range.first, range.second); _inprocs.erase (range.first, range.second);
return 0; return 0;
} }
@ -864,6 +866,10 @@ int zmq::socket_base_t::connect_internal (const char *endpoint_uri_)
&& peer.options.hello_msg.size () > 0) { && peer.options.hello_msg.size () > 0) {
send_hello_msg (new_pipes[1], peer.options); send_hello_msg (new_pipes[1], peer.options);
} }
if (peer.options.can_recv_disconnect_msg
&& peer.options.disconnect_msg.size () > 0)
new_pipes[0]->set_disconnect_msg (peer.options.disconnect_msg);
#endif #endif
// Attach remote end of the pipe to the peer socket. Note that peer's // Attach remote end of the pipe to the peer socket. Note that peer's
@ -1530,6 +1536,8 @@ void zmq::socket_base_t::process_term (int linger_)
// Ask all attached pipes to terminate. // Ask all attached pipes to terminate.
for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) { for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) {
// Only inprocs might have a disconnect message set
_pipes[i]->send_disconnect_msg ();
_pipes[i]->terminate (false); _pipes[i]->terminate (false);
} }
register_term_acks (static_cast<int> (_pipes.size ())); register_term_acks (static_cast<int> (_pipes.size ()));

View File

@ -685,7 +685,11 @@ void zmq::stream_engine_base_t::error (error_reason_t reason_)
_socket->event_disconnected (_endpoint_uri_pair, _s); _socket->event_disconnected (_endpoint_uri_pair, _s);
_session->flush (); _session->flush ();
_session->engine_error (reason_); _session->engine_error (
!_handshaking
&& (_mechanism == NULL
|| _mechanism->status () != mechanism_t::handshaking),
reason_);
unplug (); unplug ();
delete this; delete this;
} }

View File

@ -344,7 +344,7 @@ int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_)
void zmq::udp_engine_t::error (error_reason_t reason_) void zmq::udp_engine_t::error (error_reason_t reason_)
{ {
zmq_assert (_session); zmq_assert (_session);
_session->engine_error (reason_); _session->engine_error (false, reason_);
terminate (); terminate ();
} }

View File

@ -66,6 +66,7 @@
#define ZMQ_ONLY_FIRST_SUBSCRIBE 108 #define ZMQ_ONLY_FIRST_SUBSCRIBE 108
#define ZMQ_RECONNECT_STOP 109 #define ZMQ_RECONNECT_STOP 109
#define ZMQ_HELLO_MSG 110 #define ZMQ_HELLO_MSG 110
#define ZMQ_DISCONNECT_MSG 111
/* DRAFT ZMQ_RECONNECT_STOP options */ /* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1

View File

@ -0,0 +1,118 @@
/*
Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
SETUP_TEARDOWN_TESTCONTEXT
void test (const char *address)
{
// Create a server
void *server = test_context_socket (ZMQ_SERVER);
// set server socket options
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (server, ZMQ_DISCONNECT_MSG, "D", 1));
// bind server
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (server, address));
// Create a client
void *client = test_context_socket (ZMQ_CLIENT);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (client, ZMQ_HELLO_MSG, "H", 1));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, address));
// Receive the hello message from client
recv_string_expect_success (server, "H", 0);
// Kill the client
test_context_socket_close (client);
// Receive the disconnect message
recv_string_expect_success (server, "D", 0);
// Clean up.
test_context_socket_close (server);
}
void test_tcp ()
{
test ("tcp://127.0.0.1:5569");
}
void test_inproc ()
{
test ("inproc://disconnect-msg");
}
void test_inproc_disconnect ()
{
const char *address = "inproc://disconnect-msg";
// Create a server
void *server = test_context_socket (ZMQ_SERVER);
// set server socket options
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (server, ZMQ_DISCONNECT_MSG, "D", 1));
// bind server
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (server, address));
// Create a client
void *client = test_context_socket (ZMQ_CLIENT);
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (client, ZMQ_HELLO_MSG, "H", 1));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, address));
// Receive the hello message from client
recv_string_expect_success (server, "H", 0);
// disconnect the client
TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (client, address));
// Receive the disconnect message
recv_string_expect_success (server, "D", 0);
// Clean up.
test_context_socket_close (client);
test_context_socket_close (server);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_tcp);
RUN_TEST (test_inproc);
RUN_TEST (test_inproc_disconnect);
return UNITY_END ();
}