problem: zeromq connects peer before handshake is completed

Solution: delay connecting the peer pipe until the handshake is completed
(cherry picked from commit e7f0090b161ce6344f6bd35009816a925c070b09)

Conflicts:
	src/i_engine.hpp
	src/norm_engine.hpp
	src/pgm_receiver.hpp
	src/pgm_sender.hpp
	src/raw_engine.cpp
	src/session_base.cpp
	src/session_base.hpp
	src/stream_engine_base.cpp
	src/stream_engine_base.hpp
	src/udp_engine.hpp
	src/ws_engine.cpp
	src/zmtp_engine.cpp
	tests/test_mock_pub_sub.cpp
This commit is contained in:
Doron Somech 2020-05-13 17:32:06 +03:00 committed by Luca Boccassi
parent 28625e3479
commit 87c0447521
12 changed files with 43 additions and 12 deletions

View File

@ -31,6 +31,10 @@ namespace zmq
{
virtual ~i_engine () {}
// Indicate if the engine has an handshake stage.
// If engine has handshake stage, engine must call session.engine_ready when the handshake is complete.
virtual bool has_handshake_stage () = 0;
// Plug the engine to the session.
virtual void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_) = 0;

View File

@ -113,7 +113,7 @@ void zmq::ipc_connecter_t::out_event ()
}
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint);
stream_engine_t (fd, options, endpoint, !options.raw_sock);
alloc_assert (engine);
// Attach the engine to the corresponding session object.

View File

@ -81,7 +81,7 @@ void zmq::ipc_listener_t::in_event ()
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint);
stream_engine_t (fd, options, endpoint, !options.raw_sock);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already

View File

@ -54,6 +54,7 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
bool has_handshake_stage () { return false; };
void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_);
void terminate ();

View File

@ -53,6 +53,7 @@ namespace zmq
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
bool has_handshake_stage () { return false; };
void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_);
void terminate ();

View File

@ -335,7 +335,18 @@ bool zmq::session_base_t::zap_enabled ()
void zmq::session_base_t::process_attach (i_engine *engine_)
{
zmq_assert (engine_ != NULL);
zmq_assert (!engine);
engine = engine_;
if (!engine_->has_handshake_stage ())
engine_ready ();
// Plug in the engine.
engine->plug (io_thread, this);
}
void zmq::session_base_t::engine_ready ()
{
// Create the pipe if it does not exist yet.
if (!pipe && !is_terminating ()) {
object_t *parents [2] = {this, socket};
@ -364,11 +375,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
// Ask socket to plug into the remote end of the pipe.
send_bind (socket, pipes [1]);
}
// Plug in the engine.
zmq_assert (!engine);
engine = engine_;
engine->plug (io_thread, this);
}
void zmq::session_base_t::detach ()

View File

@ -56,6 +56,7 @@ namespace zmq
virtual void reset ();
void flush ();
void detach ();
void engine_ready ();
// i_pipe_events interface implementation.
void read_activated (zmq::pipe_t *pipe_);

View File

@ -54,7 +54,8 @@
#include "wire.hpp"
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
const std::string &endpoint_) :
const std::string &endpoint_,
bool has_handshake_stage_) :
s (fd_),
inpos (NULL),
insize (0),
@ -66,6 +67,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_,
greeting_size (v2_greeting_size),
greeting_bytes_read (0),
session (NULL),
_has_handshake_stage (has_handshake_stage_),
options (options_),
endpoint (endpoint_),
plugged (false),
@ -192,9 +194,12 @@ void zmq::stream_engine_t::in_event ()
assert (!io_error);
// If still handshaking, receive and process the greeting message.
if (unlikely (handshaking))
if (unlikely (handshaking)) {
if (!handshake ())
return;
else if (mechanism == NULL && _has_handshake_stage)
session->engine_ready ();
}
zmq_assert (decoder);
@ -667,6 +672,9 @@ void zmq::stream_engine_t::zap_msg_available ()
void zmq::stream_engine_t::mechanism_ready ()
{
if (_has_handshake_stage)
session->engine_ready ();
if (options.recv_identity) {
msg_t identity;
mechanism->peer_identity (&identity);

View File

@ -53,10 +53,12 @@ namespace zmq
public:
stream_engine_t (fd_t fd_, const options_t &options_,
const std::string &endpoint);
const std::string &endpoint,
bool has_handshake_stage_);
~stream_engine_t ();
// i_engine interface implementation.
bool has_handshake_stage () { return _has_handshake_stage; };
void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_);
void terminate ();
@ -156,6 +158,10 @@ namespace zmq
// The session this engine is attached to.
zmq::session_base_t *session;
// Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
// when handshake is completed.
bool _has_handshake_stage;
options_t options;
// String representation of endpoint

View File

@ -127,7 +127,7 @@ void zmq::tcp_connecter_t::out_event ()
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint);
stream_engine_t (fd, options, endpoint, !options.raw_sock);
alloc_assert (engine);
// Attach the engine to the corresponding session object.

View File

@ -92,7 +92,7 @@ void zmq::tcp_listener_t::in_event ()
// Create the engine object for this connection.
stream_engine_t *engine = new (std::nothrow)
stream_engine_t (fd, options, endpoint);
stream_engine_t (fd, options, endpoint, !options.raw_sock);
alloc_assert (engine);
// Choose I/O thread to run connecter in. Given that we are already

View File

@ -140,8 +140,12 @@ expect_bounce_fail (void *server, void *client)
// Send message from server to client to test other direction
rc = zmq_send (server, content, 32, ZMQ_SNDMORE);
if (rc == -1 && zmq_errno () == EAGAIN)
return;
assert (rc == 32);
rc = zmq_send (server, content, 32, 0);
if (rc == -1 && zmq_errno () == EAGAIN)
return;
assert (rc == 32);
// Receive message at client side (should not succeed)