Merge pull request #381 from hurtonm/connecter_cleanups

Connecter cleanups
This commit is contained in:
Pieter Hintjens 2012-06-14 23:09:59 -07:00
commit 7753379eab
4 changed files with 44 additions and 36 deletions

View File

@ -42,13 +42,14 @@
zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const address_t *addr_, bool wait_) : const address_t *addr_, bool delayed_start_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
addr (addr_), addr (addr_),
s (retired_fd), s (retired_fd),
handle_valid (false), handle_valid (false),
wait (wait_), delayed_start (delayed_start_),
timer_started (false),
session (session_), session (session_),
current_reconnect_ivl(options.reconnect_ivl) current_reconnect_ivl(options.reconnect_ivl)
{ {
@ -59,24 +60,24 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
zmq::ipc_connecter_t::~ipc_connecter_t () zmq::ipc_connecter_t::~ipc_connecter_t ()
{ {
zmq_assert (!wait); zmq_assert (!timer_started);
zmq_assert (!handle_valid); zmq_assert (!handle_valid);
zmq_assert (s == retired_fd); zmq_assert (s == retired_fd);
} }
void zmq::ipc_connecter_t::process_plug () void zmq::ipc_connecter_t::process_plug ()
{ {
if (wait) if (delayed_start)
add_reconnect_timer(); add_reconnect_timer ();
else else
start_connecting (); start_connecting ();
} }
void zmq::ipc_connecter_t::process_term (int linger_) void zmq::ipc_connecter_t::process_term (int linger_)
{ {
if (wait) { if (timer_started) {
cancel_timer (reconnect_timer_id); cancel_timer (reconnect_timer_id);
wait = false; timer_started = false;
} }
if (handle_valid) { if (handle_valid) {
@ -107,7 +108,6 @@ void zmq::ipc_connecter_t::out_event ()
// Handle the error condition by attempt to reconnect. // Handle the error condition by attempt to reconnect.
if (fd == retired_fd) { if (fd == retired_fd) {
close (); close ();
wait = true;
add_reconnect_timer(); add_reconnect_timer();
return; return;
} }
@ -128,7 +128,7 @@ void zmq::ipc_connecter_t::out_event ()
void zmq::ipc_connecter_t::timer_event (int id_) void zmq::ipc_connecter_t::timer_event (int id_)
{ {
zmq_assert (id_ == reconnect_timer_id); zmq_assert (id_ == reconnect_timer_id);
wait = false; timer_started = false;
start_connecting (); start_connecting ();
} }
@ -142,7 +142,6 @@ void zmq::ipc_connecter_t::start_connecting ()
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
out_event (); out_event ();
return;
} }
// Connection establishment may be delayed. Poll for its completion. // Connection establishment may be delayed. Poll for its completion.
@ -151,13 +150,14 @@ void zmq::ipc_connecter_t::start_connecting ()
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno()); session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno());
return;
} }
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
else {
if (s != retired_fd)
close (); close ();
wait = true; add_reconnect_timer ();
add_reconnect_timer(); }
} }
void zmq::ipc_connecter_t::add_reconnect_timer() void zmq::ipc_connecter_t::add_reconnect_timer()
@ -165,6 +165,7 @@ void zmq::ipc_connecter_t::add_reconnect_timer()
int rc_ivl = get_new_reconnect_ivl(); int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id); add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl); session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl);
timer_started = true;
} }
int zmq::ipc_connecter_t::get_new_reconnect_ivl () int zmq::ipc_connecter_t::get_new_reconnect_ivl ()

View File

