Problem: process_term and close duplicated across subclasses of stream_connecter_base_t

Solution: pull up
This commit is contained in:
Simon Giesecke 2019-01-31 09:39:35 -05:00
parent 531df586d0
commit a09099a615
8 changed files with 39 additions and 98 deletions

View File

@ -62,23 +62,6 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_,
zmq_assert (_addr->protocol == protocol_name::ipc); zmq_assert (_addr->protocol == protocol_name::ipc);
} }
void zmq::ipc_connecter_t::process_term (int linger_)
{
if (_reconnect_timer_started) {
cancel_timer (reconnect_timer_id);
_reconnect_timer_started = false;
}
if (_handle) {
rm_handle ();
}
if (_s != retired_fd)
close ();
own_t::process_term (linger_);
}
void zmq::ipc_connecter_t::in_event () void zmq::ipc_connecter_t::in_event ()
{ {
// We are not polling for incoming data, so we are actually called // We are not polling for incoming data, so we are actually called
@ -180,16 +163,6 @@ int zmq::ipc_connecter_t::open ()
return -1; return -1;
} }
int zmq::ipc_connecter_t::close ()
{
zmq_assert (_s != retired_fd);
int rc = ::close (_s);
errno_assert (rc == 0);
_socket->event_closed (_endpoint, _s);
_s = retired_fd;
return 0;
}
zmq::fd_t zmq::ipc_connecter_t::connect () zmq::fd_t zmq::ipc_connecter_t::connect ()
{ {
// Following code should handle both Berkeley-derived socket // Following code should handle both Berkeley-derived socket

View File

@ -56,9 +56,6 @@ class ipc_connecter_t : public stream_connecter_base_t
reconnect_timer_id = 1 reconnect_timer_id = 1
}; };
// Handlers for incoming commands.
void process_term (int linger_);
// Handlers for I/O events. // Handlers for I/O events.
void in_event (); void in_event ();
void out_event (); void out_event ();
@ -72,9 +69,6 @@ class ipc_connecter_t : public stream_connecter_base_t
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
int open (); int open ();
// Close the connecting socket.
int close ();
// Get the file descriptor of newly created connection. Returns // Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful. // retired_fd if the connection was unsuccessful.
fd_t connect (); fd_t connect ();

View File

@ -72,6 +72,23 @@ void zmq::stream_connecter_base_t::process_plug ()
start_connecting (); start_connecting ();
} }
void zmq::stream_connecter_base_t::process_term (int linger_)
{
if (_reconnect_timer_started) {
cancel_timer (reconnect_timer_id);
_reconnect_timer_started = false;
}
if (_handle) {
rm_handle ();
}
if (_s != retired_fd)
close ();
own_t::process_term (linger_);
}
void zmq::stream_connecter_base_t::add_reconnect_timer () void zmq::stream_connecter_base_t::add_reconnect_timer ()
{ {
if (options.reconnect_ivl != -1) { if (options.reconnect_ivl != -1) {
@ -103,3 +120,17 @@ void zmq::stream_connecter_base_t::rm_handle ()
rm_fd (_handle); rm_fd (_handle);
_handle = static_cast<handle_t> (NULL); _handle = static_cast<handle_t> (NULL);
} }
void zmq::stream_connecter_base_t::close ()
{
zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
const int rc = closesocket (_s);
wsa_assert (rc != SOCKET_ERROR);
#else
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_s = retired_fd;
}

View File

@ -54,9 +54,6 @@ class stream_connecter_base_t : public own_t, public io_object_t
~stream_connecter_base_t (); ~stream_connecter_base_t ();
private: private:
// Handlers for incoming commands.
void process_plug ();
// Internal function to return a reconnect backoff delay. // Internal function to return a reconnect backoff delay.
// Will modify the current_reconnect_ivl used for next call // Will modify the current_reconnect_ivl used for next call
// Returns the currently used interval // Returns the currently used interval
@ -72,12 +69,19 @@ class stream_connecter_base_t : public own_t, public io_object_t
reconnect_timer_id = 1 reconnect_timer_id = 1
}; };
// Handlers for incoming commands.
void process_plug ();
void process_term (int linger_);
// Internal function to add a reconnect timer // Internal function to add a reconnect timer
void add_reconnect_timer (); void add_reconnect_timer ();
// Removes the handle from the poller. // Removes the handle from the poller.
void rm_handle (); void rm_handle ();
// Close the connecting socket.
void close ();
// Address to connect to. Owned by session_base_t. // Address to connect to. Owned by session_base_t.
// It is non-const since some parts may change during opening. // It is non-const since some parts may change during opening.
address_t *const _addr; address_t *const _addr;

