0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-15 10:18:01 +08:00

Backport zmq_proxy_steerable and fix error in proxy function prototype.

This commit is contained in:
Cosmo Harrigan 2014-03-17 17:55:59 -07:00
parent 1ac53ed1f7
commit e0f718bbd2
4 changed files with 71 additions and 6 deletions

View File

@ -392,6 +392,7 @@ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/* Built-in message proxy (3-way) */
ZMQ_EXPORT int zmq_proxy (void *frontend, void *backend, void *capture);
ZMQ_EXPORT int zmq_proxy_steerable (void *frontend, void *backend, void *capture, void *control);
/* Encode a binary key as printable text using ZMQ RFC 32 */
ZMQ_EXPORT char *zmq_z85_encode (char *dest, uint8_t *data, size_t size);

View File

@ -57,7 +57,8 @@
int zmq::proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *capture_)
class socket_base_t *capture_,
class socket_base_t *control_)
{
msg_t msg;
int rc = msg.init ();
@ -71,14 +72,63 @@ int zmq::proxy (
size_t moresz;
zmq_pollitem_t items [] = {
{ frontend_, 0, ZMQ_POLLIN, 0 },
{ backend_, 0, ZMQ_POLLIN, 0 }
{ backend_, 0, ZMQ_POLLIN, 0 },
{ control_, 0, ZMQ_POLLIN, 0 }
};
while (true) {
int qt_poll_items = (control_ ? 3 : 2);
// Proxy can be in these three states
enum {
active,
paused,
terminated
} state = active;
while (state != terminated) {
// Wait while there are either requests or replies to process.
rc = zmq_poll (&items [0], 2, -1);
rc = zmq_poll (&items [0], qt_poll_items, -1);
if (unlikely (rc < 0))
return -1;
// Process a control command if any
if (control_ && items [2].revents & ZMQ_POLLIN) {
rc = control_->recv (&msg, 0);
if (unlikely (rc < 0))
return -1;
moresz = sizeof more;
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0) || more)
return -1;
// Copy message to capture socket if any
if (capture_) {
msg_t ctrl;
int rc = ctrl.init ();
if (unlikely (rc < 0))
return -1;
rc = ctrl.copy (msg);
if (unlikely (rc < 0))
return -1;
rc = capture_->send (&ctrl, more? ZMQ_SNDMORE: 0);
if (unlikely (rc < 0))
return -1;
}
if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
state = paused;
else
if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
state = active;
else
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated;
else {
// This is an API error, we should assert
puts ("E: invalid command sent to proxy");
zmq_assert (false);
}
}
// Process a request
if (items [0].revents & ZMQ_POLLIN) {
while (true) {

View File

@ -25,7 +25,8 @@ namespace zmq
int proxy (
class socket_base_t *frontend_,
class socket_base_t *backend_,
class socket_base_t *control_);
class socket_base_t *capture_,
class socket_base_t *control_ = NULL); // backward compatibility without this argument);
}
#endif

View File

@ -1016,7 +1016,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
// The proxy functionality
int zmq_proxy (void *frontend_, void *backend_, void *control_)
int zmq_proxy (void *frontend_, void *backend_, void *capture_)
{
if (!frontend_ || !backend_) {
errno = EFAULT;
@ -1025,6 +1025,19 @@ int zmq_proxy (void *frontend_, void *backend_, void *control_)
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_);
}
int zmq_proxy_steerable (void *frontend_, void *backend_, void *capture_, void *control_)
{
if (!frontend_ || !backend_) {
errno = EFAULT;
return -1;
}
return zmq::proxy (
(zmq::socket_base_t*) frontend_,
(zmq::socket_base_t*) backend_,
(zmq::socket_base_t*) capture_,
(zmq::socket_base_t*) control_);
}