From 04c37982b152945731188f8a149de99a72aef29c Mon Sep 17 00:00:00 2001 From: Chengye Ke Date: Sat, 15 May 2021 06:05:56 +0800 Subject: [PATCH] Support so_busy_poll (#4188) * Support so_busy_poll. --- CMakeLists.txt | 1 + builds/cmake/platform.hpp.in | 1 + doc/zmq_setsockopt.txt | 15 ++++++++++ include/zmq.h | 1 + src/options.cpp | 15 +++++++++- src/options.hpp | 3 ++ src/tcp.cpp | 18 +++++++++++ src/tcp.hpp | 2 ++ src/zmq_draft.h | 1 + tests/CMakeLists.txt | 3 ++ tests/test_busy_poll.cpp | 58 ++++++++++++++++++++++++++++++++++++ 11 files changed, 117 insertions(+), 1 deletion(-) create mode 100644 tests/test_busy_poll.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 4b71c303..b3467b1a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -555,6 +555,7 @@ else() check_cxx_symbol_exists(if_nametoindex net/if.h HAVE_IF_NAMETOINDEX) check_cxx_symbol_exists(SO_PEERCRED sys/socket.h ZMQ_HAVE_SO_PEERCRED) check_cxx_symbol_exists(LOCAL_PEERCRED sys/socket.h ZMQ_HAVE_LOCAL_PEERCRED) + check_cxx_symbol_exists(SO_BUSY_POLL sys/socket.h ZMQ_HAVE_BUSY_POLL) endif() if(NOT MINGW) diff --git a/builds/cmake/platform.hpp.in b/builds/cmake/platform.hpp.in index f7119df1..50bb8a96 100644 --- a/builds/cmake/platform.hpp.in +++ b/builds/cmake/platform.hpp.in @@ -35,6 +35,7 @@ #cmakedefine ZMQ_HAVE_SO_PEERCRED #cmakedefine ZMQ_HAVE_LOCAL_PEERCRED +#cmakedefine ZMQ_HAVE_BUSY_POLL #cmakedefine ZMQ_HAVE_O_CLOEXEC diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index a010d3d8..6d893766 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -87,6 +87,21 @@ Default value:: not set Applicable socket types:: all, when using TCP or UDP transports. +ZMQ_BUSY_POLL: This removes delays caused by the interrupt and the resultant context switch. +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Busy polling helps reduce latency in the network receive path by allowing socket layer code +to poll the receive queue of a network device, and disabling network interrupts. This removes +delays caused by the interrupt and the resultant context switch. However, it also increases +CPU utilization. Busy polling also prevents the CPU from sleeping, which can incur additional +power consumption. + +[horizontal] +Option value type:: int +Option value unit:: 0,1 +Default value:: 0 +Applicable socket types:: all + + ZMQ_CONNECT_RID: Assign the next outbound connection id ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This option name is now deprecated. Use ZMQ_CONNECT_ROUTING_ID instead. diff --git a/include/zmq.h b/include/zmq.h index 0dad49bc..419ef26b 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -683,6 +683,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); #define ZMQ_HELLO_MSG 110 #define ZMQ_DISCONNECT_MSG 111 #define ZMQ_PRIORITY 112 +#define ZMQ_BUSY_POLL 113 /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 diff --git a/src/options.cpp b/src/options.cpp index 1b7d2175..ddeda56e 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -254,7 +254,8 @@ zmq::options_t::options_t () : hello_msg (), can_send_hello_msg (false), disconnect_msg (), - can_recv_disconnect_msg (false) + can_recv_disconnect_msg (false), + busy_poll (0) { memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_secret_key, 0, CURVE_KEYSIZE); @@ -802,6 +803,12 @@ int zmq::options_t::setsockopt (int option_, } break; + case ZMQ_BUSY_POLL: + if (is_int) { + busy_poll = value; + return 0; + } + break; #ifdef ZMQ_HAVE_WSS case ZMQ_WSS_KEY_PEM: // TODO: check if valid certificate @@ -1285,6 +1292,12 @@ int zmq::options_t::getsockopt (int option_, return 0; } break; + + case ZMQ_BUSY_POLL: + if (is_int) { + *value = busy_poll; + } + break; #endif diff --git a/src/options.hpp b/src/options.hpp index 6aa8c321..6f8d37d3 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -308,6 +308,9 @@ struct options_t // Disconnect msg std::vector disconnect_msg; bool can_recv_disconnect_msg; + + // This option removes several delays caused by scheduling, interrupts and context switching. + int busy_poll; }; inline bool get_effective_conflate_option (const options_t &options) diff --git a/src/tcp.cpp b/src/tcp.cpp index c37bed02..e75c482b 100644 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -341,6 +341,21 @@ void zmq::tcp_tune_loopback_fast_path (const fd_t socket_) #endif } +void zmq::tune_tcp_busy_poll (fd_t socket_, int busy_poll_) +{ +#if defined(ZMQ_HAVE_BUSY_POLL) + if (busy_poll_ > 0) { + const int rc = + setsockopt (socket_, SOL_SOCKET, SO_BUSY_POLL, + reinterpret_cast (&busy_poll_), sizeof (int)); + assert_success_or_recoverable (socket_, rc); + } +#else + LIBZMQ_UNUSED (socket_); + LIBZMQ_UNUSED (busy_poll_); +#endif +} + zmq::fd_t zmq::tcp_open_socket (const char *address_, const zmq::options_t &options_, bool local_, @@ -398,6 +413,9 @@ zmq::fd_t zmq::tcp_open_socket (const char *address_, if (options_.rcvbuf >= 0) set_tcp_receive_buffer (s, options_.rcvbuf); + // This option removes several delays caused by scheduling, interrupts and context switching. + if (options_.busy_poll) + tune_tcp_busy_poll (s, options_.busy_poll); return s; setsockopt_error: diff --git a/src/tcp.hpp b/src/tcp.hpp index fc355a63..0b5e9759 100644 --- a/src/tcp.hpp +++ b/src/tcp.hpp @@ -68,6 +68,8 @@ int tcp_read (fd_t s_, void *data_, size_t size_); void tcp_tune_loopback_fast_path (fd_t socket_); +void tune_tcp_busy_poll (fd_t socket_, int busy_poll_); + // Resolves the given address_ string, opens a socket and sets socket options // according to the passed options_. On success, returns the socket // descriptor and assigns the resolved address to out_tcp_addr_. In case of diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 9e4c1825..4111b77c 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -69,6 +69,7 @@ #define ZMQ_HELLO_MSG 110 #define ZMQ_DISCONNECT_MSG 111 #define ZMQ_PRIORITY 112 +#define ZMQ_BUSY_POLL 113 /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 08056143..fa7496be 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -162,6 +162,9 @@ if(ENABLE_DRAFTS) test_hello_msg test_disconnect_msg ) + if(ZMQ_HAVE_BUSY_POLL) + list(APPEND tests test_busy_poll) + endif() endif() if(ZMQ_HAVE_WS) diff --git a/tests/test_busy_poll.cpp b/tests/test_busy_poll.cpp new file mode 100644 index 00000000..e0405542 --- /dev/null +++ b/tests/test_busy_poll.cpp @@ -0,0 +1,58 @@ +/* + Copyright (c) 2007-2021 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +SETUP_TEARDOWN_TESTCONTEXT + +void test_busy_poll () +{ + // Create a socket + void *socket = test_context_socket (ZMQ_DEALER); + + // set socket ZMQ_BUSY_POLL options + int busy_poll = 1; + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (socket, ZMQ_BUSY_POLL, &busy_poll, sizeof (int))); + + // bind socket + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (socket, "tcp://127.0.0.1:*")); + + // Clean up. + test_context_socket_close (socket); +} + +int main () +{ + setup_test_environment (); + UNITY_BEGIN (); + RUN_TEST (test_busy_poll); + return UNITY_END (); +}