diff --git a/src/i_engine.hpp b/src/i_engine.hpp index 5e610468..dfbdd265 100644 --- a/src/i_engine.hpp +++ b/src/i_engine.hpp @@ -50,7 +50,10 @@ struct i_engine // This method is called by the session to signalise that more // messages can be written to the pipe. - virtual void restart_input () = 0; + // Returns false if the engine was deleted due to an error. + // TODO it is probably better to change the design such that the engine + // does not delete itself + virtual bool restart_input () = 0; // This method is called by the session to signalise that there // are messages to send available. diff --git a/src/norm_engine.cpp b/src/norm_engine.cpp index bd9810af..724feb7d 100644 --- a/src/norm_engine.cpp +++ b/src/norm_engine.cpp @@ -382,7 +382,7 @@ void zmq::norm_engine_t::in_event () } } // zmq::norm_engine_t::in_event() -void zmq::norm_engine_t::restart_input () +bool zmq::norm_engine_t::restart_input () { // TBD - should we check/assert that zmq_input_ready was false??? zmq_input_ready = true; @@ -390,6 +390,7 @@ void zmq::norm_engine_t::restart_input () if (!msg_ready_list.IsEmpty ()) recv_data (NORM_OBJECT_INVALID); + return true; } // end zmq::norm_engine_t::restart_input() void zmq::norm_engine_t::recv_data (NormObjectHandle object) diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp index 5b241038..733f24fc 100644 --- a/src/norm_engine.hpp +++ b/src/norm_engine.hpp @@ -39,7 +39,7 @@ class norm_engine_t : public io_object_t, public i_engine // This method is called by the session to signalise that more // messages can be written to the pipe. - virtual void restart_input (); + virtual bool restart_input (); // This method is called by the session to signalise that there // are messages to send available. diff --git a/src/pgm_receiver.cpp b/src/pgm_receiver.cpp index cbbaf16d..3dcef21d 100644 --- a/src/pgm_receiver.cpp +++ b/src/pgm_receiver.cpp @@ -116,7 +116,7 @@ void zmq::pgm_receiver_t::restart_output () drop_subscriptions (); } -void zmq::pgm_receiver_t::restart_input () +bool zmq::pgm_receiver_t::restart_input () { zmq_assert (session != NULL); zmq_assert (active_tsi != NULL); @@ -135,7 +135,7 @@ void zmq::pgm_receiver_t::restart_input () // HWM reached; we will try later. if (errno == EAGAIN) { session->flush (); - return; + return true; } // Data error. Delete message decoder, mark the // peer as not joined and drop remaining data. @@ -151,6 +151,8 @@ void zmq::pgm_receiver_t::restart_input () active_tsi = NULL; in_event (); + + return true; } const char *zmq::pgm_receiver_t::get_endpoint () const diff --git a/src/pgm_receiver.hpp b/src/pgm_receiver.hpp index d377d8fd..d5eca74d 100644 --- a/src/pgm_receiver.hpp +++ b/src/pgm_receiver.hpp @@ -57,7 +57,7 @@ class pgm_receiver_t : public io_object_t, public i_engine // i_engine interface implementation. void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); - void restart_input (); + bool restart_input (); void restart_output (); void zap_msg_available () {} const char *get_endpoint () const; diff --git a/src/pgm_sender.cpp b/src/pgm_sender.cpp index 1fc71521..52561d0f 100644 --- a/src/pgm_sender.cpp +++ b/src/pgm_sender.cpp @@ -137,9 +137,10 @@ void zmq::pgm_sender_t::restart_output () out_event (); } -void zmq::pgm_sender_t::restart_input () +bool zmq::pgm_sender_t::restart_input () { zmq_assert (false); + return true; } const char *zmq::pgm_sender_t::get_endpoint () const diff --git a/src/pgm_sender.hpp b/src/pgm_sender.hpp index dfc79425..1eb0b1f1 100644 --- a/src/pgm_sender.hpp +++ b/src/pgm_sender.hpp @@ -56,7 +56,7 @@ class pgm_sender_t : public io_object_t, public i_engine // i_engine interface implementation. void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); - void restart_input (); + bool restart_input (); void restart_output (); void zap_msg_available () {} const char *get_endpoint () const; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 9dc96696..2d0bb2f7 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -441,7 +441,7 @@ void zmq::stream_engine_t::restart_output () out_event (); } -void zmq::stream_engine_t::restart_input () +bool zmq::stream_engine_t::restart_input () { zmq_assert (_input_stopped); zmq_assert (_session != NULL); @@ -451,9 +451,11 @@ void zmq::stream_engine_t::restart_input () if (rc == -1) { if (errno == EAGAIN) _session->flush (); - else + else { error (protocol_error); - return; + return false; + } + return true; } while (_insize > 0) { @@ -471,10 +473,14 @@ void zmq::stream_engine_t::restart_input () if (rc == -1 && errno == EAGAIN) _session->flush (); - else if (_io_error) + else if (_io_error) { error (connection_error); - else if (rc == -1) + return false; + } else if (rc == -1) { error (protocol_error); + return false; + } + else { _input_stopped = false; set_pollin (_handle); @@ -483,6 +489,8 @@ void zmq::stream_engine_t::restart_input () // Speculative read. in_event (); } + + return true; } bool zmq::stream_engine_t::handshake () @@ -814,7 +822,8 @@ void zmq::stream_engine_t::zap_msg_available () return; } if (_input_stopped) - restart_input (); + if (!restart_input ()) + return; if (_output_stopped) restart_output (); } diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index e714fbd9..6604afb6 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -76,7 +76,7 @@ class stream_engine_t : public io_object_t, public i_engine // i_engine interface implementation. void plug (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_); void terminate (); - void restart_input (); + bool restart_input (); void restart_output (); void zap_msg_available (); const char *get_endpoint () const; diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index a6cbf293..e070ccb5 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -537,11 +537,12 @@ void zmq::udp_engine_t::in_event () _session->flush (); } -void zmq::udp_engine_t::restart_input () +bool zmq::udp_engine_t::restart_input () { - if (!_recv_enabled) - return; + if (_recv_enabled) { + set_pollin (_handle); + in_event (); + } - set_pollin (_handle); - in_event (); + return true; } diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp index b21f1316..9bf5a533 100644 --- a/src/udp_engine.hpp +++ b/src/udp_engine.hpp @@ -32,7 +32,7 @@ class udp_engine_t : public io_object_t, public i_engine // This method is called by the session to signalise that more // messages can be written to the pipe. - void restart_input (); + bool restart_input (); // This method is called by the session to signalise that there // are messages to send available.