mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 23:36:04 +00:00
Merge pull request #2518 from bjovke/master
Reworked zmq::proxy() for improved performance.
This commit is contained in:
commit
545cacf5e2
312
src/proxy.cpp
312
src/proxy.cpp
@ -52,7 +52,31 @@
|
||||
#include "socket_base.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
int capture(
|
||||
#ifdef ZMQ_HAVE_POLLER
|
||||
|
||||
#include "socket_poller.hpp"
|
||||
|
||||
// Macros for repetitive code.
|
||||
|
||||
// PROXY_CLEANUP() must not be used before these variables are initialized.
|
||||
#define PROXY_CLEANUP()\
|
||||
delete poller_all;\
|
||||
delete poller_in;\
|
||||
delete poller_control;\
|
||||
delete poller_receive_blocked;\
|
||||
delete poller_send_blocked;\
|
||||
delete poller_both_blocked;\
|
||||
|
||||
|
||||
#define CHECK_RC_EXIT_ON_FAILURE()\
|
||||
if (rc < 0) {\
|
||||
PROXY_CLEANUP();\
|
||||
return close_and_return (&msg, -1);\
|
||||
}
|
||||
|
||||
#endif // ZMQ_HAVE_POLLER
|
||||
|
||||
int capture (
|
||||
class zmq::socket_base_t *capture_,
|
||||
zmq::msg_t& msg_,
|
||||
int more_ = 0)
|
||||
@ -66,14 +90,14 @@ int capture(
|
||||
rc = ctrl.copy (msg_);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
rc = capture_->send (&ctrl, more_? ZMQ_SNDMORE: 0);
|
||||
rc = capture_->send (&ctrl, more_ ? ZMQ_SNDMORE : 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int forward(
|
||||
int forward (
|
||||
class zmq::socket_base_t *from_,
|
||||
class zmq::socket_base_t *to_,
|
||||
class zmq::socket_base_t *capture_,
|
||||
@ -92,11 +116,11 @@ int forward(
|
||||
return -1;
|
||||
|
||||
// Copy message to capture socket if any
|
||||
rc = capture(capture_, msg_, more);
|
||||
rc = capture (capture_, msg_, more);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
|
||||
rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0);
|
||||
rc = to_->send (&msg_, more ? ZMQ_SNDMORE : 0);
|
||||
if (unlikely (rc < 0))
|
||||
return -1;
|
||||
if (more == 0)
|
||||
@ -105,6 +129,258 @@ int forward(
|
||||
return 0;
|
||||
}
|
||||
|
||||
#ifdef ZMQ_HAVE_POLLER
|
||||
|
||||
int zmq::proxy (
|
||||
class socket_base_t *frontend_,
|
||||
class socket_base_t *backend_,
|
||||
class socket_base_t *capture_,
|
||||
class socket_base_t *control_)
|
||||
{
|
||||
msg_t msg;
|
||||
int rc = msg.init ();
|
||||
if (rc != 0)
|
||||
return -1;
|
||||
|
||||
// The algorithm below assumes ratio of requests and replies processed
|
||||
// under full load to be 1:1.
|
||||
|
||||
int more;
|
||||
size_t moresz = sizeof (more);
|
||||
|
||||
// Proxy can be in these three states
|
||||
enum {
|
||||
active,
|
||||
paused,
|
||||
terminated
|
||||
} state = active;
|
||||
|
||||
bool frontend_equal_to_backend;
|
||||
bool frontend_in = false;
|
||||
bool frontend_out = false;
|
||||
bool backend_in = false;
|
||||
bool backend_out = false;
|
||||
bool control_in = false;
|
||||
zmq::socket_poller_t::event_t events [3];
|
||||
|
||||
// Don't allocate these pollers from stack because they will take more than 900 kB of stack!
|
||||
// On Windows this blows up default stack of 1 MB and aborts the program.
|
||||
// I wanted to use std::shared_ptr here as the best solution but that requires C++11...
|
||||
zmq::socket_poller_t *poller_all = new (std::nothrow) zmq::socket_poller_t; // Poll for everything.
|
||||
zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop.
|
||||
zmq::socket_poller_t *poller_control = new (std::nothrow) zmq::socket_poller_t; // Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused.
|
||||
zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'.
|
||||
|
||||
// If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored.
|
||||
// In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'.
|
||||
// We also don't need 'poller_both_blocked', no need to initialize it.
|
||||
// We save some RAM and time for initialization.
|
||||
zmq::socket_poller_t *poller_send_blocked = NULL; // All except 'ZMQ_POLLIN' on 'backend_'.
|
||||
zmq::socket_poller_t *poller_both_blocked = NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
|
||||
|
||||
if (frontend_ != backend_) {
|
||||
poller_send_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'.
|
||||
poller_both_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'.
|
||||
frontend_equal_to_backend = false;
|
||||
} else
|
||||
frontend_equal_to_backend = true;
|
||||
|
||||
if (poller_all == NULL || poller_in == NULL || poller_control == NULL || poller_receive_blocked == NULL
|
||||
|| ((poller_send_blocked == NULL || poller_both_blocked == NULL) && !frontend_equal_to_backend)) {
|
||||
PROXY_CLEANUP ();
|
||||
return close_and_return (&msg, -1);
|
||||
}
|
||||
|
||||
zmq::socket_poller_t *poller_wait = poller_in; // Poller for blocking wait, initially all 'ZMQ_POLLIN'.
|
||||
|
||||
// Register 'frontend_' and 'backend_' with pollers.
|
||||
rc = poller_all->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
|
||||
if (frontend_equal_to_backend) {
|
||||
// If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same,
|
||||
// so we don't need 'poller_send_blocked'. We need only 'poller_receive_blocked'.
|
||||
// We also don't need 'poller_both_blocked', no need to initialize it.
|
||||
rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
} else {
|
||||
rc = poller_all->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // Everything.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_both_blocked->add (frontend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_both_blocked->add (backend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_send_blocked->add (backend_, NULL, ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_send_blocked->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_receive_blocked->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
}
|
||||
|
||||
// Register 'control_' with pollers.
|
||||
if (control_ != NULL) {
|
||||
rc = poller_all->add (control_, NULL, ZMQ_POLLIN);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_in->add (control_, NULL, ZMQ_POLLIN);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_control->add (control_, NULL, ZMQ_POLLIN); // When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket.
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
if (!frontend_equal_to_backend) {
|
||||
rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int i;
|
||||
bool request_processed, reply_processed;
|
||||
|
||||
|
||||
while (state != terminated) {
|
||||
|
||||
// Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'.
|
||||
// If one of receiving end's queue is full ('ZMQ_POLLOUT' not available),
|
||||
// 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'.
|
||||
rc = poller_wait->wait (events, 3, -1);
|
||||
if (rc < 0 && errno == ETIMEDOUT)
|
||||
rc = 0;
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
|
||||
// Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking.
|
||||
rc = poller_all->wait (events, 3, 0);
|
||||
if (rc < 0 && errno == ETIMEDOUT)
|
||||
rc = 0;
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
|
||||
// Process events.
|
||||
for (i = 0; i < rc; i++) {
|
||||
if (events [i].socket == frontend_) {
|
||||
frontend_in = (events [i].events & ZMQ_POLLIN) != 0;
|
||||
frontend_out = (events [i].events & ZMQ_POLLOUT) != 0;
|
||||
} else
|
||||
// This 'if' needs to be after check for 'frontend_' in order never
|
||||
// to be reached in case frontend_==backend_, so we ensure backend_in=false in that case.
|
||||
if (events [i].socket == backend_) {
|
||||
backend_in = (events [i].events & ZMQ_POLLIN) != 0;
|
||||
backend_out = (events [i].events & ZMQ_POLLOUT) != 0;
|
||||
} else
|
||||
if (events [i].socket == control_)
|
||||
control_in = (events [i].events & ZMQ_POLLIN) != 0;
|
||||
}
|
||||
|
||||
|
||||
// Process a control command if any.
|
||||
if (control_in) {
|
||||
rc = control_->recv (&msg, 0);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
|
||||
if (unlikely (rc < 0) || more) {
|
||||
PROXY_CLEANUP ();
|
||||
return close_and_return (&msg, -1);
|
||||
}
|
||||
|
||||
// Copy message to capture socket if any.
|
||||
rc = capture (capture_, msg);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
|
||||
if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) {
|
||||
state = paused;
|
||||
poller_wait = poller_control;
|
||||
} else
|
||||
if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0) {
|
||||
state = active;
|
||||
poller_wait = poller_in;
|
||||
} else
|
||||
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
|
||||
state = terminated;
|
||||
else {
|
||||
// This is an API error, we assert
|
||||
puts ("E: invalid command sent to proxy");
|
||||
zmq_assert (false);
|
||||
}
|
||||
control_in = false;
|
||||
}
|
||||
|
||||
if (state == active) {
|
||||
|
||||
// Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
|
||||
// In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
|
||||
if (frontend_in && (backend_out || frontend_equal_to_backend)) {
|
||||
rc = forward (frontend_, backend_, capture_, msg);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
request_processed = true;
|
||||
frontend_in = backend_out = false;
|
||||
} else request_processed = false;
|
||||
|
||||
// Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'.
|
||||
// If 'frontend_' and 'backend_' are the same this is not needed because previous processing
|
||||
// covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
|
||||
// design in 'for' event processing loop.
|
||||
if (backend_in && frontend_out) {
|
||||
rc = forward (backend_, frontend_, capture_, msg);
|
||||
CHECK_RC_EXIT_ON_FAILURE ();
|
||||
reply_processed = true;
|
||||
backend_in = frontend_out = false;
|
||||
} else reply_processed = false;
|
||||
|
||||
if (request_processed || reply_processed) {
|
||||
// If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event.
|
||||
// Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled.
|
||||
if (poller_wait != poller_in) {
|
||||
if (request_processed) { // 'frontend_' -> 'backend_'
|
||||
if (poller_wait == poller_both_blocked)
|
||||
poller_wait = poller_send_blocked;
|
||||
else
|
||||
if (poller_wait == poller_receive_blocked)
|
||||
poller_wait = poller_in;
|
||||
}
|
||||
if (reply_processed) { // 'backend_' -> 'frontend_'
|
||||
if (poller_wait == poller_both_blocked)
|
||||
poller_wait = poller_receive_blocked;
|
||||
else
|
||||
if (poller_wait == poller_send_blocked)
|
||||
poller_wait = poller_in;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No requests have been processed, there were no 'ZMQ_POLLOUT' events.
|
||||
// That means that receiving end queue(s) is/are full.
|
||||
// Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT'.
|
||||
if (frontend_in) {
|
||||
if (poller_wait == poller_send_blocked)
|
||||
poller_wait = poller_both_blocked;
|
||||
else
|
||||
poller_wait = poller_receive_blocked;
|
||||
}
|
||||
if (backend_in) {
|
||||
// Will never be reached if frontend_==backend_, 'backend_in' will
|
||||
// always be false due to design in 'for' event processing loop.
|
||||
if (poller_wait == poller_receive_blocked)
|
||||
poller_wait = poller_both_blocked;
|
||||
else
|
||||
poller_wait = poller_send_blocked;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
PROXY_CLEANUP ();
|
||||
return close_and_return (&msg, 0);
|
||||
}
|
||||
|
||||
#else // ZMQ_HAVE_POLLER
|
||||
|
||||
int zmq::proxy (
|
||||
class socket_base_t *frontend_,
|
||||
class socket_base_t *backend_,
|
||||
@ -174,21 +450,21 @@ int zmq::proxy (
|
||||
if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0)
|
||||
state = paused;
|
||||
else
|
||||
if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
|
||||
state = active;
|
||||
else
|
||||
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
|
||||
state = terminated;
|
||||
else {
|
||||
// This is an API error, so we assert
|
||||
puts ("E: invalid command sent to proxy");
|
||||
zmq_assert (false);
|
||||
}
|
||||
if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0)
|
||||
state = active;
|
||||
else
|
||||
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
|
||||
state = terminated;
|
||||
else {
|
||||
// This is an API error, so we assert
|
||||
puts ("E: invalid command sent to proxy");
|
||||
zmq_assert (false);
|
||||
}
|
||||
}
|
||||
// Process a request
|
||||
if (state == active
|
||||
&& items [0].revents & ZMQ_POLLIN
|
||||
&& (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
|
||||
&& items [0].revents & ZMQ_POLLIN
|
||||
&& (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
|
||||
rc = forward (frontend_, backend_, capture_, msg);
|
||||
if (unlikely (rc < 0))
|
||||
return close_and_return (&msg, -1);
|
||||
@ -206,3 +482,5 @@ int zmq::proxy (
|
||||
|
||||
return close_and_return (&msg, 0);
|
||||
}
|
||||
|
||||
#endif // ZMQ_HAVE_POLLER
|
||||
|
@ -46,9 +46,18 @@ zmq::socket_poller_t::socket_poller_t () :
|
||||
#endif
|
||||
{
|
||||
#if defined ZMQ_POLL_BASED_ON_SELECT
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
// On Windows fd_set contains array of SOCKETs, each 4 bytes.
|
||||
// For large fd_sets memset() could be expensive and it is unnecessary.
|
||||
// It is enough to set fd_count to 0, exactly what FD_ZERO() macro does.
|
||||
FD_ZERO (&pollset_in);
|
||||
FD_ZERO (&pollset_out);
|
||||
FD_ZERO (&pollset_err);
|
||||
#else
|
||||
memset(&pollset_in, 0, sizeof(pollset_in));
|
||||
memset(&pollset_out, 0, sizeof(pollset_in));
|
||||
memset(&pollset_err, 0, sizeof(pollset_in));
|
||||
memset(&pollset_out, 0, sizeof(pollset_out));
|
||||
memset(&pollset_err, 0, sizeof(pollset_err));
|
||||
#endif
|
||||
#endif
|
||||
}
|
||||
|
||||
@ -585,10 +594,14 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
|
||||
|
||||
// Wait for events. Ignore interrupts if there's infinite timeout.
|
||||
while (true) {
|
||||
memcpy (&inset, &pollset_in, sizeof (fd_set));
|
||||
memcpy (&outset, &pollset_out, sizeof (fd_set));
|
||||
memcpy (&errset, &pollset_err, sizeof (fd_set));
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
// On Windows we don't need to copy the whole fd_set.
|
||||
// SOCKETS are continuous from the beginning of fd_array in fd_set.
|
||||
// We just need to copy fd_count elements of fd_array.
|
||||
// We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE.
|
||||
memcpy (&inset, &pollset_in, sizeof (pollset_in.fd_count) + sizeof(*(pollset_in.fd_array)) * pollset_in.fd_count);
|
||||
memcpy (&outset, &pollset_out, sizeof (pollset_out.fd_count) + sizeof (*(pollset_out.fd_array)) * pollset_out.fd_count);
|
||||
memcpy (&errset, &pollset_err, sizeof (pollset_err.fd_count) + sizeof (*(pollset_err.fd_array)) * pollset_err.fd_count);
|
||||
int rc = select (0, &inset, &outset, &errset, ptimeout);
|
||||
if (unlikely (rc == SOCKET_ERROR)) {
|
||||
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
|
||||
@ -596,6 +609,9 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
|
||||
return -1;
|
||||
}
|
||||
#else
|
||||
memcpy (&inset, &pollset_in, sizeof (fd_set));
|
||||
memcpy (&outset, &pollset_out, sizeof (fd_set));
|
||||
memcpy (&errset, &pollset_err, sizeof (fd_set));
|
||||
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
|
||||
if (unlikely (rc == -1)) {
|
||||
errno_assert (errno == EINTR || errno == EBADF);
|
||||
|
Loading…
x
Reference in New Issue
Block a user