add connect timeout logic

This commit is contained in:
KIU Shueng Chuan 2015-08-04 20:47:31 +08:00
parent c0ca2be642
commit eeb697b5ac
2 changed files with 42 additions and 4 deletions

View File

@ -67,6 +67,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
s (retired_fd), s (retired_fd),
handle_valid (false), handle_valid (false),
delayed_start (delayed_start_), delayed_start (delayed_start_),
connect_timer_started (false),
reconnect_timer_started (false), reconnect_timer_started (false),
session (session_), session (session_),
current_reconnect_ivl (options.reconnect_ivl) current_reconnect_ivl (options.reconnect_ivl)
@ -79,6 +80,7 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
zmq::tcp_connecter_t::~tcp_connecter_t () zmq::tcp_connecter_t::~tcp_connecter_t ()
{ {
zmq_assert (!connect_timer_started);
zmq_assert (!reconnect_timer_started); zmq_assert (!reconnect_timer_started);
zmq_assert (!handle_valid); zmq_assert (!handle_valid);
zmq_assert (s == retired_fd); zmq_assert (s == retired_fd);
@ -94,6 +96,11 @@ void zmq::tcp_connecter_t::process_plug ()
void zmq::tcp_connecter_t::process_term (int linger_) void zmq::tcp_connecter_t::process_term (int linger_)
{ {
if (connect_timer_started) {
cancel_timer (connect_timer_id);
connect_timer_started = false;
}
if (reconnect_timer_started) { if (reconnect_timer_started) {
cancel_timer (reconnect_timer_id); cancel_timer (reconnect_timer_id);
reconnect_timer_started = false; reconnect_timer_started = false;
@ -120,6 +127,11 @@ void zmq::tcp_connecter_t::in_event ()
void zmq::tcp_connecter_t::out_event () void zmq::tcp_connecter_t::out_event ()
{ {
if (connect_timer_started) {
cancel_timer (connect_timer_id);
connect_timer_started = false;
}
rm_fd (handle); rm_fd (handle);
handle_valid = false; handle_valid = false;
@ -153,9 +165,20 @@ void zmq::tcp_connecter_t::out_event ()
void zmq::tcp_connecter_t::timer_event (int id_) void zmq::tcp_connecter_t::timer_event (int id_)
{ {
zmq_assert (id_ == reconnect_timer_id); zmq_assert (id_ == reconnect_timer_id || id_ == connect_timer_id);
reconnect_timer_started = false; if (id_ == connect_timer_id) {
start_connecting (); connect_timer_started = false;
rm_fd (handle);
handle_valid = false;
close ();
add_reconnect_timer ();
}
else if (id_ == reconnect_timer_id) {
reconnect_timer_started = false;
start_connecting ();
}
} }
void zmq::tcp_connecter_t::start_connecting () void zmq::tcp_connecter_t::start_connecting ()
@ -177,6 +200,9 @@ void zmq::tcp_connecter_t::start_connecting ()
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
socket->event_connect_delayed (endpoint, zmq_errno()); socket->event_connect_delayed (endpoint, zmq_errno());
// add userspace connect timeout
add_connect_timer ();
} }
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
@ -187,6 +213,14 @@ void zmq::tcp_connecter_t::start_connecting ()
} }
} }
void zmq::tcp_connecter_t::add_connect_timer ()
{
if (options.connect_timeout > 0) {
add_timer (options.connect_timeout, connect_timer_id);
connect_timer_started = true;
}
}
void zmq::tcp_connecter_t::add_reconnect_timer () void zmq::tcp_connecter_t::add_reconnect_timer ()
{ {
const int interval = get_new_reconnect_ivl (); const int interval = get_new_reconnect_ivl ();

View File

@ -57,7 +57,7 @@ namespace zmq
private: private:
// ID of the timer used to delay the reconnection. // ID of the timer used to delay the reconnection.
enum {reconnect_timer_id = 1}; enum {reconnect_timer_id = 1, connect_timer_id};
// Handlers for incoming commands. // Handlers for incoming commands.
void process_plug (); void process_plug ();
@ -71,6 +71,9 @@ namespace zmq
// Internal function to start the actual connection establishment. // Internal function to start the actual connection establishment.
void start_connecting (); void start_connecting ();
// Internal function to add a connect timer
void add_connect_timer();
// Internal function to add a reconnect timer // Internal function to add a reconnect timer
void add_reconnect_timer(); void add_reconnect_timer();
@ -108,6 +111,7 @@ namespace zmq
const bool delayed_start; const bool delayed_start;
// True iff a timer has been started. // True iff a timer has been started.
bool connect_timer_started;
bool reconnect_timer_started; bool reconnect_timer_started;
// Reference to the session we belong to. // Reference to the session we belong to.