From 354491ddf2ca4d7623f887371b6d4ed250e08b4b Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sun, 25 Feb 2018 20:20:44 +0000 Subject: [PATCH] Problem: no test for ZMQ_XPUB_VERBOSE(R) Solution: add test_xpub_verbose to cover those APIs --- Makefile.am | 5 + tests/CMakeLists.txt | 1 + tests/test_xpub_verbose.cpp | 499 ++++++++++++++++++++++++++++++++++++ 3 files changed, 505 insertions(+) create mode 100644 tests/test_xpub_verbose.cpp diff --git a/Makefile.am b/Makefile.am index b1bde75c..746a56a5 100644 --- a/Makefile.am +++ b/Makefile.am @@ -417,6 +417,7 @@ test_apps = \ tests/test_xpub_nodrop \ tests/test_xpub_manual \ tests/test_xpub_welcome_msg \ + tests/test_xpub_verbose \ tests/test_atomics \ tests/test_sockopt_hwm \ tests/test_heartbeats \ @@ -623,6 +624,10 @@ tests_test_xpub_manual_LDADD = src/libzmq.la tests_test_xpub_welcome_msg_SOURCES = tests/test_xpub_welcome_msg.cpp tests_test_xpub_welcome_msg_LDADD = src/libzmq.la +tests_test_xpub_verbose_SOURCES = tests/test_xpub_verbose.cpp +tests_test_xpub_verbose_LDADD = src/libzmq.la ${UNITY_LIBS} +tests_test_xpub_verbose_CPPFLAGS = ${UNITY_CPPFLAGS} + tests_test_atomics_SOURCES = tests/test_atomics.cpp tests_test_atomics_LDADD = src/libzmq.la diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 345f006e..5f758354 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -65,6 +65,7 @@ set(tests test_stream_timeout test_xpub_manual test_xpub_welcome_msg + test_xpub_verbose test_base85 test_bind_after_connect_tcp test_sodium diff --git a/tests/test_xpub_verbose.cpp b/tests/test_xpub_verbose.cpp new file mode 100644 index 00000000..97906da6 --- /dev/null +++ b/tests/test_xpub_verbose.cpp @@ -0,0 +1,499 @@ +/* + Copyright (c) 2018 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +#include + +void setUp () +{ +} +void tearDown () +{ +} + +void test_xpub_verbose_one_sub () +{ + int rc; + char buffer[2]; + void *ctx = zmq_ctx_new (); + TEST_ASSERT_NOT_NULL (ctx); + + void *pub = zmq_socket (ctx, ZMQ_XPUB); + TEST_ASSERT_NOT_NULL (pub); + rc = zmq_bind (pub, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + void *sub = zmq_socket (ctx, ZMQ_SUB); + TEST_ASSERT_NOT_NULL (sub); + rc = zmq_connect (sub, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Subscribe for A + rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive subscriptions from subscriber + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + + // Subscribe socket for B instead + rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "B", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive subscriptions from subscriber + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'B'); + + // Subscribe again for A again + rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // This time it is duplicated, so it will be filtered out + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + int verbose = 1; + rc = zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Subscribe socket for A again + rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // This time with VERBOSE the duplicated sub will be received + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + + // Sending A message and B Message + rc = zmq_send_const (pub, "A", 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + rc = zmq_send_const (pub, "B", 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + rc = zmq_recv (sub, buffer, 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + assert (buffer[0] == 'A'); + + rc = zmq_recv (sub, buffer, 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + assert (buffer[0] == 'B'); + + // Clean up. + rc = zmq_close (pub); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_close (sub); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_ctx_term (ctx); + TEST_ASSERT_EQUAL_INT (0, rc); +} + +void test_xpub_verbose_two_subs () +{ + int rc; + char buffer[2]; + void *ctx = zmq_ctx_new (); + TEST_ASSERT_NOT_NULL (ctx); + + void *pub = zmq_socket (ctx, ZMQ_XPUB); + TEST_ASSERT_NOT_NULL (pub); + rc = zmq_bind (pub, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + void *sub0 = zmq_socket (ctx, ZMQ_SUB); + TEST_ASSERT_NOT_NULL (sub0); + rc = zmq_connect (sub0, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + void *sub1 = zmq_socket (ctx, ZMQ_SUB); + TEST_ASSERT_NOT_NULL (sub1); + rc = zmq_connect (sub1, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Subscribe for A on the first socket + rc = zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive subscriptions from subscriber + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + + // Subscribe for A on the second socket + rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // This time it is duplicated, so it will be filtered out + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + // Subscribe socket for B instead + rc = zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, "B", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive subscriptions from subscriber + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'B'); + + int verbose = 1; + rc = zmq_setsockopt (pub, ZMQ_XPUB_VERBOSE, &verbose, sizeof (int)); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Subscribe socket for A again + rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // This time with VERBOSE the duplicated sub will be received + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + + // Sending A message and B Message + rc = zmq_send_const (pub, "A", 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + rc = zmq_send_const (pub, "B", 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + rc = zmq_recv (sub0, buffer, 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + assert (buffer[0] == 'A'); + + rc = zmq_recv (sub1, buffer, 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + assert (buffer[0] == 'A'); + + rc = zmq_recv (sub0, buffer, 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + assert (buffer[0] == 'B'); + + // Clean up. + rc = zmq_close (pub); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_close (sub0); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_close (sub1); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_ctx_term (ctx); + TEST_ASSERT_EQUAL_INT (0, rc); +} + +void test_xpub_verboser_one_sub () +{ + int rc; + char buffer[3]; + void *ctx = zmq_ctx_new (); + TEST_ASSERT_NOT_NULL (ctx); + + // Create a publisher + void *pub = zmq_socket (ctx, ZMQ_XPUB); + TEST_ASSERT_NOT_NULL (pub); + rc = zmq_bind (pub, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Create a subscriber + void *sub = zmq_socket (ctx, ZMQ_SUB); + TEST_ASSERT_NOT_NULL (sub); + rc = zmq_connect (sub, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Unsubscribe for A, does not exist yet + rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Does not exist, so it will be filtered out by XSUB + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + // Subscribe for A + rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive subscriptions from subscriber + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + + // Subscribe again for A again, XSUB will increase refcount + rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // This time it is duplicated, so it will be filtered out by XPUB + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + // Unsubscribe for A, this time it exists in XPUB + rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // XSUB refcounts and will not actually send unsub to PUB until the number + // of unsubs match the earlier subs + rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive unsubscriptions from subscriber + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 0); + assert (buffer[1] == 'A'); + + // XSUB only sends the last and final unsub, so XPUB will only receive 1 + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + // Unsubscribe for A, does not exist anymore + rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Does not exist, so it will be filtered out by XSUB + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + int verbose = 1; + rc = zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Subscribe socket for A again + rc = zmq_setsockopt (sub, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive subscriptions from subscriber, did not exist anymore + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + + // Sending A message to make sure everything still works + rc = zmq_send_const (pub, "A", 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + rc = zmq_recv (sub, buffer, 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + assert (buffer[0] == 'A'); + + // Unsubscribe for A, this time it exists + rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive unsubscriptions from subscriber + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 0); + assert (buffer[1] == 'A'); + + // Unsubscribe for A again, it does not exist anymore so XSUB will filter + rc = zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // XSUB only sends unsub if it matched it in its trie, IOW: it will only + // send it if it existed in the first place even with XPUB_VERBBOSER + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + // Clean up. + rc = zmq_close (pub); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_close (sub); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_ctx_term (ctx); + TEST_ASSERT_EQUAL_INT (0, rc); +} + +void test_xpub_verboser_two_subs () +{ + int rc; + char buffer[3]; + void *ctx = zmq_ctx_new (); + TEST_ASSERT_NOT_NULL (ctx); + + void *pub = zmq_socket (ctx, ZMQ_XPUB); + TEST_ASSERT_NOT_NULL (pub); + rc = zmq_bind (pub, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + void *sub0 = zmq_socket (ctx, ZMQ_SUB); + TEST_ASSERT_NOT_NULL (sub0); + rc = zmq_connect (sub0, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + void *sub1 = zmq_socket (ctx, ZMQ_SUB); + TEST_ASSERT_NOT_NULL (sub1); + rc = zmq_connect (sub1, "inproc://soname"); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Subscribe for A + rc = zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive subscriptions from subscriber + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + + // Subscribe again for A on the other socket + rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // This time it is duplicated, so it will be filtered out by XPUB + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + // Unsubscribe for A, this time it exists in XPUB + rc = zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // sub1 is still subscribed, so no notification + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + // Unsubscribe the second socket to trigger the notification + rc = zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive unsubscriptions since all sockets are gone + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 0); + assert (buffer[1] == 'A'); + + // Make really sure there is only one notification + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + int verbose = 1; + rc = zmq_setsockopt (pub, ZMQ_XPUB_VERBOSER, &verbose, sizeof (int)); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Subscribe socket for A again + rc = zmq_setsockopt (sub0, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Subscribe socket for A again + rc = zmq_setsockopt (sub1, ZMQ_SUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive subscriptions from subscriber, did not exist anymore + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + + // VERBOSER is set, so subs from both sockets are received + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 1); + assert (buffer[1] == 'A'); + + // Sending A message to make sure everything still works + rc = zmq_send_const (pub, "A", 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + rc = zmq_recv (sub0, buffer, 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + assert (buffer[0] == 'A'); + + rc = zmq_recv (sub1, buffer, 1, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + assert (buffer[0] == 'A'); + + // Unsubscribe for A + rc = zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive unsubscriptions from first subscriber due to VERBOSER + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 0); + assert (buffer[1] == 'A'); + + // Unsubscribe for A again from the other socket + rc = zmq_setsockopt (sub0, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Receive unsubscriptions from first subscriber due to VERBOSER + rc = zmq_recv (pub, buffer, 2, 0); + TEST_ASSERT_EQUAL_INT (2, rc); + assert (buffer[0] == 0); + assert (buffer[1] == 'A'); + + // Unsubscribe again to make sure it gets filtered now + rc = zmq_setsockopt (sub1, ZMQ_UNSUBSCRIBE, "A", 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + // Unmatched, so XSUB filters even with VERBOSER + rc = zmq_recv (pub, buffer, 1, ZMQ_DONTWAIT); + TEST_ASSERT_EQUAL_INT (-1, rc); + TEST_ASSERT_EQUAL_INT (EAGAIN, errno); + + // Clean up. + rc = zmq_close (pub); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_close (sub0); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_close (sub1); + TEST_ASSERT_EQUAL_INT (0, rc); + rc = zmq_ctx_term (ctx); + TEST_ASSERT_EQUAL_INT (0, rc); +} + +int main (void) +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_xpub_verbose_one_sub); + RUN_TEST (test_xpub_verbose_two_subs); + RUN_TEST (test_xpub_verboser_one_sub); + RUN_TEST (test_xpub_verboser_two_subs); + + return 0; +}