diff --git a/src/xpub.cpp b/src/xpub.cpp index 677af36a..93baa9da 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -131,7 +131,57 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) _process_subscribe = !_only_first_subscribe || is_subscribe_or_cancel; - if (!is_subscribe_or_cancel && options.type != ZMQ_PUB) { + if (is_subscribe_or_cancel) { + if (_manual) { + // Store manual subscription to use on termination + if (!subscribe) + _manual_subscriptions.rm (data, size, pipe_); + else + _manual_subscriptions.add (data, size, pipe_); + + _pending_pipes.push_back (pipe_); + } else { + if (!subscribe) { + const mtrie_t::rm_result rm_result = + _subscriptions.rm (data, size, pipe_); + // TODO reconsider what to do if rm_result == mtrie_t::not_found + notify = + rm_result != mtrie_t::values_remain || _verbose_unsubs; + } else { + const bool first_added = + _subscriptions.add (data, size, pipe_); + notify = first_added || _verbose_subs; + } + } + + // If the request was a new subscription, or the subscription + // was removed, or verbose mode or manual mode are enabled, store it + // so that it can be passed to the user on next recv call. + if (_manual || (options.type == ZMQ_XPUB && notify)) { + // ZMTP 3.1 hack: we need to support sub/cancel commands, but + // we can't give them back to userspace as it would be an API + // breakage since the payload of the message is completely + // different. Manually craft an old-style message instead. + // Although with other transports it would be possible to simply + // reuse the same buffer and prefix a 0/1 byte to the topic, with + // inproc the subscribe/cancel command string is not present in + // the message, so this optimization is not possible. + // The pushback makes a copy of the data array anyway, so the + // number of buffer copies does not change. + blob_t notification (size + 1); + if (subscribe) + *notification.data () = 1; + else + *notification.data () = 0; + memcpy (notification.data () + 1, data, size); + + _pending_data.push_back (ZMQ_MOVE (notification)); + if (metadata) + metadata->add_ref (); + _pending_metadata.push_back (metadata); + _pending_flags.push_back (0); + } + } else if (options.type != ZMQ_PUB) { // Process user message coming upstream from xsub socket, // but not if the type is PUB, which never processes user // messages @@ -140,56 +190,6 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) metadata->add_ref (); _pending_metadata.push_back (metadata); _pending_flags.push_back (msg.flags ()); - msg.close (); - continue; - } - - if (_manual) { - // Store manual subscription to use on termination - if (!subscribe) - _manual_subscriptions.rm (data, size, pipe_); - else - _manual_subscriptions.add (data, size, pipe_); - - _pending_pipes.push_back (pipe_); - } else { - if (!subscribe) { - const mtrie_t::rm_result rm_result = - _subscriptions.rm (data, size, pipe_); - // TODO reconsider what to do if rm_result == mtrie_t::not_found - notify = rm_result != mtrie_t::values_remain || _verbose_unsubs; - } else { - const bool first_added = _subscriptions.add (data, size, pipe_); - notify = first_added || _verbose_subs; - } - } - - // If the request was a new subscription, or the subscription - // was removed, or verbose mode or manual mode are enabled, store it - // so that it can be passed to the user on next recv call. - if (_manual || (options.type == ZMQ_XPUB && notify)) { - // ZMTP 3.1 hack: we need to support sub/cancel commands, but - // we can't give them back to userspace as it would be an API - // breakage since the payload of the message is completely - // different. Manually craft an old-style message instead. - // Although with other transports it would be possible to simply - // reuse the same buffer and prefix a 0/1 byte to the topic, with - // inproc the subscribe/cancel command string is not present in - // the message, so this optimization is not possible. - // The pushback makes a copy of the data array anyway, so the - // number of buffer copies does not change. - blob_t notification (size + 1); - if (subscribe) - *notification.data () = 1; - else - *notification.data () = 0; - memcpy (notification.data () + 1, data, size); - - _pending_data.push_back (ZMQ_MOVE (notification)); - if (metadata) - metadata->add_ref (); - _pending_metadata.push_back (metadata); - _pending_flags.push_back (0); } msg.close ();