From eeb697b5acacf89ced182c834a14bade154c1bdd Mon Sep 17 00:00:00 2001 From: KIU Shueng Chuan Date: Tue, 4 Aug 2015 20:47:31 +0800 Subject: [PATCH] add connect timeout logic --- src/tcp_connecter.cpp | 40 +++++++++++++++++++++++++++++++++++++--- src/tcp_connecter.hpp | 6 +++++- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 8623b259..2d29af6f 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -67,6 +67,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, s (retired_fd), handle_valid (false), delayed_start (delayed_start_), + connect_timer_started (false), reconnect_timer_started (false), session (session_), current_reconnect_ivl (options.reconnect_ivl) @@ -79,6 +80,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, zmq::tcp_connecter_t::~tcp_connecter_t () { + zmq_assert (!connect_timer_started); zmq_assert (!reconnect_timer_started); zmq_assert (!handle_valid); zmq_assert (s == retired_fd); @@ -94,6 +96,11 @@ void zmq::tcp_connecter_t::process_plug () void zmq::tcp_connecter_t::process_term (int linger_) { + if (connect_timer_started) { + cancel_timer (connect_timer_id); + connect_timer_started = false; + } + if (reconnect_timer_started) { cancel_timer (reconnect_timer_id); reconnect_timer_started = false; @@ -120,6 +127,11 @@ void zmq::tcp_connecter_t::in_event () void zmq::tcp_connecter_t::out_event () { + if (connect_timer_started) { + cancel_timer (connect_timer_id); + connect_timer_started = false; + } + rm_fd (handle); handle_valid = false; @@ -153,9 +165,20 @@ void zmq::tcp_connecter_t::out_event () void zmq::tcp_connecter_t::timer_event (int id_) { - zmq_assert (id_ == reconnect_timer_id); - reconnect_timer_started = false; - start_connecting (); + zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id); + if (id_ == connect_timer_id) { + connect_timer_started = false; + + rm_fd (handle); + handle_valid = false; + + close (); + add_reconnect_timer (); + } + else if (id_ == reconnect_timer_id) { + reconnect_timer_started = false; + start_connecting (); + } } void zmq::tcp_connecter_t::start_connecting () @@ -177,6 +200,9 @@ void zmq::tcp_connecter_t::start_connecting () handle_valid = true; set_pollout (handle); socket->event_connect_delayed (endpoint, zmq_errno()); + + // add userspace connect timeout + add_connect_timer (); } // Handle any other error condition by eventual reconnect. @@ -187,6 +213,14 @@ void zmq::tcp_connecter_t::start_connecting () } } +void zmq::tcp_connecter_t::add_connect_timer () +{ + if (options.connect_timeout > 0) { + add_timer (options.connect_timeout, connect_timer_id); + connect_timer_started = true; + } +} + void zmq::tcp_connecter_t::add_reconnect_timer () { const int interval = get_new_reconnect_ivl (); diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 6a284ed4..267627d0 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -57,7 +57,7 @@ namespace zmq private: // ID of the timer used to delay the reconnection. - enum {reconnect_timer_id = 1}; + enum {reconnect_timer_id = 1, connect_timer_id}; // Handlers for incoming commands. void process_plug (); @@ -71,6 +71,9 @@ namespace zmq // Internal function to start the actual connection establishment. void start_connecting (); + // Internal function to add a connect timer + void add_connect_timer(); + // Internal function to add a reconnect timer void add_reconnect_timer(); @@ -108,6 +111,7 @@ namespace zmq const bool delayed_start; // True iff a timer has been started. + bool connect_timer_started; bool reconnect_timer_started; // Reference to the session we belong to.