@ -41,11 +41,11 @@ namespace zmq
{ {
public: public:
// If 'delay' is true connecter first waits for a while, then starts // If 'delayed_start' is true connecter first waits for a while,
// connection process. // then starts connection process.
ipc_connecter_t (zmq::io_thread_t *io_thread_, ipc_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_, const options_t &options_, zmq::session_base_t *session_, const options_t &options_,
const address_t *addr_, bool delay_); const address_t *addr_, bool delayed_start_);
~ipc_connecter_t (); ~ipc_connecter_t ();
private: private:
@ -99,7 +99,10 @@ namespace zmq
bool handle_valid; bool handle_valid;
// If true, connecter is waiting a while before trying to connect. // If true, connecter is waiting a while before trying to connect.
bool wait; const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
// Reference to the session we belong to. // Reference to the session we belong to.
zmq::session_base_t *session; zmq::session_base_t *session;

View File

@ -52,13 +52,14 @@
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_, zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
class session_base_t *session_, const options_t &options_, class session_base_t *session_, const options_t &options_,
const address_t *addr_, bool wait_) : const address_t *addr_, bool delayed_start_) :
own_t (io_thread_, options_), own_t (io_thread_, options_),
io_object_t (io_thread_), io_object_t (io_thread_),
addr (addr_), addr (addr_),
s (retired_fd), s (retired_fd),
handle_valid (false), handle_valid (false),
wait (wait_), delayed_start (delayed_start_),
timer_started (false),
session (session_), session (session_),
current_reconnect_ivl(options.reconnect_ivl) current_reconnect_ivl(options.reconnect_ivl)
{ {
@ -69,24 +70,24 @@ zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
zmq::tcp_connecter_t::~tcp_connecter_t () zmq::tcp_connecter_t::~tcp_connecter_t ()
{ {
zmq_assert (!wait); zmq_assert (!timer_started);
zmq_assert (!handle_valid); zmq_assert (!handle_valid);
zmq_assert (s == retired_fd); zmq_assert (s == retired_fd);
} }
void zmq::tcp_connecter_t::process_plug () void zmq::tcp_connecter_t::process_plug ()
{ {
if (wait) if (delayed_start)
add_reconnect_timer(); add_reconnect_timer ();
else else
start_connecting (); start_connecting ();
} }
void zmq::tcp_connecter_t::process_term (int linger_) void zmq::tcp_connecter_t::process_term (int linger_)
{ {
if (wait) { if (timer_started) {
cancel_timer (reconnect_timer_id); cancel_timer (reconnect_timer_id);
wait = false; timer_started = false;
} }
if (handle_valid) { if (handle_valid) {
@ -117,7 +118,6 @@ void zmq::tcp_connecter_t::out_event ()
// Handle the error condition by attempt to reconnect. // Handle the error condition by attempt to reconnect.
if (fd == retired_fd) { if (fd == retired_fd) {
close (); close ();
wait = true;
add_reconnect_timer(); add_reconnect_timer();
return; return;
} }
@ -141,7 +141,7 @@ void zmq::tcp_connecter_t::out_event ()
void zmq::tcp_connecter_t::timer_event (int id_) void zmq::tcp_connecter_t::timer_event (int id_)
{ {
zmq_assert (id_ == reconnect_timer_id); zmq_assert (id_ == reconnect_timer_id);
wait = false; timer_started = false;
start_connecting (); start_connecting ();
} }
@ -155,7 +155,6 @@ void zmq::tcp_connecter_t::start_connecting ()
handle = add_fd (s); handle = add_fd (s);
handle_valid = true; handle_valid = true;
out_event (); out_event ();
return;
} }
// Connection establishment may be delayed. Poll for its completion. // Connection establishment may be delayed. Poll for its completion.
@ -164,13 +163,14 @@ void zmq::tcp_connecter_t::start_connecting ()
handle_valid = true; handle_valid = true;
set_pollout (handle); set_pollout (handle);
session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno()); session->monitor_event (ZMQ_EVENT_CONNECT_DELAYED, endpoint.c_str(), zmq_errno());
return;
} }
// Handle any other error condition by eventual reconnect. // Handle any other error condition by eventual reconnect.
else {
if (s != retired_fd)
close (); close ();
wait = true; add_reconnect_timer ();
add_reconnect_timer(); }
} }
void zmq::tcp_connecter_t::add_reconnect_timer() void zmq::tcp_connecter_t::add_reconnect_timer()
@ -178,6 +178,7 @@ void zmq::tcp_connecter_t::add_reconnect_timer()
int rc_ivl = get_new_reconnect_ivl(); int rc_ivl = get_new_reconnect_ivl();
add_timer (rc_ivl, reconnect_timer_id); add_timer (rc_ivl, reconnect_timer_id);
session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl); session->monitor_event (ZMQ_EVENT_CONNECT_RETRIED, endpoint.c_str(), rc_ivl);
timer_started = true;
} }
int zmq::tcp_connecter_t::get_new_reconnect_ivl () int zmq::tcp_connecter_t::get_new_reconnect_ivl ()

View File

@ -39,11 +39,11 @@ namespace zmq
{ {
public: public:
// If 'delay' is true connecter first waits for a while, then starts // If 'delayed_start' is true connecter first waits for a while,
// connection process. // then starts connection process.
tcp_connecter_t (zmq::io_thread_t *io_thread_, tcp_connecter_t (zmq::io_thread_t *io_thread_,
zmq::session_base_t *session_, const options_t &options_, zmq::session_base_t *session_, const options_t &options_,
const address_t *addr_, bool delay_); const address_t *addr_, bool delayed_start_);
~tcp_connecter_t (); ~tcp_connecter_t ();
private: private:
@ -97,7 +97,10 @@ namespace zmq
bool handle_valid; bool handle_valid;
// If true, connecter is waiting a while before trying to connect. // If true, connecter is waiting a while before trying to connect.
bool wait; const bool delayed_start;
// True iff a timer has been started.
bool timer_started;
// Reference to the session we belong to. // Reference to the session we belong to.
zmq::session_base_t *session; zmq::session_base_t *session;