diff --git a/src/xreq.cpp b/src/xreq.cpp index 91317f79..59d3f394 100644 --- a/src/xreq.cpp +++ b/src/xreq.cpp @@ -24,7 +24,8 @@ #include "msg.hpp" zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : - socket_base_t (parent_, tid_) + socket_base_t (parent_, tid_), + prefetched (false) { options.type = ZMQ_XREQ; @@ -36,10 +37,13 @@ zmq::xreq_t::xreq_t (class ctx_t *parent_, uint32_t tid_) : options.send_identity = true; options.recv_identity = true; + + prefetched_msg.init (); } zmq::xreq_t::~xreq_t () { + prefetched_msg.close (); } void zmq::xreq_t::xattach_pipe (pipe_t *pipe_) @@ -56,6 +60,14 @@ int zmq::xreq_t::xsend (msg_t *msg_, int flags_) int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) { + // If there is a prefetched message, return it. + if (prefetched) { + int rc = msg_->move (prefetched_msg); + errno_assert (rc == 0); + prefetched = false; + return 0; + } + // XREQ socket doesn't use identities. We can safely drop it and while (true) { int rc = fq.recv (msg_, flags_); @@ -69,7 +81,17 @@ int zmq::xreq_t::xrecv (msg_t *msg_, int flags_) bool zmq::xreq_t::xhas_in () { - return fq.has_in (); + // We may already have a message pre-fetched. + if (prefetched) + return true; + + // Try to read the next message to the pre-fetch buffer. + int rc = xrecv (&prefetched_msg, ZMQ_DONTWAIT); + if (rc != 0 && errno == EAGAIN) + return false; + zmq_assert (rc == 0); + prefetched = true; + return true; } bool zmq::xreq_t::xhas_out () diff --git a/src/xreq.hpp b/src/xreq.hpp index 4c94cadf..897c197c 100644 --- a/src/xreq.hpp +++ b/src/xreq.hpp @@ -62,6 +62,12 @@ namespace zmq fq_t fq; lb_t lb; + // Have we prefetched a message. + bool prefetched; + + // Holds the prefetched message. + msg_t prefetched_msg; + xreq_t (const xreq_t&); const xreq_t &operator = (const xreq_t&); };