mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-29 00:32:34 +08:00
factorize code in proxy
This commit is contained in:
parent
0e94ddf377
commit
5bc6737039
128
src/proxy.cpp
128
src/proxy.cpp
@ -53,6 +53,58 @@
|
||||
// zmq.h must be included *after* poll.h for AIX to build properly
|
||||
#include "../include/zmq.h"
|
||||
|
||||
int capture(
|
||||
class zmq::socket_base_t *capture_,
|
||||
zmq::msg_t& msg_,
|
||||
int more_ = 0)
|
||||
{
|
||||
// Copy message to capture socket if any
|
||||
if (capture_) {
|
||||
zmq::msg_t ctrl;
|
||||
int rc = ctrl.init ();
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
rc = ctrl.copy (msg_);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
rc = capture_->send (&ctrl, more_? ZMQ_SNDMORE: 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int forward(
|
||||
class zmq::socket_base_t *from_,
|
||||
class zmq::socket_base_t *to_,
|
||||
class zmq::socket_base_t *capture_,
|
||||
zmq::msg_t& msg_)
|
||||
{
|
||||
int more;
|
||||
size_t moresz;
|
||||
while (true) {
|
||||
int rc = from_->recv (&msg_, 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
moresz = sizeof more;
|
||||
rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
// Copy message to capture socket if any
|
||||
rc = capture(capture_, msg_, more);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
if (more == 0)
|
||||
break;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int zmq::proxy (
|
||||
class socket_base_t *frontend_,
|
||||
@ -102,18 +154,10 @@ int zmq::proxy (
|
||||
return -1;
|
||||
|
||||
// Copy message to capture socket if any
|
||||
if (capture_) {
|
||||
msg_t ctrl;
|
||||
rc = ctrl.init ();
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
rc = ctrl.copy (msg);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
rc = capture_->send (&ctrl, 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
}
|
||||
rc = capture(capture_, msg);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
|
||||
state = paused;
|
||||
else
|
||||
@ -131,68 +175,16 @@ int zmq::proxy (
|
||||
// Process a request
|
||||
if (state == active
|
||||
&& items [0].revents & ZMQ_POLLIN) {
|
||||
while (true) {
|
||||
rc = frontend_->recv (&msg, 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
moresz = sizeof more;
|
||||
rc = frontend_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
// Copy message to capture socket if any
|
||||
if (capture_) {
|
||||
msg_t ctrl;
|
||||
rc = ctrl.init ();
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
rc = ctrl.copy (msg);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
}
|
||||
rc = backend_->send (&msg, more? ZMQ_SNDMORE: 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
if (more == 0)
|
||||
break;
|
||||
}
|
||||
rc = forward(frontend_, backend_, capture_,msg);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
}
|
||||
// Process a reply
|
||||
if (state == active
|
||||
&& items [1].revents & ZMQ_POLLIN) {
|
||||
while (true) {
|
||||
rc = backend_->recv (&msg, 0);
|
||||
rc = forward(backend_, frontend_, capture_,msg);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
moresz = sizeof more;
|
||||
rc = backend_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
// Copy message to capture socket if any
|
||||
if (capture_) {
|
||||
msg_t ctrl;
|
||||
rc = ctrl.init ();
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
rc = ctrl.copy (msg);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
}
|
||||
rc = frontend_->send (&msg, more? ZMQ_SNDMORE: 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
if (more == 0)
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
|
Loading…
x
Reference in New Issue
Block a user