0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

Add ZMQ_TOPICS_COUNT socket option (#4459)

This commit is contained in:
Francesco Montorsi 2022-11-29 13:00:11 +01:00 committed by GitHub
parent 20de92ac0a
commit c59104a01d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 428 additions and 92 deletions

1
.gitignore vendored
View File

@ -125,3 +125,4 @@ zeromq-*.tar.gz
zeromq-*.zip
core
mybuild

View File

@ -472,8 +472,8 @@ test_apps = \
tests/test_unbind_wildcard \
tests/test_ctx_options \
tests/test_ctx_destroy \
tests/test_security_no_zap_handler \
tests/test_security_null \
tests/test_security_no_zap_handler \
tests/test_security_null \
tests/test_security_plain \
tests/test_security_zap \
tests/test_iov \
@ -1067,7 +1067,8 @@ test_apps += tests/test_poller \
tests/test_channel \
tests/test_hiccup_msg \
tests/test_zmq_ppoll_fd \
tests/test_xsub_verbose
tests/test_xsub_verbose \
tests/test_pubsub_topics_count
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
@ -1145,6 +1146,10 @@ tests_test_xsub_verbose_SOURCES = tests/test_xsub_verbose.cpp
tests_test_xsub_verbose_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_xsub_verbose_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_pubsub_topics_count_SOURCES = tests/test_pubsub_topics_count.cpp
tests_test_pubsub_topics_count_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_pubsub_topics_count_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
if HAVE_FORK
test_apps += tests/test_zmq_ppoll_signals

View File

@ -983,6 +983,20 @@ Default value:: 8192
Applicable socket types:: All, when using TCP, IPC, PGM or NORM transport.
ZMQ_TOPICS_COUNT: Number of topic subscriptions received
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Gets the number of topic (prefix) subscriptions either
* received on a (X)PUB socket from all the connected (X)SUB sockets or
* acknowledged on an (X)SUB socket from all the connected (X)PUB sockets
NOTE: in DRAFT state, not yet available in stable releases.
[horizontal]
Option value type:: int
Option value unit:: N/A
Default value:: 0
Applicable socket types:: ZMQ_PUB, ZMQ_XPUB, ZMQ_SUB, ZMQ_XSUB
RETURN VALUE
------------

View File

@ -680,6 +680,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_BUSY_POLL 113
#define ZMQ_HICCUP_MSG 114
#define ZMQ_XSUB_VERBOSE_UNSUBSCRIBE 115
#define ZMQ_TOPICS_COUNT 116
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1

View File

@ -35,6 +35,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
#include "macros.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
namespace zmq
{
@ -83,12 +84,18 @@ template <typename T> class generic_mtrie_t
void (*func_) (value_t *value_, Arg arg_),
Arg arg_);
// Retrieve the number of prefixes stored in this trie (added - removed)
// Note this is a multithread safe function.
uint32_t num_prefixes () const { return _num_prefixes.get (); }
private:
bool is_redundant () const;
typedef std::set<value_t *> pipes_t;
pipes_t *_pipes;
atomic_counter_t _num_prefixes;
unsigned char _min;
unsigned short _count;
unsigned short _live_nodes;

View File

@ -45,7 +45,7 @@ namespace zmq
{
template <typename T>
generic_mtrie_t<T>::generic_mtrie_t () :
_pipes (0), _min (0), _count (0), _live_nodes (0)
_pipes (0), _num_prefixes (0), _min (0), _count (0), _live_nodes (0)
{
}
@ -144,6 +144,8 @@ bool generic_mtrie_t<T>::add (prefix_t prefix_, size_t size_, value_t *pipe_)
if (!it->_pipes) {
it->_pipes = new (std::nothrow) pipes_t;
alloc_assert (it->_pipes);
_num_prefixes.add (1);
}
it->_pipes->insert (pipe_);
@ -535,6 +537,11 @@ generic_mtrie_t<T>::rm (prefix_t prefix_, size_t size_, value_t *pipe_)
}
}
if (ret == last_value_removed) {
zmq_assert (_num_prefixes.get () > 0);
_num_prefixes.sub (1);
}
return ret;
}

View File

