diff --git a/src/xpub.cpp b/src/xpub.cpp index e30ada58..174fbe31 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -34,6 +34,7 @@ #include "pipe.hpp" #include "err.hpp" #include "msg.hpp" +#include "macros.hpp" zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), @@ -203,6 +204,13 @@ int zmq::xpub_t::xsetsockopt (int option_, const void *optval_, return 0; } +static void stub (unsigned char *data_, size_t size_, void *arg_) +{ + LIBZMQ_UNUSED(data_); + LIBZMQ_UNUSED(size_); + LIBZMQ_UNUSED(arg_); +} + void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) { if (manual) @@ -210,6 +218,10 @@ void zmq::xpub_t::xpipe_terminated (pipe_t *pipe_) // Remove the pipe from the trie and send corresponding manual // unsubscriptions upstream. manual_subscriptions.rm (pipe_, send_unsubscription, this, false); + // Remove pipe without actually sending the message as it was taken + // care of by the manual call above. subscriptions is the real mtrie, + // so the pipe must be removed from there or it will be left over. + subscriptions.rm (pipe_, stub, NULL, false); } else { diff --git a/tests/test_xpub_manual.cpp b/tests/test_xpub_manual.cpp index a64268f9..5ec49edc 100644 --- a/tests/test_xpub_manual.cpp +++ b/tests/test_xpub_manual.cpp @@ -466,6 +466,124 @@ int test_missing_subscriptions(void) } +int test_unsubscribe_cleanup (void) +{ + size_t len = MAX_SOCKET_STRING; + char my_endpoint[MAX_SOCKET_STRING]; + + void *ctx = zmq_ctx_new (); + assert (ctx); + + // Create a publisher + void *pub = zmq_socket (ctx, ZMQ_XPUB); + assert (pub); + int manual = 1; + int rc = zmq_setsockopt (pub, ZMQ_XPUB_MANUAL, &manual, 4); + assert (rc == 0); + rc = zmq_bind (pub, "tcp://127.0.0.1:*"); + assert (rc == 0); + rc = zmq_getsockopt (pub, ZMQ_LAST_ENDPOINT, my_endpoint, &len); + assert (rc == 0); + + // Create a subscriber + void *sub = zmq_socket (ctx, ZMQ_XSUB); + assert (sub); + rc = zmq_connect (sub, my_endpoint); + assert (rc == 0); + + // Subscribe for A + char subscription[2] = { 1, 'A'}; + rc = zmq_send_const (sub, subscription, 2, 0); + assert (rc == 2); + + char buffer[2]; + + // Receive subscriptions from subscriber + rc = zmq_recv(pub, buffer, 2, 0); + assert (rc == 2); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XA", 2); + assert (rc == 0); + + // send 2 messages + rc = zmq_send_const (pub, "XA", 2, 0); + assert (rc == 2); + rc = zmq_send_const (pub, "XB", 2, 0); + assert (rc == 2); + + // receive the single message + rc = zmq_recv (sub, buffer, 2, 0); + assert (rc == 2); + assert (buffer[0] == 'X'); + assert (buffer[1] == 'A'); + + // should be nothing left in the queue + rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT); + assert (rc == -1); + + // close the socket + rc = zmq_close (sub); + assert (rc == 0); + + // closing the socket will result in an unsubscribe event + rc = zmq_recv (pub, buffer, 2, 0); + assert (rc == 2); + assert (buffer[0] == 0); + assert (buffer[1] == 'A'); + + // this doesn't really do anything + // there is no last_pipe set it will just fail silently + rc = zmq_setsockopt (pub, ZMQ_UNSUBSCRIBE, "XA", 2); + assert (rc == 0); + + // reconnect + sub = zmq_socket (ctx, ZMQ_XSUB); + rc = zmq_connect (sub, my_endpoint); + assert (rc == 0); + + // send a subscription for B + subscription[0] = 1; + subscription[1] = 'B'; + rc = zmq_send (sub, subscription, 2, 0); + assert (rc == 2); + + // receive the subscription, overwrite it to XB + rc = zmq_recv (pub, buffer, 2, 0); + assert (rc == 2); + assert (buffer[0] == 1); + assert(buffer[1] == 'B'); + rc = zmq_setsockopt (pub, ZMQ_SUBSCRIBE, "XB", 2); + assert (rc == 0); + + // send 2 messages + rc = zmq_send_const (pub, "XA", 2, 0); + assert (rc == 2); + rc = zmq_send_const (pub, "XB", 2, 0); + assert (rc == 2); + + // receive the single message + rc = zmq_recv (sub, buffer, 2, 0); + assert (rc == 2); + assert (buffer[0] == 'X'); + assert (buffer[1] == 'B'); // this assertion will fail + + // should be nothing left in the queue + rc = zmq_recv (sub, buffer, 2, ZMQ_DONTWAIT); + assert (rc == -1); + + // Clean up. + rc = zmq_close (pub); + assert (rc == 0); + rc = zmq_close (sub); + assert (rc == 0); + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0 ; +} + + int main(void) { setup_test_environment (); @@ -473,6 +591,7 @@ int main(void) test_unsubscribe_manual (); test_xpub_proxy_unsubscribe_on_disconnect (); test_missing_subscriptions (); + test_unsubscribe_cleanup (); return 0; }