diff --git a/src/decoder.cpp b/src/decoder.cpp index 9e93b736..d57265a9 100644 --- a/src/decoder.cpp +++ b/src/decoder.cpp @@ -23,6 +23,7 @@ #include "decoder.hpp" #include "session_base.hpp" +#include "likely.hpp" #include "wire.hpp" #include "err.hpp" @@ -136,8 +137,14 @@ bool zmq::decoder_t::message_ready () { // Message is completely read. Push it further and start reading // new message. (in_progress is a 0-byte message after this point.) - if (!session || !session->write (&in_progress)) + if (unlikely (!session)) return false; + int rc = session->write (&in_progress); + if (unlikely (rc != 0)) { + if (errno != EAGAIN) + decoding_error (); + return false; + } next_step (tmpbuf, 1, &decoder_t::one_byte_size_ready); return true; diff --git a/src/decoder.hpp b/src/decoder.hpp index 01021c41..de63a09f 100644 --- a/src/decoder.hpp +++ b/src/decoder.hpp @@ -164,10 +164,18 @@ namespace zmq private: + // Where to store the read data. unsigned char *read_pos; + + // How much data to read before taking next step. size_t to_read; + + // Next step. If set to NULL, it means that associated data stream + // is dead. Note that there can be still data in the process in such + // case. step_t next; + // The duffer for data to decode. size_t bufsize; unsigned char *buf; diff --git a/src/encoder.cpp b/src/encoder.cpp index 6d093846..8689e45d 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -20,6 +20,7 @@ #include "encoder.hpp" #include "session_base.hpp" +#include "likely.hpp" #include "wire.hpp" zmq::encoder_t::encoder_t (size_t bufsize_) : @@ -62,7 +63,14 @@ bool zmq::encoder_t::message_ready () // Note that new state is set only if write is successful. That way // unsuccessful write will cause retry on the next state machine // invocation. - if (!session || !session->read (&in_progress)) { + if (unlikely (!session)) { + rc = in_progress.init (); + errno_assert (rc == 0); + return false; + } + rc = session->read (&in_progress); + if (unlikely (rc != 0)) { + errno_assert (errno == EAGAIN); rc = in_progress.init (); errno_assert (rc == 0); return false; diff --git a/src/encoder.hpp b/src/encoder.hpp index f7e3cbcf..949cbdce 100644 --- a/src/encoder.hpp +++ b/src/encoder.hpp @@ -142,11 +142,20 @@ namespace zmq private: + // Where to get the data to write from. unsigned char *write_pos; + + // How much data to write before next step should be executed. size_t to_write; + + // Next step. If set to NULL, it means that associated data stream + // is dead. step_t next; + + // If true, first byte of the message is being written. bool beginning; + // The buffer for encoded data. size_t bufsize; unsigned char *buf; diff --git a/src/req.cpp b/src/req.cpp index 323e0586..04a19fbc 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -158,3 +158,23 @@ zmq::req_session_t::~req_session_t () { } +int zmq::req_session_t::write (msg_t *msg_) +{ + if (state == request_id) { + if (msg_->flags () == msg_t::label && msg_->size () == 4) { + state = body; + return xreq_session_t::write (msg_); + } + } + else { + if (msg_->flags () == msg_t::more) + return xreq_session_t::write (msg_); + if (msg_->flags () == 0) { + state = request_id; + return xreq_session_t::write (msg_); + } + } + errno = EFAULT; + return -1; +} + diff --git a/src/req.hpp b/src/req.hpp index 2c2cbc48..0207a4f0 100644 --- a/src/req.hpp +++ b/src/req.hpp @@ -67,8 +67,16 @@ namespace zmq const char *protocol_, const char *address_); ~req_session_t (); + // Overloads of the functions from session_base_t. + int write (msg_t *msg_); + private: + enum { + request_id, + body + } state; + req_session_t (const req_session_t&); const req_session_t &operator = (const req_session_t&); }; diff --git a/src/session_base.cpp b/src/session_base.cpp index 7d4c5ab5..32dcd4f7 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -148,28 +148,28 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_) pipe->set_event_sink (this); } -bool zmq::session_base_t::read (msg_t *msg_) +int zmq::session_base_t::read (msg_t *msg_) { - if (!pipe) - return false; - - if (!pipe->read (msg_)) - return false; + if (!pipe || !pipe->read (msg_)) { + errno = EAGAIN; + return -1; + } incomplete_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false; - return true; + return 0; } -bool zmq::session_base_t::write (msg_t *msg_) +int zmq::session_base_t::write (msg_t *msg_) { if (pipe && pipe->write (msg_)) { int rc = msg_->init (); errno_assert (rc == 0); - return true; + return 0; } - return false; + errno = EAGAIN; + return -1; } void zmq::session_base_t::flush () diff --git a/src/session_base.hpp b/src/session_base.hpp index 175a11d1..e388d429 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -48,8 +48,8 @@ namespace zmq void attach_pipe (class pipe_t *pipe_); // Following functions are the interface exposed towards the engine. - bool read (msg_t *msg_); - bool write (msg_t *msg_); + virtual int read (msg_t *msg_); + virtual int write (msg_t *msg_); void flush (); void detach ();