From f0cf4095b5055eb41e9cd8f841542944ddf7e771 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Thu, 18 Apr 2013 17:23:57 +0200 Subject: [PATCH] Fixed issue #525 - multipart upstreaming from xsub to xpub --- src/xpub.cpp | 36 +++++++++++++++++++++--------------- src/xpub.hpp | 4 ++-- src/xsub.cpp | 2 +- 3 files changed, 24 insertions(+), 18 deletions(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index 455148b1..ad98beb2 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -56,9 +56,8 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) // There are some subscriptions waiting. Let's process them. msg_t sub; while (pipe_->read (&sub)) { - - // Apply the subscription to the trie. - unsigned char *const data = (unsigned char*) sub.data (); + // Apply the subscription to the trie + unsigned char *const data = (unsigned char *) sub.data (); const size_t size = sub.size (); if (size > 0 && (*data == 0 || *data == 1)) { bool unique; @@ -69,13 +68,16 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) // If the subscription is not a duplicate store it so that it can be // passed to used on next recv call. (Unsubscribe is not verbose.) - if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) - pending.push_back (blob_t (data, size)); + if (options.type == ZMQ_XPUB && (unique || (*data && verbose))) { + pending_data.push_back (blob_t (data, size)); + pending_flags.push_back (0); + } } - else + else { // Process user message coming upstream from xsub socket - pending.push_back (blob_t (data, size)); - + pending_data.push_back (blob_t (data, size)); + pending_flags.push_back (sub.flags ()); + } sub.close (); } } @@ -149,24 +151,27 @@ bool zmq::xpub_t::xhas_out () int zmq::xpub_t::xrecv (msg_t *msg_) { // If there is at least one - if (pending.empty ()) { + if (pending_data.empty ()) { errno = EAGAIN; return -1; } int rc = msg_->close (); errno_assert (rc == 0); - rc = msg_->init_size (pending.front ().size ()); + rc = msg_->init_size (pending_data.front ().size ()); errno_assert (rc == 0); - memcpy (msg_->data (), pending.front ().data (), - pending.front ().size ()); - pending.pop_front (); + memcpy (msg_->data (), + pending_data.front ().data (), + pending_data.front ().size ()); + msg_->set_flags (pending_flags.front ()); + pending_data.pop_front (); + pending_flags.pop_front (); return 0; } bool zmq::xpub_t::xhas_in () { - return !pending.empty (); + return !pending_data.empty (); } void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, @@ -180,7 +185,8 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, blob_t unsub (size_ + 1, 0); unsub [0] = 0; memcpy (&unsub [1], data_, size_); - self->pending.push_back (unsub); + self->pending_data.push_back (unsub); + self->pending_flags.push_back (0); } } diff --git a/src/xpub.hpp b/src/xpub.hpp index 0a799a77..457630ad 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -82,8 +82,8 @@ namespace zmq // List of pending (un)subscriptions, ie. those that were already // applied to the trie, but not yet received by the user. typedef std::basic_string blob_t; - typedef std::deque pending_t; - pending_t pending; + std::deque pending_data; + std::deque pending_flags; xpub_t (const xpub_t&); const xpub_t &operator = (const xpub_t&); diff --git a/src/xsub.cpp b/src/xsub.cpp index f37d944c..d30efbe8 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -83,7 +83,7 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_) int zmq::xsub_t::xsend (msg_t *msg_) { size_t size = msg_->size (); - unsigned char *data = (unsigned char*) msg_->data (); + unsigned char *data = (unsigned char *) msg_->data (); if (size > 0 && *data == 1) { // Process subscribe message