From 813c738137f430aed3b61beb8e31035b00d43cff Mon Sep 17 00:00:00 2001 From: Fedor Sheremetyev Date: Fri, 17 Jun 2016 11:36:13 +0100 Subject: [PATCH 1/2] Add test for consistent unsubscription in XPUB manual mode. Expect custom messages on both explicit unsubscription and pipe termination. --- tests/test_xpub_manual.cpp | 107 +++++++++++++++++++++++++++++++++++++ 1 file changed, 107 insertions(+) diff --git a/tests/test_xpub_manual.cpp b/tests/test_xpub_manual.cpp index 6f581ab3..1aa53288 100644 --- a/tests/test_xpub_manual.cpp +++ b/tests/test_xpub_manual.cpp @@ -90,6 +90,112 @@ int test_basic() return 0 ; } + +int test_unsubscribe_manual() +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + // Create a publisher + void *pub = zmq_socket (ctx, ZMQ_XPUB); + assert (pub); + int rc = zmq_bind (pub, "inproc://soname"); + assert (rc == 0); + + // set pub socket options + int manual = 1; + rc = zmq_setsockopt(pub, ZMQ_XPUB_MANUAL, &manual, 4); + assert (rc == 0); + + // Create a subscriber + void *sub = zmq_socket (ctx, ZMQ_XSUB); + assert (sub); + rc = zmq_connect (sub, "inproc://soname"); + assert (rc == 0); + + // Subscribe for A + char subscription1[2] = { 1, 'A'}; + rc = zmq_send_const(sub, subscription1, 2, 0); + assert (rc == 2); + + // Subscribe for B + char subscription2[2] = { 1, 'B'}; + rc = zmq_send_const(sub, subscription2, 2, 0); + assert (rc == 2); + + char buffer[3]; + + // Receive subscription "A" from subscriber + rc = zmq_recv(pub, buffer, 2, 0); + assert(rc == 2); + assert(buffer[0] == 1); + assert(buffer[1] == 'A'); + + // Subscribe socket for XA instead + rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "XA", 2); + assert(rc == 0); + + // Receive subscription "B" from subscriber + rc = zmq_recv(pub, buffer, 2, 0); + assert(rc == 2); + assert(buffer[0] == 1); + assert(buffer[1] == 'B'); + + // Subscribe socket for XB instead + rc = zmq_setsockopt(pub, ZMQ_SUBSCRIBE, "XB", 2); + assert(rc == 0); + + // Unsubscribe from A + char unsubscription1[2] = { 0, 'A'}; + rc = zmq_send_const(sub, unsubscription1, 2, 0); + assert (rc == 2); + + // Receive unsubscription "A" from subscriber + rc = zmq_recv(pub, buffer, 2, 0); + assert(rc == 2); + assert(buffer[0] == 0); + assert(buffer[1] == 'A'); + + // Unsubscribe socket from XA instead + rc = zmq_setsockopt(pub, ZMQ_UNSUBSCRIBE, "XA", 2); + assert(rc == 0); + + // Sending messages XA, XB + rc = zmq_send_const(pub, "XA", 2, 0); + assert(rc == 2); + rc = zmq_send_const(pub, "XB", 2, 0); + assert(rc == 2); + + // Subscriber should receive XB only + rc = zmq_recv(sub, buffer, 2, ZMQ_DONTWAIT); + assert(rc == 2); + assert(buffer[0] == 'X'); + assert(buffer[1] == 'B'); + + // Close subscriber + rc = zmq_close (sub); + assert (rc == 0); + + // Receive unsubscription "B" + rc = zmq_recv(pub, buffer, 2, 0); + assert(rc == 2); + assert(buffer[0] == 0); + assert(buffer[1] == 'B'); + + // Unsubscribe socket from XB instead + rc = zmq_setsockopt(pub, ZMQ_UNSUBSCRIBE, "XB", 2); + assert(rc == 0); + + // Clean up. + rc = zmq_close (pub); + assert (rc == 0); + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} + + int test_xpub_proxy_unsubscribe_on_disconnect(const char *frontend, const char *backend) { @@ -349,6 +455,7 @@ int main(void) { setup_test_environment (); test_basic (); + test_unsubscribe_manual (); const char *frontend; const char *backend; From baea4066835a28f2949fec84dfc41e49e14c10e8 Mon Sep 17 00:00:00 2001 From: Fedor Sheremetyev Date: Fri, 17 Jun 2016 11:40:17 +0100 Subject: [PATCH 2/2] Store manual subscriptions in XPUB and send them out on pipe termination. --- src/xpub.cpp | 23 +++++++++++++++++++---- src/xpub.hpp | 3 +++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index 565e2758..afc0ba2a 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -92,6 +92,12 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) if (size > 0 && (*data == 0 || *data == 1)) { if (manual) { + // Store manual subscription to use on termination + if (*data == 0) + manual_subscriptions.rm(data + 1, size - 1, pipe_); + else + manual_subscriptions.add(data + 1, size - 1, pipe_); + pending_pipes.push_back(pipe_); pending_data.push_back(blob_t(data, size)); pending_metadata.push_back(sub.metadata()); @@ -191,10 +197,19 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) { - // Remove the pipe from the trie. If there are topics that nobody - // is interested in anymore, send corresponding unsubscriptions - // upstream. - subscriptions.rm (pipe_, send_unsubscription, this, !(verbose_unsubs || manual)); + if (manual) + { + // Remove the pipe from the trie and send corresponding manual + // unsubscriptions upstream. + manual_subscriptions.rm (pipe_, send_unsubscription, this, false); + } + else + { + // Remove the pipe from the trie. If there are topics that nobody + // is interested in anymore, send corresponding unsubscriptions + // upstream. + subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs); + } dist.pipe_terminated (pipe_); } diff --git a/src/xpub.hpp b/src/xpub.hpp index 668677c3..e83978c8 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -79,6 +79,9 @@ namespace zmq // List of all subscriptions mapped to corresponding pipes. mtrie_t subscriptions; + // List of manual subscriptions mapped to corresponding pipes. + mtrie_t manual_subscriptions; + // Distributor of messages holding the list of outbound pipes. dist_t dist;