From 03ebd39d1ffaa4332f96ebab5b3d3b8e280ddaec Mon Sep 17 00:00:00 2001 From: Doron Somech Date: Wed, 13 May 2020 17:32:06 +0300 Subject: [PATCH] problem: zeromq connects peer before handshake is completed Solution: delay connecting the peer pipe until the handshake is completed (cherry picked from commit e7f0090b161ce6344f6bd35009816a925c070b09) Conflicts: src/i_engine.hpp src/norm_engine.hpp src/pgm_receiver.hpp src/pgm_sender.hpp src/raw_engine.cpp src/session_base.cpp src/session_base.hpp src/stream_engine_base.cpp src/stream_engine_base.hpp src/udp_engine.hpp src/ws_engine.cpp src/zmtp_engine.cpp tests/test_mock_pub_sub.cpp --- src/i_engine.hpp | 4 ++++ src/ipc_connecter.cpp | 2 +- src/ipc_listener.cpp | 2 +- src/norm_engine.hpp | 2 ++ src/pgm_receiver.hpp | 1 + src/pgm_sender.hpp | 1 + src/session_base.cpp | 16 +++++++++++----- src/session_base.hpp | 1 + src/socks_connecter.cpp | 2 +- src/stream_engine.cpp | 12 ++++++++++-- src/stream_engine.hpp | 8 +++++++- src/tcp_connecter.cpp | 2 +- src/tcp_listener.cpp | 2 +- src/tipc_connecter.cpp | 2 +- src/tipc_listener.cpp | 2 +- 15 files changed, 44 insertions(+), 15 deletions(-) diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 7a61e8e9..09eb0ec8 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -41,6 +41,10 @@ namespace zmq { virtual ~i_engine () {} + // Indicate if the engine has an handshake stage. + // If engine has handshake stage, engine must call session.engine_ready when the handshake is complete. + virtual bool has_handshake_stage () = 0; + // Plug the engine to the session. virtual void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_) = 0; diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 34abb568..fbe53c6e 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -123,7 +123,7 @@ void zmq::ipc_connecter_t::out_event () } // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/ipc_listener.cpp b/src/ipc_listener.cpp index 5c2a028f..9c973971 100644 --- a/src/ipc_listener.cpp +++ b/src/ipc_listener.cpp @@ -102,7 +102,7 @@ void zmq::ipc_listener_t::in_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp index 72542e19..04d3e3a7 100644 --- a/src/norm_engine.hpp +++ b/src/norm_engine.hpp @@ -26,6 +26,8 @@ namespace zmq // create NORM instance, session, etc int init(const char* network_, bool send, bool recv); void shutdown(); + + bool has_handshake_stage () { return false; }; // i_engine interface implementation. // Plug the engine to the session. diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index 4594ab46..dadace5f 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -64,6 +64,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. + bool has_handshake_stage () { return false; }; void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index bed05f75..c83e28ed 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -63,6 +63,7 @@ namespace zmq int init (bool udp_encapsulation_, const char *network_); // i_engine interface implementation. + bool has_handshake_stage () { return false; }; void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); diff --git a/src/session_base.cpp b/src/session_base.cpp index e8f90764..460cafe4 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -352,7 +352,18 @@ bool zmq::session_base_t::zap_enabled () void zmq::session_base_t::process_attach (i_engine *engine_) { zmq_assert (engine_ != NULL); + zmq_assert (!engine); + engine = engine_; + if (!engine_->has_handshake_stage ()) + engine_ready (); + + // Plug in the engine. + engine->plug (io_thread, this); +} + +void zmq::session_base_t::engine_ready () +{ // Create the pipe if it does not exist yet. if (!pipe && !is_terminating ()) { object_t *parents [2] = {this, socket}; @@ -381,11 +392,6 @@ void zmq::session_base_t::process_attach (i_engine *engine_) // Ask socket to plug into the remote end of the pipe. send_bind (socket, pipes [1]); } - - // Plug in the engine. - zmq_assert (!engine); - engine = engine_; - engine->plug (io_thread, this); } void zmq::session_base_t::engine_error ( diff --git a/src/session_base.hpp b/src/session_base.hpp index 8730c271..ff4a899b 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -67,6 +67,7 @@ namespace zmq virtual void reset (); void flush (); void engine_error (zmq::stream_engine_t::error_reason_t reason); + void engine_ready (); // i_pipe_events interface implementation. void read_activated (zmq::pipe_t *pipe_); diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp index a3b70436..a4c764c3 100644 --- a/src/socks_connecter.cpp +++ b/src/socks_connecter.cpp @@ -152,7 +152,7 @@ void zmq::socks_connecter_t::in_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (s, options, endpoint); + stream_engine_t (s, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index e31bb0df..f5ec6dfb 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -72,7 +72,8 @@ #include "wire.hpp" zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, - const std::string &endpoint_) : + const std::string &endpoint_, + bool has_handshake_stage_) : s (fd_), inpos (NULL), insize (0), @@ -85,6 +86,7 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, greeting_size (v2_greeting_size), greeting_bytes_read (0), session (NULL), + _has_handshake_stage (has_handshake_stage_), options (options_), endpoint (endpoint_), plugged (false), @@ -272,9 +274,12 @@ void zmq::stream_engine_t::in_event () zmq_assert (!io_error); // If still handshaking, receive and process the greeting message. - if (unlikely (handshaking)) + if (unlikely (handshaking)) { if (!handshake ()) return; + else if (mechanism == NULL && _has_handshake_stage) + session->engine_ready (); + } zmq_assert (decoder); @@ -800,6 +805,9 @@ void zmq::stream_engine_t::zap_msg_available () void zmq::stream_engine_t::mechanism_ready () { + if (_has_handshake_stage) + session->engine_ready (); + if (options.recv_identity) { msg_t identity; mechanism->peer_identity (&identity); diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index e9635f6f..718c6598 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -70,10 +70,12 @@ namespace zmq }; stream_engine_t (fd_t fd_, const options_t &options_, - const std::string &endpoint); + const std::string &endpoint, + bool has_handshake_stage_); ~stream_engine_t (); // i_engine interface implementation. + bool has_handshake_stage () { return _has_handshake_stage; }; void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); @@ -170,6 +172,10 @@ namespace zmq // The session this engine is attached to. zmq::session_base_t *session; + // Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready + // when handshake is completed. + bool _has_handshake_stage; + options_t options; // String representation of endpoint diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 21ba15f6..5cd2d470 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -139,7 +139,7 @@ void zmq::tcp_connecter_t::out_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index a16fb550..e83c9282 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -106,7 +106,7 @@ void zmq::tcp_listener_t::in_event () // Create the engine object for this connection. stream_engine_t *engine = new (std::nothrow) - stream_engine_t (fd, options, endpoint); + stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp index 11b53c50..4c6f7b17 100644 --- a/src/tipc_connecter.cpp +++ b/src/tipc_connecter.cpp @@ -121,7 +121,7 @@ void zmq::tipc_connecter_t::out_event () return; } // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); + stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Attach the engine to the corresponding session object. diff --git a/src/tipc_listener.cpp b/src/tipc_listener.cpp index fb8df6c3..7003630d 100644 --- a/src/tipc_listener.cpp +++ b/src/tipc_listener.cpp @@ -89,7 +89,7 @@ void zmq::tipc_listener_t::in_event () } // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint); + stream_engine_t *engine = new (std::nothrow) stream_engine_t (fd, options, endpoint, !options.raw_sock); alloc_assert (engine); // Choose I/O thread to run connecter in. Given that we are already