From ec5592db1f87cd18e61545be4a3a3174709736dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ricardo=20Catalinas=20Jim=C3=A9nez?= Date: Tue, 21 Jul 2015 23:35:50 +0100 Subject: [PATCH] Fix 1478: receive unsubscriptions in XPUB when verbose Fixes not receiving unsubscription messages in XPUB socket with ZMQ_XPUB_VERBOSE and using a XSUB-XPUB proxy in front. This adds two modifications: - It adds a new flag, ZMQ_XPUB_VERBOSE_UNSUBSCRIBE, to enable verbose unsubscription messages, necessary when using a XSUB/XPUB proxy. - It adds a boolean switch to zmq::mtrie_t::rm () to control if the callback is invoked every time or only in the last removal. Necessary when a pipe is terminated and the verbose mode for unsubscriptions is enabled. --- doc/zmq_setsockopt.txt | 24 ++++++++++++++++++++++-- include/zmq.h | 1 + src/mtrie.cpp | 23 ++++++++++++++--------- src/mtrie.hpp | 9 +++++---- src/xpub.cpp | 21 ++++++++++++++------- src/xpub.hpp | 6 +++++- 6 files changed, 61 insertions(+), 23 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 903e9761..61392959 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -14,8 +14,9 @@ SYNOPSIS Caution: All options, with the exception of ZMQ_SUBSCRIBE, ZMQ_UNSUBSCRIBE, ZMQ_LINGER, ZMQ_ROUTER_HANDOVER, ZMQ_ROUTER_MANDATORY, ZMQ_PROBE_ROUTER, -ZMQ_XPUB_VERBOSE, ZMQ_REQ_CORRELATE, ZMQ_REQ_RELAXED, ZMQ_SNDHWM -and ZMQ_RCVHWM, only take effect for subsequent socket bind/connects. +ZMQ_XPUB_VERBOSE, ZMQ_XPUB_VERBOSE_UNSUBSCRIBE, ZMQ_REQ_CORRELATE, +ZMQ_REQ_RELAXED, ZMQ_SNDHWM and ZMQ_RCVHWM, only take effect for +subsequent socket bind/connects. Specifically, security options take effect for subsequent bind/connect calls, and can be changed at any time to affect subsequent binds and/or connects. @@ -839,6 +840,25 @@ Default value:: 0 Applicable socket types:: ZMQ_XPUB +ZMQ_XPUB_VERBOSE_UNSUBSCRIBE: provide all unsubscription messages on XPUB sockets +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +Sets the 'XPUB' socket behaviour on new subscriptions and unsubscriptions. +A value of '0' is the default and passes only the last unsubscription message to +upstream. A value of '1' passes all unsubscription messages upstream. + +This behaviour should be enabled in all the intermediary XPUB sockets if +ZMQ_XPUB_VERBOSE is also being used in order to allow the correct forwarding +of all the unsubscription messages. + +NOTE: This behaviour only takes effect when ZMQ_XPUB_VERBOSE is also enabled. + +[horizontal] +Option value type:: int +Option value unit:: 0, 1 +Default value:: 0 +Applicable socket types:: ZMQ_XPUB + + ZMQ_XPUB_MANUAL: change the subscription handling to manual ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Sets the 'XPUB' socket subscription handling mode manual/automatic. diff --git a/include/zmq.h b/include/zmq.h index 0eb31f7a..aaf3d55e 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -319,6 +319,7 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg); #define ZMQ_HEARTBEAT_IVL 75 #define ZMQ_HEARTBEAT_TTL 76 #define ZMQ_HEARTBEAT_TIMEOUT 77 +#define ZMQ_XPUB_VERBOSE_UNSUBSCRIBE 78 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/mtrie.cpp b/src/mtrie.cpp index 6f7c5898..3fa3971d 100644 --- a/src/mtrie.cpp +++ b/src/mtrie.cpp @@ -159,23 +159,28 @@ bool zmq::mtrie_t::add_helper (unsigned char *prefix_, size_t size_, void zmq::mtrie_t::rm (pipe_t *pipe_, void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) + void *arg_, bool call_on_uniq_) { unsigned char *buff = NULL; - rm_helper (pipe_, &buff, 0, 0, func_, arg_); + rm_helper (pipe_, &buff, 0, 0, func_, arg_, call_on_uniq_); free (buff); } void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_) + void *arg_, bool call_on_uniq_) { // Remove the subscription from this node. - if (pipes && pipes->erase (pipe_) && pipes->empty ()) { - func_ (*buff_, buffsize_, arg_); - delete pipes; - pipes = 0; + if (pipes && pipes->erase (pipe_)) { + if (!call_on_uniq_ || pipes->empty ()) { + func_ (*buff_, buffsize_, arg_); + } + + if (pipes->empty ()) { + delete pipes; + pipes = 0; + } } // Adjust the buffer. @@ -194,7 +199,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, (*buff_) [buffsize_] = min; buffsize_++; next.node->rm_helper (pipe_, buff_, buffsize_, maxbuffsize_, - func_, arg_); + func_, arg_, call_on_uniq_); // Prune the node if it was made redundant by the removal if (next.node->is_redundant ()) { @@ -217,7 +222,7 @@ void zmq::mtrie_t::rm_helper (pipe_t *pipe_, unsigned char **buff_, (*buff_) [buffsize_] = min + c; if (next.table [c]) { next.table [c]->rm_helper (pipe_, buff_, buffsize_ + 1, - maxbuffsize_, func_, arg_); + maxbuffsize_, func_, arg_, call_on_uniq_); // Prune redundant nodes from the mtrie if (next.table [c]->is_redundant ()) { diff --git a/src/mtrie.hpp b/src/mtrie.hpp index 6d49380c..55279303 100644 --- a/src/mtrie.hpp +++ b/src/mtrie.hpp @@ -54,11 +54,12 @@ namespace zmq bool add (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_); // Remove all subscriptions for a specific peer from the trie. - // If there are no subscriptions left on some topics, invoke the - // supplied callback function. + // The call_on_uniq_ flag controls if the callback is invoked + // when there are no subscriptions left on some topics or on + // every removal. void rm (zmq::pipe_t *pipe_, void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); + void *arg_, bool call_on_uniq_); // Remove specific subscription from the trie. Return true is it was // actually removed rather than de-duplicated. @@ -75,7 +76,7 @@ namespace zmq void rm_helper (zmq::pipe_t *pipe_, unsigned char **buff_, size_t buffsize_, size_t maxbuffsize_, void (*func_) (unsigned char *data_, size_t size_, void *arg_), - void *arg_); + void *arg_, bool call_on_uniq_); bool rm_helper (unsigned char *prefix_, size_t size_, zmq::pipe_t *pipe_); bool is_redundant () const; diff --git a/src/xpub.cpp b/src/xpub.cpp index fd086e9e..a5ec28ce 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -36,7 +36,8 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), - verbose (false), + verbose_subs (false), + verbose_unsubs (false), more (false), lossy (true), manual(false), @@ -101,9 +102,11 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) else unique = subscriptions.add(data + 1, size - 1, pipe_); - // If the subscription is not a duplicate store it so that it can be - // passed to used on next recv call. (Unsubscribe is not verbose.) - if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) { + // If the (un)subscription is not a duplicate store it so that it can be + // passed to the user on next recv call unless verbose mode is enabled + // which makes to pass always these messages. + if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || + (*data == 0 && verbose_unsubs && verbose_subs))) { pending_data.push_back(blob_t(data, size)); pending_flags.push_back(0); } @@ -126,7 +129,8 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_) int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) { - if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) + if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE || + option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) { if (optvallen_ != sizeof(int) || *static_cast (optval_) < 0) { errno = EINVAL; @@ -134,7 +138,10 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, } if (option_ == ZMQ_XPUB_VERBOSE) - verbose = (*static_cast (optval_) != 0); + verbose_subs = (*static_cast (optval_) != 0); + else + if (option_ == ZMQ_XPUB_VERBOSE_UNSUBSCRIBE) + verbose_unsubs = (*static_cast (optval_) != 0); else if (option_ == ZMQ_XPUB_NODROP) lossy = (*static_cast (optval_) == 0); @@ -173,7 +180,7 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) // Remove the pipe from the trie. If there are topics that nobody // is interested in anymore, send corresponding unsubscriptions // upstream. - subscriptions.rm (pipe_, send_unsubscription, this); + subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs); dist.pipe_terminated (pipe_); } diff --git a/src/xpub.hpp b/src/xpub.hpp index 261cac44..4be7af5f 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -84,7 +84,11 @@ namespace zmq // If true, send all subscription messages upstream, not just // unique ones - bool verbose; + bool verbose_subs; + + // If true, send all unsubscription messages upstream, not just + // unique ones + bool verbose_unsubs; // True if we are in the middle of sending a multi-part message. bool more;