diff --git a/src/stream.cpp b/src/stream.cpp index 68ab18d2..0cb7e272 100644 --- a/src/stream.cpp +++ b/src/stream.cpp @@ -28,7 +28,6 @@ zmq::stream_t::stream_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), prefetched (false), identity_sent (false), - more_in (false), current_out (NULL), more_out (false), next_peer_id (generate_random ()) @@ -97,8 +96,6 @@ int zmq::stream_t::xsend (msg_t *msg_) // TODO: The connections should be killed instead. if (msg_->flags () & msg_t::more) { - more_out = true; - // Find the pipe associated with the identity stored in the prefix. // If there's no such pipe return an error blob_t identity ((unsigned char*) msg_->data (), msg_->size ()); @@ -109,18 +106,19 @@ int zmq::stream_t::xsend (msg_t *msg_) if (!current_out->check_write ()) { it->second.active = false; current_out = NULL; - more_out = false; errno = EAGAIN; return -1; } } else { - more_out = false; errno = EHOSTUNREACH; return -1; } } + // Expect one more message frame. + more_out = true; + int rc = msg_->close (); errno_assert (rc == 0); rc = msg_->init (); @@ -131,8 +129,8 @@ int zmq::stream_t::xsend (msg_t *msg_) // Ignore the MORE flag msg_->reset_flags (msg_t::more); - // Check whether this is the last part of the message. - more_out = msg_->flags () & msg_t::more ? true : false; + // This is the last part of the message. + more_out = false; // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -148,13 +146,9 @@ int zmq::stream_t::xsend (msg_t *msg_) return 0; } bool ok = current_out->write (msg_); - if (unlikely (!ok)) - current_out = NULL; - else - if (!more_out) { + if (likely (ok)) current_out->flush (); - current_out = NULL; - } + current_out = NULL; } else { int rc = msg_->close (); @@ -181,46 +175,34 @@ int zmq::stream_t::xrecv (msg_t *msg_) errno_assert (rc == 0); prefetched = false; } - more_in = msg_->flags () & msg_t::more ? true : false; return 0; } pipe_t *pipe = NULL; - int rc = fq.recvpipe (msg_, &pipe); + int rc = fq.recvpipe (&prefetched_msg, &pipe); if (rc != 0) return -1; zmq_assert (pipe != NULL); + zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0); - // If we are in the middle of reading a message, just return the next part. - if (more_in) - more_in = msg_->flags () & msg_t::more ? true : false; - else { - // We are at the beginning of a message. - // Keep the message part we have in the prefetch buffer - // and return the ID of the peer instead. - rc = prefetched_msg.move (*msg_); - errno_assert (rc == 0); - prefetched = true; + // We have received a frame with TCP data. + // Rather than sendig this frame, we keep it in prefetched + // buffer and send a frame with peer's ID. + blob_t identity = pipe->get_identity (); + rc = msg_->init_size (identity.size ()); + errno_assert (rc == 0); + memcpy (msg_->data (), identity.data (), identity.size ()); + msg_->set_flags (msg_t::more); - blob_t identity = pipe->get_identity (); - rc = msg_->init_size (identity.size ()); - errno_assert (rc == 0); - memcpy (msg_->data (), identity.data (), identity.size ()); - msg_->set_flags (msg_t::more); - identity_sent = true; - } + prefetched = true; + identity_sent = true; return 0; } bool zmq::stream_t::xhas_in () { - // If we are in the middle of reading the messages, there are - // definitely more parts available. - if (more_in) - return true; - // We may already have a message pre-fetched. if (prefetched) return true; @@ -233,6 +215,7 @@ bool zmq::stream_t::xhas_in () return false; zmq_assert (pipe != NULL); + zmq_assert ((prefetched_msg.flags () & msg_t::more) == 0); blob_t identity = pipe->get_identity (); rc = prefetched_id.init_size (identity.size ()); diff --git a/src/stream.hpp b/src/stream.hpp index 73e8ca21..7c8def8a 100644 --- a/src/stream.hpp +++ b/src/stream.hpp @@ -68,9 +68,6 @@ namespace zmq // Holds the prefetched message. msg_t prefetched_msg; - // If true, more incoming message parts are expected. - bool more_in; - struct outpipe_t { zmq::pipe_t *pipe;