diff --git a/.gitignore b/.gitignore index 3f4931ed..27cdde0f 100644 --- a/.gitignore +++ b/.gitignore @@ -168,6 +168,7 @@ remote_lat remote_thr inproc_lat inproc_thr +proxy_thr benchmark_radix_tree !local_lat/ !local_thr/ diff --git a/CMakeLists.txt b/CMakeLists.txt index a631d50f..0b53acf0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1292,7 +1292,8 @@ if(BUILD_SHARED) local_thr remote_thr inproc_lat - inproc_thr) + inproc_thr + proxy_thr) if(NOT CMAKE_BUILD_TYPE STREQUAL "Debug") # Why? option(WITH_PERF_TOOL "Build with perf-tools" ON) diff --git a/Makefile.am b/Makefile.am index 4dcfff6e..ca15828d 100644 --- a/Makefile.am +++ b/Makefile.am @@ -341,7 +341,8 @@ noinst_PROGRAMS = \ perf/local_thr \ perf/remote_thr \ perf/inproc_lat \ - perf/inproc_thr + perf/inproc_thr \ + perf/proxy_thr perf_local_lat_LDADD = src/libzmq.la perf_local_lat_SOURCES = perf/local_lat.cpp @@ -361,6 +362,9 @@ perf_inproc_lat_SOURCES = perf/inproc_lat.cpp perf_inproc_thr_LDADD = src/libzmq.la perf_inproc_thr_SOURCES = perf/inproc_thr.cpp +perf_proxy_thr_LDADD = src/libzmq.la +perf_proxy_thr_SOURCES = perf/proxy_thr.cpp + if ENABLE_STATIC noinst_PROGRAMS += \ perf/benchmark_radix_tree diff --git a/RELICENSE/emtr.md b/RELICENSE/emtr.md new file mode 100644 index 00000000..602f3de9 --- /dev/null +++ b/RELICENSE/emtr.md @@ -0,0 +1,15 @@ +# Permission to Relicense under MPLv2 or any other OSI approved license chosen by the current ZeroMQ BDFL + +This is a statement by Claudio Biagi that grants permission to relicense +its copyrights in the libzmq C++ library (ZeroMQ) under the Mozilla Public +License v2 (MPLv2) or any other Open Source Initiative approved license +chosen by the current ZeroMQ BDFL (Benevolent Dictator for Life). + +A portion of the commits made by the Github handle "emtr", with +commit author "Claudio Biagi ", are copyright +of Claudio Biagi. This document hereby grants the libzmq project +team to relicense libzmq, including all past, present and future +contributions of the author listed above. + +Claudio Biagi +2019/03/07 diff --git a/perf/proxy_thr.cpp b/perf/proxy_thr.cpp new file mode 100644 index 00000000..e8f924f2 --- /dev/null +++ b/perf/proxy_thr.cpp @@ -0,0 +1,401 @@ +/* + Copyright (c) 2007-2017 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ +#include "../include/zmq.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "platform.hpp" + +#if defined ZMQ_HAVE_WINDOWS +#include +#include +#else +#include +#include +#endif + + +/* + Asynchronous proxy benchmark using ZMQ_XPUB_NODROP. + + Topology: + + XPUB SUB + | | + +-----> XSUB -> XPUB -----/ + | ^^^^^^^^^^^^ + XPUB ZMQ proxy + + All connections use "inproc" transport. The two XPUB sockets start + flooding the proxy. The throughput is computed using the bytes received + in the SUB socket. +*/ + + +#define HWM 10000 + +#ifndef ARRAY_SIZE +#define ARRAY_SIZE(x) (sizeof (x) / sizeof (*x)) +#endif + +#define TEST_ASSERT_SUCCESS_ERRNO(expr) \ + test_assert_success_message_errno_helper (expr, NULL, #expr) + + +static uint64_t message_count = 0; +static size_t message_size = 0; + + +typedef struct +{ + void *context; + int thread_idx; + const char *frontend_endpoint[4]; + const char *backend_endpoint[4]; + const char *control_endpoint; +} proxy_hwm_cfg_t; + + +int test_assert_success_message_errno_helper (int rc_, + const char *msg_, + const char *expr_) +{ + if (rc_ == -1) { + char buffer[512]; + buffer[sizeof (buffer) - 1] = + 0; // to ensure defined behavior with VC++ <= 2013 + printf ("%s failed%s%s%s, errno = %i (%s)", expr_, + msg_ ? " (additional info: " : "", msg_ ? msg_ : "", + msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ())); + exit (1); + } + return rc_; +} + +static void set_hwm (void *skt) +{ + int hwm = HWM; + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (skt, ZMQ_SNDHWM, &hwm, sizeof (hwm))); + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (skt, ZMQ_RCVHWM, &hwm, sizeof (hwm))); +} + +static void publisher_thread_main (void *pvoid) +{ + const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; + const int idx = cfg->thread_idx; + int optval; + int rc; + + void *pubsocket = zmq_socket (cfg->context, ZMQ_XPUB); + assert (pubsocket); + + set_hwm (pubsocket); + + optval = 1; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pubsocket, ZMQ_XPUB_NODROP, &optval, sizeof (optval))); + + optval = 1; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (pubsocket, ZMQ_SNDTIMEO, &optval, sizeof (optval))); + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_connect (pubsocket, cfg->frontend_endpoint[idx])); + + // Wait before starting TX operations till 1 subscriber has subscribed + // (in this test there's 1 subscriber only) + char buffer[32] = {}; + rc = TEST_ASSERT_SUCCESS_ERRNO ( + zmq_recv (pubsocket, buffer, sizeof (buffer), 0)); + if (rc != 1) { + printf ("invalid response length: expected 1, received %d", rc); + exit (1); + } + if (buffer[0] != 1) { + printf ("invalid response value: expected 1, received %d", + (int) buffer[0]); + exit (1); + } + + zmq_msg_t msg_orig; + rc = zmq_msg_init_size (&msg_orig, message_size); + assert (rc == 0); + memset (zmq_msg_data (&msg_orig), 'A', zmq_msg_size (&msg_orig)); + + uint64_t send_count = 0; + while (send_count < message_count) { + zmq_msg_t msg; + zmq_msg_init (&msg); + rc = zmq_msg_copy (&msg, &msg_orig); + assert (rc == 0); + + // Send the message to the socket + rc = zmq_msg_send (&msg, pubsocket, 0); + if (rc != -1) { + send_count++; + } else { + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + } + } + + zmq_close (pubsocket); + printf ("publisher thread ended\n"); +} + +static void subscriber_thread_main (void *pvoid) +{ + const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; + const int idx = cfg->thread_idx; + + void *subsocket = zmq_socket (cfg->context, ZMQ_SUB); + assert (subsocket); + + set_hwm (subsocket); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (subsocket, ZMQ_SUBSCRIBE, 0, 0)); + + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_connect (subsocket, cfg->backend_endpoint[idx])); + + // Receive message_count messages + uint64_t rxsuccess = 0; + bool success = true; + while (success) { + zmq_msg_t msg; + int rc = zmq_msg_init (&msg); + assert (rc == 0); + + rc = zmq_msg_recv (&msg, subsocket, 0); + if (rc != -1) { + TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg)); + rxsuccess++; + } + + if (rxsuccess == message_count) + break; + } + + // Cleanup + + zmq_close (subsocket); + printf ("subscriber thread ended\n"); +} + +static void proxy_thread_main (void *pvoid) +{ + const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid; + int rc; + + // FRONTEND SUB + + void *frontend_xsub = zmq_socket ( + cfg->context, + ZMQ_XSUB); // the frontend is the one exposed to internal threads (INPROC) + assert (frontend_xsub); + + set_hwm (frontend_xsub); + + // Bind FRONTEND + for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) { + const char *ep = cfg->frontend_endpoint[i]; + if (ep != NULL) { + assert (strlen (ep) > 5); + rc = zmq_bind (frontend_xsub, ep); + assert (rc == 0); + } + } + + // BACKEND PUB + + void *backend_xpub = zmq_socket ( + cfg->context, + ZMQ_XPUB); // the backend is the one exposed to the external world (TCP) + assert (backend_xpub); + + int optval = 1; + rc = + zmq_setsockopt (backend_xpub, ZMQ_XPUB_NODROP, &optval, sizeof (optval)); + assert (rc == 0); + + set_hwm (backend_xpub); + + // Bind BACKEND + for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) { + const char *ep = cfg->backend_endpoint[i]; + if (ep != NULL) { + assert (strlen (ep) > 5); + rc = zmq_bind (backend_xpub, ep); + assert (rc == 0); + } + } + + // CONTROL REP + + void *control_rep = zmq_socket ( + cfg->context, + ZMQ_REP); // This one is used by the proxy to receive&reply to commands + assert (control_rep); + + // Bind CONTROL + rc = zmq_bind (control_rep, cfg->control_endpoint); + assert (rc == 0); + + // Start proxying! + + zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep); + + zmq_close (frontend_xsub); + zmq_close (backend_xpub); + zmq_close (control_rep); + printf ("proxy thread ended\n"); +} + +void terminate_proxy (const proxy_hwm_cfg_t *cfg) +{ + // CONTROL REQ + + void *control_req = zmq_socket ( + cfg->context, + ZMQ_REQ); // This one can be used to send command to the proxy + assert (control_req); + + // Connect CONTROL-REQ: a socket to which send commands + int rc = zmq_connect (control_req, cfg->control_endpoint); + assert (rc == 0); + + // Ask the proxy to exit: the subscriber has received all messages + + rc = zmq_send (control_req, "TERMINATE", 9, 0); + assert (rc == 9); + + zmq_close (control_req); +} + +// The main thread simply starts some publishers, a proxy, +// and a subscriber. Finish when all packets are received. + +int main (int argc, char *argv[]) +{ + if (argc != 3) { + printf ("usage: inproc_thr \n"); + return 1; + } + + message_size = atoi (argv[1]); + message_count = atoi (argv[2]); + printf ("message size: %d [B]\n", (int) message_size); + printf ("message count: %d\n", (int) message_count); + + void *context = zmq_ctx_new (); + assert (context); + + int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4); + assert (rv == 0); + + // START ALL SECONDARY THREADS + + const char *pub1 = "inproc://perf_pub1"; + const char *pub2 = "inproc://perf_pub2"; + const char *sub1 = "inproc://perf_backend"; + + proxy_hwm_cfg_t cfg_global = {}; + cfg_global.context = context; + cfg_global.frontend_endpoint[0] = pub1; + cfg_global.frontend_endpoint[1] = pub2; + cfg_global.backend_endpoint[0] = sub1; + cfg_global.control_endpoint = "inproc://ctrl"; + + // Proxy + proxy_hwm_cfg_t cfg_proxy = cfg_global; + void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy); + assert (proxy != 0); + + // Subscriber 1 + proxy_hwm_cfg_t cfg_sub1 = cfg_global; + cfg_sub1.thread_idx = 0; + void *subscriber = + zmq_threadstart (&subscriber_thread_main, (void *) &cfg_sub1); + assert (subscriber != 0); + + // Start measuring + void *watch = zmq_stopwatch_start (); + + // Publisher 1 + proxy_hwm_cfg_t cfg_pub1 = cfg_global; + cfg_pub1.thread_idx = 0; + void *publisher1 = + zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1); + assert (publisher1 != 0); + + // Publisher 2 + proxy_hwm_cfg_t cfg_pub2 = cfg_global; + cfg_pub2.thread_idx = 1; + void *publisher2 = + zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2); + assert (publisher2 != 0); + + // Wait for all packets to be received + zmq_threadclose (subscriber); + + // Stop measuring + unsigned long elapsed = zmq_stopwatch_stop (watch); + if (elapsed == 0) + elapsed = 1; + + unsigned long throughput = + (unsigned long) ((double) message_count / (double) elapsed * 1000000); + double megabits = (double) (throughput * message_size * 8) / 1000000; + + printf ("mean throughput: %d [msg/s]\n", (int) throughput); + printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits); + + // Wait for the end of publishers... + zmq_threadclose (publisher1); + zmq_threadclose (publisher2); + + // ... then close the proxy + terminate_proxy (&cfg_proxy); + zmq_threadclose (proxy); + + int rc = zmq_ctx_term (context); + assert (rc == 0); + + return 0; +} diff --git a/src/config.hpp b/src/config.hpp index af228a0f..8a10c803 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -70,6 +70,11 @@ enum // Maximum number of events the I/O thread can process in one go. max_io_events = 256, + // Maximal batch size of packets forwarded by a ZMQ proxy. + // Increasing this value improves throughput at the expense of + // latency and fairness. + proxy_burst_size = 1000, + // Maximal delay to process command in API thread (in CPU ticks). // 3,000,000 ticks equals to 1 - 2 milliseconds on current CPUs. // Note that delay is only applied when there is continuous stream of diff --git a/src/proxy.cpp b/src/proxy.cpp index bac93b65..6c2f0127 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -116,40 +116,49 @@ int forward (class zmq::socket_base_t *from_, class zmq::socket_base_t *capture_, zmq::msg_t *msg_) { - int more; - size_t moresz; - size_t complete_msg_size = 0; - while (true) { - int rc = from_->recv (msg_, 0); - if (unlikely (rc < 0)) - return -1; + // Forward a burst of messages + for (unsigned int i = 0; i < zmq::proxy_burst_size; i++) { + int more; + size_t moresz; + size_t complete_msg_size = 0; - complete_msg_size += msg_->size (); + // Forward all the parts of one message + while (true) { + int rc = from_->recv (msg_, ZMQ_DONTWAIT); + if (rc < 0) { + if (likely (errno == EAGAIN && i > 0)) + return 0; // End of burst + else + return -1; + } - moresz = sizeof more; - rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); - if (unlikely (rc < 0)) - return -1; + complete_msg_size += msg_->size (); - // Copy message to capture socket if any - rc = capture (capture_, msg_, more); - if (unlikely (rc < 0)) - return -1; + moresz = sizeof more; + rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); + if (unlikely (rc < 0)) + return -1; - rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0); - if (unlikely (rc < 0)) - return -1; + // Copy message to capture socket if any + rc = capture (capture_, msg_, more); + if (unlikely (rc < 0)) + return -1; - if (more == 0) - break; + rc = to_->send (msg_, more ? ZMQ_SNDMORE : 0); + if (unlikely (rc < 0)) + return -1; + + if (more == 0) + break; + } + + // A multipart message counts as 1 packet: + from_stats_->msg_in++; + from_stats_->bytes_in += complete_msg_size; + to_stats_->msg_out++; + to_stats_->bytes_out += complete_msg_size; } - // A multipart message counts as 1 packet: - from_stats_->msg_in++; - from_stats_->bytes_in += complete_msg_size; - to_stats_->msg_out++; - to_stats_->bytes_out += complete_msg_size; - return 0; } diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp index 6498cdb0..9414b1b6 100644 --- a/tests/test_proxy.cpp +++ b/tests/test_proxy.cpp @@ -103,9 +103,9 @@ static void client_task (void *db_) rc = zmq_connect (control, "inproc://control"); assert (rc == 0); - char content[CONTENT_SIZE_MAX]; + char content[CONTENT_SIZE_MAX] = {}; // Set random routing id to make tracing easier - char routing_id[ROUTING_ID_SIZE]; + char routing_id[ROUTING_ID_SIZE] = {}; sprintf (routing_id, "%04X-%04X", rand () % 0xFFFF, rand () % 0xFFFF); rc = zmq_setsockopt (client, ZMQ_ROUTING_ID, routing_id, @@ -291,8 +291,10 @@ static void server_worker (void *ctx_) rc = zmq_connect (control, "inproc://control"); assert (rc == 0); - char content[CONTENT_SIZE_MAX]; // bigger than what we need to check that - char routing_id[ROUTING_ID_SIZE_MAX]; // the size received is the size sent + char content[CONTENT_SIZE_MAX] = + {}; // bigger than what we need to check that + char routing_id[ROUTING_ID_SIZE_MAX] = + {}; // the size received is the size sent bool run = true; bool keep_sending = true;