diff --git a/.gitignore b/.gitignore
index 6170b15a..22223e57 100644
--- a/.gitignore
+++ b/.gitignore
@@ -125,3 +125,4 @@ zeromq-*.tar.gz
zeromq-*.zip
core
+mybuild
diff --git a/Makefile.am b/Makefile.am
index 5008f7e4..7b796570 100755
--- a/Makefile.am
+++ b/Makefile.am
@@ -472,8 +472,8 @@ test_apps = \
tests/test_unbind_wildcard \
tests/test_ctx_options \
tests/test_ctx_destroy \
- tests/test_security_no_zap_handler \
- tests/test_security_null \
+ tests/test_security_no_zap_handler \
+ tests/test_security_null \
tests/test_security_plain \
tests/test_security_zap \
tests/test_iov \
@@ -1067,7 +1067,8 @@ test_apps += tests/test_poller \
tests/test_channel \
tests/test_hiccup_msg \
tests/test_zmq_ppoll_fd \
- tests/test_xsub_verbose
+ tests/test_xsub_verbose \
+ tests/test_pubsub_topics_count
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
@@ -1145,6 +1146,10 @@ tests_test_xsub_verbose_SOURCES = tests/test_xsub_verbose.cpp
tests_test_xsub_verbose_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_xsub_verbose_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
+tests_test_pubsub_topics_count_SOURCES = tests/test_pubsub_topics_count.cpp
+tests_test_pubsub_topics_count_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
+tests_test_pubsub_topics_count_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
+
if HAVE_FORK
test_apps += tests/test_zmq_ppoll_signals
diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 67eb6181..d912980d 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -983,6 +983,20 @@ Default value:: 8192
Applicable socket types:: All, when using TCP, IPC, PGM or NORM transport.
+ZMQ_TOPICS_COUNT: Number of topic subscriptions received
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+Gets the number of topic (prefix) subscriptions either
+* received on a (X)PUB socket from all the connected (X)SUB sockets or
+* acknowledged on an (X)SUB socket from all the connected (X)PUB sockets
+
+NOTE: in DRAFT state, not yet available in stable releases.
+
+[horizontal]
+Option value type:: int
+Option value unit:: N/A
+Default value:: 0
+Applicable socket types:: ZMQ_PUB, ZMQ_XPUB, ZMQ_SUB, ZMQ_XSUB
+
RETURN VALUE
------------
diff --git a/include/zmq.h b/include/zmq.h
index 4f1fb975..5f226018 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -680,6 +680,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_BUSY_POLL 113
#define ZMQ_HICCUP_MSG 114
#define ZMQ_XSUB_VERBOSE_UNSUBSCRIBE 115
+#define ZMQ_TOPICS_COUNT 116
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1
diff --git a/src/generic_mtrie.hpp b/src/generic_mtrie.hpp
index 15dfad63..6c309894 100644
--- a/src/generic_mtrie.hpp
+++ b/src/generic_mtrie.hpp
@@ -35,6 +35,7 @@ along with this program. If not, see .
#include "macros.hpp"
#include "stdint.hpp"
+#include "atomic_counter.hpp"
namespace zmq
{
@@ -83,12 +84,18 @@ template class generic_mtrie_t
void (*func_) (value_t *value_, Arg arg_),
Arg arg_);
+ // Retrieve the number of prefixes stored in this trie (added - removed)
+ // Note this is a multithread safe function.
+ uint32_t num_prefixes () const { return _num_prefixes.get (); }
+
private:
bool is_redundant () const;
typedef std::set pipes_t;
pipes_t *_pipes;
+ atomic_counter_t _num_prefixes;
+
unsigned char _min;
unsigned short _count;
unsigned short _live_nodes;
diff --git a/src/generic_mtrie_impl.hpp b/src/generic_mtrie_impl.hpp
index 19f3694e..96eec584 100644
--- a/src/generic_mtrie_impl.hpp
+++ b/src/generic_mtrie_impl.hpp
@@ -45,7 +45,7 @@ namespace zmq
{
template
generic_mtrie_t::generic_mtrie_t () :
- _pipes (0), _min (0), _count (0), _live_nodes (0)
+ _pipes (0), _num_prefixes (0), _min (0), _count (0), _live_nodes (0)
{
}
@@ -144,6 +144,8 @@ bool generic_mtrie_t::add (prefix_t prefix_, size_t size_, value_t *pipe_)
if (!it->_pipes) {
it->_pipes = new (std::nothrow) pipes_t;
alloc_assert (it->_pipes);
+
+ _num_prefixes.add (1);
}
it->_pipes->insert (pipe_);
@@ -535,6 +537,11 @@ generic_mtrie_t::rm (prefix_t prefix_, size_t size_, value_t *pipe_)
}
}
+ if (ret == last_value_removed) {
+ zmq_assert (_num_prefixes.get () > 0);
+ _num_prefixes.sub (1);
+ }
+
return ret;
}
diff --git a/src/radix_tree.cpp b/src/radix_tree.cpp
index 2a38ebf0..12e1534c 100644
--- a/src/radix_tree.cpp
+++ b/src/radix_tree.cpp
@@ -326,7 +326,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
_root._data = current_node._data;
else
parent_node.set_node_at (edge_index, current_node);
- ++_size;
+ _size.add (1);
return true;
}
@@ -362,7 +362,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
current_node.set_edge_at (0, key_node.prefix ()[0], key_node);
current_node.set_edge_at (1, split_node.prefix ()[0], split_node);
- ++_size;
+ _size.add (1);
parent_node.set_node_at (edge_index, current_node);
return true;
}
@@ -394,7 +394,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
current_node.set_edge_at (0, split_node.prefix ()[0], split_node);
current_node.set_refcount (1);
- ++_size;
+ _size.add (1);
parent_node.set_node_at (edge_index, current_node);
return true;
}
@@ -402,7 +402,7 @@ bool zmq::radix_tree_t::add (const unsigned char *key_, size_t key_size_)
zmq_assert (key_bytes_matched == key_size_);
zmq_assert (prefix_bytes_matched == current_node.prefix_length ());
- ++_size;
+ _size.add (1);
current_node.set_refcount (current_node.refcount () + 1);
return current_node.refcount () == 1;
}
@@ -424,7 +424,7 @@ bool zmq::radix_tree_t::rm (const unsigned char *key_, size_t key_size_)
return false;
current_node.set_refcount (current_node.refcount () - 1);
- --_size;
+ _size.sub (1);
if (current_node.refcount () > 0)
return false;
@@ -574,5 +574,5 @@ void zmq::radix_tree_t::apply (
size_t zmq::radix_tree_t::size () const
{
- return _size;
+ return _size.get ();
}
diff --git a/src/radix_tree.hpp b/src/radix_tree.hpp
index 02e74969..4c4aa0c0 100644
--- a/src/radix_tree.hpp
+++ b/src/radix_tree.hpp
@@ -33,6 +33,7 @@
#include
#include "stdint.hpp"
+#include "atomic_counter.hpp"
// Wrapper type for a node's data layout.
//
@@ -133,6 +134,7 @@ class radix_tree_t
void apply (void (*func_) (unsigned char *data, size_t size, void *arg),
void *arg_);
+ // Retrieve size of the radix tree. Note this is a multithread safe function.
size_t size () const;
private:
@@ -140,7 +142,7 @@ class radix_tree_t
match (const unsigned char *key_, size_t key_size_, bool is_lookup_) const;
node_t _root;
- size_t _size;
+ atomic_counter_t _size;
};
}
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index ee78d8b7..26fb277e 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -460,6 +460,12 @@ int zmq::socket_base_t::getsockopt (int option_,
return -1;
}
+ // First, check whether specific socket type overloads the option.
+ int rc = xgetsockopt (option_, optval_, optvallen_);
+ if (rc == 0 || errno != EINVAL) {
+ return rc;
+ }
+
if (option_ == ZMQ_RCVMORE) {
return do_getsockopt (optval_, optvallen_, _rcvmore ? 1 : 0);
}
@@ -1619,6 +1625,12 @@ int zmq::socket_base_t::xsetsockopt (int, const void *, size_t)
return -1;
}
+int zmq::socket_base_t::xgetsockopt (int, void *, size_t *)
+{
+ errno = EINVAL;
+ return -1;
+}
+
bool zmq::socket_base_t::xhas_out ()
{
return false;
diff --git a/src/socket_base.hpp b/src/socket_base.hpp
index 92deb9f7..01920e72 100644
--- a/src/socket_base.hpp
+++ b/src/socket_base.hpp
@@ -186,6 +186,11 @@ class socket_base_t : public own_t,
virtual int
xsetsockopt (int option_, const void *optval_, size_t optvallen_);
+ // The default implementation assumes there are no specific socket
+ // options for the particular socket type. If not so, ZMQ_FINAL this
+ // method.
+ virtual int xgetsockopt (int option_, void *optval_, size_t *optvallen_);
+
// The default implementation assumes that send is not supported.
virtual bool xhas_out ();
virtual int xsend (zmq::msg_t *msg_);
diff --git a/src/trie.hpp b/src/trie.hpp
index 32a2c2cb..8457c3ce 100644
--- a/src/trie.hpp
+++ b/src/trie.hpp
@@ -34,6 +34,7 @@
#include "macros.hpp"
#include "stdint.hpp"
+#include "atomic_counter.hpp"
namespace zmq
{
@@ -80,6 +81,53 @@ class trie_t
ZMQ_NON_COPYABLE_NOR_MOVABLE (trie_t)
};
+
+
+// lightweight wrapper around trie_t adding tracking of total number of prefixes
+class trie_with_size_t
+{
+ public:
+ trie_with_size_t () {}
+ ~trie_with_size_t () {}
+
+ bool add (unsigned char *prefix_, size_t size_)
+ {
+ if (_trie.add (prefix_, size_)) {
+ _num_prefixes.add (1);
+ return true;
+ } else
+ return false;
+ }
+
+ bool rm (unsigned char *prefix_, size_t size_)
+ {
+ if (_trie.rm (prefix_, size_)) {
+ _num_prefixes.sub (1);
+ return true;
+ } else
+ return false;
+ }
+
+ bool check (const unsigned char *data_, size_t size_) const
+ {
+ return _trie.check (data_, size_);
+ }
+
+ void apply (void (*func_) (unsigned char *data_, size_t size_, void *arg_),
+ void *arg_)
+ {
+ _trie.apply (func_, arg_);
+ }
+
+ // Retrieve the number of prefixes stored in this trie (added - removed)
+ // Note this is a multithread safe function.
+ uint32_t num_prefixes () const { return _num_prefixes.get (); }
+
+ private:
+ atomic_counter_t _num_prefixes;
+ trie_t _trie;
+};
+
}
#endif
diff --git a/src/xpub.cpp b/src/xpub.cpp
index a71543a3..acaed4c9 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -255,6 +255,21 @@ int zmq::xpub_t::xsetsockopt (int option_,
return 0;
}
+int zmq::xpub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_)
+{
+ if (option_ == ZMQ_TOPICS_COUNT) {
+ // make sure to use a multi-thread safe function to avoid race conditions with I/O threads
+ // where subscriptions are processed:
+ return do_getsockopt (optval_, optvallen_,
+ (int) _subscriptions.num_prefixes ());
+ }
+
+ // room for future options here
+
+ errno = EINVAL;
+ return -1;
+}
+
static void stub (zmq::mtrie_t::prefix_t data_, size_t size_, void *arg_)
{
LIBZMQ_UNUSED (data_);
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 82504b0e..32d7325a 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -62,6 +62,7 @@ class xpub_t : public socket_base_t
void xwrite_activated (zmq::pipe_t *pipe_) ZMQ_FINAL;
int
xsetsockopt (int option_, const void *optval_, size_t optvallen_) ZMQ_FINAL;
+ int xgetsockopt (int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL;
void xpipe_terminated (zmq::pipe_t *pipe_) ZMQ_FINAL;
private:
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 795e4bf3..d2192c90 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -121,6 +121,27 @@ int zmq::xsub_t::xsetsockopt (int option_,
return -1;
}
+int zmq::xsub_t::xgetsockopt (int option_, void *optval_, size_t *optvallen_)
+{
+ if (option_ == ZMQ_TOPICS_COUNT) {
+ // make sure to use a multi-thread safe function to avoid race conditions with I/O threads
+ // where subscriptions are processed:
+#ifdef ZMQ_USE_RADIX_TREE
+ uint64_t num_subscriptions = _subscriptions.size ();
+#else
+ uint64_t num_subscriptions = _subscriptions.num_prefixes ();
+#endif
+
+ return do_getsockopt (optval_, optvallen_,
+ (int) num_subscriptions);
+ }
+
+ // room for future options here
+
+ errno = EINVAL;
+ return -1;
+}
+
int zmq::xsub_t::xsend (msg_t *msg_)
{
size_t size = msg_->size ();
diff --git a/src/xsub.hpp b/src/xsub.hpp
index 2fc37ea5..14f3c7bb 100644
--- a/src/xsub.hpp
+++ b/src/xsub.hpp
@@ -60,6 +60,7 @@ class xsub_t : public socket_base_t
int xsetsockopt (int option_,
const void *optval_,
size_t optvallen_) ZMQ_OVERRIDE;
+ int xgetsockopt (int option_, void *optval_, size_t *optvallen_) ZMQ_FINAL;
int xsend (zmq::msg_t *msg_) ZMQ_OVERRIDE;
bool xhas_out () ZMQ_OVERRIDE;
int xrecv (zmq::msg_t *msg_) ZMQ_FINAL;
@@ -88,7 +89,7 @@ class xsub_t : public socket_base_t
#ifdef ZMQ_USE_RADIX_TREE
radix_tree_t _subscriptions;
#else
- trie_t _subscriptions;
+ trie_with_size_t _subscriptions;
#endif
// If true, send all unsubscription messages upstream, not just
diff --git a/src/zmq_draft.h b/src/zmq_draft.h
index 5378ce51..dfb005ad 100644
--- a/src/zmq_draft.h
+++ b/src/zmq_draft.h
@@ -72,6 +72,7 @@
#define ZMQ_BUSY_POLL 113
#define ZMQ_HICCUP_MSG 114
#define ZMQ_XSUB_VERBOSE_UNSUBSCRIBE 115
+#define ZMQ_TOPICS_COUNT 116
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1
diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt
index 63e3c37b..469e60d4 100644
--- a/tests/CMakeLists.txt
+++ b/tests/CMakeLists.txt
@@ -5,76 +5,76 @@ cmake_minimum_required(VERSION "2.8.1")
project(tests)
set(tests
- test_ancillaries
- test_system
- test_pair_inproc
- test_pair_tcp
- test_reqrep_inproc
- test_reqrep_tcp
- test_hwm
- test_hwm_pubsub
- test_reqrep_device
- test_sub_forward
- test_invalid_rep
- test_msg_flags
- test_msg_ffn
- test_connect_resolve
- test_immediate
- test_last_endpoint
- test_term_endpoint
- test_router_mandatory
- test_probe_router
- test_stream
- test_stream_empty
- test_stream_disconnect
- test_disconnect_inproc
- test_unbind_wildcard
- test_ctx_options
- test_ctx_destroy
- test_security_no_zap_handler
- test_security_null
- test_security_plain
- test_security_zap
- test_iov
- test_spec_req
- test_spec_rep
- test_spec_dealer
- test_spec_router
- test_spec_pushpull
- test_req_correlate
- test_req_relaxed
- test_conflate
- test_inproc_connect
- test_issue_566
- test_shutdown_stress
- test_timeo
- test_many_sockets
- test_diffserv
- test_connect_rid
- test_xpub_nodrop
- test_pub_invert_matching
- test_setsockopt
- test_sockopt_hwm
- test_heartbeats
- test_atomics
- test_bind_src_address
- test_capabilities
- test_metadata
- test_router_handover
- test_srcfd
- test_stream_timeout
- test_xpub_manual
- test_xpub_welcome_msg
- test_xpub_verbose
- test_base85
- test_bind_after_connect_tcp
- test_sodium
- test_monitor
- test_socket_null
- test_reconnect_ivl
- test_reconnect_options
- test_tcp_accept_filter
- test_mock_pub_sub)
+ test_ancillaries
+ test_system
+ test_pair_inproc
+ test_pair_tcp
+ test_reqrep_inproc
+ test_reqrep_tcp
+ test_hwm
+ test_hwm_pubsub
+ test_reqrep_device
+ test_sub_forward
+ test_invalid_rep
+ test_msg_flags
+ test_msg_ffn
+ test_connect_resolve
+ test_immediate
+ test_last_endpoint
+ test_term_endpoint
+ test_router_mandatory
+ test_probe_router
+ test_stream
+ test_stream_empty
+ test_stream_disconnect
+ test_disconnect_inproc
+ test_unbind_wildcard
+ test_ctx_options
+ test_ctx_destroy
+ test_security_no_zap_handler
+ test_security_null
+ test_security_plain
+ test_security_zap
+ test_iov
+ test_spec_req
+ test_spec_rep
+ test_spec_dealer
+ test_spec_router
+ test_spec_pushpull
+ test_req_correlate
+ test_req_relaxed
+ test_conflate
+ test_inproc_connect
+ test_issue_566
+ test_shutdown_stress
+ test_timeo
+ test_many_sockets
+ test_diffserv
+ test_connect_rid
+ test_xpub_nodrop
+ test_pub_invert_matching
+ test_setsockopt
+ test_sockopt_hwm
+ test_heartbeats
+ test_atomics
+ test_bind_src_address
+ test_capabilities
+ test_metadata
+ test_router_handover
+ test_srcfd
+ test_stream_timeout
+ test_xpub_manual
+ test_xpub_welcome_msg
+ test_xpub_verbose
+ test_base85
+ test_bind_after_connect_tcp
+ test_sodium
+ test_monitor
+ test_socket_null
+ test_reconnect_ivl
+ test_reconnect_options
+ test_tcp_accept_filter
+ test_mock_pub_sub)
if(NOT WIN32)
list(APPEND tests test_security_gssapi test_socks test_connect_null_fuzzer test_bind_null_fuzzer test_connect_fuzzer test_bind_fuzzer)
@@ -85,12 +85,14 @@ if(ZMQ_HAVE_CURVE)
if(NOT CMAKE_SYSTEM_NAME MATCHES "Linux")
list(APPEND tests test_security_curve)
endif()
+
if(NOT WIN32)
list(APPEND tests test_connect_curve_fuzzer test_bind_curve_fuzzer test_z85_decode_fuzzer)
endif()
endif()
option(ENABLE_CAPSH "Run tests that require sudo and capsh (for cap_net_admin)" OFF)
+
if(ENABLE_CAPSH)
find_program(CAPSH_PROGRAM NAMES capsh)
@@ -119,11 +121,14 @@ if(NOT WIN32)
test_router_mandatory_hwm
test_use_fd
test_zmq_poll_fd)
+
if(HAVE_FORK)
list(APPEND tests test_fork)
endif()
+
if(CMAKE_SYSTEM_NAME MATCHES "Linux")
list(APPEND tests test_abstract_ipc)
+
if(ZMQ_HAVE_TIPC)
list(
APPEND
@@ -167,10 +172,13 @@ if(ENABLE_DRAFTS)
test_hiccup_msg
test_zmq_ppoll_fd
test_xsub_verbose
-)
+ test_pubsub_topics_count
+ )
+
if(HAVE_FORK)
list(APPEND tests test_zmq_ppoll_signals)
endif()
+
if(ZMQ_HAVE_BUSY_POLL)
list(APPEND tests test_busy_poll)
endif()
@@ -178,6 +186,7 @@ endif()
if(ZMQ_HAVE_WS)
list(APPEND tests test_ws_transport)
+
if(ZMQ_HAVE_WSS)
list(APPEND tests test_wss_transport)
endif()
@@ -187,6 +196,7 @@ endif()
if(WIN32)
add_definitions(-DZMQ_CUSTOM_PLATFORM_HPP)
add_definitions(-D_WINSOCK_DEPRECATED_NO_WARNINGS)
+
# Same name on 64bit systems
link_libraries(ws2_32.lib)
endif()
@@ -200,22 +210,25 @@ target_compile_definitions(unity PUBLIC "UNITY_USE_COMMAND_LINE_ARGS" "UNITY_EXC
target_include_directories(unity PUBLIC "${CMAKE_CURRENT_LIST_DIR}/../external/unity")
set(TESTUTIL_SOURCES
- testutil.cpp
- testutil.hpp
- testutil_monitoring.cpp
- testutil_monitoring.hpp
- testutil_security.cpp
- testutil_security.hpp
- testutil_unity.cpp
- testutil_unity.hpp)
+ testutil.cpp
+ testutil.hpp
+ testutil_monitoring.cpp
+ testutil_monitoring.hpp
+ testutil_security.cpp
+ testutil_security.hpp
+ testutil_unity.cpp
+ testutil_unity.hpp)
+
if(BUILD_STATIC)
add_library(testutil-static STATIC ${TESTUTIL_SOURCES})
target_link_libraries(testutil-static libzmq-static ${OPTIONAL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} unity)
endif()
+
if(BUILD_SHARED)
add_library(testutil STATIC ${TESTUTIL_SOURCES})
target_link_libraries(testutil libzmq ${OPTIONAL_LIBRARIES} ${CMAKE_THREAD_LIBS_INIT} unity)
endif()
+
if(BUILD_STATIC AND NOT BUILD_SHARED)
# use testutil-static for both tests and unit tests
set(TESTUTIL_LIB testutil-static)
@@ -234,6 +247,7 @@ endif()
# add include dirs for all targets
include_directories("${ZeroMQ_SOURCE_DIR}/../include" "${ZeroMQ_BINARY_DIR}")
+
if(WIN32)
add_definitions(-D_CRT_NONSTDC_NO_DEPRECATE)
endif()
@@ -250,11 +264,14 @@ foreach(test ${tests})
else()
add_executable(${test} ${test}.cpp)
endif()
+
target_link_libraries(${test} ${TESTUTIL_LIB})
+
if(WIN32)
# This is the output for Debug dynamic builds on Visual Studio 6.0 You should provide the correct directory, don't
# know how to do it automatically
find_path(LIBZMQ_PATH "libzmq.lib" PATHS "../bin/Win32/Debug/v120/dynamic")
+
if(NOT ${LIBZMQ_PATH} STREQUAL "LIBZMQ_PATH-NOTFOUND")
set_target_properties(${test} PROPERTIES LINK_FLAGS "/LIBPATH:${LIBZMQ_PATH}")
endif()
@@ -286,6 +303,7 @@ foreach(test ${tests})
add_test(NAME ${test} COMMAND ${test})
endif()
endif()
+
set_tests_properties(${test} PROPERTIES TIMEOUT 10)
set_tests_properties(${test} PROPERTIES SKIP_RETURN_CODE 77)
endforeach()
@@ -301,6 +319,7 @@ if(NOT CMAKE_SYSTEM_NAME MATCHES "Linux")
if(ZMQ_HAVE_CURVE)
set_tests_properties(test_security_curve PROPERTIES TIMEOUT 60)
endif()
+
# add additional required flags ZMQ_USE_TWEETNACL will already be defined when not using sodium
if(ZMQ_HAVE_CURVE AND NOT ZMQ_USE_TWEETNACL)
target_compile_definitions(test_security_curve PRIVATE "-DZMQ_USE_TWEETNACL")
@@ -313,9 +332,11 @@ set_tests_properties(test_reconnect_ivl PROPERTIES TIMEOUT 15)
# Check whether all tests in the current folder are present
file(READ "${CMAKE_CURRENT_LIST_FILE}" CURRENT_LIST_FILE_CONTENT)
file(GLOB ALL_TEST_SOURCES "test_*.cpp")
+
foreach(TEST_SOURCE ${ALL_TEST_SOURCES})
get_filename_component(TESTNAME "${TEST_SOURCE}" NAME_WE)
string(REGEX MATCH "${TESTNAME}" MATCH_TESTNAME "${CURRENT_LIST_FILE_CONTENT}")
+
if(NOT MATCH_TESTNAME)
message(AUTHOR_WARNING "Test '${TESTNAME}' is not known to CTest.")
endif()
diff --git a/tests/test_pubsub_topics_count.cpp b/tests/test_pubsub_topics_count.cpp
new file mode 100644
index 00000000..f4fdbcc0
--- /dev/null
+++ b/tests/test_pubsub_topics_count.cpp
@@ -0,0 +1,151 @@
+/*
+ Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file
+
+ This file is part of libzmq, the ZeroMQ core engine in C++.
+
+ libzmq is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License (LGPL) as published
+ by the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ As a special exception, the Contributors give you permission to link
+ this library with independent modules to produce an executable,
+ regardless of the license terms of these independent modules, and to
+ copy and distribute the resulting executable under terms of your choice,
+ provided that you also meet, for each linked independent module, the
+ terms and conditions of the license of that module. An independent
+ module is a module which is not derived from or based on this library.
+ If you modify this library, you must extend this exception to your
+ version of the library.
+
+ libzmq is distributed in the hope that it will be useful, but WITHOUT
+ ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
+ License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#include "testutil.hpp"
+#include "testutil_unity.hpp"
+#include
+
+SETUP_TEARDOWN_TESTCONTEXT
+
+
+void settle_subscriptions (void *skt)
+{
+ // To kick the application thread, do a dummy getsockopt - users here
+ // should use the monitor and the other sockets in a poll.
+ unsigned long int dummy;
+ size_t dummy_size = sizeof (dummy);
+ msleep (SETTLE_TIME);
+ zmq_getsockopt (skt, ZMQ_EVENTS, &dummy, &dummy_size);
+}
+
+int get_subscription_count (void *skt)
+{
+ int num_subs = 0;
+ size_t num_subs_len = sizeof (num_subs);
+
+ settle_subscriptions (skt);
+ TEST_ASSERT_SUCCESS_ERRNO (
+ zmq_getsockopt (skt, ZMQ_TOPICS_COUNT, &num_subs, &num_subs_len));
+
+ return num_subs;
+}
+
+void test_independent_topic_prefixes ()
+{
+ // Create a publisher
+ void *publisher = test_context_socket (ZMQ_PUB);
+ char my_endpoint[MAX_SOCKET_STRING];
+
+ // Bind publisher
+ test_bind (publisher, "inproc://soname", my_endpoint, MAX_SOCKET_STRING);
+
+ // Create a subscriber
+ void *subscriber = test_context_socket (ZMQ_SUB);
+ TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subscriber, my_endpoint));
+
+ // Subscribe to 3 topics
+ TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
+ subscriber, ZMQ_SUBSCRIBE, "topicprefix1", strlen ("topicprefix1")));
+ TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
+ subscriber, ZMQ_SUBSCRIBE, "topicprefix2", strlen ("topicprefix2")));
+ TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
+ subscriber, ZMQ_SUBSCRIBE, "topicprefix3", strlen ("topicprefix3")));
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 3);
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 3);
+
+ // Remove first subscription and check subscriptions went 3 -> 2
+ TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
+ subscriber, ZMQ_UNSUBSCRIBE, "topicprefix3", strlen ("topicprefix3")));
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 2);
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 2);
+
+ // Remove other 2 subscriptions and check we're back to 0 subscriptions
+ TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
+ subscriber, ZMQ_UNSUBSCRIBE, "topicprefix1", strlen ("topicprefix1")));
+ TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
+ subscriber, ZMQ_UNSUBSCRIBE, "topicprefix2", strlen ("topicprefix2")));
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 0);
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 0);
+
+ // Clean up.
+ test_context_socket_close (publisher);
+ test_context_socket_close (subscriber);
+}
+
+void test_nested_topic_prefixes ()
+{
+ // Create a publisher
+ void *publisher = test_context_socket (ZMQ_PUB);
+ char my_endpoint[MAX_SOCKET_STRING];
+
+ // Bind publisher
+ test_bind (publisher, "inproc://soname", my_endpoint, MAX_SOCKET_STRING);
+
+ // Create a subscriber
+ void *subscriber = test_context_socket (ZMQ_SUB);
+ TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (subscriber, my_endpoint));
+
+ // Subscribe to 3 (nested) topics
+ TEST_ASSERT_SUCCESS_ERRNO (
+ zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "a", strlen ("a")));
+ TEST_ASSERT_SUCCESS_ERRNO (
+ zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "ab", strlen ("ab")));
+ TEST_ASSERT_SUCCESS_ERRNO (
+ zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "abc", strlen ("abc")));
+
+ // Even if the subscriptions are nested one into the other, the number of subscriptions
+ // received on the subscriber/publisher socket will be 3:
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 3);
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 3);
+
+ // Subscribe to other 3 (nested) topics
+ TEST_ASSERT_SUCCESS_ERRNO (
+ zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "xyz", strlen ("a")));
+ TEST_ASSERT_SUCCESS_ERRNO (
+ zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "xy", strlen ("ab")));
+ TEST_ASSERT_SUCCESS_ERRNO (
+ zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE, "x", strlen ("abc")));
+
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (subscriber), 6);
+ TEST_ASSERT_EQUAL_INT (get_subscription_count (publisher), 6);
+
+ // Clean up.
+ test_context_socket_close (publisher);
+ test_context_socket_close (subscriber);
+}
+
+int main ()
+{
+ setup_test_environment ();
+
+ UNITY_BEGIN ();
+ RUN_TEST (test_independent_topic_prefixes);
+ RUN_TEST (test_nested_topic_prefixes);
+ return UNITY_END ();
+}
diff --git a/unittests/unittest_mtrie.cpp b/unittests/unittest_mtrie.cpp
index e85f7f36..a03ace2c 100644
--- a/unittests/unittest_mtrie.cpp
+++ b/unittests/unittest_mtrie.cpp
@@ -81,6 +81,8 @@ void test_add_single_entry_match_exact ()
bool res = mtrie.add (test_name, getlen (test_name), &pipe);
TEST_ASSERT_TRUE (res);
+ TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
+
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
TEST_ASSERT_EQUAL_INT (1, count);
@@ -96,9 +98,11 @@ void test_add_single_entry_twice_match_exact ()
bool res = mtrie.add (test_name, getlen (test_name), &pipe);
TEST_ASSERT_TRUE (res);
+ TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
res = mtrie.add (test_name, getlen (test_name), &pipe);
TEST_ASSERT_FALSE (res);
+ TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
@@ -115,9 +119,11 @@ void test_add_two_entries_with_same_name_match_exact ()
bool res = mtrie.add (test_name, getlen (test_name), &pipe_1);
TEST_ASSERT_TRUE (res);
+ TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
res = mtrie.add (test_name, getlen (test_name), &pipe_2);
TEST_ASSERT_FALSE (res);
+ TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
@@ -136,9 +142,11 @@ void test_add_two_entries_match_prefix_and_exact ()
bool res = mtrie.add (test_name_prefix, getlen (test_name_prefix), &pipe_1);
TEST_ASSERT_TRUE (res);
+ TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
res = mtrie.add (test_name_full, getlen (test_name_full), &pipe_2);
TEST_ASSERT_TRUE (res);
+ TEST_ASSERT_EQUAL_INT (2, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name_full, getlen (test_name_full), mtrie_count, &count);
@@ -153,9 +161,11 @@ void test_add_rm_single_entry_match_exact ()
reinterpret_cast::prefix_t> ("foo");
mtrie.add (test_name, getlen (test_name), &pipe);
+ TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
zmq::generic_mtrie_t::rm_result res =
mtrie.rm (test_name, getlen (test_name), &pipe);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::last_value_removed, res);
+ TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
@@ -169,6 +179,7 @@ void test_rm_nonexistent_0_size_empty ()
zmq::generic_mtrie_t::rm_result res = mtrie.rm (0, 0, &pipe);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::not_found, res);
+ TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ());
}
void test_rm_nonexistent_empty ()
@@ -181,6 +192,7 @@ void test_rm_nonexistent_empty ()
zmq::generic_mtrie_t::rm_result res =
mtrie.rm (test_name, getlen (test_name), &pipe);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::not_found, res);
+ TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ());
int count = 0;
mtrie.match (test_name, getlen (test_name), mtrie_count, &count);
@@ -197,10 +209,12 @@ void test_add_and_rm_other (const char *add_name_, const char *rm_name_)
reinterpret_cast::prefix_t> (rm_name_);
mtrie.add (add_name_data, getlen (add_name_data), &addpipe);
+ TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
zmq::generic_mtrie_t::rm_result res =
mtrie.rm (rm_name_data, getlen (rm_name_data), &rmpipe);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::not_found, res);
+ TEST_ASSERT_EQUAL_INT (1, mtrie.num_prefixes ());
{
int count = 0;
@@ -249,7 +263,9 @@ void add_indexed_expect_unique (zmq::generic_mtrie_t &mtrie_,
reinterpret_cast::prefix_t> (names_[i_]);
bool res = mtrie_.add (name_data, getlen (name_data), &pipes_[i_]);
- TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::last_value_removed, res);
+ TEST_ASSERT_EQUAL (
+ zmq::generic_mtrie_t::last_value_removed,
+ res); // FIXME asserting equality between enum and bool? I think first arg for macro should be "true"
}
void test_rm_nonexistent_between ()
@@ -260,6 +276,7 @@ void test_rm_nonexistent_between ()
zmq::generic_mtrie_t mtrie;
add_indexed_expect_unique (mtrie, pipes, names, 0);
add_indexed_expect_unique (mtrie, pipes, names, 2);
+ TEST_ASSERT_EQUAL_INT (2, mtrie.num_prefixes ());
const zmq::generic_mtrie_t::prefix_t name_data =
reinterpret_cast::prefix_t> (names[1]);
@@ -267,6 +284,7 @@ void test_rm_nonexistent_between ()
zmq::generic_mtrie_t::rm_result res =
mtrie.rm (name_data, getlen (name_data), &pipes[1]);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::not_found, res);
+ TEST_ASSERT_EQUAL_INT (2, mtrie.num_prefixes ());
}
template
@@ -277,6 +295,7 @@ void add_entries (zmq::generic_mtrie_t &mtrie_,
for (size_t i = 0; i < N; ++i) {
add_indexed_expect_unique (mtrie_, pipes_, names_, i);
}
+ TEST_ASSERT_EQUAL_INT (N, mtrie_.num_prefixes ());
}
void test_add_multiple ()
@@ -306,6 +325,7 @@ void test_add_multiple_reverse ()
add_indexed_expect_unique (mtrie, pipes, names,
static_cast (i));
}
+ TEST_ASSERT_EQUAL_INT (3, mtrie.num_prefixes ());
for (size_t i = 0; i < 3; ++i) {
const zmq::generic_mtrie_t::prefix_t name_data =
@@ -330,6 +350,7 @@ template void add_and_rm_entries (const char *(&names_)[N])
mtrie.rm (name_data, getlen (name_data), &pipes[i]);
TEST_ASSERT_EQUAL (zmq::generic_mtrie_t::last_value_removed, res);
}
+ TEST_ASSERT_EQUAL_INT (0, mtrie.num_prefixes ());
}
void test_rm_multiple_in_order ()
@@ -394,8 +415,10 @@ void add_duplicate_entry (zmq::generic_mtrie_t &mtrie_, int (&pipes_)[2])
bool res = mtrie_.add (name_data, getlen (name_data), &pipes_[0]);
TEST_ASSERT_TRUE (res);
+ TEST_ASSERT_EQUAL_INT (1, mtrie_.num_prefixes ());
res = mtrie_.add (name_data, getlen (name_data), &pipes_[1]);
TEST_ASSERT_FALSE (res);
+ TEST_ASSERT_EQUAL_INT (1, mtrie_.num_prefixes ());
}
void test_rm_with_callback_duplicate ()