Merge pull request #57 from hintjens/master

Backported fixes for LIBZMQ-465
This commit is contained in:
Pieter Hintjens 2012-11-13 04:40:49 -08:00
commit 69fa792cae
11 changed files with 51 additions and 35 deletions

View File

@ -57,11 +57,6 @@ void zmq::decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
msg_sink = msg_sink_; msg_sink = msg_sink_;
} }
bool zmq::decoder_t::stalled () const
{
return next == &decoder_t::message_ready;
}
bool zmq::decoder_t::one_byte_size_ready () bool zmq::decoder_t::one_byte_size_ready ()
{ {
// First byte of size is read. If it is 0xff read 8-byte size. // First byte of size is read. If it is 0xff read 8-byte size.

View File

@ -143,6 +143,28 @@ namespace zmq
} }
} }
// Returns true if the decoder has been fed all required data
// but cannot proceed with the next decoding step.
// False is returned if the decoder has encountered an error.
bool stalled ()
{
while (!to_read) {
if (!(static_cast <T*> (this)->*next) ()) {
if (unlikely (!(static_cast <T*> (this)->next)))
return false;
return true;
}
}
return false;
}
inline bool message_ready_size (size_t msg_sz)
{
zmq_assert (false);
return false;
}
protected: protected:
// Prototype of state machine action. Action should return false if // Prototype of state machine action. Action should return false if
@ -166,13 +188,13 @@ namespace zmq
next = NULL; next = NULL;
} }
private:
// Next step. If set to NULL, it means that associated data stream // Next step. If set to NULL, it means that associated data stream
// is dead. Note that there can be still data in the process in such // is dead. Note that there can be still data in the process in such
// case. // case.
step_t next; step_t next;
private:
// Where to store the read data. // Where to store the read data.
unsigned char *read_pos; unsigned char *read_pos;
@ -199,10 +221,6 @@ namespace zmq
// Set the receiver of decoded messages. // Set the receiver of decoded messages.
void set_msg_sink (i_msg_sink *msg_sink_); void set_msg_sink (i_msg_sink *msg_sink_);
// Returns true if there is a decoded message
// waiting to be delivered to the session.
bool stalled () const;
private: private:
bool one_byte_size_ready (); bool one_byte_size_ready ();

View File

@ -140,9 +140,10 @@ void zmq::epoll_t::loop ()
// Wait for events. // Wait for events.
int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events, int n = epoll_wait (epoll_fd, &ev_buf [0], max_io_events,
timeout ? timeout : -1); timeout ? timeout : -1);
if (n == -1 && errno == EINTR) if (n == -1) {
errno_assert (errno == EINTR);
continue; continue;
errno_assert (n != -1); }
for (int i = 0; i < n; i ++) { for (int i = 0; i < n; i ++) {
poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr); poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);

View File

@ -40,7 +40,7 @@ namespace zmq
virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0; virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0;
virtual bool stalled () const = 0; virtual bool stalled () = 0;
}; };

View File

@ -163,9 +163,10 @@ void zmq::kqueue_t::loop ()
timespec ts = {timeout / 1000, (timeout % 1000) * 1000000}; timespec ts = {timeout / 1000, (timeout % 1000) * 1000000};
int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events, int n = kevent (kqueue_fd, NULL, 0, &ev_buf [0], max_io_events,
timeout ? &ts: NULL); timeout ? &ts: NULL);
if (n == -1 && errno == EINTR) if (n == -1) {
errno_assert (errno == EINTR);
continue; continue;
errno_assert (n != -1); }
for (int i = 0; i < n; i ++) { for (int i = 0; i < n; i ++) {
poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata; poll_entry_t *pe = (poll_entry_t*) ev_buf [i].udata;

View File

@ -125,10 +125,10 @@ void zmq::poll_t::loop ()
// Wait for events. // Wait for events.
int rc = poll (&pollset [0], pollset.size (), timeout ? timeout : -1); int rc = poll (&pollset [0], pollset.size (), timeout ? timeout : -1);
if (rc == -1 && errno == EINTR) if (rc == -1) {
errno_assert (errno == EINTR);
continue; continue;
errno_assert (rc != -1); }
// If there are no events (i.e. it's a timeout) there's no point // If there are no events (i.e. it's a timeout) there's no point
// in checking the pollset. // in checking the pollset.

View File

@ -168,9 +168,10 @@ void zmq::select_t::loop ()
#else #else
int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds, int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds,
timeout ? &tv : NULL); timeout ? &tv : NULL);
if (rc == -1 && errno == EINTR) if (rc == -1) {
errno_assert (errno == EINTR);
continue; continue;
errno_assert (rc != -1); }
#endif #endif
// If there are no events (i.e. it's a timeout) there's no point // If there are no events (i.e. it's a timeout) there's no point

