From 9bcfc251f42f699be1e5565826eae68c2f85852d Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sat, 18 May 2019 16:02:30 +0100 Subject: [PATCH 1/5] Problem: new test_xpub_manual_last_value not built by automake Solution: list it --- Makefile.am | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Makefile.am b/Makefile.am index 74684718..5b63f64a 100644 --- a/Makefile.am +++ b/Makefile.am @@ -966,6 +966,7 @@ test_apps += tests/test_poller \ tests/test_scatter_gather \ tests/test_dgram \ tests/test_app_meta \ + tests/test_xpub_manual_last_value \ tests/test_router_notify tests_test_poller_SOURCES = tests/test_poller.cpp @@ -996,6 +997,10 @@ tests_test_dgram_SOURCES = tests/test_dgram.cpp tests_test_dgram_LDADD = src/libzmq.la ${TESTUTIL_LIBS} tests_test_dgram_CPPFLAGS = ${TESTUTIL_CPPFLAGS} +tests_test_xpub_manual_last_value_SOURCES = tests/test_xpub_manual_last_value.cpp +tests_test_xpub_manual_last_value_LDADD = src/libzmq.la ${TESTUTIL_LIBS} +tests_test_xpub_manual_last_value_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + tests_test_app_meta_SOURCES = tests/test_app_meta.cpp tests_test_app_meta_LDADD = src/libzmq.la ${TESTUTIL_LIBS} tests_test_app_meta_CPPFLAGS = ${TESTUTIL_CPPFLAGS} From a53dfe936a3690f03ff7a45691d09a6be42467ae Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sat, 18 May 2019 16:08:10 +0100 Subject: [PATCH 2/5] Problem: many unnecessary ifdefs covering new xpub option Solution: remove them, only public headers have to be ifdef'd --- src/xpub.cpp | 17 ++--------------- src/xpub.hpp | 2 -- 2 files changed, 2 insertions(+), 17 deletions(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index 689d286a..131d3c90 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -44,9 +44,7 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : _more (false), _lossy (true), _manual (false), -#ifdef ZMQ_BUILD_DRAFT_API _send_last_pipe (false), -#endif _pending_pipes (), _welcome_msg () { @@ -193,10 +191,8 @@ int zmq::xpub_t::xsetsockopt (int option_, size_t optvallen_) { if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER -#ifdef ZMQ_BUILD_DRAFT_API - || option_ == ZMQ_XPUB_MANUAL_LAST_VALUE -#endif - || option_ == ZMQ_XPUB_NODROP || option_ == ZMQ_XPUB_MANUAL) { + || option_ == ZMQ_XPUB_MANUAL_LAST_VALUE || option_ == ZMQ_XPUB_NODROP + || option_ == ZMQ_XPUB_MANUAL) { if (optvallen_ != sizeof (int) || *static_cast (optval_) < 0) { errno = EINVAL; @@ -208,11 +204,9 @@ int zmq::xpub_t::xsetsockopt (int option_, } else if (option_ == ZMQ_XPUB_VERBOSER) { _verbose_subs = (*static_cast (optval_) != 0); _verbose_unsubs = _verbose_subs; -#ifdef ZMQ_BUILD_DRAFT_API } else if (option_ == ZMQ_XPUB_MANUAL_LAST_VALUE) { _manual = (*static_cast (optval_) != 0); _send_last_pipe = _manual; -#endif } else if (option_ == ZMQ_XPUB_NODROP) _lossy = (*static_cast (optval_) == 0); else if (option_ == ZMQ_XPUB_MANUAL) @@ -276,13 +270,11 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, xpub_t *self_) self_->_dist.match (pipe_); } -#ifdef ZMQ_BUILD_DRAFT_API void zmq::xpub_t::mark_last_pipe_as_matching (pipe_t *pipe_, xpub_t *self_) { if (self_->_last_pipe == pipe_) self_->_dist.match (pipe_); } -#endif int zmq::xpub_t::xsend (msg_t *msg_) { @@ -290,7 +282,6 @@ int zmq::xpub_t::xsend (msg_t *msg_) // For the first part of multi-part message, find the matching pipes. if (!_more) { -#ifdef ZMQ_BUILD_DRAFT_API if (_manual && _last_pipe && _send_last_pipe) { _subscriptions.match (static_cast (msg_->data ()), msg_->size (), mark_last_pipe_as_matching, @@ -299,10 +290,6 @@ int zmq::xpub_t::xsend (msg_t *msg_) } else _subscriptions.match (static_cast (msg_->data ()), msg_->size (), mark_as_matching, this); -#else - _subscriptions.match (static_cast (msg_->data ()), - msg_->size (), mark_as_matching, this); -#endif // If inverted matching is used, reverse the selection now if (options.invert_matching) { _dist.reverse_match (); diff --git a/src/xpub.hpp b/src/xpub.hpp index 21456faf..f8fa960f 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -99,13 +99,11 @@ class xpub_t : public socket_base_t // Subscriptions will not bed added automatically, only after calling set option with ZMQ_SUBSCRIBE or ZMQ_UNSUBSCRIBE bool _manual; -#ifdef ZMQ_BUILD_DRAFT_API // Send message to the last pipe, only used if xpub is on manual and after calling set option with ZMQ_SUBSCRIBE bool _send_last_pipe; // Function to be applied to match the last pipe. static void mark_last_pipe_as_matching (zmq::pipe_t *pipe_, xpub_t *arg_); -#endif // Last pipe that sent subscription message, only used if xpub is on manual pipe_t *_last_pipe; From 41be0f53869b2ccf97a6dfed9024b0dd793dd004 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sat, 18 May 2019 16:11:33 +0100 Subject: [PATCH 3/5] Problem: new xpub option is in critical path Solution: use unlikely to optimize for the most common case --- src/xpub.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/xpub.cpp b/src/xpub.cpp index 131d3c90..e2015df7 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -282,7 +282,7 @@ int zmq::xpub_t::xsend (msg_t *msg_) // For the first part of multi-part message, find the matching pipes. if (!_more) { - if (_manual && _last_pipe && _send_last_pipe) { + if (unlikely (_manual && _last_pipe && _send_last_pipe)) { _subscriptions.match (static_cast (msg_->data ()), msg_->size (), mark_last_pipe_as_matching, this); From 797439c8e2fe4b845856115c44902d8d9e371b86 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sat, 18 May 2019 16:14:58 +0100 Subject: [PATCH 4/5] Problem: typos in manpage entry for ZMQ_XPUB_MANUAL_LAST_VALUE Solution: fix them --- doc/zmq_setsockopt.txt | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 3f1b8c8c..1680a71a 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -1074,12 +1074,12 @@ Applicable socket types:: ZMQ_XPUB ZMQ_XPUB_MANUAL_LAST_VALUE: change the subscription handling to manual -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ This option is similar to ZMQ_XPUB_MANUAL. -What is the difference, ZMQ_XPUB_MANUAL_LAST_VALUE sets the 'XPUB' socket -behaviour to send the first message to the last subscriber after the 'XPUB' socket -recieve a subscription and call setsockopt with ZMQ_SUBSCRIBE on 'XPUB' socket. -This prevent duplicated message when use last value caching(LVC). +The difference is that ZMQ_XPUB_MANUAL_LAST_VALUE changes the 'XPUB' socket +behaviour to send the first message to the last subscriber after the socket +receives a subscription and call setsockopt with ZMQ_SUBSCRIBE on 'XPUB' socket. +This prevents duplicated messages when using last value caching(LVC). NOTE: in DRAFT state, not yet available in stable releases. From 19ff4d0b6a36a0d42c04001a9e3fa90064d76451 Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sat, 18 May 2019 16:22:46 +0100 Subject: [PATCH 5/5] Problem: zmq_socket_monitor_versioned_typed duplicates zmq_socket_monitor_versioned Solution: unify the two APIs, as they are both still in DRAFT state and thus can be changed. --- doc/zmq_socket_monitor_versioned.txt | 22 ++------------ include/zmq.h | 6 +--- src/zmq.cpp | 24 ++++----------- src/zmq_draft.h | 6 +--- tests/test_monitor.cpp | 44 +++++++++++++--------------- 5 files changed, 31 insertions(+), 71 deletions(-) diff --git a/doc/zmq_socket_monitor_versioned.txt b/doc/zmq_socket_monitor_versioned.txt index f70c9a19..5339f404 100644 --- a/doc/zmq_socket_monitor_versioned.txt +++ b/doc/zmq_socket_monitor_versioned.txt @@ -10,9 +10,7 @@ zmq_socket_monitor_versioned - monitor socket events SYNOPSIS -------- -*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version');* -*int zmq_socket_monitor_versioned_typed ( - void '*socket', char '*endpoint', uint64_t 'events', int 'event_version', int 'type');* +*int zmq_socket_monitor_versioned (void '*socket', char '*endpoint', uint64_t 'events', int 'event_version', int 'type');* *int zmq_socket_monitor_pipes_stats (void '*socket');* @@ -58,11 +56,8 @@ connection uses a bound or connected local endpoint. Note that the format of the second and further frames, and also the number of frames, may be different for events added in the future. -The _zmq_socket_monitor_versioned_typed()_ is a generalisation of -_zmq_socket_monitor_versioned_ that supports more monitoring socket types. The 'type' argument is used to specify the type of the monitoring socket. -Supported types are 'ZMQ_PAIR' (which is the equivalent of -_zmq_socket_monitor_versioned_), 'ZMQ_PUB' and 'ZMQ_PUSH'. Note that consumers +Supported types are 'ZMQ_PAIR', 'ZMQ_PUB' and 'ZMQ_PUSH'. Note that consumers of the events will have to be compatible with the socket type, for instance a monitoring socket of type 'ZMQ_PUB' will require consumers of type 'ZMQ_SUB'. In the case that the monitoring socket type is of 'ZMQ_PUB', the multipart @@ -224,19 +219,6 @@ The 0MQ 'context' associated with the specified 'socket' was terminated. The transport protocol of the monitor 'endpoint' is not supported. Monitor sockets are required to use the inproc:// transport. -*EINVAL*:: -The monitor 'endpoint' supplied does not exist. - - -ERRORS - _zmq_socket_monitor_typed()_ -------------------------------- -*ETERM*:: -The 0MQ 'context' associated with the specified 'socket' was terminated. - -*EPROTONOSUPPORT*:: -The transport protocol of the monitor 'endpoint' is not supported. Monitor -sockets are required to use the inproc:// transport. - *EINVAL*:: The monitor 'endpoint' supplied does not exist or the specified socket 'type' is not supported. diff --git a/include/zmq.h b/include/zmq.h index 8dd2790c..f26cf9ed 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -732,11 +732,7 @@ ZMQ_EXPORT int zmq_socket_get_peer_state (void *socket, #define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL #define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS -ZMQ_EXPORT int zmq_socket_monitor_versioned (void *s_, - const char *addr_, - uint64_t events_, - int event_version_); -ZMQ_EXPORT int zmq_socket_monitor_versioned_typed ( +ZMQ_EXPORT int zmq_socket_monitor_versioned ( void *s_, const char *addr_, uint64_t events_, int event_version_, int type_); ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s); diff --git a/src/zmq.cpp b/src/zmq.cpp index 5bdef399..a2ad7f8a 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -267,32 +267,20 @@ int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_) return s->getsockopt (option_, optval_, optvallen_); } -int zmq_socket_monitor_versioned (void *s_, - const char *addr_, - uint64_t events_, - int event_version_) -{ - zmq::socket_base_t *s = as_socket_base_t (s_); - if (!s) - return -1; - return s->monitor (addr_, events_, event_version_, ZMQ_PAIR); -} - -int zmq_socket_monitor (void *s_, const char *addr_, int events_) -{ - return zmq_socket_monitor_versioned (s_, addr_, events_, 1); -} - -int zmq_socket_monitor_versioned_typed ( +int zmq_socket_monitor_versioned ( void *s_, const char *addr_, uint64_t events_, int event_version_, int type_) { zmq::socket_base_t *s = as_socket_base_t (s_); if (!s) return -1; - return s->monitor (addr_, events_, event_version_, type_); } +int zmq_socket_monitor (void *s_, const char *addr_, int events_) +{ + return zmq_socket_monitor_versioned (s_, addr_, events_, 1, ZMQ_PAIR); +} + int zmq_join (void *s_, const char *group_) { zmq::socket_base_t *s = as_socket_base_t (s_); diff --git a/src/zmq_draft.h b/src/zmq_draft.h index ba3e13ba..c37ba319 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -130,11 +130,7 @@ int zmq_socket_get_peer_state (void *socket_, #define ZMQ_EVENT_ALL_V1 ZMQ_EVENT_ALL #define ZMQ_EVENT_ALL_V2 ZMQ_EVENT_ALL_V1 | ZMQ_EVENT_PIPES_STATS -int zmq_socket_monitor_versioned (void *s_, - const char *addr_, - uint64_t events_, - int event_version_); -int zmq_socket_monitor_versioned_typed ( +int zmq_socket_monitor_versioned ( void *s_, const char *addr_, uint64_t events_, int event_version_, int type_); int zmq_socket_monitor_pipes_stats (void *s_); diff --git a/tests/test_monitor.cpp b/tests/test_monitor.cpp index b47e8a10..8d9881a9 100644 --- a/tests/test_monitor.cpp +++ b/tests/test_monitor.cpp @@ -126,21 +126,21 @@ void test_monitor_basic () #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ || (defined ZMQ_CURRENT_EVENT_VERSION \ && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) -void test_monitor_versioned_typed_invalid_socket_type () +void test_monitor_versioned_invalid_socket_type () { void *client = test_context_socket (ZMQ_DEALER); // Socket monitoring only works with ZMQ_PAIR, ZMQ_PUB and ZMQ_PUSH. TEST_ASSERT_FAILURE_ERRNO ( - EINVAL, zmq_socket_monitor_versioned_typed ( + EINVAL, zmq_socket_monitor_versioned ( client, "inproc://invalid-socket-type", 0, 2, ZMQ_CLIENT)); test_context_socket_close_zero_linger (client); } -void test_monitor_versioned_typed_basic (bind_function_t bind_function_, - const char *expected_prefix_, - int type_) +void test_monitor_versioned_basic (bind_function_t bind_function_, + const char *expected_prefix_, + int type_) { char server_endpoint[MAX_SOCKET_STRING]; char client_mon_endpoint[MAX_SOCKET_STRING]; @@ -158,9 +158,9 @@ void test_monitor_versioned_typed_basic (bind_function_t bind_function_, void *server = test_context_socket (ZMQ_DEALER); // Monitor all events on client and server sockets - TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned_typed ( + TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned ( client, client_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_)); - TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned_typed ( + TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned ( server, server_mon_endpoint, ZMQ_EVENT_ALL_V2, 2, type_)); // Choose the appropriate consumer socket type. @@ -267,35 +267,33 @@ void test_monitor_versioned_typed_basic (bind_function_t bind_function_, void test_monitor_versioned_basic_tcp_ipv4 () { static const char prefix[] = "tcp://127.0.0.1:"; - // Calling 'monitor_versioned_typed' with ZMQ_PAIR is the equivalent of - // calling 'monitor_versioned'. - test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PAIR); - test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PUB); - test_monitor_versioned_typed_basic (bind_loopback_ipv4, prefix, ZMQ_PUSH); + test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PAIR); + test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PUB); + test_monitor_versioned_basic (bind_loopback_ipv4, prefix, ZMQ_PUSH); } void test_monitor_versioned_basic_tcp_ipv6 () { static const char prefix[] = "tcp://[::1]:"; - test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PAIR); - test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PUB); - test_monitor_versioned_typed_basic (bind_loopback_ipv6, prefix, ZMQ_PUSH); + test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PAIR); + test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PUB); + test_monitor_versioned_basic (bind_loopback_ipv6, prefix, ZMQ_PUSH); } void test_monitor_versioned_basic_ipc () { static const char prefix[] = "ipc://"; - test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PAIR); - test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PUB); - test_monitor_versioned_typed_basic (bind_loopback_ipc, prefix, ZMQ_PUSH); + test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PAIR); + test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PUB); + test_monitor_versioned_basic (bind_loopback_ipc, prefix, ZMQ_PUSH); } void test_monitor_versioned_basic_tipc () { static const char prefix[] = "tipc://"; - test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PAIR); - test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PUB); - test_monitor_versioned_typed_basic (bind_loopback_tipc, prefix, ZMQ_PUSH); + test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PAIR); + test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PUB); + test_monitor_versioned_basic (bind_loopback_tipc, prefix, ZMQ_PUSH); } #ifdef ZMQ_EVENT_PIPES_STATS @@ -310,7 +308,7 @@ void test_monitor_versioned_stats (bind_function_t bind_function_, void *push = test_context_socket (ZMQ_PUSH); TEST_ASSERT_SUCCESS_ERRNO (zmq_socket_monitor_versioned ( - push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2)); + push, "inproc://monitor-push", ZMQ_EVENT_PIPES_STATS, 2, ZMQ_PAIR)); // Should fail if there are no pipes to monitor TEST_ASSERT_FAILURE_ERRNO (EAGAIN, zmq_socket_monitor_pipes_stats (push)); @@ -437,7 +435,7 @@ int main () #if (defined ZMQ_CURRENT_EVENT_VERSION && ZMQ_CURRENT_EVENT_VERSION >= 2) \ || (defined ZMQ_CURRENT_EVENT_VERSION \ && ZMQ_CURRENT_EVENT_VERSION_DRAFT >= 2) - RUN_TEST (test_monitor_versioned_typed_invalid_socket_type); + RUN_TEST (test_monitor_versioned_invalid_socket_type); RUN_TEST (test_monitor_versioned_basic_tcp_ipv4); RUN_TEST (test_monitor_versioned_basic_tcp_ipv6); RUN_TEST (test_monitor_versioned_basic_ipc);