From 0c0a351fa5ffed3b66206eac6b77cef2fb316151 Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Sat, 10 Nov 2012 23:05:10 +0100 Subject: [PATCH 1/3] Backported fix for ZMQ-465 --- src/decoder.cpp | 5 ----- src/decoder.hpp | 31 +++++++++++++++++++++++++------ src/i_decoder.hpp | 2 +- src/stream_engine.cpp | 17 +++++++++++------ src/stream_engine.hpp | 4 +++- src/v1_decoder.cpp | 5 ----- src/v1_decoder.hpp | 2 -- 7 files changed, 40 insertions(+), 26 deletions(-) diff --git a/src/decoder.cpp b/src/decoder.cpp index ee9c3abe..c718ae12 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -57,11 +57,6 @@ void zmq::decoder_t::set_msg_sink (i_msg_sink *msg_sink_) msg_sink = msg_sink_; } -bool zmq::decoder_t::stalled () const -{ - return next == &decoder_t::message_ready; -} - bool zmq::decoder_t::one_byte_size_ready () { // First byte of size is read. If it is 0xff read 8-byte size. diff --git a/src/decoder.hpp b/src/decoder.hpp index baf67cfa..479cceea 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -143,6 +143,29 @@ namespace zmq } } + // Returns true if the decoder has been fed all required data + // but cannot proceed with the next decoding step. + // False is returned if the decoder has encountered an error. + bool stalled () + { + while (!to_read) { + if (!(static_cast (this)->*next) ()) { + if (unlikely (!(static_cast (this)->next))) + return false; + return true; + } + } + + return false; + } + + inline bool message_ready_size (size_t msg_sz) + { + zmq_assert (false); + return false; + } + +>>>>>>> c543b2c... Resolve LIBZMQ-465 protected: // Prototype of state machine action. Action should return false if @@ -166,13 +189,13 @@ namespace zmq next = NULL; } + private: + // Next step. If set to NULL, it means that associated data stream // is dead. Note that there can be still data in the process in such // case. step_t next; - private: - // Where to store the read data. unsigned char *read_pos; @@ -199,10 +222,6 @@ namespace zmq // Set the receiver of decoded messages. void set_msg_sink (i_msg_sink *msg_sink_); - // Returns true if there is a decoded message - // waiting to be delivered to the session. - bool stalled () const; - private: bool one_byte_size_ready (); diff --git a/src/i_decoder.hpp b/src/i_decoder.hpp index 2555f128..99eddeb0 100644 --- a/src/i_decoder.hpp +++ b/src/i_decoder.hpp @@ -40,7 +40,7 @@ namespace zmq virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0; - virtual bool stalled () const = 0; + virtual bool stalled () = 0; }; diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 5c2a1211..1616b2a0 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -50,10 +50,10 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) : s (fd_), + io_enabled (false), inpos (NULL), insize (0), decoder (NULL), - input_error (false), outpos (NULL), outsize (0), encoder (NULL), @@ -132,6 +132,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, // Connect to I/O threads poller object. io_object_t::plug (io_thread_); handle = add_fd (s); + io_enabled = true; // Send the 'length' and 'flags' fields of the identity message. // The 'length' field is encoded in the long format. @@ -153,7 +154,10 @@ void zmq::stream_engine_t::unplug () plugged = false; // Cancel all fd subscriptions. - rm_fd (handle); + if (io_enabled) { + rm_fd (handle); + io_enabled = false; + } // Disconnect from I/O threads poller object. io_object_t::unplug (); @@ -225,9 +229,10 @@ void zmq::stream_engine_t::in_event () // waiting for input events and postpone the termination // until after the session has accepted the message. if (disconnection) { - input_error = true; - if (decoder->stalled ()) - reset_pollin (handle); + if (decoder->stalled ()) { + rm_fd (handle); + io_enabled = false; + } else error (); } @@ -294,7 +299,7 @@ void zmq::stream_engine_t::activate_out () void zmq::stream_engine_t::activate_in () { - if (input_error) { + if (unlikely (!io_enabled)) { // There was an input error but the engine could not // be terminated (due to the stalled decoder). // Flush the pending message and terminate the engine now. diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index f24c1fff..2d83995c 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -96,12 +96,14 @@ namespace zmq // Preamble (10 bytes) + version (1 byte) + socket type (1 byte). const static size_t greeting_size = 12; + // True iff we are registered with an I/O poller. + bool io_enabled; + handle_t handle; unsigned char *inpos; size_t insize; i_decoder *decoder; - bool input_error; unsigned char *outpos; size_t outsize; diff --git a/src/v1_decoder.cpp b/src/v1_decoder.cpp index 8ce87f96..f22f96e3 100644 --- a/src/v1_decoder.cpp +++ b/src/v1_decoder.cpp @@ -58,11 +58,6 @@ void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_) msg_sink = msg_sink_; } -bool zmq::v1_decoder_t::stalled () const -{ - return next == &v1_decoder_t::message_ready; -} - bool zmq::v1_decoder_t::flags_ready () { msg_flags = 0; diff --git a/src/v1_decoder.hpp b/src/v1_decoder.hpp index 8336367e..d42e2f16 100644 --- a/src/v1_decoder.hpp +++ b/src/v1_decoder.hpp @@ -44,8 +44,6 @@ namespace zmq // i_decoder interface. virtual void set_msg_sink (i_msg_sink *msg_sink_); - virtual bool stalled () const; - private: bool flags_ready (); From 50b6da0c1cc992293603915aa62769e739a41b3f Mon Sep 17 00:00:00 2001 From: Martin Hurton Date: Tue, 13 Nov 2012 13:06:29 +0100 Subject: [PATCH 2/3] Minor code cleanup --- src/epoll.cpp | 5 +++-- src/kqueue.cpp | 5 +++-- src/poll.cpp | 6 +++--- src/select.cpp | 5 +++-- 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/epoll.cpp b/src/epoll.cpp index a62345d4..3855ddfd 100644 --- a/src/epoll.cpp +++ b/src/epoll.cpp @@ -140,9 +140,10 @@ void zmq::epoll_t::loop () // Wait for events. int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events, timeout ? timeout : -1); - if (n == -1 && errno == EINTR) + if (n == -1) { + errno_assert (errno == EINTR); continue; - errno_assert (n != -1); + } for (int i = 0; i < n; i ++) { poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr); diff --git a/src/kqueue.cpp b/src/kqueue.cpp index 0b07faba..5fbd01c3 100644 --- a/src/kqueue.cpp +++ b/src/kqueue.cpp @@ -163,9 +163,10 @@ void zmq::kqueue_t::loop () timespec ts = {timeout / 1000, (timeout % 1000) * 1000000}; int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, timeout ? &ts: NULL); - if (n == -1 && errno == EINTR) + if (n == -1) { + errno_assert (errno == EINTR); continue; - errno_assert (n != -1); + } for (int i = 0; i < n; i ++) { poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata; diff --git a/src/poll.cpp b/src/poll.cpp index de7e0da3..e41f8fb4 100644 --- a/src/poll.cpp +++ b/src/poll.cpp @@ -125,10 +125,10 @@ void zmq::poll_t::loop () // Wait for events. int rc = poll (&pollset [0], pollset.size (), timeout ? timeout : -1); - if (rc == -1 && errno == EINTR) + if (rc == -1) { + errno_assert (errno == EINTR); continue; - errno_assert (rc != -1); - + } // If there are no events (i.e. it's a timeout) there's no point // in checking the pollset. diff --git a/src/select.cpp b/src/select.cpp index 56b87ae6..d1792f01 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -168,9 +168,10 @@ void zmq::select_t::loop () #else int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds, timeout ? &tv : NULL); - if (rc == -1 && errno == EINTR) + if (rc == -1) { + errno_assert (errno == EINTR); continue; - errno_assert (rc != -1); + } #endif // If there are no events (i.e. it's a timeout) there's no point From 30738e1123550a511060d5b871218894e1a9cb33 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 13 Nov 2012 21:39:59 +0900 Subject: [PATCH 3/3] Backported fix for ZMQ-465 --- src/decoder.hpp | 1 - 1 file changed, 1 deletion(-) diff --git a/src/decoder.hpp b/src/decoder.hpp index 479cceea..ec5410f0 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -165,7 +165,6 @@ namespace zmq return false; } ->>>>>>> c543b2c... Resolve LIBZMQ-465 protected: // Prototype of state machine action. Action should return false if