@ -326,7 +326,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
_root._data = current_node._data;
else
parent_node.set_node_at (edge_index, current_node);
++_size;
_size.add (1);
return true;
}
@ -362,7 +362,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
current_node.set_edge_at (0, key_node.prefix ()[0], key_node);
current_node.set_edge_at (1, split_node.prefix ()[0], split_node);
++_size;
_size.add (1);
parent_node.set_node_at (edge_index, current_node);
return true;
}
@ -394,7 +394,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
current_node.set_edge_at (0, split_node.prefix ()[0], split_node);
current_node.set_refcount (1);
++_size;
_size.add (1);
parent_node.set_node_at (edge_index, current_node);
return true;
}
@ -402,7 +402,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
zmq_assert (key_bytes_matched == key_size_);
zmq_assert (prefix_bytes_matched == current_node.prefix_length ());
++_size;
_size.add (1);
current_node.set_refcount (current_node.refcount () + 1);
return current_node.refcount () == 1;
}
@ -424,7 +424,7 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
return false;
current_node.set_refcount (current_node.refcount () - 1);
--_size;
_size.sub (1);
if (current_node.refcount () > 0)
return false;
@ -574,5 +574,5 @@ void zmq::radix_tree_t::apply (
size_t zmq::radix_tree_t::size () const
{
return _size;
return _size.get ();
}

View File

@ -33,6 +33,7 @@
#include <stddef.h>
#include "stdint.hpp"
#include "atomic_counter.hpp"
// Wrapper type for a node's data layout.
//
@ -133,6 +134,7 @@ class radix_tree_t
void apply (void (*func_) (unsigned char *data, size_t size, void *arg),
void *arg_);
// Retrieve size of the radix tree. Note this is a multithread safe function.
size_t size () const;
private:
@ -140,7 +142,7 @@ class radix_tree_t
match (const unsigned char *key_, size_t key_size_, bool is_lookup_) const;
node_t _root;
size_t _size;
atomic_counter_t _size;
};
}

View File

@ -460,6 +460,12 @@ int zmq::socket_base_t::getsockopt (int option_,
return -1;
}
// First, check whether specific socket type overloads the option.
int rc = xgetsockopt (option_, optval_, optvallen_);
if (rc == 0 || errno != EINVAL) {
return rc;
}
if (option_ == ZMQ_RCVMORE) {
return do_getsockopt<int> (optval_, optvallen_, _rcvmore ? 1 : 0);
}
@ -1619,6 +1625,12 @@ int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
return -1;
}
int zmq::socket_base_t::xgetsockopt (int, void *, size_t *)
{
errno = EINVAL;
return -1;
}
bool zmq::socket_base_t::xhas_out ()
{
return false;

View File

@ -186,6 +186,11 @@ class socket_base_t : public own_t,
virtual int
xsetsockopt (int option_, const void *optval_, size_t optvallen_);
// The default implementation assumes there are no specific socket
// options for the particular socket type. If not so, ZMQ_FINAL this
// method.
virtual int xgetsockopt (int option_, void *optval_, size_t *optvallen_);
// The default implementation assumes that send is not supported.
virtual bool xhas_out ();
virtual int xsend (zmq::msg_t *msg_);

View File

@ -34,6 +34,7 @@
#include "macros.hpp"
#include "stdint.hpp"
#include "atomic_counter.hpp"
namespace zmq
{
@ -80,6 +81,53 @@ class trie_t
ZMQ_NON_COPYABLE_NOR_MOVABLE (trie_t)
};
// lightweight wrapper around trie_t adding tracking of total number of prefixes
class trie_with_size_t
{
public:
trie_with_size_t () {}
~trie_with_size_t () {}
bool add (unsigned char *prefix_, size_t size_)
{
if (_trie.add (prefix_, size_)) {
_num_prefixes.add (1);
return true;
} else
return false;
}
bool rm (unsigned char *prefix_, size_t size_)
{
if (_trie.rm (prefix_, size_)) {
_num_prefixes.sub (1);
return true;
} else
return false;
}
bool check (const unsigned char *data_, size_t size_) const
{
return _trie.check (data_, size_);
}
void apply (void (*func_) (unsigned char *data_, size_t size_, void *arg_),
void *arg_)
{
_trie.apply (func_, arg_);
}
// Retrieve the number of prefixes stored in this trie (added - removed)
// Note this is a multithread safe function.
uint32_t num_prefixes () const { return _num_prefixes.get (); }
private:
atomic_counter_t _num_prefixes;
trie_t _trie;
};
}
#endif

