From dd35e1db0f2f28387144cde7c13ea28d55ec0ae9 Mon Sep 17 00:00:00 2001 From: David Jelenc Date: Sat, 5 Sep 2015 12:07:50 +0200 Subject: [PATCH] Problem: lingering subscriptions on XPUB sockets (#1566) The patch fixes lingering subscriptions that occur upon disconnection on XPUB sockets with option XPUB_MANUAL when used in a XPUB-XSUB proxies. --- src/xpub.cpp | 17 +++-- tests/test_xpub_manual.cpp | 150 ++++++++++++++++++++++++++++++++++++- 2 files changed, 160 insertions(+), 7 deletions(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index af24f062..e8ad43ff 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -153,11 +153,17 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, manual = (*static_cast (optval_) != 0); } else - if (option_ == ZMQ_SUBSCRIBE && manual && last_pipe != NULL) - subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe); + if (option_ == ZMQ_SUBSCRIBE && manual) { + if (last_pipe != NULL) { + subscriptions.add((unsigned char *)optval_, optvallen_, last_pipe); + } + } else - if (option_ == ZMQ_UNSUBSCRIBE && manual && last_pipe != NULL) - subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe); + if (option_ == ZMQ_UNSUBSCRIBE && manual) { + if (last_pipe != NULL) { + subscriptions.rm((unsigned char *)optval_, optvallen_, last_pipe); + } + } else if (option_ == ZMQ_XPUB_WELCOME_MSG) { welcome_msg.close(); @@ -183,7 +189,7 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) // Remove the pipe from the trie. If there are topics that nobody // is interested in anymore, send corresponding unsubscriptions // upstream. - subscriptions.rm (pipe_, send_unsubscription, this, !verbose_unsubs); + subscriptions.rm (pipe_, send_unsubscription, this, !(verbose_unsubs || manual)); dist.pipe_terminated (pipe_); } @@ -274,6 +280,7 @@ void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_, unsub [0] = 0; if (size_ > 0) memcpy (&unsub [1], data_, size_); + self->last_pipe = NULL; self->pending_data.push_back (unsub); self->pending_metadata.push_back (NULL); self->pending_flags.push_back (0); diff --git a/tests/test_xpub_manual.cpp b/tests/test_xpub_manual.cpp index b32a008c..bb55e87b 100644 --- a/tests/test_xpub_manual.cpp +++ b/tests/test_xpub_manual.cpp @@ -29,9 +29,8 @@ #include "testutil.hpp" -int main (void) +int test_basic() { - setup_test_environment(); void *ctx = zmq_ctx_new (); assert (ctx); @@ -90,3 +89,150 @@ int main (void) return 0 ; } + +int test_xpub_proxy_unsubscribe_on_disconnect() +{ + const char* frontend = "ipc://frontend"; + const char* backend = "ipc://backend"; + const char* topic = "1"; + const char* payload = "X"; + + int manual = 1; + + void *ctx = zmq_ctx_new (); + assert (ctx); + + // proxy frontend + void *xsub_proxy = zmq_socket (ctx, ZMQ_XSUB); + assert (xsub_proxy); + assert (zmq_bind (xsub_proxy, frontend) == 0); + + // proxy backend + void *xpub_proxy = zmq_socket (ctx, ZMQ_XPUB); + assert (xpub_proxy); + assert (zmq_setsockopt (xpub_proxy, ZMQ_XPUB_MANUAL, &manual, 4) == 0); + assert (zmq_bind (xpub_proxy, backend) == 0); + + // publisher + void *pub = zmq_socket (ctx, ZMQ_PUB); + assert (zmq_connect (pub, frontend) == 0); + + // first subscriber subscribes + void *sub1 = zmq_socket (ctx, ZMQ_SUB); + assert (sub1); + assert (zmq_connect (sub1, backend) == 0); + assert (zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, topic, 1) == 0); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // proxy reroutes and confirms subscriptions + char sub_buff[2]; + assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2); + assert (sub_buff [0] == 1); + assert (sub_buff [1] == *topic); + assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic, 1) == 0); + assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2); + + // second subscriber subscribes + void *sub2 = zmq_socket (ctx, ZMQ_SUB); + assert (sub2); + assert (zmq_connect (sub2, backend) == 0); + assert (zmq_setsockopt (sub2, ZMQ_SUBSCRIBE, topic, 1) == 0); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // proxy reroutes + assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2); + assert (sub_buff [0] == 1); + assert (sub_buff [1] == *topic); + assert (zmq_setsockopt (xpub_proxy, ZMQ_SUBSCRIBE, topic, 1) == 0); + assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // let publisher send a msg + assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1); + assert (zmq_send (pub, payload, 1, 0) == 1); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // proxy reroutes data messages to subscribers + char topic_buff[1]; + char data_buff[1]; + assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == 1); + assert (topic_buff [0] == *topic); + assert (zmq_recv (xsub_proxy, data_buff, 1, ZMQ_DONTWAIT) == 1); + assert (data_buff [0] == *payload); + assert (zmq_send (xpub_proxy, topic_buff, 1, ZMQ_SNDMORE) == 1); + assert (zmq_send (xpub_proxy, data_buff, 1, 0) == 1); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // each subscriber should now get a message + assert (zmq_recv (sub2, topic_buff, 1, ZMQ_DONTWAIT) == 1); + assert (topic_buff [0] == *topic); + assert (zmq_recv (sub2, data_buff, 1, ZMQ_DONTWAIT) == 1); + assert (data_buff [0] == *payload); + + assert (zmq_recv (sub1, topic_buff, 1, ZMQ_DONTWAIT) == 1); + assert (topic_buff [0] == *topic); + assert (zmq_recv (sub1, data_buff, 1, ZMQ_DONTWAIT) == 1); + assert (data_buff [0] == *payload); + + // Disconnect both subscribers + assert (zmq_close (sub1) == 0); + assert (zmq_close (sub2) == 0); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // unsubscribe messages are passed from proxy to publisher + assert (zmq_recv (xpub_proxy, sub_buff, 2, 0) == 2); + assert (sub_buff [0] == 0); + assert (sub_buff [1] == *topic); + assert (zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic, 1) == 0); + assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2); + + // should receive another unsubscribe msg + assert (zmq_recv (xpub_proxy, sub_buff, 2, ZMQ_DONTWAIT) == 2 + && "Should receive the second unsubscribe message."); + assert (sub_buff [0] == 0); + assert (sub_buff [1] == *topic); + assert (zmq_setsockopt (xpub_proxy, ZMQ_UNSUBSCRIBE, topic, 1) == 0); + assert (zmq_send (xsub_proxy, sub_buff, 2, 0) == 2); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // let publisher send a msg + assert (zmq_send (pub, topic, 1, ZMQ_SNDMORE) == 1); + assert (zmq_send (pub, payload, 1, 0) == 1); + + // wait + assert (zmq_poll (0, 0, 100) == 0); + + // nothing should come to the proxy + assert (zmq_recv (xsub_proxy, topic_buff, 1, ZMQ_DONTWAIT) == -1); + assert (errno == EAGAIN); + + assert (zmq_close (pub) == 0); + assert (zmq_close (xpub_proxy) == 0); + assert (zmq_close (xsub_proxy) == 0); + assert (zmq_ctx_term (ctx) == 0); + + return 0; +} + +int main(void) +{ + setup_test_environment (); + test_basic (); + test_xpub_proxy_unsubscribe_on_disconnect (); + + return 0; +}