0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-14 01:37:56 +08:00

Problem: long flag isn't set for subscriptions if topic has between 246 and 255 characters (#4564)

* Problem: long flag isn't set for subscriptions if topic has between 246 and 255 characters

Solution: fix V3.1 encoder to calculate long flag after evaluating the subscribe and cancel commands
This commit is contained in:
Cornelius 2023-06-20 16:17:26 +02:00 committed by GitHub
parent 7af09a0e3b
commit ecc63d0d3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 95 additions and 2 deletions

View File

@ -493,6 +493,7 @@ test_apps = \
tests/test_capabilities \ tests/test_capabilities \
tests/test_xpub_nodrop \ tests/test_xpub_nodrop \
tests/test_xpub_manual \ tests/test_xpub_manual \
tests/test_xpub_topic \
tests/test_xpub_welcome_msg \ tests/test_xpub_welcome_msg \
tests/test_xpub_verbose \ tests/test_xpub_verbose \
tests/test_atomics \ tests/test_atomics \
@ -766,6 +767,10 @@ tests_test_xpub_manual_SOURCES = tests/test_xpub_manual.cpp
tests_test_xpub_manual_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_xpub_manual_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_xpub_manual_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_xpub_manual_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_xpub_topic_SOURCES = tests/test_xpub_topic.cpp
tests_test_xpub_topic_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_xpub_topic_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_xpub_welcome_msg_SOURCES = tests/test_xpub_welcome_msg.cpp tests_test_xpub_welcome_msg_SOURCES = tests/test_xpub_welcome_msg.cpp
tests_test_xpub_welcome_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_xpub_welcome_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_xpub_welcome_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_xpub_welcome_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

View File

@ -29,8 +29,6 @@ void zmq::v3_1_encoder_t::message_ready ()
protocol_flags = 0; protocol_flags = 0;
if (in_progress ()->flags () & msg_t::more) if (in_progress ()->flags () & msg_t::more)
protocol_flags |= v2_protocol_t::more_flag; protocol_flags |= v2_protocol_t::more_flag;
if (in_progress ()->size () > UCHAR_MAX)
protocol_flags |= v2_protocol_t::large_flag;
if (in_progress ()->flags () & msg_t::command if (in_progress ()->flags () & msg_t::command
|| in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) { || in_progress ()->is_subscribe () || in_progress ()->is_cancel ()) {
protocol_flags |= v2_protocol_t::command_flag; protocol_flags |= v2_protocol_t::command_flag;
@ -39,6 +37,10 @@ void zmq::v3_1_encoder_t::message_ready ()
else if (in_progress ()->is_cancel ()) else if (in_progress ()->is_cancel ())
size += zmq::msg_t::cancel_cmd_name_size; size += zmq::msg_t::cancel_cmd_name_size;
} }
// Calculate large_flag after command_flag. Subscribe or cancel commands
// increase the message size.
if (size > UCHAR_MAX)
protocol_flags |= v2_protocol_t::large_flag;
// Encode the message length. For messages less then 256 bytes, // Encode the message length. For messages less then 256 bytes,
// the length is encoded as 8-bit unsigned integer. For larger // the length is encoded as 8-bit unsigned integer. For larger

View File

@ -64,6 +64,7 @@ set(tests
test_srcfd test_srcfd
test_stream_timeout test_stream_timeout
test_xpub_manual test_xpub_manual
test_xpub_topic
test_xpub_welcome_msg test_xpub_welcome_msg
test_xpub_verbose test_xpub_verbose
test_base85 test_base85

85
tests/test_xpub_topic.cpp Normal file
View File

@ -0,0 +1,85 @@
/* SPDX-License-Identifier: MPL-2.0 */
#include "testutil.hpp"
#include "testutil_unity.hpp"
SETUP_TEARDOWN_TESTCONTEXT
const char bind_address[] = "tcp://127.0.0.1:*";
char connect_address[MAX_SOCKET_STRING];
// 245 chars + 10 chars for subscribe command = 255 chars
const char short_topic[] =
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDE";
// 246 chars + 10 chars for subscribe command = 256 chars
const char long_topic[] =
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOP"
"ABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEFGHIJKLMNOPABCDEF";
template <size_t SIZE>
void test_subscribe_cancel (void *xpub, void *sub, const char (&topic)[SIZE])
{
// Ignore '\0' terminating the topic string.
const size_t topic_len = SIZE - 1;
// Subscribe for topic
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_SUBSCRIBE, topic, topic_len));
// Allow receiving more than the expected number of bytes
char buffer[topic_len + 5];
// Receive subscription
int rc =
TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (xpub, buffer, sizeof (buffer), 0));
TEST_ASSERT_EQUAL_INT (topic_len + 1, rc);
TEST_ASSERT_EQUAL_UINT8 (1, buffer[0]);
TEST_ASSERT_EQUAL_UINT8_ARRAY (topic, buffer + 1, topic_len);
// Unsubscribe from topic
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (sub, ZMQ_UNSUBSCRIBE, topic, topic_len));
// Receive unsubscription
rc =
TEST_ASSERT_SUCCESS_ERRNO (zmq_recv (xpub, buffer, sizeof (buffer), 0));
TEST_ASSERT_EQUAL_INT (topic_len + 1, rc);
TEST_ASSERT_EQUAL_UINT8 (0, buffer[0]);
TEST_ASSERT_EQUAL_UINT8_ARRAY (topic, buffer + 1, topic_len);
}
void test_xpub_subscribe_long_topic ()
{
void *xpub = test_context_socket (ZMQ_XPUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (xpub, bind_address));
size_t len = MAX_SOCKET_STRING;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (xpub, ZMQ_LAST_ENDPOINT, connect_address, &len));
void *sub = test_context_socket (ZMQ_SUB);
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sub, connect_address));
test_subscribe_cancel (xpub, sub, short_topic);
test_subscribe_cancel (xpub, sub, long_topic);
// Clean up.
test_context_socket_close (xpub);
test_context_socket_close (sub);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test_xpub_subscribe_long_topic);
return UNITY_END ();
}