diff --git a/src/decoder.cpp b/src/decoder.cpp index ee9c3abe..c718ae12 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -57,11 +57,6 @@ void zmq::decoder_t::set_msg_sink (i_msg_sink *msg_sink_) msg_sink = msg_sink_; } -bool zmq::decoder_t::stalled () const -{ - return next == &decoder_t::message_ready; -} - bool zmq::decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. diff --git a/src/decoder.hpp b/src/decoder.hpp index 3860af2d..ec5410f0 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -143,6 +143,22 @@ namespace zmq } } + // Returns true if the decoder has been fed all required data + // but cannot proceed with the next decoding step. + // False is returned if the decoder has encountered an error. + bool stalled () + { + while (!to_read) { + if (!(static_cast (this)->*next) ()) { + if (unlikely (!(static_cast (this)->next))) + return false; + return true; + } + } + + return false; + } + inline bool message_ready_size (size_t msg_sz) { zmq_assert (false); @@ -172,13 +188,13 @@ namespace zmq next = NULL; } + private: + // Next step. If set to NULL, it means that associated data stream // is dead. Note that there can be still data in the process in such // case. step_t next; - private: - // Where to store the read data. unsigned char *read_pos; @@ -205,10 +221,6 @@ namespace zmq // Set the receiver of decoded messages. void set_msg_sink (i_msg_sink *msg_sink_); - // Returns true if there is a decoded message - // waiting to be delivered to the session. - bool stalled () const; - private: bool one_byte_size_ready (); diff --git a/src/i_decoder.hpp b/src/i_decoder.hpp index ab4b96fc..789df6de 100644 --- a/src/i_decoder.hpp +++ b/src/i_decoder.hpp @@ -40,7 +40,7 @@ namespace zmq virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0; - virtual bool stalled () const = 0; + virtual bool stalled () = 0; virtual bool message_ready_size (size_t msg_sz) = 0; }; diff --git a/src/raw_decoder.cpp b/src/raw_decoder.cpp index d35fd222..98ac0f2c 100644 --- a/src/raw_decoder.cpp +++ b/src/raw_decoder.cpp @@ -53,7 +53,7 @@ void zmq::raw_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) msg_sink = msg_sink_; } -bool zmq::raw_decoder_t::stalled () const +bool zmq::raw_decoder_t::stalled () { return false; } diff --git a/src/raw_decoder.hpp b/src/raw_decoder.hpp index 0dfbb732..98dbd226 100644 --- a/src/raw_decoder.hpp +++ b/src/raw_decoder.hpp @@ -45,7 +45,7 @@ namespace zmq // i_decoder interface. virtual void set_msg_sink (i_msg_sink *msg_sink_); - virtual bool stalled () const; + virtual bool stalled (); virtual bool message_ready_size (size_t msg_sz); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 413b87ec..57381d5e 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -52,10 +52,10 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) : s (fd_), + io_enabled (false), inpos (NULL), insize (0), decoder (NULL), - input_error (false), outpos (NULL), outsize (0), encoder (NULL), @@ -134,6 +134,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, // Connect to I/O threads poller object. io_object_t::plug (io_thread_); handle = add_fd (s); + io_enabled = true; if (options.raw_sock) { // no handshaking for raw sock, instantiate raw encoder and decoders @@ -169,7 +170,10 @@ void zmq::stream_engine_t::unplug () plugged = false; // Cancel all fd subscriptions. - rm_fd (handle); + if (io_enabled) { + rm_fd (handle); + io_enabled = false; + } // Disconnect from I/O threads poller object. io_object_t::unplug (); @@ -250,9 +254,10 @@ void zmq::stream_engine_t::in_event () // waiting for input events and postpone the termination // until after the session has accepted the message. if (disconnection) { - input_error = true; - if (decoder->stalled ()) - reset_pollin (handle); + if (decoder->stalled ()) { + rm_fd (handle); + io_enabled = false; + } else error (); } @@ -319,7 +324,7 @@ void zmq::stream_engine_t::activate_out () void zmq::stream_engine_t::activate_in () { - if (input_error) { + if (unlikely (!io_enabled)) { // There was an input error but the engine could not // be terminated (due to the stalled decoder). // Flush the pending message and terminate the engine now. diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index f24c1fff..2d83995c 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -96,12 +96,14 @@ namespace zmq // Preamble (10 bytes) + version (1 byte) + socket type (1 byte). const static size_t greeting_size = 12; + // True iff we are registered with an I/O poller. + bool io_enabled; + handle_t handle; unsigned char *inpos; size_t insize; i_decoder *decoder; - bool input_error; unsigned char *outpos; size_t outsize; diff --git a/src/v1_decoder.cpp b/src/v1_decoder.cpp index 8ce87f96..f22f96e3 100644 --- a/src/v1_decoder.cpp +++ b/src/v1_decoder.cpp @@ -58,11 +58,6 @@ void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) msg_sink = msg_sink_; } -bool zmq::v1_decoder_t::stalled () const -{ - return next == &v1_decoder_t::message_ready; -} - bool zmq::v1_decoder_t::flags_ready () { msg_flags = 0; diff --git a/src/v1_decoder.hpp b/src/v1_decoder.hpp index 8336367e..d42e2f16 100644 --- a/src/v1_decoder.hpp +++ b/src/v1_decoder.hpp @@ -44,8 +44,6 @@ namespace zmq // i_decoder interface. virtual void set_msg_sink (i_msg_sink *msg_sink_); - virtual bool stalled () const; - private: bool flags_ready ();