mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-31 01:43:02 +08:00
Merge pull request #2969 from skaes/master
Problem: enormous memory increase due to zero copy decoding
This commit is contained in:
commit
4ea1e78d9d
@ -39,6 +39,12 @@ The 'ZMQ_MAX_MSGSZ' argument returns the maximum size of a message
|
||||
allowed for this context. Default value is INT_MAX.
|
||||
|
||||
|
||||
ZMQ_ZERO_COPY_RCV: Get message decoding strategy
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_ZERO_COPY_RCV' argument return whether message decoder uses a zero copy
|
||||
strategy when receiving messages. Default value is 1.
|
||||
|
||||
|
||||
ZMQ_SOCKET_LIMIT: Get largest configurable number of sockets
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_SOCKET_LIMIT' argument returns the largest number of sockets that
|
||||
|
@ -127,6 +127,18 @@ Default value:: INT_MAX
|
||||
Maximum value:: INT_MAX
|
||||
|
||||
|
||||
ZMQ_ZERO_COPY_RCV: Specify message decoding strategy
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_ZERO_COPY_RCV' argument specifies whether the message decoder should
|
||||
use a zero copy strategy when receiving messages. The zero copy strategy can
|
||||
lead to increased memory usage in some cases. This option allows you to use the
|
||||
older copying strategy. You can query the value of this option with
|
||||
linkzmq:zmq_ctx_get[3] using the 'ZMQ_ZERO_COPY_RECV' option.
|
||||
|
||||
[horizontal]
|
||||
Default value:: 1
|
||||
|
||||
|
||||
ZMQ_MAX_SOCKETS: Set maximum number of sockets
|
||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||
The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed
|
||||
|
@ -628,6 +628,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread);
|
||||
#define ZMQ_THREAD_AFFINITY_CPU_ADD 7
|
||||
#define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8
|
||||
#define ZMQ_THREAD_NAME_PREFIX 9
|
||||
#define ZMQ_ZERO_COPY_RECV 10
|
||||
|
||||
/* DRAFT Socket methods. */
|
||||
ZMQ_EXPORT int zmq_join (void *s, const char *group);
|
||||
|
10
src/ctx.cpp
10
src/ctx.cpp
@ -76,7 +76,8 @@ zmq::ctx_t::ctx_t () :
|
||||
max_msgsz (INT_MAX),
|
||||
io_thread_count (ZMQ_IO_THREADS_DFLT),
|
||||
blocky (true),
|
||||
ipv6 (false)
|
||||
ipv6 (false),
|
||||
zero_copy (true)
|
||||
{
|
||||
#ifdef HAVE_FORK
|
||||
pid = getpid ();
|
||||
@ -245,6 +246,9 @@ int zmq::ctx_t::set (int option_, int optval_)
|
||||
} else if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
|
||||
scoped_lock_t locker (opt_sync);
|
||||
max_msgsz = optval_ < INT_MAX ? optval_ : INT_MAX;
|
||||
} else if (option_ == ZMQ_ZERO_COPY_RECV && optval_ >= 0) {
|
||||
scoped_lock_t locker (opt_sync);
|
||||
zero_copy = (optval_ != 0);
|
||||
} else {
|
||||
rc = thread_ctx_t::set (option_, optval_);
|
||||
}
|
||||
@ -268,7 +272,9 @@ int zmq::ctx_t::get (int option_)
|
||||
rc = max_msgsz;
|
||||
else if (option_ == ZMQ_MSG_T_SIZE)
|
||||
rc = sizeof (zmq_msg_t);
|
||||
else {
|
||||
else if (option_ == ZMQ_ZERO_COPY_RECV) {
|
||||
rc = zero_copy;
|
||||
} else {
|
||||
errno = EINVAL;
|
||||
rc = -1;
|
||||
}
|
||||
|
@ -234,6 +234,9 @@ class ctx_t : public thread_ctx_t
|
||||
// Is IPv6 enabled on this context?
|
||||
bool ipv6;
|
||||
|
||||
// Should we use zero copy message decoding in this context?
|
||||
bool zero_copy;
|
||||
|
||||
ctx_t (const ctx_t &);
|
||||
const ctx_t &operator= (const ctx_t &);
|
||||
|
||||
|
@ -405,8 +405,8 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object)
|
||||
(NormRxStreamState *) NormObjectGetUserData (object);
|
||||
if (NULL == rxState) {
|
||||
// This is a new stream, so create rxState with zmq decoder, etc
|
||||
rxState =
|
||||
new (std::nothrow) NormRxStreamState (object, options.maxmsgsize);
|
||||
rxState = new (std::nothrow)
|
||||
NormRxStreamState (object, options.maxmsgsize, options.zero_copy);
|
||||
errno_assert (rxState);
|
||||
|
||||
if (!rxState->Init ()) {
|
||||
@ -547,9 +547,10 @@ void zmq::norm_engine_t::recv_data (NormObjectHandle object)
|
||||
} // end zmq::norm_engine_t::recv_data()
|
||||
|
||||
zmq::norm_engine_t::NormRxStreamState::NormRxStreamState (
|
||||
NormObjectHandle normStream, int64_t maxMsgSize) :
|
||||
NormObjectHandle normStream, int64_t maxMsgSize, bool zeroCopy) :
|
||||
norm_stream (normStream),
|
||||
max_msg_size (maxMsgSize),
|
||||
zero_copy (zeroCopy),
|
||||
in_sync (false),
|
||||
rx_ready (false),
|
||||
zmq_decoder (NULL),
|
||||
@ -582,7 +583,8 @@ bool zmq::norm_engine_t::NormRxStreamState::Init ()
|
||||
if (NULL != zmq_decoder)
|
||||
delete zmq_decoder;
|
||||
// Note "in_batch_size" comes from config.h
|
||||
zmq_decoder = new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size);
|
||||
zmq_decoder =
|
||||
new (std::nothrow) v2_decoder_t (in_batch_size, max_msg_size, zero_copy);
|
||||
alloc_assert (zmq_decoder);
|
||||
if (NULL != zmq_decoder) {
|
||||
buffer_count = 0;
|
||||
|
@ -68,7 +68,9 @@ class norm_engine_t : public io_object_t, public i_engine
|
||||
class NormRxStreamState
|
||||
{
|
||||
public:
|
||||
NormRxStreamState (NormObjectHandle normStream, int64_t maxMsgSize);
|
||||
NormRxStreamState (NormObjectHandle normStream,
|
||||
int64_t maxMsgSize,
|
||||
bool zeroCopy);
|
||||
~NormRxStreamState ();
|
||||
|
||||
NormObjectHandle GetStreamHandle () const { return norm_stream; }
|
||||
@ -132,6 +134,7 @@ class norm_engine_t : public io_object_t, public i_engine
|
||||
private:
|
||||
NormObjectHandle norm_stream;
|
||||
int64_t max_msg_size;
|
||||
bool zero_copy;
|
||||
bool in_sync;
|
||||
bool rx_ready;
|
||||
v2_decoder_t *zmq_decoder;
|
||||
|
@ -91,7 +91,8 @@ zmq::options_t::options_t () :
|
||||
heartbeat_timeout (-1),
|
||||
use_fd (-1),
|
||||
zap_enforce_domain (false),
|
||||
loopback_fastpath (false)
|
||||
loopback_fastpath (false),
|
||||
zero_copy (true)
|
||||
{
|
||||
memset (curve_public_key, 0, CURVE_KEYSIZE);
|
||||
memset (curve_secret_key, 0, CURVE_KEYSIZE);
|
||||
|
@ -251,6 +251,9 @@ struct options_t
|
||||
|
||||
// Use of loopback fastpath.
|
||||
bool loopback_fastpath;
|
||||
|
||||
// Use zero copy strategy for storing message content when decoding.
|
||||
bool zero_copy;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -206,6 +206,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_,
|
||||
options.socket_id = sid_;
|
||||
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
|
||||
options.linger.store (parent_->get (ZMQ_BLOCKY) ? -1 : 0);
|
||||
options.zero_copy = parent_->get (ZMQ_ZERO_COPY_RECV);
|
||||
|
||||
if (thread_safe) {
|
||||
mailbox = new (std::nothrow) mailbox_safe_t (&sync);
|
||||
|
@ -636,15 +636,15 @@ bool zmq::stream_engine_t::handshake ()
|
||||
encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
|
||||
alloc_assert (encoder);
|
||||
|
||||
decoder =
|
||||
new (std::nothrow) v2_decoder_t (in_batch_size, options.maxmsgsize);
|
||||
decoder = new (std::nothrow)
|
||||
v2_decoder_t (in_batch_size, options.maxmsgsize, options.zero_copy);
|
||||
alloc_assert (decoder);
|
||||
} else {
|
||||
encoder = new (std::nothrow) v2_encoder_t (out_batch_size);
|
||||
alloc_assert (encoder);
|
||||
|
||||
decoder =
|
||||
new (std::nothrow) v2_decoder_t (in_batch_size, options.maxmsgsize);
|
||||
decoder = new (std::nothrow)
|
||||
v2_decoder_t (in_batch_size, options.maxmsgsize, options.zero_copy);
|
||||
alloc_assert (decoder);
|
||||
|
||||
if (options.mechanism == ZMQ_NULL
|
||||
|
@ -38,10 +38,13 @@
|
||||
#include "wire.hpp"
|
||||
#include "err.hpp"
|
||||
|
||||
zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_) :
|
||||
zmq::v2_decoder_t::v2_decoder_t (size_t bufsize_,
|
||||
int64_t maxmsgsize_,
|
||||
bool zero_copy_) :
|
||||
shared_message_memory_allocator (bufsize_),
|
||||
decoder_base_t<v2_decoder_t, shared_message_memory_allocator> (this),
|
||||
msg_flags (0),
|
||||
zero_copy (zero_copy_),
|
||||
maxmsgsize (maxmsgsize_)
|
||||
{
|
||||
int rc = in_progress.init ();
|
||||
@ -111,8 +114,9 @@ int zmq::v2_decoder_t::size_ready (uint64_t msg_size,
|
||||
// the current message can exceed the current buffer. We have to copy the buffer
|
||||
// data into a new message and complete it in the next receive.
|
||||
|
||||
if (unlikely ((unsigned char *) read_pos + msg_size
|
||||
> (data () + size ()))) {
|
||||
if (unlikely (
|
||||
!zero_copy
|
||||
|| ((unsigned char *) read_pos + msg_size > (data () + size ())))) {
|
||||
// a new message has started, but the size would exceed the pre-allocated arena
|
||||
// this happens every time when a message does not fit completely into the buffer
|
||||
rc = in_progress.init_size (static_cast<size_t> (msg_size));
|
||||
|
@ -44,7 +44,7 @@ class v2_decoder_t :
|
||||
public decoder_base_t<v2_decoder_t, shared_message_memory_allocator>
|
||||
{
|
||||
public:
|
||||
v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_);
|
||||
v2_decoder_t (size_t bufsize_, int64_t maxmsgsize_, bool zero_copy_);
|
||||
virtual ~v2_decoder_t ();
|
||||
|
||||
// i_decoder interface.
|
||||
@ -62,6 +62,7 @@ class v2_decoder_t :
|
||||
unsigned char msg_flags;
|
||||
msg_t in_progress;
|
||||
|
||||
const bool zero_copy;
|
||||
const int64_t maxmsgsize;
|
||||
|
||||
v2_decoder_t (const v2_decoder_t &);
|
||||
|
@ -99,6 +99,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_);
|
||||
#define ZMQ_THREAD_AFFINITY_CPU_ADD 7
|
||||
#define ZMQ_THREAD_AFFINITY_CPU_REMOVE 8
|
||||
#define ZMQ_THREAD_NAME_PREFIX 9
|
||||
#define ZMQ_ZERO_COPY_RECV 10
|
||||
|
||||
/* DRAFT Socket methods. */
|
||||
int zmq_join (void *s, const char *group);
|
||||
|
@ -146,6 +146,56 @@ void test_ctx_thread_opts (void *ctx)
|
||||
#endif
|
||||
}
|
||||
|
||||
void test_ctx_zero_copy (void *ctx)
|
||||
{
|
||||
#ifdef ZMQ_ZERO_COPY_RECV
|
||||
int zero_copy;
|
||||
// Default value is 1.
|
||||
zero_copy = zmq_ctx_get (ctx, ZMQ_ZERO_COPY_RECV);
|
||||
assert (zero_copy == 1);
|
||||
|
||||
// Test we can set it to 0.
|
||||
assert (0 == zmq_ctx_set (ctx, ZMQ_ZERO_COPY_RECV, 0));
|
||||
zero_copy = zmq_ctx_get (ctx, ZMQ_ZERO_COPY_RECV);
|
||||
assert (zero_copy == 0);
|
||||
|
||||
// Create a TCP socket pair using the context and test that messages can be
|
||||
// received. Note that inproc sockets cannot be used for this test.
|
||||
void *pull = zmq_socket (ctx, ZMQ_PULL);
|
||||
assert (0 == zmq_bind (pull, "tcp://127.0.0.1:*"));
|
||||
|
||||
void *push = zmq_socket (ctx, ZMQ_PUSH);
|
||||
size_t endpoint_len = MAX_SOCKET_STRING;
|
||||
char endpoint[MAX_SOCKET_STRING];
|
||||
assert (
|
||||
0 == zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len));
|
||||
assert (0 == zmq_connect (push, endpoint));
|
||||
|
||||
const char *small_str = "abcd";
|
||||
const char *large_str =
|
||||
"01234567890123456789012345678901234567890123456789";
|
||||
|
||||
assert (4 == zmq_send (push, (void *) small_str, 4, 0));
|
||||
assert (40 == zmq_send (push, (void *) large_str, 40, 0));
|
||||
|
||||
zmq_msg_t small_msg, large_msg;
|
||||
zmq_msg_init (&small_msg);
|
||||
zmq_msg_init (&large_msg);
|
||||
assert (4 == zmq_msg_recv (&small_msg, pull, 0));
|
||||
assert (40 == zmq_msg_recv (&large_msg, pull, 0));
|
||||
assert (!strncmp (small_str, (const char *) zmq_msg_data (&small_msg), 4));
|
||||
assert (!strncmp (large_str, (const char *) zmq_msg_data (&large_msg), 40));
|
||||
|
||||
// Clean up.
|
||||
assert (0 == zmq_close (push));
|
||||
assert (0 == zmq_close (pull));
|
||||
assert (0 == zmq_msg_close (&small_msg));
|
||||
assert (0 == zmq_msg_close (&large_msg));
|
||||
assert (0 == zmq_ctx_set (ctx, ZMQ_ZERO_COPY_RECV, 1));
|
||||
zero_copy = zmq_ctx_get (ctx, ZMQ_ZERO_COPY_RECV);
|
||||
assert (zero_copy == 1);
|
||||
#endif
|
||||
}
|
||||
|
||||
int main (void)
|
||||
{
|
||||
@ -173,6 +223,7 @@ int main (void)
|
||||
assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1);
|
||||
|
||||
test_ctx_thread_opts (ctx);
|
||||
test_ctx_zero_copy (ctx);
|
||||
|
||||
void *router = zmq_socket (ctx, ZMQ_ROUTER);
|
||||
int value;
|
||||
|
@ -43,7 +43,7 @@
|
||||
// settled. Tested to work reliably at 1 msec on a fast PC.
|
||||
#define SETTLE_TIME 300 // In msec
|
||||
// Commonly used buffer size for ZMQ_LAST_ENDPOINT
|
||||
#define MAX_SOCKET_STRING sizeof ("tcp://127.0.0.1:65536")
|
||||
#define MAX_SOCKET_STRING sizeof ("tcp://[::ffff:127.127.127.127]:65536")
|
||||
|
||||
// We need to test codepaths with non-random bind ports. List them here to
|
||||
// keep them unique, to allow parallel test runs.
|
||||
|
Loading…
x
Reference in New Issue
Block a user