diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 78b4d862..eee665cc 100644 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -1154,6 +1154,22 @@ Default value:: NULL Applicable socket types:: ZMQ_XPUB +ZMQ_ONLY_FIRST_SUBSCRIBE: Process only fist subscribe/unsubscribe in a multipart message +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +If set, only the first part of the multipart message is processed as +a subscribe/unsubscribe message. The rest are forwarded as user data +regardless of message contents. + +It not set (default), subscribe/unsubscribe messages in a multipart message +are processed as such regardless of their number and order. + +[horizontal] +Option value type:: int +Option value unit:: boolean +Default value:: 0 (false) +Applicable socket types:: ZMQ_XSUB, ZMQ_XPUB + + ZMQ_ZAP_DOMAIN: Set RFC 27 authentication domain ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Sets the domain for ZAP (ZMQ RFC 27) authentication. A ZAP domain must be diff --git a/include/zmq.h b/include/zmq.h index a005aedd..8c39fd07 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -675,6 +675,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); #define ZMQ_WSS_TRUST_PEM 105 #define ZMQ_WSS_HOSTNAME 106 #define ZMQ_WSS_TRUST_SYSTEM 107 +#define ZMQ_ONLY_FIRST_SUBSCRIBE 108 /* DRAFT Context options */ diff --git a/src/xpub.cpp b/src/xpub.cpp index 56131847..089a9c12 100644 --- a/src/xpub.cpp +++ b/src/xpub.cpp @@ -43,6 +43,8 @@ zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : _verbose_unsubs (false), _more_send (false), _more_recv (false), + _process_subscribe (false), + _only_first_subscribe (false), _lossy (true), _manual (false), _send_last_pipe (false), @@ -101,7 +103,10 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) bool subscribe = false; bool is_subscribe_or_cancel = false; - if (!_more_recv) { + bool first_part = !_more_recv; + _more_recv = (msg.flags () & msg_t::more) != 0; + + if (first_part || _process_subscribe) { // Apply the subscription to the trie if (msg.is_subscribe () || msg.is_cancel ()) { data = static_cast (msg.command_body ()); @@ -116,7 +121,9 @@ void zmq::xpub_t::xread_activated (pipe_t *pipe_) } } - _more_recv = (msg.flags () & msg_t::more) != 0; + if (first_part) + _process_subscribe = + !_only_first_subscribe || is_subscribe_or_cancel; if (!is_subscribe_or_cancel) { // Process user message coming upstream from xsub socket @@ -199,7 +206,7 @@ int zmq::xpub_t::xsetsockopt (int option_, { if (option_ == ZMQ_XPUB_VERBOSE || option_ == ZMQ_XPUB_VERBOSER || option_ == ZMQ_XPUB_MANUAL_LAST_VALUE || option_ == ZMQ_XPUB_NODROP - || option_ == ZMQ_XPUB_MANUAL) { + || option_ == ZMQ_XPUB_MANUAL || option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) { if (optvallen_ != sizeof (int) || *static_cast (optval_) < 0) { errno = EINVAL; @@ -218,6 +225,8 @@ int zmq::xpub_t::xsetsockopt (int option_, _lossy = (*static_cast (optval_) == 0); else if (option_ == ZMQ_XPUB_MANUAL) _manual = (*static_cast (optval_) != 0); + else if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) + _only_first_subscribe = (*static_cast (optval_) != 0); } else if (option_ == ZMQ_SUBSCRIBE && _manual) { if (_last_pipe != NULL) _subscriptions.add ((unsigned char *) optval_, optvallen_, diff --git a/src/xpub.hpp b/src/xpub.hpp index c5afcaf5..b6796dd3 100644 --- a/src/xpub.hpp +++ b/src/xpub.hpp @@ -96,6 +96,15 @@ class xpub_t : public socket_base_t // True if we are in the middle of receiving a multi-part message. bool _more_recv; + // If true, subscribe and cancel messages are processed for the rest + // of multipart message. + bool _process_subscribe; + + // This option is enabled with ZMQ_ONLY_FIRST_SUBSCRIBE. + // If true, messages following subscribe/unsubscribe in a multipart + // message are treated as user data regardless of the first byte. + bool _only_first_subscribe; + // Drop messages if HWM reached, otherwise return with EAGAIN bool _lossy; diff --git a/src/xsub.cpp b/src/xsub.cpp index ca1be4bc..8f9aa1c6 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -38,7 +38,9 @@ zmq::xsub_t::xsub_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_), _has_message (false), _more_send (false), - _more_recv (false) + _more_recv (false), + _process_subscribe (false), + _only_first_subscribe (false) { options.type = ZMQ_XSUB; @@ -95,17 +97,38 @@ void zmq::xsub_t::xhiccuped (pipe_t *pipe_) pipe_->flush (); } +int zmq::xsub_t::xsetsockopt (int option_, + const void *optval_, + size_t optvallen_) +{ + if (option_ == ZMQ_ONLY_FIRST_SUBSCRIBE) { + if (optvallen_ != sizeof (int) + || *static_cast (optval_) < 0) { + errno = EINVAL; + return -1; + } + _only_first_subscribe = (*static_cast (optval_) != 0); + return 0; + } else { + errno = EINVAL; + return -1; + } +} + int zmq::xsub_t::xsend (msg_t *msg_) { size_t size = msg_->size (); unsigned char *data = static_cast (msg_->data ()); - bool send_more = _more_send; + bool first_part = !_more_send; _more_send = (msg_->flags () & msg_t::more) != 0; - if (send_more) + if (first_part) { + _process_subscribe = !_only_first_subscribe; + } else if (!_process_subscribe) { // User message sent upstream to XPUB socket return _dist.send_to_all (msg_); + } if (msg_->is_subscribe () || (size > 0 && *data == 1)) { // Process subscribe message @@ -121,6 +144,7 @@ int zmq::xsub_t::xsend (msg_t *msg_) size = size - 1; } _subscriptions.add (data, size); + _process_subscribe = true; return _dist.send_to_all (msg_); } if (msg_->is_cancel () || (size > 0 && *data == 0)) { @@ -132,6 +156,7 @@ int zmq::xsub_t::xsend (msg_t *msg_) data = data + 1; size = size - 1; } + _process_subscribe = true; if (_subscriptions.rm (data, size)) return _dist.send_to_all (msg_); } else diff --git a/src/xsub.hpp b/src/xsub.hpp old mode 100755 new mode 100644 index 5cd98280..d7cedb56 --- a/src/xsub.hpp +++ b/src/xsub.hpp @@ -57,6 +57,7 @@ class xsub_t : public socket_base_t void xattach_pipe (zmq::pipe_t *pipe_, bool subscribe_to_all_, bool locally_initiated_); + int xsetsockopt (int option_, const void *optval_, size_t optvallen_); int xsend (zmq::msg_t *msg_); bool xhas_out (); int xrecv (zmq::msg_t *msg_); @@ -101,6 +102,15 @@ class xsub_t : public socket_base_t // there are following parts still waiting. bool _more_recv; + // If true, subscribe and cancel messages are processed for the rest + // of multipart message. + bool _process_subscribe; + + // This option is enabled with ZMQ_ONLY_FIRST_SUBSCRIBE. + // If true, messages following subscribe/unsubscribe in a multipart + // message are treated as user data regardless of the first byte. + bool _only_first_subscribe; + xsub_t (const xsub_t &); const xsub_t &operator= (const xsub_t &); }; diff --git a/src/zmq_draft.h b/src/zmq_draft.h index e558958c..fbe33427 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -57,6 +57,8 @@ #define ZMQ_SOCKS_PASSWORD 100 #define ZMQ_IN_BATCH_SIZE 101 #define ZMQ_OUT_BATCH_SIZE 102 +#define ZMQ_ONLY_FIRST_SUBSCRIBE 108 + /* DRAFT Context options */ #define ZMQ_ZERO_COPY_RECV 10 diff --git a/tests/test_xpub_manual.cpp b/tests/test_xpub_manual.cpp index cc04c0a5..012f765d 100644 --- a/tests/test_xpub_manual.cpp +++ b/tests/test_xpub_manual.cpp @@ -456,57 +456,66 @@ void test_user_message () test_context_socket_close (sub); } +#ifdef ZMQ_ONLY_FIRST_SUBSCRIBE void test_user_message_multi () { + const int only_first_subscribe = 1; + // Create a publisher void *pub = test_context_socket (ZMQ_XPUB); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (pub, "inproc://soname")); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (pub, ZMQ_ONLY_FIRST_SUBSCRIBE, + &only_first_subscribe, + sizeof (only_first_subscribe))); // Create a subscriber void *sub = test_context_socket (ZMQ_XSUB); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, "inproc://soname")); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (sub, ZMQ_ONLY_FIRST_SUBSCRIBE, + &only_first_subscribe, + sizeof (only_first_subscribe))); // Send some data that is neither sub nor unsub const uint8_t msg_common[] = {'A', 'B', 'C'}; // Message starts with 0 but should still treated as user - const uint8_t msg_0[] = {0, 'B', 'C'}; + const uint8_t msg_0a[] = {0, 'B', 'C'}; + const uint8_t msg_0b[] = {0, 'C', 'D'}; // Message starts with 1 but should still treated as user - const uint8_t msg_1[] = {1, 'B', 'C'}; + const uint8_t msg_1a[] = {1, 'B', 'C'}; + const uint8_t msg_1b[] = {1, 'C', 'D'}; // Test second message starting with 0 send_array_expect_success (sub, msg_common, ZMQ_SNDMORE); - send_array_expect_success (sub, msg_0, 0); + send_array_expect_success (sub, msg_0a, 0); // Receive messages from subscriber recv_array_expect_success (pub, msg_common, 0); - recv_array_expect_success (pub, msg_0, 0); + recv_array_expect_success (pub, msg_0a, 0); // Test second message starting with 1 send_array_expect_success (sub, msg_common, ZMQ_SNDMORE); - send_array_expect_success (sub, msg_1, 0); + send_array_expect_success (sub, msg_1a, 0); // Receive messages from subscriber recv_array_expect_success (pub, msg_common, 0); - recv_array_expect_success (pub, msg_1, 0); - - char buffer[255]; - // Test first message starting with 0 - send_array_expect_success (sub, msg_0, 0); - - // wait - msleep (SETTLE_TIME); - - int rc = zmq_recv (pub, buffer, sizeof (buffer), ZMQ_DONTWAIT); - TEST_ASSERT_EQUAL_INT (-1, rc); + recv_array_expect_success (pub, msg_1a, 0); // Test first message starting with 1 - send_array_expect_success (sub, msg_1, 0); - recv_array_expect_success (pub, msg_1, 0); + send_array_expect_success (sub, msg_1a, ZMQ_SNDMORE); + send_array_expect_success (sub, msg_1b, 0); + recv_array_expect_success (pub, msg_1a, 0); + recv_array_expect_success (pub, msg_1b, 0); + + send_array_expect_success (sub, msg_0a, ZMQ_SNDMORE); + send_array_expect_success (sub, msg_0b, 0); + recv_array_expect_success (pub, msg_0a, 0); + recv_array_expect_success (pub, msg_0b, 0); // Clean up. test_context_socket_close (pub); test_context_socket_close (sub); } +#endif int main () { @@ -519,7 +528,9 @@ int main () RUN_TEST (test_missing_subscriptions); RUN_TEST (test_unsubscribe_cleanup); RUN_TEST (test_user_message); +#ifdef ZMQ_ONLY_FIRST_SUBSCRIBE RUN_TEST (test_user_message_multi); +#endif return UNITY_END (); }