diff --git a/doc/zmq_ctx_get.txt b/doc/zmq_ctx_get.txt index 128b3cae..74428475 100644 --- a/doc/zmq_ctx_get.txt +++ b/doc/zmq_ctx_get.txt @@ -39,6 +39,12 @@ The 'ZMQ_MAX_MSGSZ' argument returns the maximum size of a message allowed for this context. Default value is INT_MAX. +ZMQ_ZERO_COPY_RCV: Get message decoding strategy +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_ZERO_COPY_RCV' argument return whether message decoder uses a zero copy +strategy when receiving messages. Default value is 1. + + ZMQ_SOCKET_LIMIT: Get largest configurable number of sockets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_SOCKET_LIMIT' argument returns the largest number of sockets that diff --git a/doc/zmq_ctx_set.txt b/doc/zmq_ctx_set.txt index 7f9345cd..f461b8e2 100644 --- a/doc/zmq_ctx_set.txt +++ b/doc/zmq_ctx_set.txt @@ -127,6 +127,18 @@ Default value:: INT_MAX Maximum value:: INT_MAX +ZMQ_ZERO_COPY_RCV: Specify message decoding strategy +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_ZERO_COPY_RCV' argument specifies whether the message decoder should +use a zero copy strategy when receiving messages. The zero copy strategy can +lead to increased memory usage in some cases. This option allows you to use the +older copying strategy. You can query the value of this option with +linkzmq:zmq_ctx_get[3] using the 'ZMQ_ZERO_COPY_RECV' option. + +[horizontal] +Default value:: 1 + + ZMQ_MAX_SOCKETS: Set maximum number of sockets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed diff --git a/include/zmq.h b/include/zmq.h index 84208864..5791b9a7 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -628,6 +628,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread); #define ZMQ_THREAD_AFFINITY_CPU_ADD 7 #define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8 #define ZMQ_THREAD_NAME_PREFIX 9 +#define ZMQ_ZERO_COPY_RECV 10 /* DRAFT Socket methods. */ ZMQ_EXPORT int zmq_join (void *s, const char *group); diff --git a/src/ctx.cpp b/src/ctx.cpp index 26065ba9..eb272930 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -76,7 +76,8 @@ zmq::ctx_t::ctx_t () : max_msgsz (INT_MAX), io_thread_count (ZMQ_IO_THREADS_DFLT), blocky (true), - ipv6 (false) + ipv6 (false), + zero_copy (true) { #ifdef HAVE_FORK pid = getpid (); @@ -245,6 +246,9 @@ int zmq::ctx_t::set (int option_, int optval_) } else if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) { scoped_lock_t locker (opt_sync); max_msgsz = optval_ < INT_MAX ? optval_ : INT_MAX; + } else if (option_ == ZMQ_ZERO_COPY_RECV && optval_ >= 0) { + scoped_lock_t locker (opt_sync); + zero_copy = (optval_ != 0); } else { rc = thread_ctx_t::set (option_, optval_); } @@ -268,7 +272,9 @@ int zmq::ctx_t::get (int option_) rc = max_msgsz; else if (option_ == ZMQ_MSG_T_SIZE) rc = sizeof (zmq_msg_t); - else { + else if (option_ == ZMQ_ZERO_COPY_RECV) { + rc = zero_copy; + } else { errno = EINVAL; rc = -1; } diff --git a/src/ctx.hpp b/src/ctx.hpp index f7aafe9c..c44b7809 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -234,6 +234,9 @@ class ctx_t : public thread_ctx_t // Is IPv6 enabled on this context? bool ipv6; + // Should we use zero copy message decoding in this context? + bool zero_copy; + ctx_t (const ctx_t &); const ctx_t &operator= (const ctx_t &); diff --git a/src/norm_engine.cpp b/src/norm_engine.cpp index bb8d873a..bd9810af 100644 --- a/src/norm_engine.cpp +++ b/src/norm_engine.cpp @@ -405,8 +405,8 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object) (NormRxStreamState *) NormObjectGetUserData (object); if (NULL == rxState) { // This is a new stream, so create rxState with zmq decoder, etc - rxState = - new (std::nothrow) NormRxStreamState (object, options.maxmsgsize); + rxState = new (std::nothrow) + NormRxStreamState (object, options.maxmsgsize, options.zero_copy); errno_assert (rxState); if (!rxState->Init ()) { @@ -547,9 +547,10 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object) } // end zmq::norm_engine_t::recv_data() zmq::norm_engine_t::NormRxStreamState::NormRxStreamState ( - NormObjectHandle normStream, int64_t maxMsgSize) : + NormObjectHandle normStream, int64_t maxMsgSize, bool zeroCopy) : norm_stream (normStream), max_msg_size (maxMsgSize), + zero_copy (zeroCopy), in_sync (false), rx_ready (false), zmq_decoder (NULL), @@ -582,7 +583,8 @@ bool zmq::norm_engine_t::NormRxStreamState::Init () if (NULL != zmq_decoder) delete zmq_decoder; // Note "in_batch_size" comes from config.h - zmq_decoder = new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size); + zmq_decoder = + new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy); alloc_assert (zmq_decoder); if (NULL != zmq_decoder) { buffer_count = 0; diff --git a/src/norm_engine.hpp b/src/norm_engine.hpp index f932e373..c3ce9296 100644 --- a/src/norm_engine.hpp +++ b/src/norm_engine.hpp @@ -68,7 +68,9 @@ class norm_engine_t : public io_object_t, public i_engine class NormRxStreamState { public: - NormRxStreamState (NormObjectHandle normStream, int64_t maxMsgSize); + NormRxStreamState (NormObjectHandle normStream, + int64_t maxMsgSize, + bool zeroCopy); ~NormRxStreamState (); NormObjectHandle GetStreamHandle () const { return norm_stream; } @@ -132,6 +134,7 @@ class norm_engine_t : public io_object_t, public i_engine private: NormObjectHandle norm_stream; int64_t max_msg_size; + bool zero_copy; bool in_sync; bool rx_ready; v2_decoder_t *zmq_decoder; diff --git a/src/options.cpp b/src/options.cpp index 643ca6ce..60f85ca7 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -91,7 +91,8 @@ zmq::options_t::options_t () : heartbeat_timeout (-1), use_fd (-1), zap_enforce_domain (false), - loopback_fastpath (false) + loopback_fastpath (false), + zero_copy (true) { memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_secret_key, 0, CURVE_KEYSIZE); diff --git a/src/options.hpp b/src/options.hpp index 14a6b815..bd9ebdd5 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -251,6 +251,9 @@ struct options_t // Use of loopback fastpath. bool loopback_fastpath; + + // Use zero copy strategy for storing message content when decoding. + bool zero_copy; }; } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 11962091..b28a8e3a 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -206,6 +206,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, options.socket_id = sid_; options.ipv6 = (parent_->get (ZMQ_IPV6) != 0); options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0); + options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV); if (thread_safe) { mailbox = new (std::nothrow) mailbox_safe_t (&sync); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index d594f2c6..02e8564b 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -636,15 +636,15 @@ bool zmq::stream_engine_t::handshake () encoder = new (std::nothrow) v2_encoder_t (out_batch_size); alloc_assert (encoder); - decoder = - new (std::nothrow) v2_decoder_t (in_batch_size, options.maxmsgsize); + decoder = new (std::nothrow) + v2_decoder_t (in_batch_size, options.maxmsgsize, options.zero_copy); alloc_assert (decoder); } else { encoder = new (std::nothrow) v2_encoder_t (out_batch_size); alloc_assert (encoder); - decoder = - new (std::nothrow) v2_decoder_t (in_batch_size, options.maxmsgsize); + decoder = new (std::nothrow) + v2_decoder_t (in_batch_size, options.maxmsgsize, options.zero_copy); alloc_assert (decoder); if (options.mechanism == ZMQ_NULL diff --git a/src/v2_decoder.cpp b/src/v2_decoder.cpp index 6d3e503f..839be9ab 100644 --- a/src/v2_decoder.cpp +++ b/src/v2_decoder.cpp @@ -38,10 +38,13 @@ #include "wire.hpp" #include "err.hpp" -zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) : +zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, + int64_t maxmsgsize_, + bool zero_copy_) : shared_message_memory_allocator (bufsize_), decoder_base_t (this), msg_flags (0), + zero_copy (zero_copy_), maxmsgsize (maxmsgsize_) { int rc = in_progress.init (); @@ -111,8 +114,9 @@ int zmq::v2_decoder_t::size_ready (uint64_t msg_size, // the current message can exceed the current buffer. We have to copy the buffer // data into a new message and complete it in the next receive. - if (unlikely ((unsigned char *) read_pos + msg_size - > (data () + size ()))) { + if (unlikely ( + !zero_copy + || ((unsigned char *) read_pos + msg_size > (data () + size ())))) { // a new message has started, but the size would exceed the pre-allocated arena // this happens every time when a message does not fit completely into the buffer rc = in_progress.init_size (static_cast (msg_size)); diff --git a/src/v2_decoder.hpp b/src/v2_decoder.hpp index 3c47204e..30a4dd2b 100644 --- a/src/v2_decoder.hpp +++ b/src/v2_decoder.hpp @@ -44,7 +44,7 @@ class v2_decoder_t : public decoder_base_t { public: - v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_); + v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_, bool zero_copy_); virtual ~v2_decoder_t (); // i_decoder interface. @@ -62,6 +62,7 @@ class v2_decoder_t : unsigned char msg_flags; msg_t in_progress; + const bool zero_copy; const int64_t maxmsgsize; v2_decoder_t (const v2_decoder_t &); diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 5dd6201b..adc1e895 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -99,6 +99,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_); #define ZMQ_THREAD_AFFINITY_CPU_ADD 7 #define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8 #define ZMQ_THREAD_NAME_PREFIX 9 +#define ZMQ_ZERO_COPY_RECV 10 /* DRAFT Socket methods. */ int zmq_join (void *s, const char *group); diff --git a/tests/test_ctx_options.cpp b/tests/test_ctx_options.cpp index 4fbd8da2..c267f925 100644 --- a/tests/test_ctx_options.cpp +++ b/tests/test_ctx_options.cpp @@ -146,6 +146,56 @@ void test_ctx_thread_opts (void *ctx) #endif } +void test_ctx_zero_copy (void *ctx) +{ +#ifdef ZMQ_ZERO_COPY_RECV + int zero_copy; + // Default value is 1. + zero_copy = zmq_ctx_get (ctx, ZMQ_ZERO_COPY_RECV); + assert (zero_copy == 1); + + // Test we can set it to 0. + assert (0 == zmq_ctx_set (ctx, ZMQ_ZERO_COPY_RECV, 0)); + zero_copy = zmq_ctx_get (ctx, ZMQ_ZERO_COPY_RECV); + assert (zero_copy == 0); + + // Create a TCP socket pair using the context and test that messages can be + // received. Note that inproc sockets cannot be used for this test. + void *pull = zmq_socket (ctx, ZMQ_PULL); + assert (0 == zmq_bind (pull, "tcp://127.0.0.1:*")); + + void *push = zmq_socket (ctx, ZMQ_PUSH); + size_t endpoint_len = MAX_SOCKET_STRING; + char endpoint[MAX_SOCKET_STRING]; + assert ( + 0 == zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len)); + assert (0 == zmq_connect (push, endpoint)); + + const char *small_str = "abcd"; + const char *large_str = + "01234567890123456789012345678901234567890123456789"; + + assert (4 == zmq_send (push, (void *) small_str, 4, 0)); + assert (40 == zmq_send (push, (void *) large_str, 40, 0)); + + zmq_msg_t small_msg, large_msg; + zmq_msg_init (&small_msg); + zmq_msg_init (&large_msg); + assert (4 == zmq_msg_recv (&small_msg, pull, 0)); + assert (40 == zmq_msg_recv (&large_msg, pull, 0)); + assert (!strncmp (small_str, (const char *) zmq_msg_data (&small_msg), 4)); + assert (!strncmp (large_str, (const char *) zmq_msg_data (&large_msg), 40)); + + // Clean up. + assert (0 == zmq_close (push)); + assert (0 == zmq_close (pull)); + assert (0 == zmq_msg_close (&small_msg)); + assert (0 == zmq_msg_close (&large_msg)); + assert (0 == zmq_ctx_set (ctx, ZMQ_ZERO_COPY_RECV, 1)); + zero_copy = zmq_ctx_get (ctx, ZMQ_ZERO_COPY_RECV); + assert (zero_copy == 1); +#endif +} int main (void) { @@ -173,6 +223,7 @@ int main (void) assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1); test_ctx_thread_opts (ctx); + test_ctx_zero_copy (ctx); void *router = zmq_socket (ctx, ZMQ_ROUTER); int value; diff --git a/tests/testutil.hpp b/tests/testutil.hpp index 296a6cd8..9ab92331 100644 --- a/tests/testutil.hpp +++ b/tests/testutil.hpp @@ -43,7 +43,7 @@ // settled. Tested to work reliably at 1 msec on a fast PC. #define SETTLE_TIME 300 // In msec // Commonly used buffer size for ZMQ_LAST_ENDPOINT -#define MAX_SOCKET_STRING sizeof ("tcp://127.0.0.1:65536") +#define MAX_SOCKET_STRING sizeof ("tcp://[::ffff:127.127.127.127]:65536") // We need to test codepaths with non-random bind ports. List them here to // keep them unique, to allow parallel test runs.