From dd5eec35be98c924d895ce0959d1f5e03bf84650 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pontus=20Sk=C3=B6ldstr=C3=B6m?= Date: Mon, 12 Mar 2018 01:41:33 +0100 Subject: [PATCH] Support application metadata through ZMQ_METADATA Lets the application set per-connection metadata. Metadata is specified as "X-key:value" and set using zmq_setsockopt, eg: zmq_setsockopt (s, ZMQ_METADATA, "X-key:value", 11); The peer can then obtain the metadata from a received message: char *data = zmq_msg_gets(msg, "X-key"); --- Makefile.am | 7 +- include/zmq.h | 1 + src/mechanism.cpp | 17 ++++ src/options.cpp | 20 +++++ src/options.hpp | 4 + src/zmq_draft.h | 1 + tests/CMakeLists.txt | 1 + tests/test_address_tipc.cpp | 13 ++- tests/test_app_meta.cpp | 167 ++++++++++++++++++++++++++++++++++++ 9 files changed, 226 insertions(+), 5 deletions(-) create mode 100644 tests/test_app_meta.cpp diff --git a/Makefile.am b/Makefile.am index 4c567797..8b8ef10c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -846,7 +846,8 @@ test_apps += tests/test_poller \ tests/test_radio_dish \ tests/test_udp \ tests/test_scatter_gather \ - tests/test_dgram + tests/test_dgram \ + tests/test_app_meta tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = src/libzmq.la @@ -871,6 +872,10 @@ tests_test_scatter_gather_LDADD = src/libzmq.la tests_test_dgram_SOURCES = tests/test_dgram.cpp tests_test_dgram_LDADD = src/libzmq.la + +tests_test_app_meta_SOURCES = tests/test_app_meta.cpp +tests_test_app_meta_LDADD = src/libzmq.la ${UNITY_LIBS} +tests_test_app_meta_CPPFLAGS = ${UNITY_CPPFLAGS} endif if ENABLE_STATIC diff --git a/include/zmq.h b/include/zmq.h index 5791b9a7..613e0a62 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -585,6 +585,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread); #define ZMQ_BINDTODEVICE 92 #define ZMQ_ZAP_ENFORCE_DOMAIN 93 #define ZMQ_LOOPBACK_FASTPATH 94 +#define ZMQ_METADATA 95 /* DRAFT 0MQ socket events and monitoring */ /* Unspecified system errors during handshake. Event value is an errno. */ diff --git a/src/mechanism.cpp b/src/mechanism.cpp index d88a578f..a85f8da1 100644 --- a/src/mechanism.cpp +++ b/src/mechanism.cpp @@ -170,13 +170,30 @@ size_t zmq::mechanism_t::add_basic_properties (unsigned char *buf, options.routing_id, options.routing_id_size); } + + for (std::map::const_iterator it = + options.app_metadata.begin (); + it != options.app_metadata.end (); ++it) + ptr += + add_property (ptr, buf_capacity - (ptr - buf), it->first.c_str (), + it->second.c_str (), strlen (it->second.c_str ())); + return ptr - buf; } size_t zmq::mechanism_t::basic_properties_len () const { const char *socket_type = socket_type_string (options.type); + int meta_len = 0; + + for (std::map::const_iterator it = + options.app_metadata.begin (); + it != options.app_metadata.end (); ++it) + meta_len += + property_len (it->first.c_str (), strlen (it->second.c_str ())); + return property_len (ZMTP_PROPERTY_SOCKET_TYPE, strlen (socket_type)) + + meta_len + ((options.type == ZMQ_REQ || options.type == ZMQ_DEALER || options.type == ZMQ_ROUTER) ? property_len (ZMTP_PROPERTY_IDENTITY, options.routing_id_size) diff --git a/src/options.cpp b/src/options.cpp index 754f6256..1eea4087 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -706,6 +706,26 @@ int zmq::options_t::setsockopt (int option_, return do_setsockopt_int_as_bool_relaxed (optval_, optvallen_, &loopback_fastpath); + case ZMQ_METADATA: + if (optvallen_ > 0 && !is_int) { + std::string s ((char *) optval_); + size_t pos = 0; + std::string key, val, delimiter = ":"; + pos = s.find (delimiter); + if (pos != std::string::npos && pos != 0 + && pos != s.length () - 1) { + key = s.substr (0, pos); + if (key.compare (0, 2, "X-") == 0 && key.length () < 256) { + val = s.substr (pos + 1, s.length ()); + app_metadata.insert ( + std::pair (key, val)); + return 0; + } + } + } + errno = EINVAL; + return -1; + break; default: #if defined(ZMQ_ACT_MILITANT) // There are valid scenarios for probing with unknown socket option diff --git a/src/options.hpp b/src/options.hpp index 3a663566..4228353a 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -33,6 +33,7 @@ #include #include #include +#include #include "atomic_ptr.hpp" #include "stddef.h" @@ -258,6 +259,9 @@ struct options_t // Use zero copy strategy for storing message content when decoding. bool zero_copy; + + // Application metadata + std::map app_metadata; }; int do_getsockopt (void *const optval_, diff --git a/src/zmq_draft.h b/src/zmq_draft.h index adc1e895..30d5586a 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -56,6 +56,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_); #define ZMQ_BINDTODEVICE 92 #define ZMQ_ZAP_ENFORCE_DOMAIN 93 #define ZMQ_LOOPBACK_FASTPATH 94 +#define ZMQ_METADATA 95 /* DRAFT 0MQ socket events and monitoring */ /* Unspecified system errors during handshake. Event value is an errno. */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 930b29b1..76d5f171 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -135,6 +135,7 @@ IF (ENABLE_DRAFTS) test_udp test_scatter_gather test_dgram + test_app_meta ) ENDIF (ENABLE_DRAFTS) diff --git a/tests/test_address_tipc.cpp b/tests/test_address_tipc.cpp index 55aa0d93..a9d0e152 100644 --- a/tests/test_address_tipc.cpp +++ b/tests/test_address_tipc.cpp @@ -30,20 +30,17 @@ #include "testutil.hpp" #include -void *ctx; void setUp () { - ctx = zmq_ctx_new (); } void tearDown () { - zmq_ctx_term (ctx); - ctx = NULL; } void test_tipc_port_name_and_domain () { + void *ctx = zmq_ctx_new (); TEST_ASSERT_NOT_NULL (ctx); // test Port Name addressing @@ -64,6 +61,8 @@ void test_tipc_port_name_and_domain () rc = zmq_close (sb); TEST_ASSERT_EQUAL_INT (0, rc); + + zmq_ctx_term (ctx); } void test_tipc_port_identity () @@ -72,6 +71,7 @@ void test_tipc_port_identity () size_t size = 256; unsigned int z, c, n, ref; + void *ctx = zmq_ctx_new (); TEST_ASSERT_NOT_NULL (ctx); void *sb = zmq_socket (ctx, ZMQ_REP); @@ -101,10 +101,13 @@ void test_tipc_port_identity () rc = zmq_close (sb); TEST_ASSERT_EQUAL_INT (0, rc); + + zmq_ctx_term (ctx); } void test_tipc_bad_addresses () { + void *ctx = zmq_ctx_new (); TEST_ASSERT_NOT_NULL (ctx); // Test Port Name addressing @@ -124,6 +127,8 @@ void test_tipc_bad_addresses () // Clean up rc = zmq_close (sb); TEST_ASSERT_EQUAL_INT (0, rc); + + zmq_ctx_term (ctx); } diff --git a/tests/test_app_meta.cpp b/tests/test_app_meta.cpp new file mode 100644 index 00000000..1e2200d1 --- /dev/null +++ b/tests/test_app_meta.cpp @@ -0,0 +1,167 @@ +/* + Copyright (c) 2007-2018 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include + + +void setUp () +{ +} +void tearDown () +{ +} + +void test_app_meta_reqrep () +{ + void *ctx; + zmq_msg_t msg; + void *rep_sock, *req_sock; + const char *req_hello = "X-hello:hello"; + const char *req_connection = "X-connection:primary"; + const char *req_z85 = "X-bin:009c6"; + const char *rep_hello = "X-hello:world"; + const char *rep_connection = "X-connection:backup"; + const char *bad_strings[] = { + ":", + "key:", + ":value", + "keyvalue", + "", + "X-" + "KeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKe" + "yTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyT" + "ooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyTooLongKeyToo" + "LongKeyTooLongKeyTooLongKeyTooLongKeyTooLong:value"}; + + ctx = zmq_ctx_new (); + rep_sock = zmq_socket (ctx, ZMQ_REP); + TEST_ASSERT_NOT_NULL (rep_sock); + req_sock = zmq_socket (ctx, ZMQ_REQ); + TEST_ASSERT_NOT_NULL (req_sock); + + int rc = + zmq_setsockopt (rep_sock, ZMQ_METADATA, rep_hello, strlen (rep_hello)); + TEST_ASSERT_EQUAL_INT (0, rc); + + int l = 0; + rc = zmq_setsockopt (rep_sock, ZMQ_LINGER, &l, sizeof (l)); + TEST_ASSERT_EQUAL_INT (0, rc); + + rc = zmq_setsockopt (rep_sock, ZMQ_METADATA, rep_connection, + strlen (rep_connection)); + TEST_ASSERT_EQUAL_INT (0, rc); + + for (int i = 0; i < 6; i++) { + rc = zmq_setsockopt (rep_sock, ZMQ_METADATA, bad_strings[i], + strlen (bad_strings[i])); + TEST_ASSERT_EQUAL_INT (-1, rc); + } + + rc = zmq_bind (rep_sock, "tcp://127.0.0.1:5555"); + TEST_ASSERT_EQUAL_INT (0, rc); + + l = 0; + rc = zmq_setsockopt (req_sock, ZMQ_LINGER, &l, sizeof (l)); + TEST_ASSERT_EQUAL_INT (0, rc); + + rc = zmq_setsockopt (req_sock, ZMQ_METADATA, req_hello, strlen (req_hello)); + TEST_ASSERT_EQUAL_INT (0, rc); + + rc = zmq_setsockopt (req_sock, ZMQ_METADATA, req_connection, + strlen (req_connection)); + TEST_ASSERT_EQUAL_INT (0, rc); + + rc = zmq_setsockopt (req_sock, ZMQ_METADATA, req_z85, strlen (req_z85)); + TEST_ASSERT_EQUAL_INT (0, rc); + + rc = zmq_connect (req_sock, "tcp://127.0.0.1:5555"); + TEST_ASSERT_EQUAL_INT (0, rc); + + rc = zmq_msg_init_size (&msg, 1); + TEST_ASSERT_EQUAL_INT (0, rc); + + char *data = (char *) zmq_msg_data (&msg); + data[0] = 1; + + rc = zmq_msg_send (&msg, req_sock, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + rc = zmq_msg_init (&msg); + TEST_ASSERT_EQUAL_INT (0, rc); + + rc = zmq_msg_recv (&msg, rep_sock, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + TEST_ASSERT_EQUAL_STRING ("hello", zmq_msg_gets (&msg, "X-hello")); + TEST_ASSERT_EQUAL_STRING ("primary", zmq_msg_gets (&msg, "X-connection")); + char *bindata = (char *) zmq_msg_gets (&msg, "X-bin"); + TEST_ASSERT_NOT_NULL (bindata); + uint8_t rawdata[4]; + void *ret = zmq_z85_decode (rawdata, bindata); + TEST_ASSERT_NOT_NULL (ret); + TEST_ASSERT_EQUAL_UINT8 (0, rawdata[0]); + TEST_ASSERT_EQUAL_UINT8 (1, rawdata[1]); + TEST_ASSERT_EQUAL_UINT8 (2, rawdata[2]); + TEST_ASSERT_EQUAL_UINT8 (3, rawdata[3]); + + TEST_ASSERT_NULL (zmq_msg_gets (&msg, "X-foobar")); + TEST_ASSERT_NULL (zmq_msg_gets (&msg, "foobar")); + + rc = zmq_msg_send (&msg, rep_sock, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + rc = zmq_msg_recv (&msg, req_sock, 0); + TEST_ASSERT_EQUAL_INT (1, rc); + + TEST_ASSERT_EQUAL_STRING ("world", zmq_msg_gets (&msg, "X-hello")); + TEST_ASSERT_EQUAL_STRING ("backup", zmq_msg_gets (&msg, "X-connection")); + + rc = zmq_msg_close (&msg); + TEST_ASSERT_EQUAL_INT (0, rc); + + rc = zmq_close (req_sock); + TEST_ASSERT_EQUAL_INT (0, rc); + + rc = zmq_close (rep_sock); + TEST_ASSERT_EQUAL_INT (0, rc); + + zmq_ctx_term (ctx); +} + + +int main () +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_app_meta_reqrep); + + return UNITY_END (); +}