problem: ZMQ_HEARTBEAT is not useful without sending an hello message

When using ZMQ_HEARTBEAT one still needs to implement application-level heartbeat in order to know when to send a hello message.
For example, with the majordomo protocol, the worker needs to send a READY message when connecting to a broker. If the connection to the broker drops, and the heartbeat recognizes it the worker won't know about it and won't send the READY msg.
To solve that, the majordomo worker still has to implement heartbeat. With this new option, whenever the connection drops and reconnects the hello message will be sent, greatly simplify the majordomo protocol, as now READY and HEARTBEAT can be handled by zeromq.
This commit is contained in:
Doron Somech
2020-04-17 09:50:59 +03:00
parent 718ad8ab96
commit 93da6763b0
18 changed files with 260 additions and 2 deletions

View File

@@ -88,9 +88,21 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
case ZMQ_SCATTER:
case ZMQ_DGRAM:
case ZMQ_PEER:
#ifdef ZMQ_BUILD_DRAFT_API
if (options_.can_send_hello_msg && options_.hello_msg.size () > 0)
s = new (std::nothrow) hello_msg_session_t (
io_thread_, active_, socket_, options_, addr_);
else
s = new (std::nothrow) session_base_t (
io_thread_, active_, socket_, options_, addr_);
break;
#else
s = new (std::nothrow)
session_base_t (io_thread_, active_, socket_, options_, addr_);
break;
#endif
default:
errno = EINVAL;
return NULL;
@@ -807,3 +819,39 @@ void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/)
send_attach (this, engine);
}
zmq::hello_msg_session_t::hello_msg_session_t (io_thread_t *io_thread_,
bool connect_,
socket_base_t *socket_,
const options_t &options_,
address_t *addr_) :
session_base_t (io_thread_, connect_, socket_, options_, addr_),
_new_pipe (true)
{
}
zmq::hello_msg_session_t::~hello_msg_session_t ()
{
}
int zmq::hello_msg_session_t::pull_msg (msg_t *msg_)
{
if (_new_pipe) {
_new_pipe = false;
const int rc =
msg_->init_buffer (&options.hello_msg[0], options.hello_msg.size ());
errno_assert (rc == 0);
return 0;
}
return session_base_t::pull_msg (msg_);
}
void zmq::hello_msg_session_t::reset ()
{
session_base_t::reset ();
_new_pipe = true;
}