diff --git a/Makefile.am b/Makefile.am index 98b20691..7db8297f 100755 --- a/Makefile.am +++ b/Makefile.am @@ -1041,7 +1041,8 @@ test_apps += tests/test_poller \ tests/test_router_notify \ tests/test_peer \ tests/test_reconnect_options \ - tests/test_msg_init + tests/test_msg_init \ + tests/test_hello_msg tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la @@ -1094,6 +1095,10 @@ tests_test_reconnect_options_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_msg_init_SOURCES = tests/test_msg_init.cpp tests_test_msg_init_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_msg_init_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +tests_test_hello_msg_SOURCES = tests/test_hello_msg.cpp +tests_test_hello_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_hello_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} endif if ENABLE_STATIC diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index e5a9b02a..11e8b96b 100755 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -322,6 +322,18 @@ Option value unit:: milliseconds Default value:: 30000 Applicable socket types:: all but ZMQ_STREAM, only for connection-oriented transports +ZMQ_HELLO_MSG: set an hello message that will be sent when a new peer connect +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +When set, the socket will automatically send an hello message when a new connection is made or accepted. +You may set this on DEALER, ROUTER, CLIENT, SERVER and PEER sockets. +The combination with ZMQ_HEARTBEAT_IVL is powerful and simplify protocols, +as now heartbeat and sending the hello message can be left out of protocols and be handled by zeromq. + +[horizontal] +Option value type:: binary data +Option value unit:: N/A +Default value:: NULL +Applicable socket types:: ZMQ_ROUTER, ZMQ_DEALER, ZMQ_CLIENT, ZMQ_SERVER and ZMQ_PEER ZMQ_HEARTBEAT_IVL: Set interval between sending ZMTP heartbeats ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/include/zmq.h b/include/zmq.h index 3389c77e..d7b33b4d 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -678,6 +678,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); #define ZMQ_WSS_TRUST_SYSTEM 107 #define ZMQ_ONLY_FIRST_SUBSCRIBE 108 #define ZMQ_RECONNECT_STOP 109 +#define ZMQ_HELLO_MSG 110 /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 diff --git a/src/client.cpp b/src/client.cpp index b0d4db46..f26b1386 100644 --- a/src/client.cpp +++ b/src/client.cpp @@ -37,6 +37,7 @@ zmq::client_t::client_t (class ctx_t *parent_, uint32_t tid_, int sid_) : socket_base_t (parent_, tid_, sid_, true) { options.type = ZMQ_CLIENT; + options.can_send_hello_msg = true; } zmq::client_t::~client_t () diff --git a/src/ctx.cpp b/src/ctx.cpp index 56fbd4ae..7d00e63a 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -848,6 +848,14 @@ void zmq::ctx_t::connect_inproc_sockets ( && pending_connection_.endpoint.socket->check_tag ()) { send_routing_id (pending_connection_.bind_pipe, bind_options_); } + +#ifdef ZMQ_BUILD_DRAFT_API + // If set, send the hello msg of the bind socket to the pending connection. + if (bind_options_.can_send_hello_msg + && bind_options_.hello_msg.size () > 0) { + send_hello_msg (pending_connection_.bind_pipe, bind_options_); + } +#endif } #ifdef ZMQ_HAVE_VMCI diff --git a/src/dealer.cpp b/src/dealer.cpp index d6c2710d..bf1c41da 100644 --- a/src/dealer.cpp +++ b/src/dealer.cpp @@ -38,6 +38,7 @@ zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) : _probe_router (false) { options.type = ZMQ_DEALER; + options.can_send_hello_msg = true; } zmq::dealer_t::~dealer_t () diff --git a/src/options.cpp b/src/options.cpp index 5cbbb799..029896aa 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -249,7 +249,9 @@ zmq::options_t::options_t () : zero_copy (true), router_notify (0), monitor_event_version (1), - wss_trust_system (false) + wss_trust_system (false), + hello_msg (), + can_send_hello_msg (false) { memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_secret_key, 0, CURVE_KEYSIZE); @@ -813,6 +815,19 @@ int zmq::options_t::setsockopt (int option_, return do_setsockopt_int_as_bool_strict (optval_, optvallen_, &wss_trust_system); #endif + + case ZMQ_HELLO_MSG: + if (optvallen_ > 0) { + unsigned char *bytes = (unsigned char *) optval_; + hello_msg = + std::vector (bytes, bytes + optvallen_); + } else { + hello_msg = std::vector (); + } + + + return 0; + #endif default: diff --git a/src/options.hpp b/src/options.hpp index fc60b4a8..afee4e7f 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -297,6 +297,10 @@ struct options_t std::string wss_trust_pem; std::string wss_hostname; bool wss_trust_system; + + // Hello msg + std::vector hello_msg; + bool can_send_hello_msg; }; inline bool get_effective_conflate_option (const options_t &options) diff --git a/src/peer.cpp b/src/peer.cpp index 027fb57a..d4cd61e0 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -40,6 +40,7 @@ zmq::peer_t::peer_t (class ctx_t *parent_, uint32_t tid_, int sid_) : server_t (parent_, tid_, sid_) { options.type = ZMQ_PEER; + options.can_send_hello_msg = true; } uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_) diff --git a/src/pipe.cpp b/src/pipe.cpp index b183a97c..873d4201 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -88,6 +88,17 @@ void zmq::send_routing_id (pipe_t *pipe_, const options_t &options_) pipe_->flush (); } +void zmq::send_hello_msg (pipe_t *pipe_, const options_t &options_) +{ + zmq::msg_t hello; + const int rc = + hello.init_buffer (&options_.hello_msg[0], options_.hello_msg.size ()); + errno_assert (rc == 0); + const bool written = pipe_->write (&hello); + zmq_assert (written); + pipe_->flush (); +} + zmq::pipe_t::pipe_t (object_t *parent_, upipe_t *inpipe_, upipe_t *outpipe_, diff --git a/src/pipe.hpp b/src/pipe.hpp index 32b3daca..13dc1895 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -261,6 +261,8 @@ class pipe_t ZMQ_FINAL : public object_t, }; void send_routing_id (pipe_t *pipe_, const options_t &options_); + +void send_hello_msg (pipe_t *pipe_, const options_t &options_); } #endif diff --git a/src/router.cpp b/src/router.cpp index 193ba183..cdbf3434 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -55,6 +55,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : options.type = ZMQ_ROUTER; options.recv_routing_id = true; options.raw_socket = false; + options.can_send_hello_msg = true; _prefetched_id.init (); _prefetched_msg.init (); diff --git a/src/server.cpp b/src/server.cpp index 7295567b..3aaccf92 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -41,6 +41,7 @@ zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) : _next_routing_id (generate_random ()) { options.type = ZMQ_SERVER; + options.can_send_hello_msg = true; } zmq::server_t::~server_t () diff --git a/src/session_base.cpp b/src/session_base.cpp index 49176c25..b0491b47 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -88,9 +88,21 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_, case ZMQ_SCATTER: case ZMQ_DGRAM: case ZMQ_PEER: +#ifdef ZMQ_BUILD_DRAFT_API + if (options_.can_send_hello_msg && options_.hello_msg.size () > 0) + s = new (std::nothrow) hello_msg_session_t ( + io_thread_, active_, socket_, options_, addr_); + else + s = new (std::nothrow) session_base_t ( + io_thread_, active_, socket_, options_, addr_); + + break; +#else s = new (std::nothrow) session_base_t (io_thread_, active_, socket_, options_, addr_); break; +#endif + default: errno = EINVAL; return NULL; @@ -807,3 +819,39 @@ void zmq::session_base_t::start_connecting_udp (io_thread_t * /*io_thread_*/) send_attach (this, engine); } + +zmq::hello_msg_session_t::hello_msg_session_t (io_thread_t *io_thread_, + bool connect_, + socket_base_t *socket_, + const options_t &options_, + address_t *addr_) : + session_base_t (io_thread_, connect_, socket_, options_, addr_), + _new_pipe (true) +{ +} + +zmq::hello_msg_session_t::~hello_msg_session_t () +{ +} + + +int zmq::hello_msg_session_t::pull_msg (msg_t *msg_) +{ + if (_new_pipe) { + _new_pipe = false; + + const int rc = + msg_->init_buffer (&options.hello_msg[0], options.hello_msg.size ()); + errno_assert (rc == 0); + + return 0; + } + + return session_base_t::pull_msg (msg_); +} + +void zmq::hello_msg_session_t::reset () +{ + session_base_t::reset (); + _new_pipe = true; +} diff --git a/src/session_base.hpp b/src/session_base.hpp index 138a9aa2..c95cb9d2 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -201,6 +201,26 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events ZMQ_NON_COPYABLE_NOR_MOVABLE (session_base_t) }; + +class hello_msg_session_t ZMQ_FINAL : public session_base_t +{ + public: + hello_msg_session_t (zmq::io_thread_t *io_thread_, + bool connect_, + zmq::socket_base_t *socket_, + const options_t &options_, + address_t *addr_); + ~hello_msg_session_t (); + + // Overrides of the functions from session_base_t. + int pull_msg (msg_t *msg_); + void reset (); + + private: + bool _new_pipe; + + ZMQ_NON_COPYABLE_NOR_MOVABLE (hello_msg_session_t) +}; } #endif diff --git a/src/socket_base.cpp b/src/socket_base.cpp index a8d769b2..34c023ee 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -833,6 +833,13 @@ int zmq::socket_base_t::connect_internal (const char *endpoint_uri_) // the peer doesn't expect it. send_routing_id (new_pipes[0], options); +#ifdef ZMQ_BUILD_DRAFT_API + // If set, send the hello msg of the local socket to the peer. + if (options.can_send_hello_msg && options.hello_msg.size () > 0) { + send_hello_msg (new_pipes[0], options); + } +#endif + const endpoint_t endpoint = {this, options}; pend_connection (std::string (endpoint_uri_), endpoint, new_pipes); } else { @@ -846,6 +853,19 @@ int zmq::socket_base_t::connect_internal (const char *endpoint_uri_) send_routing_id (new_pipes[1], peer.options); } +#ifdef ZMQ_BUILD_DRAFT_API + // If set, send the hello msg of the local socket to the peer. + if (options.can_send_hello_msg && options.hello_msg.size () > 0) { + send_hello_msg (new_pipes[0], options); + } + + // If set, send the hello msg of the peer to the local socket. + if (options.can_send_hello_msg + && peer.options.hello_msg.size () > 0) { + send_hello_msg (new_pipes[1], peer.options); + } +#endif + // Attach remote end of the pipe to the peer socket. Note that peer's // seqnum was incremented in find_endpoint function. We don't need it // increased here. diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 376cd8ac..5c612ca0 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -65,6 +65,7 @@ #define ZMQ_WSS_TRUST_SYSTEM 107 #define ZMQ_ONLY_FIRST_SUBSCRIBE 108 #define ZMQ_RECONNECT_STOP 109 +#define ZMQ_HELLO_MSG 110 /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 diff --git a/tests/test_hello_msg.cpp b/tests/test_hello_msg.cpp new file mode 100644 index 00000000..98455347 --- /dev/null +++ b/tests/test_hello_msg.cpp @@ -0,0 +1,106 @@ +/* + Copyright (c) 2007-2020 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +SETUP_TEARDOWN_TESTCONTEXT + +void test (const char *address) +{ + // Create a router + void *router = test_context_socket (ZMQ_ROUTER); + + // set router socket options + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (router, ZMQ_HELLO_MSG, "H", 1)); + + // bind router + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (router, address)); + + // Create a dealer + void *dealer = test_context_socket (ZMQ_DEALER); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (dealer, address)); + + // Receive the hello message + recv_string_expect_success (dealer, "H", 0); + + // Clean up. + test_context_socket_close (dealer); + test_context_socket_close (router); +} + +void test_tcp () +{ + test ("tcp://127.0.0.1:5569"); +} + +void test_inproc () +{ + test ("inproc://hello-msg"); +} + +void test_inproc_late_bind () +{ + char address[] = "inproc://late-hello-msg"; + + // Create a server + void *server = test_context_socket (ZMQ_SERVER); + + // set server socket options + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (server, ZMQ_HELLO_MSG, "W", 1)); + + // Create a dealer + void *client = test_context_socket (ZMQ_CLIENT); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (client, ZMQ_HELLO_MSG, "H", 1)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, address)); + + // bind server after the dealer + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (server, address)); + + // Receive the welcome message from server + recv_string_expect_success (client, "W", 0); + + // Receive the hello message from client + recv_string_expect_success (server, "H", 0); + + // Clean up. + test_context_socket_close (client); + test_context_socket_close (server); +} + +int main () +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_tcp); + RUN_TEST (test_inproc); + RUN_TEST (test_inproc_late_bind); + return UNITY_END (); +}