mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-20 02:17:55 +00:00
Problem: test_large_msg kills my system temporarily
And I'm on a reasonably sized laptop. I think allocating INT_MAX memory is dangerous in a test case. Solution: expose this as a context option. I've used ZMQ_MAX_MSGSZ and documented it and implemented the API. However I don't know how to get the parent context for a socket, so the code in zmq.cpp is still unfinished.
This commit is contained in:
parent
7470c00d4d
commit
62c66ae7f7
@ -26,20 +26,30 @@ ZMQ_IO_THREADS: Get number of I/O threads
|
|||||||
The 'ZMQ_IO_THREADS' argument returns the size of the 0MQ thread pool
|
The 'ZMQ_IO_THREADS' argument returns the size of the 0MQ thread pool
|
||||||
for this context.
|
for this context.
|
||||||
|
|
||||||
|
|
||||||
ZMQ_MAX_SOCKETS: Get maximum number of sockets
|
ZMQ_MAX_SOCKETS: Get maximum number of sockets
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_MAX_SOCKETS' argument returns the maximum number of sockets
|
The 'ZMQ_MAX_SOCKETS' argument returns the maximum number of sockets
|
||||||
allowed for this context.
|
allowed for this context.
|
||||||
|
|
||||||
|
|
||||||
|
ZMQ_MAX_MSGSZ: Get maximum message size
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
The 'ZMQ_MAX_MSGSZ' argument returns the maximum size of a message
|
||||||
|
allowed for this context. Default value is INT_MAX.
|
||||||
|
|
||||||
|
|
||||||
ZMQ_SOCKET_LIMIT: Get largest configurable number of sockets
|
ZMQ_SOCKET_LIMIT: Get largest configurable number of sockets
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_SOCKET_LIMIT' argument returns the largest number of sockets that
|
The 'ZMQ_SOCKET_LIMIT' argument returns the largest number of sockets that
|
||||||
linkzmq:zmq_ctx_set[3] will accept.
|
linkzmq:zmq_ctx_set[3] will accept.
|
||||||
|
|
||||||
|
|
||||||
ZMQ_IPV6: Set IPv6 option
|
ZMQ_IPV6: Set IPv6 option
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_IPV6' argument returns the IPv6 option for the context.
|
The 'ZMQ_IPV6' argument returns the IPv6 option for the context.
|
||||||
|
|
||||||
|
|
||||||
ZMQ_BLOCKY: Get blocky setting
|
ZMQ_BLOCKY: Get blocky setting
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_BLOCKY' argument returns 1 if the context will block on terminate,
|
The 'ZMQ_BLOCKY' argument returns 1 if the context will block on terminate,
|
||||||
|
@ -47,6 +47,7 @@ context.
|
|||||||
[horizontal]
|
[horizontal]
|
||||||
Default value:: 1
|
Default value:: 1
|
||||||
|
|
||||||
|
|
||||||
ZMQ_THREAD_SCHED_POLICY: Set scheduling policy for I/O threads
|
ZMQ_THREAD_SCHED_POLICY: Set scheduling policy for I/O threads
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_THREAD_SCHED_POLICY' argument sets the scheduling policy for
|
The 'ZMQ_THREAD_SCHED_POLICY' argument sets the scheduling policy for
|
||||||
@ -58,6 +59,7 @@ This option only applies before creating any sockets on the context.
|
|||||||
[horizontal]
|
[horizontal]
|
||||||
Default value:: -1
|
Default value:: -1
|
||||||
|
|
||||||
|
|
||||||
ZMQ_THREAD_PRIORITY: Set scheduling priority for I/O threads
|
ZMQ_THREAD_PRIORITY: Set scheduling priority for I/O threads
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_THREAD_PRIORITY' argument sets scheduling priority for
|
The 'ZMQ_THREAD_PRIORITY' argument sets scheduling priority for
|
||||||
@ -69,6 +71,19 @@ This option only applies before creating any sockets on the context.
|
|||||||
[horizontal]
|
[horizontal]
|
||||||
Default value:: -1
|
Default value:: -1
|
||||||
|
|
||||||
|
|
||||||
|
ZMQ_MAX_MSGSZ: Set maximum message size
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
The 'ZMQ_MAX_MSGSZ' argument sets the maximum allowed size
|
||||||
|
of a message sent in the context. You can query the maximal
|
||||||
|
allowed value with linkzmq:zmq_ctx_get[3] using the
|
||||||
|
'ZMQ_MAX_MSGSZ' option.
|
||||||
|
|
||||||
|
[horizontal]
|
||||||
|
Default value:: INT_MAX
|
||||||
|
Maximum value:: INT_MAX
|
||||||
|
|
||||||
|
|
||||||
ZMQ_MAX_SOCKETS: Set maximum number of sockets
|
ZMQ_MAX_SOCKETS: Set maximum number of sockets
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed
|
The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed
|
||||||
@ -78,6 +93,7 @@ linkzmq:zmq_ctx_get[3] using the 'ZMQ_SOCKET_LIMIT' option.
|
|||||||
[horizontal]
|
[horizontal]
|
||||||
Default value:: 1024
|
Default value:: 1024
|
||||||
|
|
||||||
|
|
||||||
ZMQ_IPV6: Set IPv6 option
|
ZMQ_IPV6: Set IPv6 option
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_IPV6' argument sets the IPv6 value for all sockets created in
|
The 'ZMQ_IPV6' argument sets the IPv6 value for all sockets created in
|
||||||
|
@ -185,13 +185,13 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
|
|||||||
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
|
/* 0MQ infrastructure (a.k.a. context) initialisation & termination. */
|
||||||
/******************************************************************************/
|
/******************************************************************************/
|
||||||
|
|
||||||
/* New API */
|
|
||||||
/* Context options */
|
/* Context options */
|
||||||
#define ZMQ_IO_THREADS 1
|
#define ZMQ_IO_THREADS 1
|
||||||
#define ZMQ_MAX_SOCKETS 2
|
#define ZMQ_MAX_SOCKETS 2
|
||||||
#define ZMQ_SOCKET_LIMIT 3
|
#define ZMQ_SOCKET_LIMIT 3
|
||||||
#define ZMQ_THREAD_PRIORITY 3
|
#define ZMQ_THREAD_PRIORITY 3
|
||||||
#define ZMQ_THREAD_SCHED_POLICY 4
|
#define ZMQ_THREAD_SCHED_POLICY 4
|
||||||
|
#define ZMQ_MAX_MSGSZ 5
|
||||||
|
|
||||||
/* Default for new contexts */
|
/* Default for new contexts */
|
||||||
#define ZMQ_IO_THREADS_DFLT 1
|
#define ZMQ_IO_THREADS_DFLT 1
|
||||||
|
11
src/ctx.cpp
11
src/ctx.cpp
@ -36,6 +36,7 @@
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
#include <limits>
|
#include <limits>
|
||||||
|
#include <climits>
|
||||||
#include <new>
|
#include <new>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
|
||||||
@ -79,6 +80,7 @@ zmq::ctx_t::ctx_t () :
|
|||||||
slot_count (0),
|
slot_count (0),
|
||||||
slots (NULL),
|
slots (NULL),
|
||||||
max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
|
max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
|
||||||
|
max_msgsz (INT_MAX),
|
||||||
io_thread_count (ZMQ_IO_THREADS_DFLT),
|
io_thread_count (ZMQ_IO_THREADS_DFLT),
|
||||||
blocky (true),
|
blocky (true),
|
||||||
ipv6 (false),
|
ipv6 (false),
|
||||||
@ -265,6 +267,12 @@ int zmq::ctx_t::set (int option_, int optval_)
|
|||||||
blocky = (optval_ != 0);
|
blocky = (optval_ != 0);
|
||||||
opt_sync.unlock ();
|
opt_sync.unlock ();
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) {
|
||||||
|
opt_sync.lock ();
|
||||||
|
max_msgsz = optval_ < INT_MAX? optval_: INT_MAX;
|
||||||
|
opt_sync.unlock ();
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
rc = -1;
|
rc = -1;
|
||||||
@ -289,6 +297,9 @@ int zmq::ctx_t::get (int option_)
|
|||||||
else
|
else
|
||||||
if (option_ == ZMQ_BLOCKY)
|
if (option_ == ZMQ_BLOCKY)
|
||||||
rc = blocky;
|
rc = blocky;
|
||||||
|
else
|
||||||
|
if (option_ == ZMQ_MAX_MSGSZ)
|
||||||
|
rc = max_msgsz;
|
||||||
else {
|
else {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
rc = -1;
|
rc = -1;
|
||||||
|
@ -199,6 +199,9 @@ namespace zmq
|
|||||||
// Maximum number of sockets that can be opened at the same time.
|
// Maximum number of sockets that can be opened at the same time.
|
||||||
int max_sockets;
|
int max_sockets;
|
||||||
|
|
||||||
|
// Maximum allowed message size
|
||||||
|
int max_msgsz;
|
||||||
|
|
||||||
// Number of I/O threads to launch.
|
// Number of I/O threads to launch.
|
||||||
int io_thread_count;
|
int io_thread_count;
|
||||||
|
|
||||||
|
17
src/zmq.cpp
17
src/zmq.cpp
@ -197,7 +197,6 @@ int zmq_ctx_shutdown (void *ctx_)
|
|||||||
errno = EFAULT;
|
errno = EFAULT;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ((zmq::ctx_t *) ctx_)->shutdown ();
|
return ((zmq::ctx_t *) ctx_)->shutdown ();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -366,15 +365,20 @@ int zmq_disconnect (void *s_, const char *addr_)
|
|||||||
|
|
||||||
// Sending functions.
|
// Sending functions.
|
||||||
|
|
||||||
static int
|
static inline int
|
||||||
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
|
s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
|
||||||
{
|
{
|
||||||
size_t sz = zmq_msg_size (msg_);
|
size_t sz = zmq_msg_size (msg_);
|
||||||
int rc = s_->send ((zmq::msg_t *) msg_, flags_);
|
int rc = s_->send ((zmq::msg_t *) msg_, flags_);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
// truncate returned size to INT_MAX to avoid overflow to negative values
|
|
||||||
return (int) (sz < INT_MAX ? sz : INT_MAX);
|
// This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09
|
||||||
|
// int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ);
|
||||||
|
size_t max_msgsz = INT_MAX;
|
||||||
|
|
||||||
|
// Truncate returned size to INT_MAX to avoid overflow to negative values
|
||||||
|
return (int) (sz < max_msgsz? sz: max_msgsz);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* To be deprecated once zmq_msg_send() is stable */
|
/* To be deprecated once zmq_msg_send() is stable */
|
||||||
@ -407,7 +411,6 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_)
|
|||||||
errno = err;
|
errno = err;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note the optimisation here. We don't close the msg object as it is
|
// Note the optimisation here. We don't close the msg object as it is
|
||||||
// empty anyway. This may change when implementation of zmq_msg_t changes.
|
// empty anyway. This may change when implementation of zmq_msg_t changes.
|
||||||
return rc;
|
return rc;
|
||||||
@ -433,7 +436,6 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_)
|
|||||||
errno = err;
|
errno = err;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Note the optimisation here. We don't close the msg object as it is
|
// Note the optimisation here. We don't close the msg object as it is
|
||||||
// empty anyway. This may change when implementation of zmq_msg_t changes.
|
// empty anyway. This may change when implementation of zmq_msg_t changes.
|
||||||
return rc;
|
return rc;
|
||||||
@ -487,7 +489,8 @@ s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_)
|
|||||||
int rc = s_->recv ((zmq::msg_t *) msg_, flags_);
|
int rc = s_->recv ((zmq::msg_t *) msg_, flags_);
|
||||||
if (unlikely (rc < 0))
|
if (unlikely (rc < 0))
|
||||||
return -1;
|
return -1;
|
||||||
// truncate returned size to INT_MAX to avoid overflow to negative values
|
|
||||||
|
// Truncate returned size to INT_MAX to avoid overflow to negative values
|
||||||
size_t sz = zmq_msg_size (msg_);
|
size_t sz = zmq_msg_size (msg_);
|
||||||
return (int) (sz < INT_MAX? sz: INT_MAX);
|
return (int) (sz < INT_MAX? sz: INT_MAX);
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
#include "testutil.hpp"
|
#include "testutil.hpp"
|
||||||
|
|
||||||
void test_setsockopt_tcp_recv_buffer()
|
void test_setsockopt_tcp_recv_buffer (void)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
@ -9,25 +9,25 @@ void test_setsockopt_tcp_recv_buffer()
|
|||||||
int val = 0;
|
int val = 0;
|
||||||
size_t placeholder = sizeof (val);
|
size_t placeholder = sizeof (val);
|
||||||
|
|
||||||
rc = zmq_getsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, &placeholder);
|
rc = zmq_getsockopt (socket, ZMQ_RCVBUF, &val, &placeholder);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 8192);
|
assert (val == 8192);
|
||||||
|
|
||||||
rc = zmq_setsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, sizeof(val));
|
rc = zmq_setsockopt (socket, ZMQ_RCVBUF, &val, sizeof (val));
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 8192);
|
assert (val == 8192);
|
||||||
|
|
||||||
rc = zmq_getsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, &placeholder);
|
rc = zmq_getsockopt (socket, ZMQ_RCVBUF, &val, &placeholder);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 8192);
|
assert (val == 8192);
|
||||||
|
|
||||||
val = 16384;
|
val = 16384;
|
||||||
|
|
||||||
rc = zmq_setsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, sizeof(val));
|
rc = zmq_setsockopt (socket, ZMQ_RCVBUF, &val, sizeof (val));
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 16384);
|
assert (val == 16384);
|
||||||
|
|
||||||
rc = zmq_getsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, &placeholder);
|
rc = zmq_getsockopt (socket, ZMQ_RCVBUF, &val, &placeholder);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 16384);
|
assert (val == 16384);
|
||||||
|
|
||||||
@ -35,7 +35,7 @@ void test_setsockopt_tcp_recv_buffer()
|
|||||||
zmq_ctx_term (ctx);
|
zmq_ctx_term (ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
void test_setsockopt_tcp_send_buffer()
|
void test_setsockopt_tcp_send_buffer (void)
|
||||||
{
|
{
|
||||||
int rc;
|
int rc;
|
||||||
void *ctx = zmq_ctx_new ();
|
void *ctx = zmq_ctx_new ();
|
||||||
@ -44,25 +44,25 @@ void test_setsockopt_tcp_send_buffer()
|
|||||||
int val = 0;
|
int val = 0;
|
||||||
size_t placeholder = sizeof (val);
|
size_t placeholder = sizeof (val);
|
||||||
|
|
||||||
rc = zmq_getsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, &placeholder);
|
rc = zmq_getsockopt (socket, ZMQ_SNDBUF, &val, &placeholder);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 8192);
|
assert (val == 8192);
|
||||||
|
|
||||||
rc = zmq_setsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, sizeof(val));
|
rc = zmq_setsockopt (socket, ZMQ_SNDBUF, &val, sizeof (val));
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 8192);
|
assert (val == 8192);
|
||||||
|
|
||||||
rc = zmq_getsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, &placeholder);
|
rc = zmq_getsockopt (socket, ZMQ_SNDBUF, &val, &placeholder);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 8192);
|
assert (val == 8192);
|
||||||
|
|
||||||
val = 16384;
|
val = 16384;
|
||||||
|
|
||||||
rc = zmq_setsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, sizeof(val));
|
rc = zmq_setsockopt (socket, ZMQ_SNDBUF, &val, sizeof (val));
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 16384);
|
assert (val == 16384);
|
||||||
|
|
||||||
rc = zmq_getsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, &placeholder);
|
rc = zmq_getsockopt (socket, ZMQ_SNDBUF, &val, &placeholder);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (val == 16384);
|
assert (val == 16384);
|
||||||
|
|
||||||
@ -97,8 +97,7 @@ void test_setsockopt_use_fd()
|
|||||||
zmq_ctx_term (ctx);
|
zmq_ctx_term (ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int main (void)
|
||||||
int main()
|
|
||||||
{
|
{
|
||||||
test_setsockopt_tcp_recv_buffer ();
|
test_setsockopt_tcp_recv_buffer ();
|
||||||
test_setsockopt_tcp_send_buffer ();
|
test_setsockopt_tcp_send_buffer ();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user