From 8fd71d631d740a558b56456e08b55574c19ef761 Mon Sep 17 00:00:00 2001 From: David Jelenc Date: Sat, 5 Sep 2015 14:59:52 +0200 Subject: [PATCH] Fixed missing subscriptions on XPUB with manual subscriptions The patch fixes the issue #1568. --- src/xpub.cpp | 8 ++- src/xpub.hpp | 5 +- tests/test_xpub_manual.cpp | 117 +++++++++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 2 deletions(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index e8ad43ff..21367d9d 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -41,6 +41,7 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : more (false), lossy (true), manual(false), + pending_pipes (), welcome_msg () { last_pipe = NULL; @@ -90,7 +91,7 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) if (size > 0 && (*data == 0 || *data == 1)) { if (manual) { - last_pipe = pipe_; + pending_pipes.push_back(pipe_); pending_data.push_back(blob_t(data, size)); pending_metadata.push_back(sub.metadata()); pending_flags.push_back(0); @@ -243,6 +244,10 @@ int zmq::xpub_t::xrecv (msg_t *msg_) return -1; } + // User is reading a message, set last_pipe and remove it from the deque + last_pipe = pending_pipes.front (); + pending_pipes.pop_front (); + int rc = msg_->close (); errno_assert (rc == 0); rc = msg_->init_size (pending_data.front ().size ()); @@ -281,6 +286,7 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, if (size_ > 0) memcpy (&unsub [1], data_, size_); self->last_pipe = NULL; + self->pending_pipes.push_back (NULL); self->pending_data.push_back (unsub); self->pending_metadata.push_back (NULL); self->pending_flags.push_back (0); diff --git a/src/xpub.hpp b/src/xpub.hpp index 9508b1a0..1789ff53 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -99,9 +99,12 @@ namespace zmq // Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE bool manual; - // Last pipe send subscription message, only used if xpub is on manual + // Last pipe that sent subscription message, only used if xpub is on manual pipe_t *last_pipe; + // Pipes that sent subscriptions messages that have not yet been processed, only used if xpub is on manual + std::deque pending_pipes; + // Welcome message to send to pipe when attached msg_t welcome_msg; diff --git a/tests/test_xpub_manual.cpp b/tests/test_xpub_manual.cpp index bb55e87b..ba4c00ed 100644 --- a/tests/test_xpub_manual.cpp +++ b/tests/test_xpub_manual.cpp @@ -228,11 +228,128 @@ int test_xpub_proxy_unsubscribe_on_disconnect() return 0; } +int test_missing_subscriptions() +{ + const char* frontend = "ipc://frontend"; + const char* backend = "ipc://backend"; + const char* topic1 = "1"; + const char* topic2 = "2"; + const char* payload = "X"; + + int manual = 1; + + void *ctx = zmq_ctx_new (); + assert (ctx); + + // proxy frontend + void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB); + assert (xsub_proxy); + assert (zmq_bind (xsub_proxy, frontend) == 0); + + // proxy backend + void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB); + assert (xpub_proxy); + assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0); + assert (zmq_bind (xpub_proxy, backend) == 0); + + // publisher + void *pub = zmq_socket (ctx, ZMQ_PUB); + assert (zmq_connect (pub, frontend) == 0); + + // Here's the problem: because subscribers subscribe in quick succession, + // the proxy is unable to confirm the first subscription before receiving + // the second. This causes the first subscription to get lost. + + // first subscriber + void *sub1 = zmq_socket (ctx, ZMQ_SUB); + assert (sub1); + assert (zmq_connect (sub1, backend) == 0); + assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic1, 1) == 0); + + // second subscriber + void *sub2 = zmq_socket (ctx, ZMQ_SUB); + assert (sub2); + assert (zmq_connect (sub2, backend) == 0); + assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic2, 1) == 0); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // proxy now reroutes and confirms subscriptions + char buffer[2]; + assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2); + assert (buffer [0] == 1); + assert (buffer [1] == *topic1); + assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic1, 1) == 0); + assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2); + + assert (zmq_recv (xpub_proxy, buffer, 2, ZMQ_DONTWAIT) == 2); + assert (buffer [0] == 1); + assert (buffer [1] == *topic2); + assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic2, 1) == 0); + assert (zmq_send (xsub_proxy, buffer, 2, 0) == 2); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // let publisher send 2 msgs, each with its own topic + assert (zmq_send (pub, topic1, 1, ZMQ_SNDMORE) == 1); + assert (zmq_send (pub, payload, 1, 0) == 1); + assert (zmq_send (pub, topic2, 1, ZMQ_SNDMORE) == 1); + assert (zmq_send (pub, payload, 1, 0) == 1); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // proxy reroutes data messages to subscribers + char topic_buff [1]; + char data_buff [1]; + assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1); + assert (topic_buff [0] == *topic1); + assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1); + assert (data_buff [0] == *payload); + assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1); + assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1); + + assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1); + assert (topic_buff [0] == *topic2); + assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1); + assert (data_buff [0] == *payload); + assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1); + assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // each subscriber should now get a message + assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1); + assert (topic_buff [0] == *topic2); + assert (zmq_recv (sub2, data_buff, 1, ZMQ_DONTWAIT) == 1); + assert (data_buff [0] == *payload); + + assert (zmq_recv (sub1, topic_buff, 1, ZMQ_DONTWAIT) == 1); + assert (topic_buff [0] == *topic1); + assert (zmq_recv (sub1, data_buff, 1, ZMQ_DONTWAIT) == 1); + assert (data_buff [0] == *payload); + + // Clean up + assert (zmq_close (sub1) == 0); + assert (zmq_close (sub2) == 0); + assert (zmq_close (pub) == 0); + assert (zmq_close (xpub_proxy) == 0); + assert (zmq_close (xsub_proxy) == 0); + assert (zmq_ctx_term (ctx) == 0); + + return 0; +} + + int main(void) { setup_test_environment (); test_basic (); test_xpub_proxy_unsubscribe_on_disconnect (); + test_missing_subscriptions (); return 0; }