0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

problem: zeromq connects peer before handshake is completed

Solution: delay connecting the peer pipe until the handshake is completed
This commit is contained in:
Doron Somech 2020-05-13 17:32:06 +03:00
parent 18cacf2ec1
commit e7f0090b16
13 changed files with 47 additions and 14 deletions

View File

@ -50,6 +50,10 @@ struct i_engine
virtual ~i_engine () ZMQ_DEFAULT;
// Indicate if the engine has an handshake stage.
// If engine has handshake stage, engine must call session.engine_ready when the handshake is complete.
virtual bool has_handshake_stage () = 0;
// Plug the engine to the session.
virtual void plug (zmq::io_thread_t *io_thread_,
class session_base_t *session_) = 0;

View File

@ -28,6 +28,8 @@ class norm_engine_t ZMQ_FINAL : public io_object_t, public i_engine
int init (const char *network_, bool send, bool recv);
void shutdown ();
bool has_handshake_stage () ZMQ_FINAL { return false; };
// i_engine interface implementation.
// Plug the engine to the session.
void plug (zmq::io_thread_t *io_thread_,

View File

@ -55,6 +55,7 @@ class pgm_receiver_t ZMQ_FINAL : public io_object_t, public i_engine
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
bool has_handshake_stage () { return false; };
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
void terminate ();
bool restart_input ();

View File

@ -54,6 +54,7 @@ class pgm_sender_t ZMQ_FINAL : public io_object_t, public i_engine
int init (bool udp_encapsulation_, const char *network_);
// i_engine interface implementation.
bool has_handshake_stage () { return false; };
void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_);
void terminate ();
bool restart_input ();

View File

@ -67,7 +67,7 @@ zmq::raw_engine_t::raw_engine_t (
fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_) :
stream_engine_base_t (fd_, options_, endpoint_uri_pair_)
stream_engine_base_t (fd_, options_, endpoint_uri_pair_, false)
{
}

View File

@ -407,7 +407,18 @@ bool zmq::session_base_t::zap_enabled () const
void zmq::session_base_t::process_attach (i_engine *engine_)
{
zmq_assert (engine_ != NULL);
zmq_assert (!_engine);
_engine = engine_;
if (!engine_->has_handshake_stage ())
engine_ready ();
// Plug in the engine.
_engine->plug (_io_thread, this);
}
void zmq::session_base_t::engine_ready ()
{
// Create the pipe if it does not exist yet.
if (!_pipe && !is_terminating ()) {
object_t *parents[2] = {this, _socket};
@ -430,17 +441,12 @@ void zmq::session_base_t::process_attach (i_engine *engine_)
// The endpoints strings are not set on bind, set them here so that
// events can use them.
pipes[0]->set_endpoint_pair (engine_->get_endpoint ());
pipes[1]->set_endpoint_pair (engine_->get_endpoint ());
pipes[0]->set_endpoint_pair (_engine->get_endpoint ());
pipes[1]->set_endpoint_pair (_engine->get_endpoint ());
// Ask socket to plug into the remote end of the pipe.
send_bind (_socket, pipes[1]);
}
// Plug in the engine.
zmq_assert (!_engine);
_engine = engine_;
_engine->plug (_io_thread, this);
}
void zmq::session_base_t::engine_error (bool handshaked_,

View File

@ -63,6 +63,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events
void flush ();
void rollback ();
void engine_error (bool handshaked_, zmq::i_engine::error_reason_t reason_);
void engine_ready ();
// i_pipe_events interface implementation.
void read_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;

View File

@ -102,7 +102,8 @@ static std::string get_peer_address (zmq::fd_t s_)
zmq::stream_engine_base_t::stream_engine_base_t (
fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_) :
const endpoint_uri_pair_t &endpoint_uri_pair_,
bool has_handshake_stage_) :
_options (options_),
_inpos (NULL),
_insize (0),
@ -128,7 +129,8 @@ zmq::stream_engine_base_t::stream_engine_base_t (
_handshaking (true),
_io_error (false),
_session (NULL),
_socket (NULL)
_socket (NULL),
_has_handshake_stage (has_handshake_stage_)
{
const int rc = _tx_msg.init ();
errno_assert (rc == 0);
@ -252,6 +254,9 @@ bool zmq::stream_engine_base_t::in_event_internal ()
// Handshaking was successful.
// Switch into the normal message flow.
_handshaking = false;
if (_mechanism == NULL && _has_handshake_stage)
_session->engine_ready ();
} else
return false;
}
@ -520,6 +525,9 @@ void zmq::stream_engine_base_t::mechanism_ready ()
_has_heartbeat_timer = true;
}
if (_has_handshake_stage)
_session->engine_ready ();
bool flush_session = false;
if (_options.recv_routing_id) {

View File

@ -57,10 +57,12 @@ class stream_engine_base_t : public io_object_t, public i_engine
public:
stream_engine_base_t (fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_);
const endpoint_uri_pair_t &endpoint_uri_pair_,
bool has_handshake_stage_);
~stream_engine_base_t () ZMQ_OVERRIDE;
// i_engine interface implementation.
bool has_handshake_stage () ZMQ_FINAL { return _has_handshake_stage; };
void plug (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_) ZMQ_FINAL;
void terminate () ZMQ_FINAL;
@ -189,9 +191,13 @@ class stream_engine_base_t : public io_object_t, public i_engine
// The session this engine is attached to.
zmq::session_base_t *_session;
// Socket
// Socket
zmq::socket_base_t *_socket;
// Indicate if engine has an handshake stage, if it does, engine must call session.engine_ready
// when handshake is completed.
bool _has_handshake_stage;
ZMQ_NON_COPYABLE_NOR_MOVABLE (stream_engine_base_t)
};
}

View File

@ -22,6 +22,8 @@ class udp_engine_t ZMQ_FINAL : public io_object_t, public i_engine
int init (address_t *address_, bool send_, bool recv_);
bool has_handshake_stage () ZMQ_FINAL { return false; };
// i_engine interface implementation.
// Plug the engine to the session.
void plug (zmq::io_thread_t *io_thread_, class session_base_t *session_);

View File

@ -115,7 +115,7 @@ zmq::ws_engine_t::ws_engine_t (fd_t fd_,
const endpoint_uri_pair_t &endpoint_uri_pair_,
const ws_address_t &address_,
bool client_) :
stream_engine_base_t (fd_, options_, endpoint_uri_pair_),
stream_engine_base_t (fd_, options_, endpoint_uri_pair_, true),
_client (client_),
_address (address_),
_client_handshake_state (client_handshake_initial),

View File

@ -67,7 +67,7 @@ zmq::zmtp_engine_t::zmtp_engine_t (
fd_t fd_,
const options_t &options_,
const endpoint_uri_pair_t &endpoint_uri_pair_) :
stream_engine_base_t (fd_, options_, endpoint_uri_pair_),
stream_engine_base_t (fd_, options_, endpoint_uri_pair_, true),
_greeting_size (v2_greeting_size),
_greeting_bytes_read (0),
_subscription_required (false),

View File

@ -157,6 +157,8 @@ static void test_mock_pub_sub (bool sub_command_, bool mock_pub_)
rc = zmq_setsockopt (server, ZMQ_SUBSCRIBE, "A", 1);
TEST_ASSERT_EQUAL_INT (0, rc);
// SUB binds, let its state machine run
// Because zeromq attach the pipe after the handshake, we need more time here before we can run the state-machine
msleep (1);
zmq_recv (server, buffer, 16, ZMQ_DONTWAIT);
if (sub_command_) {