View File

@ -255,6 +255,21 @@ int zmq::xpub_t::xsetsockopt (int option_,
return 0;
}
int zmq::xpub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_)
{
if (option_ == ZMQ_TOPICS_COUNT) {
// make sure to use a multi-thread safe function to avoid race conditions with I/O threads
// where subscriptions are processed:
return do_getsockopt<int> (optval_, optvallen_,
(int) _subscriptions.num_prefixes ());
}
// room for future options here
errno = EINVAL;
return -1;
}
static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_)
{
LIBZMQ_UNUSED (data_);

View File

@ -62,6 +62,7 @@ class xpub_t : public socket_base_t
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
int
xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
int xgetsockopt (int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:

View File

@ -121,6 +121,27 @@ int zmq::xsub_t::xsetsockopt (int option_,
return -1;
}
int zmq::xsub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_)
{
if (option_ == ZMQ_TOPICS_COUNT) {
// make sure to use a multi-thread safe function to avoid race conditions with I/O threads
// where subscriptions are processed:
#ifdef ZMQ_USE_RADIX_TREE
uint64_t num_subscriptions = _subscriptions.size ();
#else
uint64_t num_subscriptions = _subscriptions.num_prefixes ();
#endif
return do_getsockopt<int> (optval_, optvallen_,
(int) num_subscriptions);
}
// room for future options here
errno = EINVAL;
return -1;
}
int zmq::xsub_t::xsend (msg_t *msg_)
{
size_t size = msg_->size ();

View File

@ -60,6 +60,7 @@ class xsub_t : public socket_base_t
int xsetsockopt (int option_,
const void *optval_,
size_t optvallen_) ZMQ_OVERRIDE;
int xgetsockopt (int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_OVERRIDE;
bool xhas_out () ZMQ_OVERRIDE;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
@ -88,7 +89,7 @@ class xsub_t : public socket_base_t
#ifdef ZMQ_USE_RADIX_TREE
radix_tree_t _subscriptions;
#else
trie_t _subscriptions;
trie_with_size_t _subscriptions;
#endif
// If true, send all unsubscription messages upstream, not just

View File

@ -72,6 +72,7 @@
#define ZMQ_BUSY_POLL 113
#define ZMQ_HICCUP_MSG 114
#define ZMQ_XSUB_VERBOSE_UNSUBSCRIBE 115
#define ZMQ_TOPICS_COUNT 116
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1

View File

@ -5,76 +5,76 @@ cmake_minimum_required(VERSION "2.8.1")
project(tests)
set(tests
test_ancillaries
test_system
test_pair_inproc
test_pair_tcp
test_reqrep_inproc
test_reqrep_tcp
test_hwm
test_hwm_pubsub
test_reqrep_device
test_sub_forward
test_invalid_rep
test_msg_flags
test_msg_ffn
test_connect_resolve
test_immediate
test_last_endpoint
test_term_endpoint
test_router_mandatory
test_probe_router
test_stream
test_stream_empty
test_stream_disconnect
test_disconnect_inproc
test_unbind_wildcard
test_ctx_options
test_ctx_destroy
test_security_no_zap_handler
test_security_null
test_security_plain
test_security_zap
test_iov
test_spec_req
test_spec_rep
test_spec_dealer
test_spec_router
test_spec_pushpull
test_req_correlate
test_req_relaxed
test_conflate
test_inproc_connect
test_issue_566
test_shutdown_stress
test_timeo
test_many_sockets
test_diffserv
test_connect_rid
test_xpub_nodrop
test_pub_invert_matching
test_setsockopt
test_sockopt_hwm
test_heartbeats
test_atomics
test_bind_src_address
test_capabilities
test_metadata
test_router_handover
test_srcfd
test_stream_timeout
test_xpub_manual
test_xpub_welcome_msg
test_xpub_verbose
test_base85
test_bind_after_connect_tcp
test_sodium
test_monitor
test_socket_null
test_reconnect_ivl
test_reconnect_options
test_tcp_accept_filter
test_mock_pub_sub)
test_ancillaries
test_system
test_pair_inproc
test_pair_tcp
test_reqrep_inproc
test_reqrep_tcp
test_hwm
test_hwm_pubsub
test_reqrep_device
test_sub_forward
test_invalid_rep
test_msg_flags
test_msg_ffn
test_connect_resolve
test_immediate
test_last_endpoint
test_term_endpoint
test_router_mandatory
test_probe_router
test_stream
test_stream_empty
test_stream_disconnect
test_disconnect_inproc
test_unbind_wildcard
test_ctx_options
test_ctx_destroy
test_security_no_zap_handler
test_security_null
test_security_plain
test_security_zap
test_iov
test_spec_req
test_spec_rep
test_spec_dealer
test_spec_router
test_spec_pushpull
test_req_correlate
test_req_relaxed
test_conflate
test_inproc_connect
test_issue_566
test_shutdown_stress
test_timeo
test_many_sockets
test_diffserv
test_connect_rid
test_xpub_nodrop
test_pub_invert_matching
test_setsockopt
test_sockopt_hwm
test_heartbeats
test_atomics
test_bind_src_address
test_capabilities
test_metadata
test_router_handover
test_srcfd
test_stream_timeout
test_xpub_manual
test_xpub_welcome_msg
test_xpub_verbose
test_base85
test_bind_after_connect_tcp
test_sodium
test_monitor
test_socket_null
test_reconnect_ivl
test_reconnect_options
test_tcp_accept_filter
test_mock_pub_sub)
if(NOT WIN32)
list(APPEND tests test_security_gssapi test_socks test_connect_null_fuzzer test_bind_null_fuzzer test_connect_fuzzer test_bind_fuzzer)
@ -85,12 +85,14 @@ if(ZMQ_HAVE_CURVE)
if(NOT CMAKE_SYSTEM_NAME MATCHES "Linux")
list(APPEND tests test_security_curve)
endif()
if(NOT WIN32)
list(APPEND tests test_connect_curve_fuzzer test_bind_curve_fuzzer test_z85_decode_fuzzer)
endif()
endif()
option(ENABLE_CAPSH "Run tests that require sudo and capsh (for cap_net_admin)" OFF)
if(ENABLE_CAPSH)
find_program(CAPSH_PROGRAM NAMES capsh)
@ -119,11 +121,14 @@ if(NOT WIN32)
test_router_mandatory_hwm
test_use_fd
test_zmq_poll_fd)
if(HAVE_FORK)
list(APPEND tests test_fork)
endif()
if(CMAKE_SYSTEM_NAME MATCHES "Linux")
list(APPEND tests test_abstract_ipc)
if(ZMQ_HAVE_TIPC)
list(
APPEND
@ -167,10 +172,13 @@ if(ENABLE_DRAFTS)
test_hiccup_msg
test_zmq_ppoll_fd
test_xsub_verbose
)
test_pubsub_topics_count
)
if(HAVE_FORK)
list(APPEND tests test_zmq_ppoll_signals)
endif()
if(ZMQ_HAVE_BUSY_POLL)
list(APPEND tests test_busy_poll)
endif()
@ -178,6 +186,7 @@ endif()
if(ZMQ_HAVE_WS)
list(APPEND tests test_ws_transport)
if(ZMQ_HAVE_WSS)
list(APPEND tests test_wss_transport)
endif()
@ -187,6 +196,7 @@ endif()
if(WIN32)
add_definitions(-DZMQ_CUSTOM_PLATFORM_HPP)
add_definitions(-D_WINSOCK_DEPRECATED_NO_WARNINGS)
# Same name on 64bit systems
link_libraries(ws2_32.lib)
endif()
@ -200,22 +210,25 @@ target_compile_definitions(unity PUBLIC "UNITY_USE_COMMAND_LINE_ARGS" "UNITY_EXC
target_include_directories(unity PUBLIC "${CMAKE_CURRENT_LIST_DIR}/../external/unity")
set(TESTUTIL_SOURCES
testutil.cpp
testutil.hpp
testutil_monitoring.cpp
testutil_monitoring.hpp
testutil_security.cpp
testutil_security.hpp
testutil_unity.cpp
testutil_unity.hpp)
testutil.cpp
testutil.hpp
testutil_monitoring.cpp
testutil_monitoring.hpp
testutil_security.cpp
testutil_security.hpp
testutil_unity.cpp
testutil_unity.hpp)
if(BUILD_STATIC)
add_library(testutil-static STATIC ${TESTUTIL_SOURCES})
target_link_libraries(testutil-static libzmq-static ${OPTIONAL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} unity)
endif()
if(BUILD_SHARED)
add_library(testutil STATIC ${TESTUTIL_SOURCES})
target_link_libraries(testutil libzmq ${OPTIONAL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} unity)
endif()
if(BUILD_STATIC AND NOT BUILD_SHARED)
# use testutil-static for both tests and unit tests
set(TESTUTIL_LIB testutil-static)
@ -234,6 +247,7 @@ endif()
# add include dirs for all targets
include_directories("${ZeroMQ_SOURCE_DIR}/../include" "${ZeroMQ_BINARY_DIR}")
if(WIN32)
add_definitions(-D_CRT_NONSTDC_NO_DEPRECATE)
endif()
@ -250,11 +264,14 @@ foreach(test ${tests})
else()
add_executable(${test} ${test}.cpp)
endif()
target_link_libraries(${test} ${TESTUTIL_LIB})
if(WIN32)
# This is the output for Debug dynamic builds on Visual Studio 6.0 You should provide the correct directory, don't
# know how to do it automatically
find_path(LIBZMQ_PATH "libzmq.lib" PATHS "../bin/Win32/Debug/v120/dynamic")
if(NOT ${LIBZMQ_PATH} STREQUAL "LIBZMQ_PATH-NOTFOUND")
set_target_properties(${test} PROPERTIES LINK_FLAGS "/LIBPATH:${LIBZMQ_PATH}")
endif()
@ -286,6 +303,7 @@ foreach(test ${tests})
add_test(NAME ${test} COMMAND ${test})
endif()
endif()
set_tests_properties(${test} PROPERTIES TIMEOUT 10)
set_tests_properties(${test} PROPERTIES SKIP_RETURN_CODE 77)
endforeach()
@ -301,6 +319,7 @@ if(NOT CMAKE_SYSTEM_NAME MATCHES "Linux")
if(ZMQ_HAVE_CURVE)
set_tests_properties(test_security_curve PROPERTIES TIMEOUT 60)
endif()
# add additional required flags ZMQ_USE_TWEETNACL will already be defined when not using sodium
if(ZMQ_HAVE_CURVE AND NOT ZMQ_USE_TWEETNACL)
target_compile_definitions(test_security_curve PRIVATE "-DZMQ_USE_TWEETNACL")
@ -313,9 +332,11 @@ set_tests_properties(test_reconnect_ivl PROPERTIES TIMEOUT 15)
# Check whether all tests in the current folder are present
file(READ "${CMAKE_CURRENT_LIST_FILE}" CURRENT_LIST_FILE_CONTENT)
file(GLOB ALL_TEST_SOURCES "test_*.cpp")
foreach(TEST_SOURCE ${ALL_TEST_SOURCES})
get_filename_component(TESTNAME "${TEST_SOURCE}" NAME_WE)
string(REGEX MATCH "${TESTNAME}" MATCH_TESTNAME "${CURRENT_LIST_FILE_CONTENT}")
if(NOT MATCH_TESTNAME)
message(AUTHOR_WARNING "Test '${TESTNAME}' is not known to CTest.")
endif()

