diff --git a/src/xrep.cpp b/src/xrep.cpp index ea19e56d..336d4e54 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -189,14 +189,18 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) return 0; } - // Get next message part. - pipe_t *pipe; - int rc = fq.recvpipe (msg_, flags_, &pipe); - if (rc != 0) - return -1; + pipe_t *pipe = NULL; + while (true) { + + // Get next message part. + int rc = fq.recvpipe (msg_, flags_, &pipe); + if (rc != 0) + return -1; + + // If identity is received, change the key assigned to the pipe. + if (likely (!(msg_->flags () & msg_t::identity))) + break; - // If identity is received, change the key assigned to the pipe. - if (unlikely (msg_->flags () & msg_t::identity)) { zmq_assert (!more_in); // Empty identity means we can preserve the auto-generated identity. @@ -219,11 +223,6 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) } zmq_assert (it != outpipes.end ()); } - - // After processing the identity, try to get the next message. - rc = fq.recvpipe (msg_, flags_, &pipe); - if (rc != 0) - return -1; } // If we are in the middle of reading a message, just return the next part. @@ -234,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) // We are at the beginning of a new message. Move the message part we // have to the prefetched and return the ID of the peer instead. - rc = prefetched_msg.move (*msg_); + int rc = prefetched_msg.move (*msg_); errno_assert (rc == 0); prefetched = true; rc = msg_->close (); @@ -260,9 +259,17 @@ int zmq::xrep_t::rollback (void) bool zmq::xrep_t::xhas_in () { + // We may already have a message pre-fetched. if (prefetched) return true; - return fq.has_in (); + + // Try to read the next message to the pre-fetch buffer. + int rc = xrecv (&prefetched_msg, ZMQ_DONTWAIT); + if (rc != 0 && errno == EAGAIN) + return false; + zmq_assert (rc == 0); + prefetched = true; + return true; } bool zmq::xrep_t::xhas_out ()