diff --git a/src/xpub.cpp b/src/xpub.cpp index a5ec28ce..b9e5769a 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -92,6 +92,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) { last_pipe = pipe_; pending_data.push_back(blob_t(data, size)); + pending_metadata.push_back(sub.metadata()); pending_flags.push_back(0); } else @@ -108,6 +109,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) if (options.type == ZMQ_XPUB && (unique || (*data == 1 && verbose_subs) || (*data == 0 && verbose_unsubs && verbose_subs))) { pending_data.push_back(blob_t(data, size)); + pending_metadata.push_back(sub.metadata()); pending_flags.push_back(0); } } @@ -115,6 +117,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) else { // Process user message coming upstream from xsub socket pending_data.push_back (blob_t (data, size)); + pending_metadata.push_back (sub.metadata ()); pending_flags.push_back (sub.flags ()); } sub.close (); @@ -241,8 +244,10 @@ int zmq::xpub_t::xrecv (msg_t *msg_) memcpy (msg_->data (), pending_data.front ().data (), pending_data.front ().size ()); + msg_->set_metadata (pending_metadata.front ()); msg_->set_flags (pending_flags.front ()); pending_data.pop_front (); + pending_metadata.pop_front (); pending_flags.pop_front (); return 0; } @@ -265,6 +270,7 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, if (size_ > 0) memcpy (&unsub [1], data_, size_); self->pending_data.push_back (unsub); + self->pending_metadata.push_back (NULL); self->pending_flags.push_back (0); } } diff --git a/src/xpub.hpp b/src/xpub.hpp index 4be7af5f..1515529a 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -109,6 +109,7 @@ namespace zmq // applied to the trie, but not yet received by the user. typedef std::basic_string blob_t; std::deque pending_data; + std::deque pending_metadata; std::deque pending_flags; xpub_t (const xpub_t&);