Problem: XPUB treats non-sub/cancel as sub/cancel

Solution: only process for sub/cancel if the messages are actually sub/cancel.
Regression introduced by cf9ccbb which tried to skip non-sub/cancel processing
for PUB.
This commit is contained in:
Luca Boccassi 2020-06-13 15:09:05 +01:00
parent b4aa912467
commit e0e3ce081e

View File

@ -131,7 +131,57 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
_process_subscribe =
!_only_first_subscribe || is_subscribe_or_cancel;
if (!is_subscribe_or_cancel && options.type != ZMQ_PUB) {
if (is_subscribe_or_cancel) {
if (_manual) {
// Store manual subscription to use on termination
if (!subscribe)
_manual_subscriptions.rm (data, size, pipe_);
else
_manual_subscriptions.add (data, size, pipe_);
_pending_pipes.push_back (pipe_);
} else {
if (!subscribe) {
const mtrie_t::rm_result rm_result =
_subscriptions.rm (data, size, pipe_);
// TODO reconsider what to do if rm_result == mtrie_t::not_found
notify =
rm_result != mtrie_t::values_remain || _verbose_unsubs;
} else {
const bool first_added =
_subscriptions.add (data, size, pipe_);
notify = first_added || _verbose_subs;
}
}
// If the request was a new subscription, or the subscription
// was removed, or verbose mode or manual mode are enabled, store it
// so that it can be passed to the user on next recv call.
if (_manual || (options.type == ZMQ_XPUB && notify)) {
// ZMTP 3.1 hack: we need to support sub/cancel commands, but
// we can't give them back to userspace as it would be an API
// breakage since the payload of the message is completely
// different. Manually craft an old-style message instead.
// Although with other transports it would be possible to simply
// reuse the same buffer and prefix a 0/1 byte to the topic, with
// inproc the subscribe/cancel command string is not present in
// the message, so this optimization is not possible.
// The pushback makes a copy of the data array anyway, so the
// number of buffer copies does not change.
blob_t notification (size + 1);
if (subscribe)
*notification.data () = 1;
else
*notification.data () = 0;
memcpy (notification.data () + 1, data, size);
_pending_data.push_back (ZMQ_MOVE (notification));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (0);
}
} else if (options.type != ZMQ_PUB) {
// Process user message coming upstream from xsub socket,
// but not if the type is PUB, which never processes user
// messages
@ -140,56 +190,6 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (msg.flags ());
msg.close ();
continue;
}
if (_manual) {
// Store manual subscription to use on termination
if (!subscribe)
_manual_subscriptions.rm (data, size, pipe_);
else
_manual_subscriptions.add (data, size, pipe_);
_pending_pipes.push_back (pipe_);
} else {
if (!subscribe) {
const mtrie_t::rm_result rm_result =
_subscriptions.rm (data, size, pipe_);
// TODO reconsider what to do if rm_result == mtrie_t::not_found
notify = rm_result != mtrie_t::values_remain || _verbose_unsubs;
} else {
const bool first_added = _subscriptions.add (data, size, pipe_);
notify = first_added || _verbose_subs;
}
}
// If the request was a new subscription, or the subscription
// was removed, or verbose mode or manual mode are enabled, store it
// so that it can be passed to the user on next recv call.
if (_manual || (options.type == ZMQ_XPUB && notify)) {
// ZMTP 3.1 hack: we need to support sub/cancel commands, but
// we can't give them back to userspace as it would be an API
// breakage since the payload of the message is completely
// different. Manually craft an old-style message instead.
// Although with other transports it would be possible to simply
// reuse the same buffer and prefix a 0/1 byte to the topic, with
// inproc the subscribe/cancel command string is not present in
// the message, so this optimization is not possible.
// The pushback makes a copy of the data array anyway, so the
// number of buffer copies does not change.
blob_t notification (size + 1);
if (subscribe)
*notification.data () = 1;
else
*notification.data () = 0;
memcpy (notification.data () + 1, data, size);
_pending_data.push_back (ZMQ_MOVE (notification));
if (metadata)
metadata->add_ref ();
_pending_metadata.push_back (metadata);
_pending_flags.push_back (0);
}
msg.close ();