From 7e73587741c6d0fa89afc93477051c283f5e9edd Mon Sep 17 00:00:00 2001 From: Simon Giesecke Date: Thu, 31 Jan 2019 09:52:21 -0500 Subject: [PATCH] Problem: duplication in *_event methods across subclasses of stream_connecter_base_t Solution: pull up common code, introduce new create_engine function in base class --- src/ipc_connecter.cpp | 27 +-------------------------- src/ipc_connecter.hpp | 2 -- src/stream_connecter_base.cpp | 31 +++++++++++++++++++++++++++++++ src/stream_connecter_base.hpp | 7 +++++++ src/tcp_connecter.cpp | 30 ++++++------------------------ src/tcp_connecter.hpp | 1 - src/tipc_connecter.cpp | 27 +-------------------------- src/tipc_connecter.hpp | 2 -- 8 files changed, 46 insertions(+), 81 deletions(-) diff --git a/src/ipc_connecter.cpp b/src/ipc_connecter.cpp index e61c1167..8d3804f2 100644 --- a/src/ipc_connecter.cpp +++ b/src/ipc_connecter.cpp @@ -62,14 +62,6 @@ zmq::ipc_connecter_t::ipc_connecter_t (class io_thread_t *io_thread_, zmq_assert (_addr->protocol == protocol_name::ipc); } -void zmq::ipc_connecter_t::in_event () -{ - // We are not polling for incoming data, so we are actually called - // because of error here. However, we can get error on out event as well - // on some platforms, so we'll simply handle both events in the same way. - out_event (); -} - void zmq::ipc_connecter_t::out_event () { fd_t fd = connect (); @@ -81,25 +73,8 @@ void zmq::ipc_connecter_t::out_event () add_reconnect_timer (); return; } - // Create the engine object for this connection. - stream_engine_t *engine = - new (std::nothrow) stream_engine_t (fd, options, _endpoint); - alloc_assert (engine); - // Attach the engine to the corresponding session object. - send_attach (_session, engine); - - // Shut the connecter down. - terminate (); - - _socket->event_connected (_endpoint, fd); -} - -void zmq::ipc_connecter_t::timer_event (int id_) -{ - zmq_assert (id_ == reconnect_timer_id); - _reconnect_timer_started = false; - start_connecting (); + create_engine (fd); } void zmq::ipc_connecter_t::start_connecting () diff --git a/src/ipc_connecter.hpp b/src/ipc_connecter.hpp index 968dfbc1..ea1edaa9 100644 --- a/src/ipc_connecter.hpp +++ b/src/ipc_connecter.hpp @@ -57,9 +57,7 @@ class ipc_connecter_t : public stream_connecter_base_t }; // Handlers for I/O events. - void in_event (); void out_event (); - void timer_event (int id_); // Internal function to start the actual connection establishment. void start_connecting (); diff --git a/src/stream_connecter_base.cpp b/src/stream_connecter_base.cpp index 6ce42e8b..ae070ed5 100644 --- a/src/stream_connecter_base.cpp +++ b/src/stream_connecter_base.cpp @@ -134,3 +134,34 @@ void zmq::stream_connecter_base_t::close () _socket->event_closed (_endpoint, _s); _s = retired_fd; } + +void zmq::stream_connecter_base_t::in_event () +{ + // We are not polling for incoming data, so we are actually called + // because of error here. However, we can get error on out event as well + // on some platforms, so we'll simply handle both events in the same way. + out_event (); +} + +void zmq::stream_connecter_base_t::create_engine (fd_t fd) +{ + // Create the engine object for this connection. + stream_engine_t *engine = + new (std::nothrow) stream_engine_t (fd, options, _endpoint); + alloc_assert (engine); + + // Attach the engine to the corresponding session object. + send_attach (_session, engine); + + // Shut the connecter down. + terminate (); + + _socket->event_connected (_endpoint, fd); +} + +void zmq::stream_connecter_base_t::timer_event (int id_) +{ + zmq_assert (id_ == reconnect_timer_id); + _reconnect_timer_started = false; + start_connecting (); +} diff --git a/src/stream_connecter_base.hpp b/src/stream_connecter_base.hpp index bc76ac21..960815e0 100644 --- a/src/stream_connecter_base.hpp +++ b/src/stream_connecter_base.hpp @@ -73,6 +73,13 @@ class stream_connecter_base_t : public own_t, public io_object_t void process_plug (); void process_term (int linger_); + // Handlers for I/O events. + void in_event (); + void timer_event (int id_); + + // Internal function to create the engine after connection was established. + void create_engine (fd_t fd); + // Internal function to add a reconnect timer void add_reconnect_timer (); diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index da36d25c..99e172aa 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -90,14 +90,6 @@ void zmq::tcp_connecter_t::process_term (int linger_) stream_connecter_base_t::process_term (linger_); } -void zmq::tcp_connecter_t::in_event () -{ - // We are not polling for incoming data, so we are actually called - // because of error here. However, we can get error on out event as well - // on some platforms, so we'll simply handle both events in the same way. - out_event (); -} - void zmq::tcp_connecter_t::out_event () { if (_connect_timer_started) { @@ -105,6 +97,9 @@ void zmq::tcp_connecter_t::out_event () _connect_timer_started = false; } + // TODO this is still very similar to (t)ipc_connecter_t, maybe the + // differences can be factored out + rm_handle (); const fd_t fd = connect (); @@ -116,18 +111,7 @@ void zmq::tcp_connecter_t::out_event () return; } - // Create the engine object for this connection. - stream_engine_t *engine = - new (std::nothrow) stream_engine_t (fd, options, _endpoint); - alloc_assert (engine); - - // Attach the engine to the corresponding session object. - send_attach (_session, engine); - - // Shut the connecter down. - terminate (); - - _socket->event_connected (_endpoint, fd); + create_engine (fd); } void zmq::tcp_connecter_t::timer_event (int id_) @@ -138,10 +122,8 @@ void zmq::tcp_connecter_t::timer_event (int id_) rm_handle (); close (); add_reconnect_timer (); - } else if (id_ == reconnect_timer_id) { - _reconnect_timer_started = false; - start_connecting (); - } + } else + stream_connecter_base_t::timer_event (id_); } void zmq::tcp_connecter_t::start_connecting () diff --git a/src/tcp_connecter.hpp b/src/tcp_connecter.hpp index 3bc59a32..e0d1b2f9 100644 --- a/src/tcp_connecter.hpp +++ b/src/tcp_connecter.hpp @@ -59,7 +59,6 @@ class tcp_connecter_t : public stream_connecter_base_t void process_term (int linger_); // Handlers for I/O events. - void in_event (); void out_event (); void timer_event (int id_); diff --git a/src/tipc_connecter.cpp b/src/tipc_connecter.cpp index 6f92e5d5..399fbf5c 100644 --- a/src/tipc_connecter.cpp +++ b/src/tipc_connecter.cpp @@ -64,14 +64,6 @@ zmq::tipc_connecter_t::tipc_connecter_t (class io_thread_t *io_thread_, zmq_assert (_addr->protocol == "tipc"); } -void zmq::tipc_connecter_t::in_event () -{ - // We are not polling for incoming data, so we are actually called - // because of error here. However, we can get error on out event as well - // on some platforms, so we'll simply handle both events in the same way. - out_event (); -} - void zmq::tipc_connecter_t::out_event () { fd_t fd = connect (); @@ -83,25 +75,8 @@ void zmq::tipc_connecter_t::out_event () add_reconnect_timer (); return; } - // Create the engine object for this connection. - stream_engine_t *engine = - new (std::nothrow) stream_engine_t (fd, options, _endpoint); - alloc_assert (engine); - // Attach the engine to the corresponding session object. - send_attach (_session, engine); - - // Shut the connecter down. - terminate (); - - _socket->event_connected (_endpoint, fd); -} - -void zmq::tipc_connecter_t::timer_event (int id_) -{ - zmq_assert (id_ == reconnect_timer_id); - _reconnect_timer_started = false; - start_connecting (); + create_engine (fd); } void zmq::tipc_connecter_t::start_connecting () diff --git a/src/tipc_connecter.hpp b/src/tipc_connecter.hpp index 7cc0b4b3..42cd958c 100644 --- a/src/tipc_connecter.hpp +++ b/src/tipc_connecter.hpp @@ -58,9 +58,7 @@ class tipc_connecter_t : public stream_connecter_base_t }; // Handlers for I/O events. - void in_event (); void out_event (); - void timer_event (int id_); // Internal function to start the actual connection establishment. void start_connecting ();