mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-10 16:06:09 +00:00
forwarder and streamer devices handle multi-part messages correctly
This commit is contained in:
parent
43f2c6ff5b
commit
3cb84b5cea
@ -21,6 +21,7 @@
|
||||
|
||||
#include "forwarder.hpp"
|
||||
#include "socket_base.hpp"
|
||||
#include "likely.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_)
|
||||
@ -29,16 +30,26 @@ int zmq::forwarder (socket_base_t *insocket_, socket_base_t *outsocket_)
|
||||
int rc = zmq_msg_init (&msg);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
int64_t more;
|
||||
size_t more_sz = sizeof (more);
|
||||
|
||||
while (true) {
|
||||
rc = insocket_->recv (&msg, 0);
|
||||
if (rc < 0) {
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
}
|
||||
|
||||
rc = outsocket_->send (&msg, 0);
|
||||
if (rc < 0) {
|
||||
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz);
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
}
|
||||
|
||||
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
|
@ -23,6 +23,7 @@
|
||||
|
||||
#include "queue.hpp"
|
||||
#include "socket_base.hpp"
|
||||
#include "likely.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
int zmq::queue (class socket_base_t *insocket_,
|
||||
@ -49,7 +50,7 @@ int zmq::queue (class socket_base_t *insocket_,
|
||||
|
||||
// Wait while there are either requests or replies to process.
|
||||
rc = zmq_poll (&items [0], 2, -1);
|
||||
if (rc < 0) {
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
@ -65,7 +66,7 @@ int zmq::queue (class socket_base_t *insocket_,
|
||||
while (true) {
|
||||
|
||||
rc = insocket_->recv (&msg, 0);
|
||||
if (rc < 0) {
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
@ -73,14 +74,14 @@ int zmq::queue (class socket_base_t *insocket_,
|
||||
|
||||
moresz = sizeof (more);
|
||||
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||
if (rc < 0) {
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
}
|
||||
|
||||
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
|
||||
if (rc < 0) {
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
@ -96,7 +97,7 @@ int zmq::queue (class socket_base_t *insocket_,
|
||||
while (true) {
|
||||
|
||||
rc = outsocket_->recv (&msg, 0);
|
||||
if (rc < 0) {
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
@ -104,14 +105,14 @@ int zmq::queue (class socket_base_t *insocket_,
|
||||
|
||||
moresz = sizeof (more);
|
||||
rc = outsocket_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||
if (rc < 0) {
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
}
|
||||
|
||||
rc = insocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
|
||||
if (rc < 0) {
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
|
@ -21,6 +21,7 @@
|
||||
|
||||
#include "streamer.hpp"
|
||||
#include "socket_base.hpp"
|
||||
#include "likely.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_)
|
||||
@ -29,16 +30,26 @@ int zmq::streamer (socket_base_t *insocket_, socket_base_t *outsocket_)
|
||||
int rc = zmq_msg_init (&msg);
|
||||
errno_assert (rc == 0);
|
||||
|
||||
int64_t more;
|
||||
size_t more_sz = sizeof (more);
|
||||
|
||||
while (true) {
|
||||
rc = insocket_->recv (&msg, 0);
|
||||
if (rc < 0) {
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
}
|
||||
|
||||
rc = insocket_->getsockopt (ZMQ_RCVMORE, &more, &more_sz);
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
}
|
||||
|
||||
rc = outsocket_->send (&msg, 0);
|
||||
if (rc < 0) {
|
||||
rc = outsocket_->send (&msg, more ? ZMQ_SNDMORE : 0);
|
||||
if (unlikely (rc < 0)) {
|
||||
if (errno == ETERM)
|
||||
return -1;
|
||||
errno_assert (false);
|
||||
|
Loading…
x
Reference in New Issue
Block a user