mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 15:26:04 +00:00
Fix 1478: receive unsubscriptions in XPUB when verbose
Fixes not receiving unsubscription messages in XPUB socket with ZMQ_XPUB_VERBOSE and using a XSUB-XPUB proxy in front. This adds two modifications: - It adds a new flag, ZMQ_XPUB_VERBOSE_UNSUBSCRIBE, to enable verbose unsubscription messages, necessary when using a XSUB/XPUB proxy. - It adds a boolean switch to zmq::mtrie_t::rm () to control if the callback is invoked every time or only in the last removal. Necessary when a pipe is terminated and the verbose mode for unsubscriptions is enabled.
This commit is contained in:
parent
305c07583d
commit
ec5592db1f
@ -14,8 +14,9 @@ SYNOPSIS
|
|||||||
|
|
||||||
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
|
Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE,
|
||||||
ZMQ_LINGER, ZMQ_ROUTER_HANDOVER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER,
|
ZMQ_LINGER, ZMQ_ROUTER_HANDOVER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER,
|
||||||
ZMQ_XPUB_VERBOSE, ZMQ_REQ_CORRELATE, ZMQ_REQ_RELAXED, ZMQ_SNDHWM
|
ZMQ_XPUB_VERBOSE, ZMQ_XPUB_VERBOSE_UNSUBSCRIBE, ZMQ_REQ_CORRELATE,
|
||||||
and ZMQ_RCVHWM, only take effect for subsequent socket bind/connects.
|
ZMQ_REQ_RELAXED, ZMQ_SNDHWM and ZMQ_RCVHWM, only take effect for
|
||||||
|
subsequent socket bind/connects.
|
||||||
|
|
||||||
Specifically, security options take effect for subsequent bind/connect calls,
|
Specifically, security options take effect for subsequent bind/connect calls,
|
||||||
and can be changed at any time to affect subsequent binds and/or connects.
|
and can be changed at any time to affect subsequent binds and/or connects.
|
||||||
@ -839,6 +840,25 @@ Default value:: 0
|
|||||||
Applicable socket types:: ZMQ_XPUB
|
Applicable socket types:: ZMQ_XPUB
|
||||||
|
|
||||||
|
|
||||||
|
ZMQ_XPUB_VERBOSE_UNSUBSCRIBE: provide all unsubscription messages on XPUB sockets
|
||||||
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
|
Sets the 'XPUB' socket behaviour on new subscriptions and unsubscriptions.
|
||||||
|
A value of '0' is the default and passes only the last unsubscription message to
|
||||||
|
upstream. A value of '1' passes all unsubscription messages upstream.
|
||||||
|
|
||||||
|
This behaviour should be enabled in all the intermediary XPUB sockets if
|
||||||
|
ZMQ_XPUB_VERBOSE is also being used in order to allow the correct forwarding
|
||||||
|
of all the unsubscription messages.
|
||||||
|
|
||||||
|
NOTE: This behaviour only takes effect when ZMQ_XPUB_VERBOSE is also enabled.
|
||||||
|
|
||||||
|
[horizontal]
|
||||||
|
Option value type:: int
|
||||||
|
Option value unit:: 0, 1
|
||||||
|
Default value:: 0
|
||||||
|
Applicable socket types:: ZMQ_XPUB
|
||||||
|
|
||||||
|
|
||||||
ZMQ_XPUB_MANUAL: change the subscription handling to manual
|
ZMQ_XPUB_MANUAL: change the subscription handling to manual
|
||||||
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
|
||||||
Sets the 'XPUB' socket subscription handling mode manual/automatic.
|
Sets the 'XPUB' socket subscription handling mode manual/automatic.
|
||||||
|
@ -319,6 +319,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg);
|
|||||||
#define ZMQ_HEARTBEAT_IVL 75
|
#define ZMQ_HEARTBEAT_IVL 75
|
||||||
#define ZMQ_HEARTBEAT_TTL 76
|
#define ZMQ_HEARTBEAT_TTL 76
|
||||||
#define ZMQ_HEARTBEAT_TIMEOUT 77
|
#define ZMQ_HEARTBEAT_TIMEOUT 77
|
||||||
|
#define ZMQ_XPUB_VERBOSE_UNSUBSCRIBE 78
|
||||||
|
|
||||||
/* Message options */
|
/* Message options */
|
||||||
#define ZMQ_MORE 1
|
#define ZMQ_MORE 1
|
||||||
|
@ -159,23 +159,28 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_,
|
|||||||
|
|
||||||
void zmq::mtrie_t::rm (pipe_t *pipe_,
|
void zmq::mtrie_t::rm (pipe_t *pipe_,
|
||||||
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
||||||
void *arg_)
|
void *arg_, bool call_on_uniq_)
|
||||||
{
|
{
|
||||||
unsigned char *buff = NULL;
|
unsigned char *buff = NULL;
|
||||||
rm_helper (pipe_, &buff, 0, 0, func_, arg_);
|
rm_helper (pipe_, &buff, 0, 0, func_, arg_, call_on_uniq_);
|
||||||
free (buff);
|
free (buff);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
|
void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
|
||||||
size_t buffsize_, size_t maxbuffsize_,
|
size_t buffsize_, size_t maxbuffsize_,
|
||||||
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
||||||
void *arg_)
|
void *arg_, bool call_on_uniq_)
|
||||||
{
|
{
|
||||||
// Remove the subscription from this node.
|
// Remove the subscription from this node.
|
||||||
if (pipes && pipes->erase (pipe_) && pipes->empty ()) {
|
if (pipes && pipes->erase (pipe_)) {
|
||||||
func_ (*buff_, buffsize_, arg_);
|
if (!call_on_uniq_ || pipes->empty ()) {
|
||||||
delete pipes;
|
func_ (*buff_, buffsize_, arg_);
|
||||||
pipes = 0;
|
}
|
||||||
|
|
||||||
|
if (pipes->empty ()) {
|
||||||
|
delete pipes;
|
||||||
|
pipes = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Adjust the buffer.
|
// Adjust the buffer.
|
||||||
@ -194,7 +199,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
|
|||||||
(*buff_) [buffsize_] = min;
|
(*buff_) [buffsize_] = min;
|
||||||
buffsize_++;
|
buffsize_++;
|
||||||
next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
|
next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_,
|
||||||
func_, arg_);
|
func_, arg_, call_on_uniq_);
|
||||||
|
|
||||||
// Prune the node if it was made redundant by the removal
|
// Prune the node if it was made redundant by the removal
|
||||||
if (next.node->is_redundant ()) {
|
if (next.node->is_redundant ()) {
|
||||||
@ -217,7 +222,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_,
|
|||||||
(*buff_) [buffsize_] = min + c;
|
(*buff_) [buffsize_] = min + c;
|
||||||
if (next.table [c]) {
|
if (next.table [c]) {
|
||||||
next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
|
next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1,
|
||||||
maxbuffsize_, func_, arg_);
|
maxbuffsize_, func_, arg_, call_on_uniq_);
|
||||||
|
|
||||||
// Prune redundant nodes from the mtrie
|
// Prune redundant nodes from the mtrie
|
||||||
if (next.table [c]->is_redundant ()) {
|
if (next.table [c]->is_redundant ()) {
|
||||||
|
@ -54,11 +54,12 @@ namespace zmq
|
|||||||
bool add (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);
|
bool add (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_);
|
||||||
|
|
||||||
// Remove all subscriptions for a specific peer from the trie.
|
// Remove all subscriptions for a specific peer from the trie.
|
||||||
// If there are no subscriptions left on some topics, invoke the
|
// The call_on_uniq_ flag controls if the callback is invoked
|
||||||
// supplied callback function.
|
// when there are no subscriptions left on some topics or on
|
||||||
|
// every removal.
|
||||||
void rm (zmq::pipe_t *pipe_,
|
void rm (zmq::pipe_t *pipe_,
|
||||||
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
||||||
void *arg_);
|
void *arg_, bool call_on_uniq_);
|
||||||
|
|
||||||
// Remove specific subscription from the trie. Return true is it was
|
// Remove specific subscription from the trie. Return true is it was
|
||||||
// actually removed rather than de-duplicated.
|
// actually removed rather than de-duplicated.
|
||||||
@ -75,7 +76,7 @@ namespace zmq
|
|||||||
void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_,
|
void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_,
|
||||||
size_t buffsize_, size_t maxbuffsize_,
|
size_t buffsize_, size_t maxbuffsize_,
|
||||||
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
void (*func_) (unsigned char *data_, size_t size_, void *arg_),
|
||||||
void *arg_);
|
void *arg_, bool call_on_uniq_);
|
||||||
bool rm_helper (unsigned char *prefix_, size_t size_,
|
bool rm_helper (unsigned char *prefix_, size_t size_,
|
||||||
zmq::pipe_t *pipe_);
|
zmq::pipe_t *pipe_);
|
||||||
bool is_redundant () const;
|
bool is_redundant () const;
|
||||||
|
21
src/xpub.cpp
21
src/xpub.cpp
@ -36,7 +36,8 @@
|
|||||||
|
|
||||||
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
|
||||||
socket_base_t (parent_, tid_, sid_),
|
socket_base_t (parent_, tid_, sid_),
|
||||||
verbose (false),
|
verbose_subs (false),
|
||||||
|
verbose_unsubs (false),
|
||||||
more (false),
|
more (false),
|
||||||
lossy (true),
|
lossy (true),
|
||||||
manual(false),
|
manual(false),
|
||||||
@ -101,9 +102,11 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
|
|||||||
else
|
else
|
||||||
unique = subscriptions.add(data + 1, size - 1, pipe_);
|
unique = subscriptions.add(data + 1, size - 1, pipe_);
|
||||||
|
|
||||||
// If the subscription is not a duplicate store it so that it can be
|
// If the (un)subscription is not a duplicate store it so that it can be
|
||||||
// passed to used on next recv call. (Unsubscribe is not verbose.)
|
// passed to the user on next recv call unless verbose mode is enabled
|
||||||
if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) {
|
// which makes to pass always these messages.
|
||||||
|
if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) ||
|
||||||
|
(*data == 0 && verbose_unsubs && verbose_subs))) {
|
||||||
pending_data.push_back(blob_t(data, size));
|
pending_data.push_back(blob_t(data, size));
|
||||||
pending_flags.push_back(0);
|
pending_flags.push_back(0);
|
||||||
}
|
}
|
||||||
@ -126,7 +129,8 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
|
|||||||
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
||||||
size_t optvallen_)
|
size_t optvallen_)
|
||||||
{
|
{
|
||||||
if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL)
|
if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE ||
|
||||||
|
option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL)
|
||||||
{
|
{
|
||||||
if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
|
if (optvallen_ != sizeof(int) || *static_cast <const int*> (optval_) < 0) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
@ -134,7 +138,10 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (option_ == ZMQ_XPUB_VERBOSE)
|
if (option_ == ZMQ_XPUB_VERBOSE)
|
||||||
verbose = (*static_cast <const int*> (optval_) != 0);
|
verbose_subs = (*static_cast <const int*> (optval_) != 0);
|
||||||
|
else
|
||||||
|
if (option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE)
|
||||||
|
verbose_unsubs = (*static_cast <const int*> (optval_) != 0);
|
||||||
else
|
else
|
||||||
if (option_ == ZMQ_XPUB_NODROP)
|
if (option_ == ZMQ_XPUB_NODROP)
|
||||||
lossy = (*static_cast <const int*> (optval_) == 0);
|
lossy = (*static_cast <const int*> (optval_) == 0);
|
||||||
@ -173,7 +180,7 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_)
|
|||||||
// Remove the pipe from the trie. If there are topics that nobody
|
// Remove the pipe from the trie. If there are topics that nobody
|
||||||
// is interested in anymore, send corresponding unsubscriptions
|
// is interested in anymore, send corresponding unsubscriptions
|
||||||
// upstream.
|
// upstream.
|
||||||
subscriptions.rm (pipe_, send_unsubscription, this);
|
subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs);
|
||||||
|
|
||||||
dist.pipe_terminated (pipe_);
|
dist.pipe_terminated (pipe_);
|
||||||
}
|
}
|
||||||
|
@ -84,7 +84,11 @@ namespace zmq
|
|||||||
|
|
||||||
// If true, send all subscription messages upstream, not just
|
// If true, send all subscription messages upstream, not just
|
||||||
// unique ones
|
// unique ones
|
||||||
bool verbose;
|
bool verbose_subs;
|
||||||
|
|
||||||
|
// If true, send all unsubscription messages upstream, not just
|
||||||
|
// unique ones
|
||||||
|
bool verbose_unsubs;
|
||||||
|
|
||||||
// True if we are in the middle of sending a multi-part message.
|
// True if we are in the middle of sending a multi-part message.
|
||||||
bool more;
|
bool more;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user