From 87c04475211eb28b62031c053fc972ca2b47d190 Mon Sep 17 00:00:00 2001 From: Doron Somech Date: Wed, 13 May 2020 17:32:06 +0300 Subject: [PATCH] problem: zeromq connects peer before handshake is completed Solution: delay connecting the peer pipe until the handshake is completed (cherry picked from commit e7f0090b161ce6344f6bd35009816a925c070b09) Conflicts: src/i_engine.hpp src/norm_engine.hpp src/pgm_receiver.hpp src/pgm_sender.hpp src/raw_engine.cpp src/session_base.cpp src/session_base.hpp src/stream_engine_base.cpp src/stream_engine_base.hpp src/udp_engine.hpp src/ws_engine.cpp src/zmtp_engine.cpp tests/test_mock_pub_sub.cpp --- src/i_engine.hpp | 4 ++++ src/ipc_connecter.cpp | 2 +- src/ipc_listener.cpp | 2 +- src/pgm_receiver.hpp | 1 + src/pgm_sender.hpp | 1 + src/session_base.cpp | 16 +++++++++++----- src/session_base.hpp | 1 + src/stream_engine.cpp | 12 ++++++++++-- src/stream_engine.hpp | 8 +++++++- src/tcp_connecter.cpp | 2 +- src/tcp_listener.cpp | 2 +- tests/testutil.hpp | 4 ++++ 12 files changed, 43 insertions(+), 12 deletions(-) diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 39266c41..cd22ea48 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -31,6 +31,10 @@ namespace zmq { virtual ~i_engine () {} + // Indicate if the engine has an handshake stage. + // If engine has handshake stage, engine must call session.engine_ready when the handshake is complete. + virtual bool has_handshake_stage () = 0; + // Plug the engine to the session. virtual void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_) = 0; diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 239d2be9..64cf571d 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -113,7 +113,7 @@ void zmq::ipc_connecter_t::out_event () } // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 742d61dd..0388d0c0 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -81,7 +81,7 @@ void zmq::ipc_listener_t::in_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 1f5d2d4e..5eabf7f2 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -54,6 +54,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. + bool has_handshake_stage () { return false; }; void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index 045cd474..00d8ffbe 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -53,6 +53,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. + bool has_handshake_stage () { return false; }; void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); diff --git a/src/session_base.cpp b/src/session_base.cpp index 412204bb..3b763a47 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -335,7 +335,18 @@ bool zmq::session_base_t::zap_enabled () void zmq::session_base_t::process_attach (i_engine *engine_) { zmq_assert (engine_ != NULL); + zmq_assert (!engine); + engine = engine_; + if (!engine_->has_handshake_stage ()) + engine_ready (); + + // Plug in the engine. + engine->plug (io_thread, this); +} + +void zmq::session_base_t::engine_ready () +{ // Create the pipe if it does not exist yet. if (!pipe && !is_terminating ()) { object_t *parents [2] = {this, socket}; @@ -364,11 +375,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_) // Ask socket to plug into the remote end of the pipe. send_bind (socket, pipes [1]); } - - // Plug in the engine. - zmq_assert (!engine); - engine = engine_; - engine->plug (io_thread, this); } void zmq::session_base_t::detach () diff --git a/src/session_base.hpp b/src/session_base.hpp index 63e16bd9..d5587c43 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -56,6 +56,7 @@ namespace zmq virtual void reset (); void flush (); void detach (); + void engine_ready (); // i_pipe_events interface implementation. void read_activated (zmq::pipe_t *pipe_); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 2bbcf87a..28499f6c 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -54,7 +54,8 @@ #include "wire.hpp" zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, - const std::string &endpoint_) : + const std::string &endpoint_, + bool has_handshake_stage_) : s (fd_), inpos (NULL), insize (0), @@ -66,6 +67,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, greeting_size (v2_greeting_size), greeting_bytes_read (0), session (NULL), + _has_handshake_stage (has_handshake_stage_), options (options_), endpoint (endpoint_), plugged (false), @@ -192,9 +194,12 @@ void zmq::stream_engine_t::in_event () assert (!io_error); // If still handshaking, receive and process the greeting message. - if (unlikely (handshaking)) + if (unlikely (handshaking)) { if (!handshake ()) return; + else if (mechanism == NULL && _has_handshake_stage) + session->engine_ready (); + } zmq_assert (decoder); @@ -667,6 +672,9 @@ void zmq::stream_engine_t::zap_msg_available () void zmq::stream_engine_t::mechanism_ready () { + if (_has_handshake_stage) + session->engine_ready (); + if (options.recv_identity) { msg_t identity; mechanism->peer_identity (&identity); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index 631d1cbb..4465e57c 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -53,10 +53,12 @@ namespace zmq public: stream_engine_t (fd_t fd_, const options_t &options_, - const std::string &endpoint); + const std::string &endpoint, + bool has_handshake_stage_); ~stream_engine_t (); // i_engine interface implementation. + bool has_handshake_stage () { return _has_handshake_stage; }; void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); @@ -156,6 +158,10 @@ namespace zmq // The session this engine is attached to. zmq::session_base_t *session; + // Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready + // when handshake is completed. + bool _has_handshake_stage; + options_t options; // String representation of endpoint diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 878ba4ff..58c476cc 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -127,7 +127,7 @@ void zmq::tcp_connecter_t::out_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index b41d0b3c..5a78983e 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -92,7 +92,7 @@ void zmq::tcp_listener_t::in_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 0c1ce78d..c7ed87dd 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -140,8 +140,12 @@ expect_bounce_fail (void *server, void *client) // Send message from server to client to test other direction rc = zmq_send (server, content, 32, ZMQ_SNDMORE); + if (rc == -1 && zmq_errno () == EAGAIN) + return; assert (rc == 32); rc = zmq_send (server, content, 32, 0); + if (rc == -1 && zmq_errno () == EAGAIN) + return; assert (rc == 32); // Receive message at client side (should not succeed)