mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 15:26:04 +00:00
Problem: linger values other than -1 or 0 are unsafe
Solution: set defaults back to infinity, and add new context option, ZMQ_BLOCKY that the user can set to false to get a less surprising behavior on context termination. Eg. zmq_ctx_set (ctx, ZMQ_BLOCKY, false);
This commit is contained in:
parent
f448af948d
commit
b6e61d72b2
@ -40,6 +40,12 @@ ZMQ_IPV6: Set IPv6 option
|
|||||||
~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_IPV6' argument returns the IPv6 option for the context.
|
The 'ZMQ_IPV6' argument returns the IPv6 option for the context.
|
||||||
|
|
||||||
|
ZMQ_BLOCKY: Get blocky setting
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
The 'ZMQ_BLOCKY' argument returns 1 if the context will block on terminate,
|
||||||
|
zero if the "block forever on context termination" gambit was disabled by
|
||||||
|
setting ZMQ_BLOCKY to false on all new contexts.
|
||||||
|
|
||||||
|
|
||||||
RETURN VALUE
|
RETURN VALUE
|
||||||
------------
|
------------
|
||||||
@ -63,6 +69,10 @@ zmq_ctx_set (context, ZMQ_MAX_SOCKETS, 256);
|
|||||||
int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS);
|
int max_sockets = zmq_ctx_get (context, ZMQ_MAX_SOCKETS);
|
||||||
assert (max_sockets == 256);
|
assert (max_sockets == 256);
|
||||||
----
|
----
|
||||||
|
.Switching off the context deadlock gambit
|
||||||
|
----
|
||||||
|
zmq_ctx_set (ctx, ZMQ_BLOCKY, false);
|
||||||
|
----
|
||||||
|
|
||||||
|
|
||||||
SEE ALSO
|
SEE ALSO
|
||||||
|
@ -21,6 +21,21 @@ The _zmq_ctx_set()_ function shall set the option specified by the
|
|||||||
The _zmq_ctx_set()_ function accepts the following options:
|
The _zmq_ctx_set()_ function accepts the following options:
|
||||||
|
|
||||||
|
|
||||||
|
ZMQ_BLOCKY: Fix blocky behavior
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
By default the context will block, forever, on a zmq_ctx_term call. The
|
||||||
|
assumption behind this behavior is that abrupt termination will cause
|
||||||
|
message loss. Most real applications use some form of handshaking to ensure
|
||||||
|
applications receive termination messages, and then terminate the context
|
||||||
|
with 'ZMQ_LINGER' set to zero on all sockets. This setting is an easier way
|
||||||
|
to get the same result. When 'ZMQ_BLOCKY' is set to false, all new sockets
|
||||||
|
are given a linger timeout of zero. You must still close all sockets before
|
||||||
|
calling zmq_term.
|
||||||
|
|
||||||
|
[horizontal]
|
||||||
|
Default value:: false (old behavior)
|
||||||
|
|
||||||
|
|
||||||
ZMQ_IO_THREADS: Set number of I/O threads
|
ZMQ_IO_THREADS: Set number of I/O threads
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
The 'ZMQ_IO_THREADS' argument specifies the size of the 0MQ thread pool to
|
The 'ZMQ_IO_THREADS' argument specifies the size of the 0MQ thread pool to
|
||||||
|
@ -304,6 +304,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
|
|||||||
#define ZMQ_IDENTITY_FD 67
|
#define ZMQ_IDENTITY_FD 67
|
||||||
#define ZMQ_SOCKS_PROXY 68
|
#define ZMQ_SOCKS_PROXY 68
|
||||||
#define ZMQ_XPUB_NODROP 69
|
#define ZMQ_XPUB_NODROP 69
|
||||||
|
#define ZMQ_BLOCKY 70
|
||||||
|
|
||||||
/* Message options */
|
/* Message options */
|
||||||
#define ZMQ_MORE 1
|
#define ZMQ_MORE 1
|
||||||
|
10
src/ctx.cpp
10
src/ctx.cpp
@ -65,6 +65,7 @@ zmq::ctx_t::ctx_t () :
|
|||||||
slots (NULL),
|
slots (NULL),
|
||||||
max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
|
max_sockets (clipped_maxsocket (ZMQ_MAX_SOCKETS_DFLT)),
|
||||||
io_thread_count (ZMQ_IO_THREADS_DFLT),
|
io_thread_count (ZMQ_IO_THREADS_DFLT),
|
||||||
|
blocky (true),
|
||||||
ipv6 (false),
|
ipv6 (false),
|
||||||
thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
|
thread_priority (ZMQ_THREAD_PRIORITY_DFLT),
|
||||||
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
|
thread_sched_policy (ZMQ_THREAD_SCHED_POLICY_DFLT)
|
||||||
@ -222,6 +223,12 @@ int zmq::ctx_t::set (int option_, int optval_)
|
|||||||
thread_sched_policy = optval_;
|
thread_sched_policy = optval_;
|
||||||
opt_sync.unlock();
|
opt_sync.unlock();
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
if (option_ == ZMQ_BLOCKY && optval_ >= 0) {
|
||||||
|
opt_sync.lock ();
|
||||||
|
blocky = (optval_ != 0);
|
||||||
|
opt_sync.unlock ();
|
||||||
|
}
|
||||||
else {
|
else {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
rc = -1;
|
rc = -1;
|
||||||
@ -243,6 +250,9 @@ int zmq::ctx_t::get (int option_)
|
|||||||
else
|
else
|
||||||
if (option_ == ZMQ_IPV6)
|
if (option_ == ZMQ_IPV6)
|
||||||
rc = ipv6;
|
rc = ipv6;
|
||||||
|
else
|
||||||
|
if (option_ == ZMQ_BLOCKY)
|
||||||
|
rc = blocky;
|
||||||
else {
|
else {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
rc = -1;
|
rc = -1;
|
||||||
|
@ -187,6 +187,9 @@ namespace zmq
|
|||||||
// Number of I/O threads to launch.
|
// Number of I/O threads to launch.
|
||||||
int io_thread_count;
|
int io_thread_count;
|
||||||
|
|
||||||
|
// Does context wait (possibly forever) on termination?
|
||||||
|
bool blocky;
|
||||||
|
|
||||||
// Is IPv6 enabled on this context?
|
// Is IPv6 enabled on this context?
|
||||||
bool ipv6;
|
bool ipv6;
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ zmq::options_t::options_t () :
|
|||||||
rcvbuf (0),
|
rcvbuf (0),
|
||||||
tos (0),
|
tos (0),
|
||||||
type (-1),
|
type (-1),
|
||||||
linger (30000),
|
linger (-1),
|
||||||
reconnect_ivl (100),
|
reconnect_ivl (100),
|
||||||
reconnect_ivl_max (0),
|
reconnect_ivl_max (0),
|
||||||
backlog (100),
|
backlog (100),
|
||||||
|
@ -143,6 +143,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_, int sid_) :
|
|||||||
{
|
{
|
||||||
options.socket_id = sid_;
|
options.socket_id = sid_;
|
||||||
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
|
options.ipv6 = (parent_->get (ZMQ_IPV6) != 0);
|
||||||
|
options.linger = parent_->get (ZMQ_BLOCKY)? -1: 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::socket_base_t::~socket_base_t ()
|
zmq::socket_base_t::~socket_base_t ()
|
||||||
|
@ -43,12 +43,23 @@ int main (void)
|
|||||||
assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1);
|
assert (zmq_ctx_get (ctx, ZMQ_IPV6) == 1);
|
||||||
|
|
||||||
void *router = zmq_socket (ctx, ZMQ_ROUTER);
|
void *router = zmq_socket (ctx, ZMQ_ROUTER);
|
||||||
int ipv6;
|
int value;
|
||||||
size_t optsize = sizeof (int);
|
size_t optsize = sizeof (int);
|
||||||
rc = zmq_getsockopt (router, ZMQ_IPV6, &ipv6, &optsize);
|
rc = zmq_getsockopt (router, ZMQ_IPV6, &value, &optsize);
|
||||||
|
assert (rc == 0);
|
||||||
|
assert (value == 1);
|
||||||
|
rc = zmq_getsockopt (router, ZMQ_LINGER, &value, &optsize);
|
||||||
|
assert (rc == 0);
|
||||||
|
assert (value == -1);
|
||||||
|
rc = zmq_close (router);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
assert (ipv6);
|
|
||||||
|
|
||||||
|
rc = zmq_ctx_set (ctx, ZMQ_BLOCKY, false);
|
||||||
|
assert (zmq_ctx_get (ctx, ZMQ_BLOCKY) == 0);
|
||||||
|
router = zmq_socket (ctx, ZMQ_ROUTER);
|
||||||
|
rc = zmq_getsockopt (router, ZMQ_LINGER, &value, &optsize);
|
||||||
|
assert (rc == 0);
|
||||||
|
assert (value == 0);
|
||||||
rc = zmq_close (router);
|
rc = zmq_close (router);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user