From 2df7ab6aeec2c21f4ba5803f7bb650c796b5c068 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tarmo=20T=C3=A4nav?= Date: Mon, 8 Feb 2021 04:35:08 +0200 Subject: [PATCH] Problem: XPUB socket allows manual subscription on terminated pipe Solution: Avoid setting pipe as _last_pipe if it has been terminated --- src/dist.cpp | 12 ++++++++++++ src/dist.hpp | 3 +++ src/xpub.cpp | 12 ++++++++++++ 3 files changed, 27 insertions(+) diff --git a/src/dist.cpp b/src/dist.cpp index 7795bb36..a205f70f 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -64,6 +64,18 @@ void zmq::dist_t::attach (pipe_t *pipe_) } } +bool zmq::dist_t::has_pipe (pipe_t *pipe_) +{ + std::size_t claimed_index = _pipes.index (pipe_); + + // If pipe claims to be outside the available index space it can't be in the distributor. + if (claimed_index >= _pipes.size ()) { + return false; + } + + return _pipes[claimed_index] == pipe_; +} + void zmq::dist_t::match (pipe_t *pipe_) { // If pipe is already matching do nothing. diff --git a/src/dist.hpp b/src/dist.hpp index f772a716..a5926cfb 100644 --- a/src/dist.hpp +++ b/src/dist.hpp @@ -51,6 +51,9 @@ class dist_t // Adds the pipe to the distributor object. void attach (zmq::pipe_t *pipe_); + // Checks if this pipe is present in the distributor. + bool has_pipe (zmq::pipe_t *pipe_); + // Activates pipe that have previously reached high watermark. void activated (zmq::pipe_t *pipe_); diff --git a/src/xpub.cpp b/src/xpub.cpp index e806b8e4..a71543a3 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -272,6 +272,12 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) // care of by the manual call above. subscriptions is the real mtrie, // so the pipe must be removed from there or it will be left over. _subscriptions.rm (pipe_, stub, static_cast (NULL), false); + + // In case the pipe is currently set as last we must clear it to prevent + // subscriptions from being re-added. + if (pipe_ == _last_pipe) { + _last_pipe = NULL; + } } else { // Remove the pipe from the trie. If there are topics that nobody // is interested in anymore, send corresponding unsubscriptions @@ -348,6 +354,12 @@ int zmq::xpub_t::xrecv (msg_t *msg_) if (_manual && !_pending_pipes.empty ()) { _last_pipe = _pending_pipes.front (); _pending_pipes.pop_front (); + + // If the distributor doesn't know about this pipe it must have already + // been terminated and thus we can't allow manual subscriptions. + if (_last_pipe != NULL && !_dist.has_pipe (_last_pipe)) { + _last_pipe = NULL; + } } int rc = msg_->close ();