From 3e55545ab0a369a2cbf5922a99e73b1e50d88a8a Mon Sep 17 00:00:00 2001 From: bjovke Date: Mon, 10 Apr 2017 11:30:52 +0200 Subject: [PATCH 1/3] Code reformatting of proxy.cpp --- src/proxy.cpp | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/proxy.cpp b/src/proxy.cpp index 2ca98ef4..6496214e 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -52,7 +52,7 @@ #include "socket_base.hpp" #include "err.hpp" -int capture( +int capture ( class zmq::socket_base_t *capture_, zmq::msg_t& msg_, int more_ = 0) @@ -66,14 +66,14 @@ int capture( rc = ctrl.copy (msg_); if (unlikely (rc < 0)) return -1; - rc = capture_->send (&ctrl, more_? ZMQ_SNDMORE: 0); + rc = capture_->send (&ctrl, more_ ? ZMQ_SNDMORE : 0); if (unlikely (rc < 0)) return -1; } return 0; } -int forward( +int forward ( class zmq::socket_base_t *from_, class zmq::socket_base_t *to_, class zmq::socket_base_t *capture_, @@ -92,11 +92,11 @@ int forward( return -1; // Copy message to capture socket if any - rc = capture(capture_, msg_, more); + rc = capture (capture_, msg_, more); if (unlikely (rc < 0)) return -1; - rc = to_->send (&msg_, more? ZMQ_SNDMORE: 0); + rc = to_->send (&msg_, more ? ZMQ_SNDMORE : 0); if (unlikely (rc < 0)) return -1; if (more == 0) @@ -174,21 +174,21 @@ int zmq::proxy ( if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) state = paused; else - if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0) - state = active; - else - if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) - state = terminated; - else { - // This is an API error, so we assert - puts ("E: invalid command sent to proxy"); - zmq_assert (false); - } + if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0) + state = active; + else + if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) + state = terminated; + else { + // This is an API error, so we assert + puts ("E: invalid command sent to proxy"); + zmq_assert (false); + } } // Process a request if (state == active - && items [0].revents & ZMQ_POLLIN - && (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) { + && items [0].revents & ZMQ_POLLIN + && (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) { rc = forward (frontend_, backend_, capture_, msg); if (unlikely (rc < 0)) return close_and_return (&msg, -1); From a7977a5e84b27d817d186ee3606aeb9628ac1283 Mon Sep 17 00:00:00 2001 From: bjovke Date: Mon, 10 Apr 2017 11:34:24 +0200 Subject: [PATCH 2/3] Reworked zmq::proxy() for improved performance. --- src/proxy.cpp | 278 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 278 insertions(+) diff --git a/src/proxy.cpp b/src/proxy.cpp index 6496214e..227a2970 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -52,6 +52,30 @@ #include "socket_base.hpp" #include "err.hpp" +#ifdef ZMQ_HAVE_POLLER + +#include "socket_poller.hpp" + +// Macros for repetitive code. + +// PROXY_CLEANUP() must not be used before these variables are initialized. +#define PROXY_CLEANUP()\ + delete poller_all;\ + delete poller_in;\ + delete poller_control;\ + delete poller_receive_blocked;\ + delete poller_send_blocked;\ + delete poller_both_blocked;\ + + +#define CHECK_RC_EXIT_ON_FAILURE()\ + if (rc < 0) {\ + PROXY_CLEANUP();\ + return close_and_return (&msg, -1);\ + } + +#endif // ZMQ_HAVE_POLLER + int capture ( class zmq::socket_base_t *capture_, zmq::msg_t& msg_, @@ -105,6 +129,258 @@ int forward ( return 0; } +#ifdef ZMQ_HAVE_POLLER + +int zmq::proxy ( + class socket_base_t *frontend_, + class socket_base_t *backend_, + class socket_base_t *capture_, + class socket_base_t *control_) +{ + msg_t msg; + int rc = msg.init (); + if (rc != 0) + return -1; + + // The algorithm below assumes ratio of requests and replies processed + // under full load to be 1:1. + + int more; + size_t moresz = sizeof (more); + + // Proxy can be in these three states + enum { + active, + paused, + terminated + } state = active; + + bool frontend_equal_to_backend; + bool frontend_in = false; + bool frontend_out = false; + bool backend_in = false; + bool backend_out = false; + bool control_in = false; + zmq::socket_poller_t::event_t events [3]; + + // Don't allocate these pollers from stack because they will take more than 900 kB of stack! + // On Windows this blows up default stack of 1 MB and aborts the program. + // I wanted to use std::shared_ptr here as the best solution but that requires C++11... + zmq::socket_poller_t *poller_all = new (std::nothrow) zmq::socket_poller_t; // Poll for everything. + zmq::socket_poller_t *poller_in = new (std::nothrow) zmq::socket_poller_t; // Poll only 'ZMQ_POLLIN' on all sockets. Initial blocking poll in loop. + zmq::socket_poller_t *poller_control = new (std::nothrow) zmq::socket_poller_t; // Poll only for 'ZMQ_POLLIN' on 'control_', when proxy is paused. + zmq::socket_poller_t *poller_receive_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'frontend_'. + + // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, 'ZMQ_POLLIN' is ignored. + // In that case 'poller_send_blocked' is not used. We need only 'poller_receive_blocked'. + // We also don't need 'poller_both_blocked', no need to initialize it. + // We save some RAM and time for initialization. + zmq::socket_poller_t *poller_send_blocked = NULL; // All except 'ZMQ_POLLIN' on 'backend_'. + zmq::socket_poller_t *poller_both_blocked = NULL; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. + + if (frontend_ != backend_) { + poller_send_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on 'backend_'. + poller_both_blocked = new (std::nothrow) zmq::socket_poller_t; // All except 'ZMQ_POLLIN' on both 'frontend_' and 'backend_'. + frontend_equal_to_backend = false; + } else + frontend_equal_to_backend = true; + + if (poller_all == NULL || poller_in == NULL || poller_control == NULL || poller_receive_blocked == NULL + || ((poller_send_blocked == NULL || poller_both_blocked == NULL) && !frontend_equal_to_backend)) { + PROXY_CLEANUP (); + return close_and_return (&msg, -1); + } + + zmq::socket_poller_t *poller_wait = poller_in; // Poller for blocking wait, initially all 'ZMQ_POLLIN'. + + // Register 'frontend_' and 'backend_' with pollers. + rc = poller_all->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // Everything. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_in->add (frontend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's. + CHECK_RC_EXIT_ON_FAILURE (); + + if (frontend_equal_to_backend) { + // If frontend_==backend_ 'poller_send_blocked' and 'poller_receive_blocked' are the same, + // so we don't need 'poller_send_blocked'. We need only 'poller_receive_blocked'. + // We also don't need 'poller_both_blocked', no need to initialize it. + rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT); + CHECK_RC_EXIT_ON_FAILURE (); + } else { + rc = poller_all->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // Everything. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_in->add (backend_, NULL, ZMQ_POLLIN); // All 'ZMQ_POLLIN's. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_both_blocked->add (frontend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_both_blocked->add (backend_, NULL, ZMQ_POLLOUT); // Waiting only for 'ZMQ_POLLOUT'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_send_blocked->add (backend_, NULL, ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_send_blocked->add (frontend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'backend_'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_receive_blocked->add (frontend_, NULL, ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_receive_blocked->add (backend_, NULL, ZMQ_POLLIN | ZMQ_POLLOUT); // All except 'ZMQ_POLLIN' on 'frontend_'. + CHECK_RC_EXIT_ON_FAILURE (); + } + + // Register 'control_' with pollers. + if (control_ != NULL) { + rc = poller_all->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_in->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_control->add (control_, NULL, ZMQ_POLLIN); // When proxy is paused we wait only for ZMQ_POLLIN on 'control_' socket. + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_receive_blocked->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + if (!frontend_equal_to_backend) { + rc = poller_send_blocked->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + rc = poller_both_blocked->add (control_, NULL, ZMQ_POLLIN); + CHECK_RC_EXIT_ON_FAILURE (); + } + } + + + int i; + bool request_processed, reply_processed; + + + while (state != terminated) { + + // Blocking wait initially only for 'ZMQ_POLLIN' - 'poller_wait' points to 'poller_in'. + // If one of receiving end's queue is full ('ZMQ_POLLOUT' not available), + // 'poller_wait' is pointed to 'poller_receive_blocked', 'poller_send_blocked' or 'poller_both_blocked'. + rc = poller_wait->wait (events, 3, -1); + if (rc < 0 && errno == ETIMEDOUT) + rc = 0; + CHECK_RC_EXIT_ON_FAILURE (); + + // Some of events waited for by 'poller_wait' have arrived, now poll for everything without blocking. + rc = poller_all->wait (events, 3, 0); + if (rc < 0 && errno == ETIMEDOUT) + rc = 0; + CHECK_RC_EXIT_ON_FAILURE (); + + // Process events. + for (i = 0; i < rc; i++) { + if (events [i].socket == frontend_) { + frontend_in = (events [i].events & ZMQ_POLLIN) != 0; + frontend_out = (events [i].events & ZMQ_POLLOUT) != 0; + } else + // This 'if' needs to be after check for 'frontend_' in order never + // to be reached in case frontend_==backend_, so we ensure backend_in=false in that case. + if (events [i].socket == backend_) { + backend_in = (events [i].events & ZMQ_POLLIN) != 0; + backend_out = (events [i].events & ZMQ_POLLOUT) != 0; + } else + if (events [i].socket == control_) + control_in = (events [i].events & ZMQ_POLLIN) != 0; + } + + + // Process a control command if any. + if (control_in) { + rc = control_->recv (&msg, 0); + CHECK_RC_EXIT_ON_FAILURE (); + rc = control_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0) || more) { + PROXY_CLEANUP (); + return close_and_return (&msg, -1); + } + + // Copy message to capture socket if any. + rc = capture (capture_, msg); + CHECK_RC_EXIT_ON_FAILURE (); + + if (msg.size () == 5 && memcmp (msg.data (), "PAUSE", 5) == 0) { + state = paused; + poller_wait = poller_control; + } else + if (msg.size () == 6 && memcmp (msg.data (), "RESUME", 6) == 0) { + state = active; + poller_wait = poller_in; + } else + if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) + state = terminated; + else { + // This is an API error, we assert + puts ("E: invalid command sent to proxy"); + zmq_assert (false); + } + control_in = false; + } + + if (state == active) { + + // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'. + // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event. + if (frontend_in && (backend_out || frontend_equal_to_backend)) { + rc = forward (frontend_, backend_, capture_, msg); + CHECK_RC_EXIT_ON_FAILURE (); + request_processed = true; + frontend_in = backend_out = false; + } else request_processed = false; + + // Process a reply, 'ZMQ_POLLIN' on 'backend_' and 'ZMQ_POLLOUT' on 'frontend_'. + // If 'frontend_' and 'backend_' are the same this is not needed because previous processing + // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to + // design in 'for' event processing loop. + if (backend_in && frontend_out) { + rc = forward (backend_, frontend_, capture_, msg); + CHECK_RC_EXIT_ON_FAILURE (); + reply_processed = true; + backend_in = frontend_out = false; + } else reply_processed = false; + + if (request_processed || reply_processed) { + // If request/reply is processed that means we had at least one 'ZMQ_POLLOUT' event. + // Enable corresponding 'ZMQ_POLLIN' for blocking wait if any was disabled. + if (poller_wait != poller_in) { + if (request_processed) { // 'frontend_' -> 'backend_' + if (poller_wait == poller_both_blocked) + poller_wait = poller_send_blocked; + else + if (poller_wait == poller_receive_blocked) + poller_wait = poller_in; + } + if (reply_processed) { // 'backend_' -> 'frontend_' + if (poller_wait == poller_both_blocked) + poller_wait = poller_receive_blocked; + else + if (poller_wait == poller_send_blocked) + poller_wait = poller_in; + } + } + } else { + // No requests have been processed, there were no 'ZMQ_POLLOUT' events. + // That means that receiving end queue(s) is/are full. + // Disable receiving 'ZMQ_POLLIN' for sockets for which there's no 'ZMQ_POLLOUT'. + if (frontend_in) { + if (poller_wait == poller_send_blocked) + poller_wait = poller_both_blocked; + else + poller_wait = poller_receive_blocked; + } + if (backend_in) { + // Will never be reached if frontend_==backend_, 'backend_in' will + // always be false due to design in 'for' event processing loop. + if (poller_wait == poller_receive_blocked) + poller_wait = poller_both_blocked; + else + poller_wait = poller_send_blocked; + } + } + + } + } + PROXY_CLEANUP (); + return close_and_return (&msg, 0); +} + +#else // ZMQ_HAVE_POLLER + int zmq::proxy ( class socket_base_t *frontend_, class socket_base_t *backend_, @@ -206,3 +482,5 @@ int zmq::proxy ( return close_and_return (&msg, 0); } + +#endif // ZMQ_HAVE_POLLER From 26520fe15261795aeee592cd19fc0d9f9be6efc1 Mon Sep 17 00:00:00 2001 From: bjovke Date: Mon, 10 Apr 2017 11:35:08 +0200 Subject: [PATCH 3/3] zmq::socket_poller_t speed improvement for constructor and wait() function. --- src/socket_poller.cpp | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index 124ff059..e46c141b 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -46,9 +46,18 @@ zmq::socket_poller_t::socket_poller_t () : #endif { #if defined ZMQ_POLL_BASED_ON_SELECT +#if defined ZMQ_HAVE_WINDOWS + // On Windows fd_set contains array of SOCKETs, each 4 bytes. + // For large fd_sets memset() could be expensive and it is unnecessary. + // It is enough to set fd_count to 0, exactly what FD_ZERO() macro does. + FD_ZERO (&pollset_in); + FD_ZERO (&pollset_out); + FD_ZERO (&pollset_err); +#else memset(&pollset_in, 0, sizeof(pollset_in)); - memset(&pollset_out, 0, sizeof(pollset_in)); - memset(&pollset_err, 0, sizeof(pollset_in)); + memset(&pollset_out, 0, sizeof(pollset_out)); + memset(&pollset_err, 0, sizeof(pollset_err)); +#endif #endif } @@ -585,10 +594,14 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev // Wait for events. Ignore interrupts if there's infinite timeout. while (true) { - memcpy (&inset, &pollset_in, sizeof (fd_set)); - memcpy (&outset, &pollset_out, sizeof (fd_set)); - memcpy (&errset, &pollset_err, sizeof (fd_set)); #if defined ZMQ_HAVE_WINDOWS + // On Windows we don't need to copy the whole fd_set. + // SOCKETS are continuous from the beginning of fd_array in fd_set. + // We just need to copy fd_count elements of fd_array. + // We gain huge memcpy() improvement if number of used SOCKETs is much lower than FD_SETSIZE. + memcpy (&inset, &pollset_in, sizeof (pollset_in.fd_count) + sizeof(*(pollset_in.fd_array)) * pollset_in.fd_count); + memcpy (&outset, &pollset_out, sizeof (pollset_out.fd_count) + sizeof (*(pollset_out.fd_array)) * pollset_out.fd_count); + memcpy (&errset, &pollset_err, sizeof (pollset_err.fd_count) + sizeof (*(pollset_err.fd_array)) * pollset_err.fd_count); int rc = select (0, &inset, &outset, &errset, ptimeout); if (unlikely (rc == SOCKET_ERROR)) { errno = zmq::wsa_error_to_errno (WSAGetLastError ()); @@ -596,6 +609,9 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev return -1; } #else + memcpy (&inset, &pollset_in, sizeof (fd_set)); + memcpy (&outset, &pollset_out, sizeof (fd_set)); + memcpy (&errset, &pollset_err, sizeof (fd_set)); int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); if (unlikely (rc == -1)) { errno_assert (errno == EINTR || errno == EBADF);