View File

@ -50,10 +50,10 @@
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) : zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) :
s (fd_), s (fd_),
io_enabled (false),
inpos (NULL), inpos (NULL),
insize (0), insize (0),
decoder (NULL), decoder (NULL),
input_error (false),
outpos (NULL), outpos (NULL),
outsize (0), outsize (0),
encoder (NULL), encoder (NULL),
@ -132,6 +132,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// Connect to I/O threads poller object. // Connect to I/O threads poller object.
io_object_t::plug (io_thread_); io_object_t::plug (io_thread_);
handle = add_fd (s); handle = add_fd (s);
io_enabled = true;
// Send the 'length' and 'flags' fields of the identity message. // Send the 'length' and 'flags' fields of the identity message.
// The 'length' field is encoded in the long format. // The 'length' field is encoded in the long format.
@ -153,7 +154,10 @@ void zmq::stream_engine_t::unplug ()
plugged = false; plugged = false;
// Cancel all fd subscriptions. // Cancel all fd subscriptions.
rm_fd (handle); if (io_enabled) {
rm_fd (handle);
io_enabled = false;
}
// Disconnect from I/O threads poller object. // Disconnect from I/O threads poller object.
io_object_t::unplug (); io_object_t::unplug ();
@ -225,9 +229,10 @@ void zmq::stream_engine_t::in_event ()
// waiting for input events and postpone the termination // waiting for input events and postpone the termination
// until after the session has accepted the message. // until after the session has accepted the message.
if (disconnection) { if (disconnection) {
input_error = true; if (decoder->stalled ()) {
if (decoder->stalled ()) rm_fd (handle);
reset_pollin (handle); io_enabled = false;
}
else else
error (); error ();
} }
@ -294,7 +299,7 @@ void zmq::stream_engine_t::activate_out ()
void zmq::stream_engine_t::activate_in () void zmq::stream_engine_t::activate_in ()
{ {
if (input_error) { if (unlikely (!io_enabled)) {
// There was an input error but the engine could not // There was an input error but the engine could not
// be terminated (due to the stalled decoder). // be terminated (due to the stalled decoder).
// Flush the pending message and terminate the engine now. // Flush the pending message and terminate the engine now.

View File

@ -96,12 +96,14 @@ namespace zmq
// Preamble (10 bytes) + version (1 byte) + socket type (1 byte). // Preamble (10 bytes) + version (1 byte) + socket type (1 byte).
const static size_t greeting_size = 12; const static size_t greeting_size = 12;
// True iff we are registered with an I/O poller.
bool io_enabled;
handle_t handle; handle_t handle;
unsigned char *inpos; unsigned char *inpos;
size_t insize; size_t insize;
i_decoder *decoder; i_decoder *decoder;
bool input_error;
unsigned char *outpos; unsigned char *outpos;
size_t outsize; size_t outsize;

View File

@ -58,11 +58,6 @@ void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
msg_sink = msg_sink_; msg_sink = msg_sink_;
} }
bool zmq::v1_decoder_t::stalled () const
{
return next == &v1_decoder_t::message_ready;
}
bool zmq::v1_decoder_t::flags_ready () bool zmq::v1_decoder_t::flags_ready ()
{ {
msg_flags = 0; msg_flags = 0;

View File

@ -44,8 +44,6 @@ namespace zmq
// i_decoder interface. // i_decoder interface.
virtual void set_msg_sink (i_msg_sink *msg_sink_); virtual void set_msg_sink (i_msg_sink *msg_sink_);
virtual bool stalled () const;
private: private:
bool flags_ready (); bool flags_ready ();