View File

@ -87,19 +87,7 @@ void zmq::tcp_connecter_t::process_term (int linger_)
_connect_timer_started = false; _connect_timer_started = false;
} }
if (_reconnect_timer_started) { stream_connecter_base_t::process_term (linger_);
cancel_timer (reconnect_timer_id);
_reconnect_timer_started = false;
}
if (_handle) {
rm_handle ();
}
if (_s != retired_fd)
close ();
own_t::process_term (linger_);
} }
void zmq::tcp_connecter_t::in_event () void zmq::tcp_connecter_t::in_event ()
@ -373,17 +361,3 @@ bool zmq::tcp_connecter_t::tune_socket (const fd_t fd_)
| tune_tcp_maxrt (fd_, options.tcp_maxrt); | tune_tcp_maxrt (fd_, options.tcp_maxrt);
return rc == 0; return rc == 0;
} }
void zmq::tcp_connecter_t::close ()
{
zmq_assert (_s != retired_fd);
#ifdef ZMQ_HAVE_WINDOWS
const int rc = closesocket (_s);
wsa_assert (rc != SOCKET_ERROR);
#else
const int rc = ::close (_s);
errno_assert (rc == 0);
#endif
_socket->event_closed (_endpoint, _s);
_s = retired_fd;
}

View File

@ -74,9 +74,6 @@ class tcp_connecter_t : public stream_connecter_base_t
// EAGAIN errno if async connect was launched. // EAGAIN errno if async connect was launched.
int open (); int open ();
// Close the connecting socket.
void close ();
// Get the file descriptor of newly created connection. Returns // Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful. // retired_fd if the connection was unsuccessful.
fd_t connect (); fd_t connect ();

View File

@ -64,23 +64,6 @@ zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_,
zmq_assert (_addr->protocol == "tipc"); zmq_assert (_addr->protocol == "tipc");
} }
void zmq::tipc_connecter_t::process_term (int linger_)
{
if (_reconnect_timer_started) {
cancel_timer (reconnect_timer_id);
_reconnect_timer_started = false;
}
if (_handle) {
rm_handle ();
}
if (_s != retired_fd)
close ();
own_t::process_term (linger_);
}
void zmq::tipc_connecter_t::in_event () void zmq::tipc_connecter_t::in_event ()
{ {
// We are not polling for incoming data, so we are actually called // We are not polling for incoming data, so we are actually called
@ -185,15 +168,6 @@ int zmq::tipc_connecter_t::open ()
return -1; return -1;
} }
void zmq::tipc_connecter_t::close ()
{
zmq_assert (_s != retired_fd);
int rc = ::close (_s);
errno_assert (rc == 0);
_socket->event_closed (_endpoint, _s);
_s = retired_fd;
}
zmq::fd_t zmq::tipc_connecter_t::connect () zmq::fd_t zmq::tipc_connecter_t::connect ()
{ {
// Following code should handle both Berkeley-derived socket // Following code should handle both Berkeley-derived socket

View File

@ -57,9 +57,6 @@ class tipc_connecter_t : public stream_connecter_base_t
reconnect_timer_id = 1 reconnect_timer_id = 1
}; };
// Handlers for incoming commands.
void process_term (int linger_);
// Handlers for I/O events. // Handlers for I/O events.
void in_event (); void in_event ();
void out_event (); void out_event ();
@ -68,9 +65,6 @@ class tipc_connecter_t : public stream_connecter_base_t
// Internal function to start the actual connection establishment. // Internal function to start the actual connection establishment.
void start_connecting (); void start_connecting ();
// Close the connecting socket.
void close ();
// Get the file descriptor of newly created connection. Returns // Get the file descriptor of newly created connection. Returns
// retired_fd if the connection was unsuccessful. // retired_fd if the connection was unsuccessful.
fd_t connect (); fd_t connect ();