From 62c66ae7f764b2cfc24e7b8594549bd4c08064cb Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 9 Feb 2016 10:51:51 +0100 Subject: [PATCH] Problem: test_large_msg kills my system temporarily And I'm on a reasonably sized laptop. I think allocating INT_MAX memory is dangerous in a test case. Solution: expose this as a context option. I've used ZMQ_MAX_MSGSZ and documented it and implemented the API. However I don't know how to get the parent context for a socket, so the code in zmq.cpp is still unfinished. --- doc/zmq_ctx_get.txt | 10 ++++ doc/zmq_ctx_set.txt | 16 ++++++ include/zmq.h | 2 +- src/ctx.cpp | 15 +++++- src/ctx.hpp | 3 ++ src/socket_base.hpp | 2 +- src/zmq.cpp | 41 +++++++------- tests/test_setsockopt.cpp | 111 +++++++++++++++++++------------------- 8 files changed, 121 insertions(+), 79 deletions(-) diff --git a/doc/zmq_ctx_get.txt b/doc/zmq_ctx_get.txt index 0a3825c3..46e978cd 100644 --- a/doc/zmq_ctx_get.txt +++ b/doc/zmq_ctx_get.txt @@ -26,20 +26,30 @@ ZMQ_IO_THREADS: Get number of I/O threads The 'ZMQ_IO_THREADS' argument returns the size of the 0MQ thread pool for this context. + ZMQ_MAX_SOCKETS: Get maximum number of sockets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_MAX_SOCKETS' argument returns the maximum number of sockets allowed for this context. + +ZMQ_MAX_MSGSZ: Get maximum message size +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_MAX_MSGSZ' argument returns the maximum size of a message +allowed for this context. Default value is INT_MAX. + + ZMQ_SOCKET_LIMIT: Get largest configurable number of sockets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_SOCKET_LIMIT' argument returns the largest number of sockets that linkzmq:zmq_ctx_set[3] will accept. + ZMQ_IPV6: Set IPv6 option ~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_IPV6' argument returns the IPv6 option for the context. + ZMQ_BLOCKY: Get blocky setting ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_BLOCKY' argument returns 1 if the context will block on terminate, diff --git a/doc/zmq_ctx_set.txt b/doc/zmq_ctx_set.txt index ba67745a..86e97b33 100644 --- a/doc/zmq_ctx_set.txt +++ b/doc/zmq_ctx_set.txt @@ -47,6 +47,7 @@ context. [horizontal] Default value:: 1 + ZMQ_THREAD_SCHED_POLICY: Set scheduling policy for I/O threads ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_THREAD_SCHED_POLICY' argument sets the scheduling policy for @@ -58,6 +59,7 @@ This option only applies before creating any sockets on the context. [horizontal] Default value:: -1 + ZMQ_THREAD_PRIORITY: Set scheduling priority for I/O threads ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_THREAD_PRIORITY' argument sets scheduling priority for @@ -69,6 +71,19 @@ This option only applies before creating any sockets on the context. [horizontal] Default value:: -1 + +ZMQ_MAX_MSGSZ: Set maximum message size +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +The 'ZMQ_MAX_MSGSZ' argument sets the maximum allowed size +of a message sent in the context. You can query the maximal +allowed value with linkzmq:zmq_ctx_get[3] using the +'ZMQ_MAX_MSGSZ' option. + +[horizontal] +Default value:: INT_MAX +Maximum value:: INT_MAX + + ZMQ_MAX_SOCKETS: Set maximum number of sockets ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_MAX_SOCKETS' argument sets the maximum number of sockets allowed @@ -78,6 +93,7 @@ linkzmq:zmq_ctx_get[3] using the 'ZMQ_SOCKET_LIMIT' option. [horizontal] Default value:: 1024 + ZMQ_IPV6: Set IPv6 option ~~~~~~~~~~~~~~~~~~~~~~~~~ The 'ZMQ_IPV6' argument sets the IPv6 value for all sockets created in diff --git a/include/zmq.h b/include/zmq.h index 24fe3f4a..790e9612 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -185,13 +185,13 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch); /* 0MQ infrastructure (a.k.a. context) initialisation & termination. */ /******************************************************************************/ -/* New API */ /* Context options */ #define ZMQ_IO_THREADS 1 #define ZMQ_MAX_SOCKETS 2 #define ZMQ_SOCKET_LIMIT 3 #define ZMQ_THREAD_PRIORITY 3 #define ZMQ_THREAD_SCHED_POLICY 4 +#define ZMQ_MAX_MSGSZ 5 /* Default for new contexts */ #define ZMQ_IO_THREADS_DFLT 1 diff --git a/src/ctx.cpp b/src/ctx.cpp index 8f3023a7..90c7b918 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -36,6 +36,7 @@ #endif #include +#include #include #include @@ -79,6 +80,7 @@ zmq::ctx_t::ctx_t () : slot_count (0), slots (NULL), max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)), + max_msgsz (INT_MAX), io_thread_count (ZMQ_IO_THREADS_DFLT), blocky (true), ipv6 (false), @@ -251,13 +253,13 @@ int zmq::ctx_t::set (int option_, int optval_) if (option_ == ZMQ_THREAD_PRIORITY && optval_ >= 0) { opt_sync.lock(); thread_priority = optval_; - opt_sync.unlock(); + opt_sync.unlock (); } else if (option_ == ZMQ_THREAD_SCHED_POLICY && optval_ >= 0) { opt_sync.lock(); thread_sched_policy = optval_; - opt_sync.unlock(); + opt_sync.unlock (); } else if (option_ == ZMQ_BLOCKY && optval_ >= 0) { @@ -265,6 +267,12 @@ int zmq::ctx_t::set (int option_, int optval_) blocky = (optval_ != 0); opt_sync.unlock (); } + else + if (option_ == ZMQ_MAX_MSGSZ && optval_ >= 0) { + opt_sync.lock (); + max_msgsz = optval_ < INT_MAX? optval_: INT_MAX; + opt_sync.unlock (); + } else { errno = EINVAL; rc = -1; @@ -289,6 +297,9 @@ int zmq::ctx_t::get (int option_) else if (option_ == ZMQ_BLOCKY) rc = blocky; + else + if (option_ == ZMQ_MAX_MSGSZ) + rc = max_msgsz; else { errno = EINVAL; rc = -1; diff --git a/src/ctx.hpp b/src/ctx.hpp index 73946239..7a47a9e2 100644 --- a/src/ctx.hpp +++ b/src/ctx.hpp @@ -199,6 +199,9 @@ namespace zmq // Maximum number of sockets that can be opened at the same time. int max_sockets; + // Maximum allowed message size + int max_msgsz; + // Number of I/O threads to launch. int io_thread_count; diff --git a/src/socket_base.hpp b/src/socket_base.hpp index 1ce2f021..2f459e9c 100644 --- a/src/socket_base.hpp +++ b/src/socket_base.hpp @@ -245,7 +245,7 @@ namespace zmq void update_pipe_options(int option_); // Socket's mailbox object. - i_mailbox* mailbox; + i_mailbox *mailbox; // List of attached pipes. typedef array_t pipes_t; diff --git a/src/zmq.cpp b/src/zmq.cpp index 475b5d24..b26edb57 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -164,12 +164,12 @@ void *zmq_ctx_new (void) int zmq_ctx_term (void *ctx_) { - if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) { errno = EFAULT; return -1; } - int rc = ((zmq::ctx_t*) ctx_)->terminate (); + int rc = ((zmq::ctx_t *) ctx_)->terminate (); int en = errno; // Shut down only if termination was not interrupted by a signal. @@ -193,30 +193,29 @@ int zmq_ctx_term (void *ctx_) int zmq_ctx_shutdown (void *ctx_) { - if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) { errno = EFAULT; return -1; } - - return ((zmq::ctx_t*) ctx_)->shutdown (); + return ((zmq::ctx_t *) ctx_)->shutdown (); } int zmq_ctx_set (void *ctx_, int option_, int optval_) { - if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) { errno = EFAULT; return -1; } - return ((zmq::ctx_t*) ctx_)->set (option_, optval_); + return ((zmq::ctx_t *) ctx_)->set (option_, optval_); } int zmq_ctx_get (void *ctx_, int option_) { - if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) { errno = EFAULT; return -1; } - return ((zmq::ctx_t*) ctx_)->get (option_); + return ((zmq::ctx_t *) ctx_)->get (option_); } // Stable/legacy context API @@ -247,11 +246,11 @@ int zmq_ctx_destroy (void *ctx_) void *zmq_socket (void *ctx_, int type_) { - if (!ctx_ || !((zmq::ctx_t*) ctx_)->check_tag ()) { + if (!ctx_ || !((zmq::ctx_t *) ctx_)->check_tag ()) { errno = EFAULT; return NULL; } - zmq::ctx_t *ctx = (zmq::ctx_t*) ctx_; + zmq::ctx_t *ctx = (zmq::ctx_t *) ctx_; zmq::socket_base_t *s = ctx->create_socket (type_); return (void *) s; } @@ -366,15 +365,20 @@ int zmq_disconnect (void *s_, const char *addr_) // Sending functions. -static int +static inline int s_sendmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) { size_t sz = zmq_msg_size (msg_); int rc = s_->send ((zmq::msg_t *) msg_, flags_); if (unlikely (rc < 0)) return -1; - // truncate returned size to INT_MAX to avoid overflow to negative values - return (int) (sz < INT_MAX ? sz : INT_MAX); + + // This is what I'd like to do, my C++ fu is too weak -- PH 2016/02/09 + // int max_msgsz = s_->parent->get (ZMQ_MAX_MSGSZ); + size_t max_msgsz = INT_MAX; + + // Truncate returned size to INT_MAX to avoid overflow to negative values + return (int) (sz < max_msgsz? sz: max_msgsz); } /* To be deprecated once zmq_msg_send() is stable */ @@ -407,7 +411,6 @@ int zmq_send (void *s_, const void *buf_, size_t len_, int flags_) errno = err; return -1; } - // Note the optimisation here. We don't close the msg object as it is // empty anyway. This may change when implementation of zmq_msg_t changes. return rc; @@ -433,7 +436,6 @@ int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_) errno = err; return -1; } - // Note the optimisation here. We don't close the msg object as it is // empty anyway. This may change when implementation of zmq_msg_t changes. return rc; @@ -484,12 +486,13 @@ int zmq_sendiov (void *s_, iovec *a_, size_t count_, int flags_) static int s_recvmsg (zmq::socket_base_t *s_, zmq_msg_t *msg_, int flags_) { - int rc = s_->recv ((zmq::msg_t*) msg_, flags_); + int rc = s_->recv ((zmq::msg_t *) msg_, flags_); if (unlikely (rc < 0)) return -1; - // truncate returned size to INT_MAX to avoid overflow to negative values + + // Truncate returned size to INT_MAX to avoid overflow to negative values size_t sz = zmq_msg_size (msg_); - return (int) (sz < INT_MAX ? sz : INT_MAX); + return (int) (sz < INT_MAX? sz: INT_MAX); } /* To be deprecated once zmq_msg_recv() is stable */ diff --git a/tests/test_setsockopt.cpp b/tests/test_setsockopt.cpp index 33a8ec61..1dcc3266 100644 --- a/tests/test_setsockopt.cpp +++ b/tests/test_setsockopt.cpp @@ -1,106 +1,105 @@ #include "testutil.hpp" -void test_setsockopt_tcp_recv_buffer() +void test_setsockopt_tcp_recv_buffer (void) { int rc; - void *ctx = zmq_ctx_new(); - void *socket = zmq_socket(ctx, ZMQ_PUSH); + void *ctx = zmq_ctx_new (); + void *socket = zmq_socket (ctx, ZMQ_PUSH); int val = 0; - size_t placeholder = sizeof(val); + size_t placeholder = sizeof (val); - rc = zmq_getsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, &placeholder); - assert(rc == 0); - assert(val == 8192); + rc = zmq_getsockopt (socket, ZMQ_RCVBUF, &val, &placeholder); + assert (rc == 0); + assert (val == 8192); - rc = zmq_setsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, sizeof(val)); - assert(rc == 0); - assert(val == 8192); + rc = zmq_setsockopt (socket, ZMQ_RCVBUF, &val, sizeof (val)); + assert (rc == 0); + assert (val == 8192); - rc = zmq_getsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, &placeholder); - assert(rc == 0); - assert(val == 8192); + rc = zmq_getsockopt (socket, ZMQ_RCVBUF, &val, &placeholder); + assert (rc == 0); + assert (val == 8192); val = 16384; - rc = zmq_setsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, sizeof(val)); - assert(rc == 0); - assert(val == 16384); + rc = zmq_setsockopt (socket, ZMQ_RCVBUF, &val, sizeof (val)); + assert (rc == 0); + assert (val == 16384); - rc = zmq_getsockopt(socket, ZMQ_TCP_RECV_BUFFER, &val, &placeholder); - assert(rc == 0); - assert(val == 16384); + rc = zmq_getsockopt (socket, ZMQ_RCVBUF, &val, &placeholder); + assert (rc == 0); + assert (val == 16384); - zmq_close(socket); - zmq_ctx_term(ctx); + zmq_close (socket); + zmq_ctx_term (ctx); } -void test_setsockopt_tcp_send_buffer() +void test_setsockopt_tcp_send_buffer (void) { int rc; - void *ctx = zmq_ctx_new(); - void *socket = zmq_socket(ctx, ZMQ_PUSH); + void *ctx = zmq_ctx_new (); + void *socket = zmq_socket (ctx, ZMQ_PUSH); int val = 0; - size_t placeholder = sizeof(val); + size_t placeholder = sizeof (val); - rc = zmq_getsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, &placeholder); - assert(rc == 0); - assert(val == 8192); + rc = zmq_getsockopt (socket, ZMQ_SNDBUF, &val, &placeholder); + assert (rc == 0); + assert (val == 8192); - rc = zmq_setsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, sizeof(val)); - assert(rc == 0); - assert(val == 8192); + rc = zmq_setsockopt (socket, ZMQ_SNDBUF, &val, sizeof (val)); + assert (rc == 0); + assert (val == 8192); - rc = zmq_getsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, &placeholder); - assert(rc == 0); - assert(val == 8192); + rc = zmq_getsockopt (socket, ZMQ_SNDBUF, &val, &placeholder); + assert (rc == 0); + assert (val == 8192); val = 16384; - rc = zmq_setsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, sizeof(val)); - assert(rc == 0); - assert(val == 16384); + rc = zmq_setsockopt (socket, ZMQ_SNDBUF, &val, sizeof (val)); + assert (rc == 0); + assert (val == 16384); - rc = zmq_getsockopt(socket, ZMQ_TCP_SEND_BUFFER, &val, &placeholder); - assert(rc == 0); - assert(val == 16384); + rc = zmq_getsockopt (socket, ZMQ_SNDBUF, &val, &placeholder); + assert (rc == 0); + assert (val == 16384); - zmq_close(socket); - zmq_ctx_term(ctx); + zmq_close (socket); + zmq_ctx_term (ctx); } -void test_setsockopt_use_fd() +void test_setsockopt_use_fd () { int rc; - void *ctx = zmq_ctx_new(); - void *socket = zmq_socket(ctx, ZMQ_PUSH); + void *ctx = zmq_ctx_new (); + void *socket = zmq_socket (ctx, ZMQ_PUSH); int val = 0; - size_t placeholder = sizeof(val); + size_t placeholder = sizeof (val); - rc = zmq_getsockopt(socket, ZMQ_USE_FD, &val, &placeholder); + rc = zmq_getsockopt (socket, ZMQ_USE_FD, &val, &placeholder); assert(rc == 0); assert(val == -1); val = 3; - rc = zmq_setsockopt(socket, ZMQ_USE_FD, &val, sizeof(val)); + rc = zmq_setsockopt (socket, ZMQ_USE_FD, &val, sizeof(val)); assert(rc == 0); assert(val == 3); - rc = zmq_getsockopt(socket, ZMQ_USE_FD, &val, &placeholder); + rc = zmq_getsockopt (socket, ZMQ_USE_FD, &val, &placeholder); assert(rc == 0); assert(val == 3); - zmq_close(socket); - zmq_ctx_term(ctx); + zmq_close (socket); + zmq_ctx_term (ctx); } - -int main() +int main (void) { - test_setsockopt_tcp_recv_buffer(); - test_setsockopt_tcp_send_buffer(); - test_setsockopt_use_fd(); + test_setsockopt_tcp_recv_buffer (); + test_setsockopt_tcp_send_buffer (); + test_setsockopt_use_fd (); }