From 81444136d55743b62481f51d7eb0de72e476b1b2 Mon Sep 17 00:00:00 2001 From: Doron Somech Date: Fri, 17 Apr 2020 13:20:57 +0300 Subject: [PATCH] problem: router doesn't know when peer disconnected ZMQ_ROUTER_NOTIFY doesn't have a context and doesn't play nice with protocols. with ZMQ_DISCONNECT_MSG we can set it to a protocol message, like DISCONNECT in majordomo. Router will send it when a peer is disconnected. Another advantage of ZMQ_DISCONNECT_MSG is that it also works on inproc. Together with ZMQ_HEARTBEAT it allows to build very reliable protocols, and much simpler as well. --- Makefile.am | 7 +- doc/zmq_setsockopt.txt | 12 ++++ include/zmq.h | 1 + src/ctx.cpp | 7 ++ src/options.cpp | 14 +++- src/options.hpp | 4 ++ src/peer.cpp | 1 + src/pipe.cpp | 23 +++++++ src/pipe.hpp | 8 ++- src/router.cpp | 1 + src/server.cpp | 1 + src/session_base.cpp | 15 ++++- src/session_base.hpp | 2 +- src/socket_base.cpp | 10 ++- src/stream_engine_base.cpp | 6 +- src/udp_engine.cpp | 2 +- src/zmq_draft.h | 1 + tests/test_disconnect_msg.cpp | 118 ++++++++++++++++++++++++++++++++++ 18 files changed, 224 insertions(+), 9 deletions(-) create mode 100644 tests/test_disconnect_msg.cpp diff --git a/Makefile.am b/Makefile.am index 7db8297f..d70058df 100755 --- a/Makefile.am +++ b/Makefile.am @@ -1042,7 +1042,8 @@ test_apps += tests/test_poller \ tests/test_peer \ tests/test_reconnect_options \ tests/test_msg_init \ - tests/test_hello_msg + tests/test_hello_msg \ + tests/test_disconnect_msg tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la @@ -1099,6 +1100,10 @@ tests_test_msg_init_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_hello_msg_SOURCES = tests/test_hello_msg.cpp tests_test_hello_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_hello_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +tests_test_disconnect_msg_SOURCES = tests/test_disconnect_msg.cpp +tests_test_disconnect_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_disconnect_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} endif if ENABLE_STATIC diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt index 11e8b96b..c7414a5a 100755 --- a/doc/zmq_setsockopt.txt +++ b/doc/zmq_setsockopt.txt @@ -228,6 +228,18 @@ Option value size:: 32 or 41 Default value:: NULL Applicable socket types:: all, when using TCP transport +ZMQ_DISCONNECT_MSG: set a disconnect message that the socket will generate when accepted peer disconnect +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +When set, the socket will generate a disconnect message when accepted peer has been disconnected. +You may set this on ROUTER, SERVER and PEER sockets. +The combination with ZMQ_HEARTBEAT_IVL is powerful and simplify protocols, when heartbeat recognize a connection drop it +will generate a disconnect message that can match the protocol of the application. + +[horizontal] +Option value type:: binary data +Option value unit:: N/A +Default value:: NULL +Applicable socket types:: ZMQ_ROUTER, ZMQ_SERVER and ZMQ_PEER ZMQ_GSSAPI_PLAINTEXT: Disable GSSAPI encryption ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/include/zmq.h b/include/zmq.h index d7b33b4d..fcf45be1 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -679,6 +679,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_); #define ZMQ_ONLY_FIRST_SUBSCRIBE 108 #define ZMQ_RECONNECT_STOP 109 #define ZMQ_HELLO_MSG 110 +#define ZMQ_DISCONNECT_MSG 111 /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 diff --git a/src/ctx.cpp b/src/ctx.cpp index 7d00e63a..b736b18b 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -828,6 +828,13 @@ void zmq::ctx_t::connect_inproc_sockets ( pending_connection_.bind_pipe->set_hwms (-1, -1); } +#ifdef ZMQ_BUILD_DRAFT_API + if (bind_options_.can_recv_disconnect_msg + && !bind_options_.disconnect_msg.empty ()) + pending_connection_.connect_pipe->set_disconnect_msg ( + bind_options_.disconnect_msg); +#endif + if (side_ == bind_side) { command_t cmd; cmd.type = command_t::bind; diff --git a/src/options.cpp b/src/options.cpp index 029896aa..b257fe14 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -251,7 +251,9 @@ zmq::options_t::options_t () : monitor_event_version (1), wss_trust_system (false), hello_msg (), - can_send_hello_msg (false) + can_send_hello_msg (false), + disconnect_msg (), + can_recv_disconnect_msg (false) { memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_secret_key, 0, CURVE_KEYSIZE); @@ -825,6 +827,16 @@ int zmq::options_t::setsockopt (int option_, hello_msg = std::vector (); } + return 0; + + case ZMQ_DISCONNECT_MSG: + if (optvallen_ > 0) { + unsigned char *bytes = (unsigned char *) optval_; + disconnect_msg = + std::vector (bytes, bytes + optvallen_); + } else { + disconnect_msg = std::vector (); + } return 0; diff --git a/src/options.hpp b/src/options.hpp index afee4e7f..3dd24b90 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -301,6 +301,10 @@ struct options_t // Hello msg std::vector hello_msg; bool can_send_hello_msg; + + // Disconnect msg + std::vector disconnect_msg; + bool can_recv_disconnect_msg; }; inline bool get_effective_conflate_option (const options_t &options) diff --git a/src/peer.cpp b/src/peer.cpp index d4cd61e0..4d7e7461 100644 --- a/src/peer.cpp +++ b/src/peer.cpp @@ -41,6 +41,7 @@ zmq::peer_t::peer_t (class ctx_t *parent_, uint32_t tid_, int sid_) : { options.type = ZMQ_PEER; options.can_send_hello_msg = true; + options.can_recv_disconnect_msg = true; } uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_) diff --git a/src/pipe.cpp b/src/pipe.cpp index 873d4201..69d5e4f6 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -124,10 +124,12 @@ zmq::pipe_t::pipe_t (object_t *parent_, _server_socket_routing_id (0), _conflate (conflate_) { + _disconnect_msg.init (); } zmq::pipe_t::~pipe_t () { + _disconnect_msg.close (); } void zmq::pipe_t::set_peer (pipe_t *peer_) @@ -591,3 +593,24 @@ void zmq::pipe_t::process_pipe_peer_stats (uint64_t queue_count_, send_pipe_stats_publish (socket_base_, queue_count_, _msgs_written - _peers_msgs_read, endpoint_pair_); } + +void zmq::pipe_t::send_disconnect_msg () +{ + if (_disconnect_msg.size () > 0) { + // Rollback any incomplete message in the pipe, and push the disconnect message. + rollback (); + + _out_pipe->write (_disconnect_msg, false); + flush (); + _disconnect_msg.init (); + } +} + +void zmq::pipe_t::set_disconnect_msg ( + const std::vector &disconnect_) +{ + _disconnect_msg.close (); + const int rc = + _disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ()); + errno_assert (rc == 0); +} diff --git a/src/pipe.hpp b/src/pipe.hpp index 13dc1895..8136b860 100644 --- a/src/pipe.hpp +++ b/src/pipe.hpp @@ -38,10 +38,10 @@ #include "blob.hpp" #include "options.hpp" #include "endpoint.hpp" +#include "msg.hpp" namespace zmq { -class msg_t; class pipe_t; // Create a pipepair for bi-directional transfer of messages. @@ -147,6 +147,9 @@ class pipe_t ZMQ_FINAL : public object_t, void send_stats_to_peer (own_t *socket_base_); + void send_disconnect_msg (); + void set_disconnect_msg (const std::vector &disconnect_); + private: // Type of the underlying lock-free pipe. typedef ypipe_base_t upipe_t; @@ -257,6 +260,9 @@ class pipe_t ZMQ_FINAL : public object_t, // The endpoints of this pipe. endpoint_uri_pair_t _endpoint_pair; + // Disconnect msg + msg_t _disconnect_msg; + ZMQ_NON_COPYABLE_NOR_MOVABLE (pipe_t) }; diff --git a/src/router.cpp b/src/router.cpp index cdbf3434..1dfbb86a 100644 --- a/src/router.cpp +++ b/src/router.cpp @@ -56,6 +56,7 @@ zmq::router_t::router_t (class ctx_t *parent_, uint32_t tid_, int sid_) : options.recv_routing_id = true; options.raw_socket = false; options.can_send_hello_msg = true; + options.can_recv_disconnect_msg = true; _prefetched_id.init (); _prefetched_msg.init (); diff --git a/src/server.cpp b/src/server.cpp index 3aaccf92..6f0cf269 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -42,6 +42,7 @@ zmq::server_t::server_t (class ctx_t *parent_, uint32_t tid_, int sid_) : { options.type = ZMQ_SERVER; options.can_send_hello_msg = true; + options.can_recv_disconnect_msg = true; } zmq::server_t::~server_t () diff --git a/src/session_base.cpp b/src/session_base.cpp index b0491b47..88a8e701 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -442,15 +442,26 @@ void zmq::session_base_t::process_attach (i_engine *engine_) _engine->plug (_io_thread, this); } -void zmq::session_base_t::engine_error (zmq::i_engine::error_reason_t reason_) +void zmq::session_base_t::engine_error (bool handshaked_, + zmq::i_engine::error_reason_t reason_) { // Engine is dead. Let's forget about it. _engine = NULL; // Remove any half-done messages from the pipes. - if (_pipe) + if (_pipe) { clean_pipes (); +#ifdef ZMQ_BUILD_DRAFT_API + // Only send disconnect message if socket was accepted and handshake was completed + if (!_active && handshaked_ && options.can_recv_disconnect_msg + && !options.disconnect_msg.empty ()) { + _pipe->set_disconnect_msg (options.disconnect_msg); + _pipe->send_disconnect_msg (); + } +#endif + } + zmq_assert (reason_ == i_engine::connection_error || reason_ == i_engine::timeout_error || reason_ == i_engine::protocol_error); diff --git a/src/session_base.hpp b/src/session_base.hpp index c95cb9d2..043f9a6f 100644 --- a/src/session_base.hpp +++ b/src/session_base.hpp @@ -62,7 +62,7 @@ class session_base_t : public own_t, public io_object_t, public i_pipe_events virtual void reset (); void flush (); void rollback (); - void engine_error (zmq::i_engine::error_reason_t reason_); + void engine_error (bool handshaked_, zmq::i_engine::error_reason_t reason_); // i_pipe_events interface implementation. void read_activated (zmq::pipe_t *pipe_) ZMQ_FINAL; diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 34c023ee..dcc494ef 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -122,8 +122,10 @@ int zmq::socket_base_t::inprocs_t::erase_pipes ( return -1; } - for (map_t::iterator it = range.first; it != range.second; ++it) + for (map_t::iterator it = range.first; it != range.second; ++it) { + it->second->send_disconnect_msg (); it->second->terminate (true); + } _inprocs.erase (range.first, range.second); return 0; } @@ -864,6 +866,10 @@ int zmq::socket_base_t::connect_internal (const char *endpoint_uri_) && peer.options.hello_msg.size () > 0) { send_hello_msg (new_pipes[1], peer.options); } + + if (peer.options.can_recv_disconnect_msg + && peer.options.disconnect_msg.size () > 0) + new_pipes[0]->set_disconnect_msg (peer.options.disconnect_msg); #endif // Attach remote end of the pipe to the peer socket. Note that peer's @@ -1530,6 +1536,8 @@ void zmq::socket_base_t::process_term (int linger_) // Ask all attached pipes to terminate. for (pipes_t::size_type i = 0, size = _pipes.size (); i != size; ++i) { + // Only inprocs might have a disconnect message set + _pipes[i]->send_disconnect_msg (); _pipes[i]->terminate (false); } register_term_acks (static_cast (_pipes.size ())); diff --git a/src/stream_engine_base.cpp b/src/stream_engine_base.cpp index 68703fde..8d02a376 100644 --- a/src/stream_engine_base.cpp +++ b/src/stream_engine_base.cpp @@ -685,7 +685,11 @@ void zmq::stream_engine_base_t::error (error_reason_t reason_) _socket->event_disconnected (_endpoint_uri_pair, _s); _session->flush (); - _session->engine_error (reason_); + _session->engine_error ( + !_handshaking + && (_mechanism == NULL + || _mechanism->status () != mechanism_t::handshaking), + reason_); unplug (); delete this; } diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index 9d90634f..0f7f58b3 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -344,7 +344,7 @@ int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_) void zmq::udp_engine_t::error (error_reason_t reason_) { zmq_assert (_session); - _session->engine_error (reason_); + _session->engine_error (false, reason_); terminate (); } diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 5c612ca0..a1cae92a 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -66,6 +66,7 @@ #define ZMQ_ONLY_FIRST_SUBSCRIBE 108 #define ZMQ_RECONNECT_STOP 109 #define ZMQ_HELLO_MSG 110 +#define ZMQ_DISCONNECT_MSG 111 /* DRAFT ZMQ_RECONNECT_STOP options */ #define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1 diff --git a/tests/test_disconnect_msg.cpp b/tests/test_disconnect_msg.cpp new file mode 100644 index 00000000..73efef3d --- /dev/null +++ b/tests/test_disconnect_msg.cpp @@ -0,0 +1,118 @@ +/* + Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +SETUP_TEARDOWN_TESTCONTEXT + +void test (const char *address) +{ + // Create a server + void *server = test_context_socket (ZMQ_SERVER); + + // set server socket options + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (server, ZMQ_DISCONNECT_MSG, "D", 1)); + + // bind server + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (server, address)); + + // Create a client + void *client = test_context_socket (ZMQ_CLIENT); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (client, ZMQ_HELLO_MSG, "H", 1)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, address)); + + // Receive the hello message from client + recv_string_expect_success (server, "H", 0); + + // Kill the client + test_context_socket_close (client); + + // Receive the disconnect message + recv_string_expect_success (server, "D", 0); + + // Clean up. + test_context_socket_close (server); +} + +void test_tcp () +{ + test ("tcp://127.0.0.1:5569"); +} + +void test_inproc () +{ + test ("inproc://disconnect-msg"); +} + + +void test_inproc_disconnect () +{ + const char *address = "inproc://disconnect-msg"; + + // Create a server + void *server = test_context_socket (ZMQ_SERVER); + + // set server socket options + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_setsockopt (server, ZMQ_DISCONNECT_MSG, "D", 1)); + + // bind server + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (server, address)); + + // Create a client + void *client = test_context_socket (ZMQ_CLIENT); + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (client, ZMQ_HELLO_MSG, "H", 1)); + TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, address)); + + // Receive the hello message from client + recv_string_expect_success (server, "H", 0); + + // disconnect the client + TEST_ASSERT_SUCCESS_ERRNO (zmq_disconnect (client, address)); + + // Receive the disconnect message + recv_string_expect_success (server, "D", 0); + + // Clean up. + test_context_socket_close (client); + test_context_socket_close (server); +} + +int main () +{ + setup_test_environment (); + + UNITY_BEGIN (); + RUN_TEST (test_tcp); + RUN_TEST (test_inproc); + RUN_TEST (test_inproc_disconnect); + return UNITY_END (); +}