diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index 99fafef7..f848d124 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -62,14 +62,6 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, zmq_assert (_addr->protocol == protocol_name::ipc); } -void zmq::ipc_connecter_t::process_plug () -{ - if (_delayed_start) - add_reconnect_timer (); - else - start_connecting (); -} - void zmq::ipc_connecter_t::process_term (int linger_) { if (_reconnect_timer_started) { @@ -145,6 +137,10 @@ void zmq::ipc_connecter_t::start_connecting () _handle = add_fd (_s); set_pollout (_handle); _socket->event_connect_delayed (_endpoint, zmq_errno ()); + + // TODO, tcp_connecter_t adds a connect timer in this case; maybe this + // should be done here as well (and then this could be pulled up to + // stream_connecter_base_t). } // Handle any other error condition by eventual reconnect. @@ -155,35 +151,6 @@ void zmq::ipc_connecter_t::start_connecting () } } -void zmq::ipc_connecter_t::add_reconnect_timer () -{ - if (options.reconnect_ivl != -1) { - int rc_ivl = get_new_reconnect_ivl (); - add_timer (rc_ivl, reconnect_timer_id); - _socket->event_connect_retried (_endpoint, rc_ivl); - _reconnect_timer_started = true; - } -} - -int zmq::ipc_connecter_t::get_new_reconnect_ivl () -{ - // The new interval is the current interval + random value. - int this_interval = - _current_reconnect_ivl + (generate_random () % options.reconnect_ivl); - - // Only change the current reconnect interval if the maximum reconnect - // interval was set and if it's larger than the reconnect interval. - if (options.reconnect_ivl_max > 0 - && options.reconnect_ivl_max > options.reconnect_ivl) { - // Calculate the next interval - _current_reconnect_ivl = _current_reconnect_ivl * 2; - if (_current_reconnect_ivl >= options.reconnect_ivl_max) { - _current_reconnect_ivl = options.reconnect_ivl_max; - } - } - return this_interval; -} - int zmq::ipc_connecter_t::open () { zmq_assert (_s == retired_fd); diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 5f975bd7..8f951c10 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -57,7 +57,6 @@ class ipc_connecter_t : public stream_connecter_base_t }; // Handlers for incoming commands. - void process_plug (); void process_term (int linger_); // Handlers for I/O events. @@ -68,14 +67,6 @@ class ipc_connecter_t : public stream_connecter_base_t // Internal function to start the actual connection establishment. void start_connecting (); - // Internal function to add a reconnect timer - void add_reconnect_timer (); - - // Internal function to return a reconnect backoff delay. - // Will modify the current_reconnect_ivl used for next call - // Returns the currently used interval - int get_new_reconnect_ivl (); - // Open IPC connecting socket. Returns -1 in case of error, // 0 if connect was successful immediately. Returns -1 with // EAGAIN errno if async connect was launched. diff --git a/src/stream_connecter_base.cpp b/src/stream_connecter_base.cpp index 525ba89b..6f582427 100644 --- a/src/stream_connecter_base.cpp +++ b/src/stream_connecter_base.cpp @@ -31,6 +31,7 @@ #include "stream_connecter_base.hpp" #include "session_base.hpp" #include "address.hpp" +#include "random.hpp" zmq::stream_connecter_base_t::stream_connecter_base_t ( zmq::io_thread_t *io_thread_, @@ -62,3 +63,37 @@ zmq::stream_connecter_base_t::~stream_connecter_base_t () zmq_assert (!_handle); zmq_assert (_s == retired_fd); } + +void zmq::stream_connecter_base_t::process_plug () +{ + if (_delayed_start) + add_reconnect_timer (); + else + start_connecting (); +} + +void zmq::stream_connecter_base_t::add_reconnect_timer () +{ + if (options.reconnect_ivl != -1) { + const int interval = get_new_reconnect_ivl (); + add_timer (interval, reconnect_timer_id); + _socket->event_connect_retried (_endpoint, interval); + _reconnect_timer_started = true; + } +} + +int zmq::stream_connecter_base_t::get_new_reconnect_ivl () +{ + // The new interval is the current interval + random value. + const int interval = + _current_reconnect_ivl + generate_random () % options.reconnect_ivl; + + // Only change the current reconnect interval if the maximum reconnect + // interval was set and if it's larger than the reconnect interval. + if (options.reconnect_ivl_max > 0 + && options.reconnect_ivl_max > options.reconnect_ivl) + // Calculate the next interval + _current_reconnect_ivl = + std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max); + return interval; +} diff --git a/src/stream_connecter_base.hpp b/src/stream_connecter_base.hpp index 9de99b90..30a022fa 100644 --- a/src/stream_connecter_base.hpp +++ b/src/stream_connecter_base.hpp @@ -53,8 +53,28 @@ class stream_connecter_base_t : public own_t, public io_object_t ~stream_connecter_base_t (); + private: + // Handlers for incoming commands. + void process_plug (); + // Internal function to return a reconnect backoff delay. + // Will modify the current_reconnect_ivl used for next call + // Returns the currently used interval + int get_new_reconnect_ivl (); + + virtual void start_connecting () = 0; + + // TODO check if some members can be made private protected: + // ID of the timer used to delay the reconnection. + enum + { + reconnect_timer_id = 1 + }; + + // Internal function to add a reconnect timer + void add_reconnect_timer (); + // Address to connect to. Owned by session_base_t. // It is non-const since some parts may change during opening. address_t *const _addr; diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 029ecf5b..20d772ec 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -35,7 +35,6 @@ #include "tcp_connecter.hpp" #include "stream_engine.hpp" #include "io_thread.hpp" -#include "random.hpp" #include "err.hpp" #include "ip.hpp" #include "tcp.hpp" @@ -81,14 +80,6 @@ zmq::tcp_connecter_t::~tcp_connecter_t () zmq_assert (!_connect_timer_started); } -void zmq::tcp_connecter_t::process_plug () -{ - if (_delayed_start) - add_reconnect_timer (); - else - start_connecting (); -} - void zmq::tcp_connecter_t::process_term (int linger_) { if (_connect_timer_started) { @@ -208,32 +199,6 @@ void zmq::tcp_connecter_t::add_connect_timer () } } -void zmq::tcp_connecter_t::add_reconnect_timer () -{ - if (options.reconnect_ivl != -1) { - const int interval = get_new_reconnect_ivl (); - add_timer (interval, reconnect_timer_id); - _socket->event_connect_retried (_endpoint, interval); - _reconnect_timer_started = true; - } -} - -int zmq::tcp_connecter_t::get_new_reconnect_ivl () -{ - // The new interval is the current interval + random value. - const int interval = - _current_reconnect_ivl + generate_random () % options.reconnect_ivl; - - // Only change the current reconnect interval if the maximum reconnect - // interval was set and if it's larger than the reconnect interval. - if (options.reconnect_ivl_max > 0 - && options.reconnect_ivl_max > options.reconnect_ivl) - // Calculate the next interval - _current_reconnect_ivl = - std::min (_current_reconnect_ivl * 2, options.reconnect_ivl_max); - return interval; -} - int zmq::tcp_connecter_t::open () { zmq_assert (_s == retired_fd); diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 11013356..3cf5a7b3 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -49,15 +49,13 @@ class tcp_connecter_t : public stream_connecter_base_t ~tcp_connecter_t (); private: - // ID of the timer used to delay the reconnection. + // ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id. enum { - reconnect_timer_id = 1, - connect_timer_id + connect_timer_id = 2 }; // Handlers for incoming commands. - void process_plug (); void process_term (int linger_); // Handlers for I/O events. @@ -74,14 +72,6 @@ class tcp_connecter_t : public stream_connecter_base_t // Internal function to add a connect timer void add_connect_timer (); - // Internal function to add a reconnect timer - void add_reconnect_timer (); - - // Internal function to return a reconnect backoff delay. - // Will modify the current_reconnect_ivl used for next call - // Returns the currently used interval - int get_new_reconnect_ivl (); - // Open TCP connecting socket. Returns -1 in case of error, // 0 if connect was successful immediately. Returns -1 with // EAGAIN errno if async connect was launched. diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp index 0f20a878..2b82e938 100644 --- a/src/tipc_connecter.cpp +++ b/src/tipc_connecter.cpp @@ -64,14 +64,6 @@ zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_, zmq_assert (_addr->protocol == "tipc"); } -void zmq::tipc_connecter_t::process_plug () -{ - if (_delayed_start) - add_reconnect_timer (); - else - start_connecting (); -} - void zmq::tipc_connecter_t::process_term (int linger_) { if (_reconnect_timer_started) { @@ -157,35 +149,6 @@ void zmq::tipc_connecter_t::start_connecting () } } -void zmq::tipc_connecter_t::add_reconnect_timer () -{ - if (options.reconnect_ivl != -1) { - int rc_ivl = get_new_reconnect_ivl (); - add_timer (rc_ivl, reconnect_timer_id); - _socket->event_connect_retried (_endpoint, rc_ivl); - _reconnect_timer_started = true; - } -} - -int zmq::tipc_connecter_t::get_new_reconnect_ivl () -{ - // The new interval is the current interval + random value. - int this_interval = - _current_reconnect_ivl + (generate_random () % options.reconnect_ivl); - - // Only change the current reconnect interval if the maximum reconnect - // interval was set and if it's larger than the reconnect interval. - if (options.reconnect_ivl_max > 0 - && options.reconnect_ivl_max > options.reconnect_ivl) { - // Calculate the next interval - _current_reconnect_ivl = _current_reconnect_ivl * 2; - if (_current_reconnect_ivl >= options.reconnect_ivl_max) { - _current_reconnect_ivl = options.reconnect_ivl_max; - } - } - return this_interval; -} - int zmq::tipc_connecter_t::open () { zmq_assert (_s == retired_fd); diff --git a/src/tipc_connecter.hpp b/src/tipc_connecter.hpp index bff14193..c6218c5d 100644 --- a/src/tipc_connecter.hpp +++ b/src/tipc_connecter.hpp @@ -58,7 +58,6 @@ class tipc_connecter_t : public stream_connecter_base_t }; // Handlers for incoming commands. - void process_plug (); void process_term (int linger_); // Handlers for I/O events. @@ -69,9 +68,6 @@ class tipc_connecter_t : public stream_connecter_base_t // Internal function to start the actual connection establishment. void start_connecting (); - // Internal function to add a reconnect timer - void add_reconnect_timer (); - // Close the connecting socket. void close (); @@ -79,11 +75,6 @@ class tipc_connecter_t : public stream_connecter_base_t // retired_fd if the connection was unsuccessful. fd_t connect (); - // Internal function to return a reconnect backoff delay. - // Will modify the current_reconnect_ivl used for next call - // Returns the currently used interval - int get_new_reconnect_ivl (); - // Open IPC connecting socket. Returns -1 in case of error, // 0 if connect was successful immediately. Returns -1 with // EAGAIN errno if async connect was launched.