From cfcab66c71e43c0068706772ea04a60eb1afa972 Mon Sep 17 00:00:00 2001 From: jean-airoldie <25088801+jean-airoldie@users.noreply.github.com> Date: Thu, 27 Jun 2019 00:34:56 -0400 Subject: [PATCH] Problem: {in,out}_batch_size must be configured at compiled time Solution: Added a socket option to configure them at runtime. --- doc/zmq_getsockopt.txt | 35 +++++++++++++++++++++++++++++++++++ doc/zmq_setsockopt.txt | 34 ++++++++++++++++++++++++++++++++++ include/zmq.h | 2 ++ src/config.hpp | 12 ------------ src/norm_engine.cpp | 10 +++++++--- src/norm_engine.hpp | 4 +++- src/options.cpp | 31 +++++++++++++++++++++++++++++++ src/options.hpp | 11 +++++++++++ src/pgm_socket.cpp | 6 +++--- src/stream_engine.cpp | 33 +++++++++++++++++---------------- src/zmq_draft.h | 2 ++ 11 files changed, 145 insertions(+), 35 deletions(-) diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt index 09e950af..f48e5c4f 100644 --- a/doc/zmq_getsockopt.txt +++ b/doc/zmq_getsockopt.txt @@ -922,6 +922,41 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER +ZMQ_IN_BATCH_SIZE: Maximal receive batch size +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Sets the maximal amount of messages that can be received in a single +'recv' system call. This can be used to improved throughtput at the expense of +latency and vice-versa. + +Cannot be zero. + +NOTE: in DRAFT state, not yet available in stable releases. + +[horizontal] +Option value type:: int +Option value unit:: messages +Default value:: 8192 +Applicable socket types:: All + + +ZMQ_OUT_BATCH_SIZE: Maximal send batch size +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Sets the maximal amount of messages that can be sent in a single +'send' system call. This can be used to improved throughtput at the expense of +latency and vice-versa. + +Cannot be zero. + +NOTE: in DRAFT state, not yet available in stable releases. + +[horizontal] +Option value type:: int +Option value unit:: messages +Default value:: 8192 +Applicable socket types:: All + + + RETURN VALUE ------------ The _zmq_getsockopt()_ function shall return zero if successful. Otherwise it diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 2b1e6707..8311a391 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -1362,6 +1362,40 @@ Default value:: 0 Applicable socket types:: ZMQ_ROUTER +ZMQ_IN_BATCH_SIZE: Maximal receive batch size +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Sets the maximal amount of messages that can be received in a single +'recv' system call. This can be used to improved throughtput at the expense of +latency and vice-versa. + +Cannot be zero. + +NOTE: in DRAFT state, not yet available in stable releases. + +[horizontal] +Option value type:: int +Option value unit:: messages +Default value:: 8192 +Applicable socket types:: All + + +ZMQ_OUT_BATCH_SIZE: Maximal send batch size +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Sets the maximal amount of messages that can be sent in a single +'send' system call. This can be used to improved throughtput at the expense of +latency and vice-versa. + +Cannot be zero. + +NOTE: in DRAFT state, not yet available in stable releases. + +[horizontal] +Option value type:: int +Option value unit:: messages +Default value:: 8192 +Applicable socket types:: All + + RETURN VALUE ------------ The _zmq_setsockopt()_ function shall return zero if successful. Otherwise it diff --git a/include/zmq.h b/include/zmq.h index 8c1b6854..f5713f80 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -658,6 +658,8 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); #define ZMQ_XPUB_MANUAL_LAST_VALUE 98 #define ZMQ_SOCKS_USERNAME 99 #define ZMQ_SOCKS_PASSWORD 100 +#define ZMQ_IN_BATCH_SIZE 101 +#define ZMQ_OUT_BATCH_SIZE 102 /* DRAFT Context options */ #define ZMQ_ZERO_COPY_RECV 10 diff --git a/src/config.hpp b/src/config.hpp index 8a10c803..c355c6d7 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -52,18 +52,6 @@ enum // real-time behaviour (less latency peaks). inbound_poll_rate = 100, - // Maximal batching size for engines with receiving functionality. - // So, if there are 10 messages that fit into the batch size, all of - // them may be read by a single 'recv' system call, thus avoiding - // unnecessary network stack traversals. - in_batch_size = 8192, - - // Maximal batching size for engines with sending functionality. - // So, if there are 10 messages that fit into the batch size, all of - // them may be written by a single 'send' system call, thus avoiding - // unnecessary network stack traversals. - out_batch_size = 8192, - // Maximal delta between high and low watermark. max_wm_delta = 1024, diff --git a/src/norm_engine.cpp b/src/norm_engine.cpp index 0b467763..f2166fdc 100644 --- a/src/norm_engine.cpp +++ b/src/norm_engine.cpp @@ -407,7 +407,8 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object) if (NULL == rxState) { // This is a new stream, so create rxState with zmq decoder, etc rxState = new (std::nothrow) - NormRxStreamState (object, options.maxmsgsize, options.zero_copy); + NormRxStreamState (object, options.maxmsgsize, options.zero_copy, + options.in_batch_size); errno_assert (rxState); if (!rxState->Init ()) { @@ -548,10 +549,14 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object) } // end zmq::norm_engine_t::recv_data() zmq::norm_engine_t::NormRxStreamState::NormRxStreamState ( - NormObjectHandle normStream, int64_t maxMsgSize, bool zeroCopy) : + NormObjectHandle normStream, + int64_t maxMsgSize, + bool zeroCopy, + int inBatchSize) : norm_stream (normStream), max_msg_size (maxMsgSize), zero_copy (zeroCopy), + in_batch_size (inBatchSize), in_sync (false), rx_ready (false), zmq_decoder (NULL), @@ -583,7 +588,6 @@ bool zmq::norm_engine_t::NormRxStreamState::Init () skip_norm_sync = false; if (NULL != zmq_decoder) delete zmq_decoder; - // Note "in_batch_size" comes from config.h zmq_decoder = new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy); alloc_assert (zmq_decoder); diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp index f4596bd2..46adcaf9 100644 --- a/src/norm_engine.hpp +++ b/src/norm_engine.hpp @@ -71,7 +71,8 @@ class norm_engine_t : public io_object_t, public i_engine public: NormRxStreamState (NormObjectHandle normStream, int64_t maxMsgSize, - bool zeroCopy); + bool zeroCopy, + int inBatchSize); ~NormRxStreamState (); NormObjectHandle GetStreamHandle () const { return norm_stream; } @@ -136,6 +137,7 @@ class norm_engine_t : public io_object_t, public i_engine NormObjectHandle norm_stream; int64_t max_msg_size; bool zero_copy; + int in_batch_size; bool in_sync; bool rx_ready; v2_decoder_t *zmq_decoder; diff --git a/src/options.cpp b/src/options.cpp index 705cf888..c95b63a9 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -243,6 +243,8 @@ zmq::options_t::options_t () : zap_enforce_domain (false), loopback_fastpath (false), multicast_loop (true), + in_batch_size (8192), + out_batch_size (8192), zero_copy (true), router_notify (0), monitor_event_version (1) @@ -768,6 +770,22 @@ int zmq::options_t::setsockopt (int option_, return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_, &multicast_loop); +#ifdef ZMQ_BUILD_DRAFT_API + case ZMQ_IN_BATCH_SIZE: + if (is_int && value > 0) { + in_batch_size = value; + return 0; + } + break; + + case ZMQ_OUT_BATCH_SIZE: + if (is_int && value > 0) { + out_batch_size = value; + return 0; + } + break; +#endif + default: #if defined(ZMQ_ACT_MILITANT) // There are valid scenarios for probing with unknown socket option @@ -1184,6 +1202,19 @@ int zmq::options_t::getsockopt (int option_, return 0; } break; + case ZMQ_IN_BATCH_SIZE: + if (is_int) { + *value = in_batch_size; + return 0; + } + break; + + case ZMQ_OUT_BATCH_SIZE: + if (is_int) { + *value = out_batch_size; + return 0; + } + break; #endif diff --git a/src/options.hpp b/src/options.hpp index e15d642d..8c87d8f5 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -264,6 +264,17 @@ struct options_t // Loop sent multicast packets to local sockets bool multicast_loop; + // Maximal batching size for engines with receiving functionality. + // So, if there are 10 messages that fit into the batch size, all of + // them may be read by a single 'recv' system call, thus avoiding + // unnecessary network stack traversals. + int in_batch_size; + // Maximal batching size for engines with sending functionality. + // So, if there are 10 messages that fit into the batch size, all of + // them may be written by a single 'send' system call, thus avoiding + // unnecessary network stack traversals. + int out_batch_size; + // Use zero copy strategy for storing message content when decoding. bool zero_copy; diff --git a/src/pgm_socket.cpp b/src/pgm_socket.cpp index 33575708..e3399db1 100644 --- a/src/pgm_socket.cpp +++ b/src/pgm_socket.cpp @@ -346,10 +346,10 @@ int zmq::pgm_socket_t::init (bool udp_encapsulation_, const char *network_) // For receiver transport preallocate pgm_msgv array. if (receiver) { - zmq_assert (in_batch_size > 0); + zmq_assert (options.in_batch_size > 0); size_t max_tsdu_size = get_max_tsdu_size (); - pgm_msgv_len = (int) in_batch_size / max_tsdu_size; - if ((int) in_batch_size % max_tsdu_size) + pgm_msgv_len = (int) options.in_batch_size / max_tsdu_size; + if ((int) options.in_batch_size % max_tsdu_size) pgm_msgv_len++; zmq_assert (pgm_msgv_len); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 32dd9811..63a278f1 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -207,10 +207,10 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_, if (_options.raw_socket) { // no handshaking for raw sock, instantiate raw encoder and decoders - _encoder = new (std::nothrow) raw_encoder_t (out_batch_size); + _encoder = new (std::nothrow) raw_encoder_t (_options.out_batch_size); alloc_assert (_encoder); - _decoder = new (std::nothrow) raw_decoder_t (in_batch_size); + _decoder = new (std::nothrow) raw_decoder_t (_options.in_batch_size); alloc_assert (_decoder); // disable handshaking for raw socket @@ -399,12 +399,13 @@ void zmq::stream_engine_t::out_event () _outpos = NULL; _outsize = _encoder->encode (&_outpos, 0); - while (_outsize < static_cast (out_batch_size)) { + while (_outsize < static_cast (_options.out_batch_size)) { if ((this->*_next_msg) (&_tx_msg) == -1) break; _encoder->load_msg (&_tx_msg); unsigned char *bufptr = _outpos + _outsize; - size_t n = _encoder->encode (&bufptr, out_batch_size - _outsize); + size_t n = + _encoder->encode (&bufptr, _options.out_batch_size - _outsize); zmq_assert (n > 0); if (_outpos == NULL) _outpos = bufptr; @@ -664,11 +665,11 @@ bool zmq::stream_engine_t::handshake_v1_0_unversioned () return false; } - _encoder = new (std::nothrow) v1_encoder_t (out_batch_size); + _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size); alloc_assert (_encoder); - _decoder = - new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize); + _decoder = new (std::nothrow) + v1_decoder_t (_options.in_batch_size, _options.maxmsgsize); alloc_assert (_decoder); // We have already sent the message header. @@ -716,11 +717,11 @@ bool zmq::stream_engine_t::handshake_v1_0 () return false; } - _encoder = new (std::nothrow) v1_encoder_t (out_batch_size); + _encoder = new (std::nothrow) v1_encoder_t (_options.out_batch_size); alloc_assert (_encoder); - _decoder = - new (std::nothrow) v1_decoder_t (in_batch_size, _options.maxmsgsize); + _decoder = new (std::nothrow) + v1_decoder_t (_options.in_batch_size, _options.maxmsgsize); alloc_assert (_decoder); return true; @@ -734,11 +735,11 @@ bool zmq::stream_engine_t::handshake_v2_0 () return false; } - _encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); alloc_assert (_encoder); - _decoder = new (std::nothrow) - v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy); + _decoder = new (std::nothrow) v2_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); alloc_assert (_decoder); return true; @@ -746,11 +747,11 @@ bool zmq::stream_engine_t::handshake_v2_0 () bool zmq::stream_engine_t::handshake_v3_0 () { - _encoder = new (std::nothrow) v2_encoder_t (out_batch_size); + _encoder = new (std::nothrow) v2_encoder_t (_options.out_batch_size); alloc_assert (_encoder); - _decoder = new (std::nothrow) - v2_decoder_t (in_batch_size, _options.maxmsgsize, _options.zero_copy); + _decoder = new (std::nothrow) v2_decoder_t ( + _options.in_batch_size, _options.maxmsgsize, _options.zero_copy); alloc_assert (_decoder); if (_options.mechanism == ZMQ_NULL diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 6faa5029..64c8f2de 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -55,6 +55,8 @@ #define ZMQ_XPUB_MANUAL_LAST_VALUE 98 #define ZMQ_SOCKS_USERNAME 99 #define ZMQ_SOCKS_PASSWORD 100 +#define ZMQ_IN_BATCH_SIZE 101 +#define ZMQ_OUT_BATCH_SIZE 102 /* DRAFT Context options */ #define ZMQ_ZERO_COPY_RECV 10