View File

@ -0,0 +1,151 @@
/*
Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
#include <string.h>
SETUP_TEARDOWN_TESTCONTEXT
void settle_subscriptions (void *skt)
{
// To kick the application thread, do a dummy getsockopt - users here
// should use the monitor and the other sockets in a poll.
unsigned long int dummy;
size_t dummy_size = sizeof (dummy);
msleep (SETTLE_TIME);
zmq_getsockopt (skt, ZMQ_EVENTS, &dummy, &dummy_size);
}
int get_subscription_count (void *skt)
{
int num_subs = 0;
size_t num_subs_len = sizeof (num_subs);
settle_subscriptions (skt);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (skt, ZMQ_TOPICS_COUNT, &num_subs, &num_subs_len));
return num_subs;
}
void test_independent_topic_prefixes ()
{
// Create a publisher
void *publisher = test_context_socket (ZMQ_PUB);
char my_endpoint[MAX_SOCKET_STRING];
// Bind publisher
test_bind (publisher, "inproc://soname", my_endpoint, MAX_SOCKET_STRING);
// Create a subscriber
void *subscriber = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subscriber, my_endpoint));
// Subscribe to 3 topics
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
subscriber, ZMQ_SUBSCRIBE, "topicprefix1", strlen ("topicprefix1")));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
subscriber, ZMQ_SUBSCRIBE, "topicprefix2", strlen ("topicprefix2")));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
subscriber, ZMQ_SUBSCRIBE, "topicprefix3", strlen ("topicprefix3")));
TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 3);
TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 3);
// Remove first subscription and check subscriptions went 3 -> 2
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
subscriber, ZMQ_UNSUBSCRIBE, "topicprefix3", strlen ("topicprefix3")));
TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 2);
TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 2);
// Remove other 2 subscriptions and check we're back to 0 subscriptions
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
subscriber, ZMQ_UNSUBSCRIBE, "topicprefix1", strlen ("topicprefix1")));
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
subscriber, ZMQ_UNSUBSCRIBE, "topicprefix2", strlen ("topicprefix2")));
TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 0);
TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 0);
// Clean up.
test_context_socket_close (publisher);
test_context_socket_close (subscriber);
}
void test_nested_topic_prefixes ()
{
// Create a publisher
void *publisher = test_context_socket (ZMQ_PUB);
char my_endpoint[MAX_SOCKET_STRING];
// Bind publisher
test_bind (publisher, "inproc://soname", my_endpoint, MAX_SOCKET_STRING);
// Create a subscriber
void *subscriber = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subscriber, my_endpoint));
// Subscribe to 3 (nested) topics
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "a", strlen ("a")));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "ab", strlen ("ab")));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "abc", strlen ("abc")));
// Even if the subscriptions are nested one into the other, the number of subscriptions
// received on the subscriber/publisher socket will be 3:
TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 3);
TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 3);
// Subscribe to other 3 (nested) topics
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "xyz", strlen ("a")));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "xy", strlen ("ab")));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "x", strlen ("abc")));
TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 6);
TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 6);
// Clean up.
test_context_socket_close (publisher);
test_context_socket_close (subscriber);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_independent_topic_prefixes);
RUN_TEST (test_nested_topic_prefixes);
return UNITY_END ();
}

View File

@ -81,6 +81,8 @@ void test_add_single_entry_match_exact ()
bool res = mtrie.add (test_name, getlen (test_name), &pipe);
TEST_ASSERT_TRUE (res);
TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
TEST_ASSERT_EQUAL_INT (1, count);
@ -96,9 +98,11 @@ void test_add_single_entry_twice_match_exact ()
bool res = mtrie.add (test_name, getlen (test_name), &pipe);
TEST_ASSERT_TRUE (res);
TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
res = mtrie.add (test_name, getlen (test_name), &pipe);
TEST_ASSERT_FALSE (res);
TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
@ -115,9 +119,11 @@ void test_add_two_entries_with_same_name_match_exact ()
bool res = mtrie.add (test_name, getlen (test_name), &pipe_1);
TEST_ASSERT_TRUE (res);
TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
res = mtrie.add (test_name, getlen (test_name), &pipe_2);
TEST_ASSERT_FALSE (res);
TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
@ -136,9 +142,11 @@ void test_add_two_entries_match_prefix_and_exact ()
bool res = mtrie.add (test_name_prefix, getlen (test_name_prefix), &pipe_1);
TEST_ASSERT_TRUE (res);
TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
res = mtrie.add (test_name_full, getlen (test_name_full), &pipe_2);
TEST_ASSERT_TRUE (res);
TEST_ASSERT_EQUAL_INT (2, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name_full, getlen (test_name_full), mtrie_count, &count);
@ -153,9 +161,11 @@ void test_add_rm_single_entry_match_exact ()
reinterpret_cast<zmq::generic_mtrie_t<int>::prefix_t> ("foo");
mtrie.add (test_name, getlen (test_name), &pipe);
TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
zmq::generic_mtrie_t<int>::rm_result res =
mtrie.rm (test_name, getlen (test_name), &pipe);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t<int>::last_value_removed, res);
TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
@ -169,6 +179,7 @@ void test_rm_nonexistent_0_size_empty ()
zmq::generic_mtrie_t<int>::rm_result res = mtrie.rm (0, 0, &pipe);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t<int>::not_found, res);
TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ());
}
void test_rm_nonexistent_empty ()
@ -181,6 +192,7 @@ void test_rm_nonexistent_empty ()
zmq::generic_mtrie_t<int>::rm_result res =
mtrie.rm (test_name, getlen (test_name), &pipe);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t<int>::not_found, res);
TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
@ -197,10 +209,12 @@ void test_add_and_rm_other (const char *add_name_, const char *rm_name_)
reinterpret_cast<zmq::generic_mtrie_t<int>::prefix_t> (rm_name_);
mtrie.add (add_name_data, getlen (add_name_data), &addpipe);
TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
zmq::generic_mtrie_t<int>::rm_result res =
mtrie.rm (rm_name_data, getlen (rm_name_data), &rmpipe);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t<int>::not_found, res);
TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
{
int count = 0;
@ -249,7 +263,9 @@ void add_indexed_expect_unique (zmq::generic_mtrie_t<int> &mtrie_,
reinterpret_cast<zmq::generic_mtrie_t<int>::prefix_t> (names_[i_]);
bool res = mtrie_.add (name_data, getlen (name_data), &pipes_[i_]);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t<int>::last_value_removed, res);
TEST_ASSERT_EQUAL (
zmq::generic_mtrie_t<int>::last_value_removed,
res); // FIXME asserting equality between enum and bool? I think first arg for macro should be "true"
}
void test_rm_nonexistent_between ()
@ -260,6 +276,7 @@ void test_rm_nonexistent_between ()
zmq::generic_mtrie_t<int> mtrie;
add_indexed_expect_unique (mtrie, pipes, names, 0);
add_indexed_expect_unique (mtrie, pipes, names, 2);
TEST_ASSERT_EQUAL_INT (2, mtrie.num_prefixes ());
const zmq::generic_mtrie_t<int>::prefix_t name_data =
reinterpret_cast<zmq::generic_mtrie_t<int>::prefix_t> (names[1]);
@ -267,6 +284,7 @@ void test_rm_nonexistent_between ()
zmq::generic_mtrie_t<int>::rm_result res =
mtrie.rm (name_data, getlen (name_data), &pipes[1]);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t<int>::not_found, res);
TEST_ASSERT_EQUAL_INT (2, mtrie.num_prefixes ());
}
template <size_t N>
@ -277,6 +295,7 @@ void add_entries (zmq::generic_mtrie_t<int> &mtrie_,
for (size_t i = 0; i < N; ++i) {
add_indexed_expect_unique (mtrie_, pipes_, names_, i);
}
TEST_ASSERT_EQUAL_INT (N, mtrie_.num_prefixes ());
}
void test_add_multiple ()
@ -306,6 +325,7 @@ void test_add_multiple_reverse ()
add_indexed_expect_unique (mtrie, pipes, names,
static_cast<size_t> (i));
}
TEST_ASSERT_EQUAL_INT (3, mtrie.num_prefixes ());
for (size_t i = 0; i < 3; ++i) {
const zmq::generic_mtrie_t<int>::prefix_t name_data =
@ -330,6 +350,7 @@ template <size_t N> void add_and_rm_entries (const char *(&names_)[N])
mtrie.rm (name_data, getlen (name_data), &pipes[i]);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t<int>::last_value_removed, res);
}
TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ());
}
void test_rm_multiple_in_order ()
@ -394,8 +415,10 @@ void add_duplicate_entry (zmq::generic_mtrie_t<int> &mtrie_, int (&pipes_)[2])
bool res = mtrie_.add (name_data, getlen (name_data), &pipes_[0]);
TEST_ASSERT_TRUE (res);
TEST_ASSERT_EQUAL_INT (1, mtrie_.num_prefixes ());
res = mtrie_.add (name_data, getlen (name_data), &pipes_[1]);
TEST_ASSERT_FALSE (res);
TEST_ASSERT_EQUAL_INT (1, mtrie_.num_prefixes ());
}
void test_rm_with_callback_duplicate ()