0
0
mirror of https://github.com/zeromq/libzmq.git synced 2025-01-15 10:18:01 +08:00

Merge pull request #4205 from somdoron/master

problem: no way to know when connection is temporarly dropped
This commit is contained in:
Luca Boccassi 2021-06-06 16:21:24 +01:00 committed by GitHub
commit e86237da58
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 141 additions and 3 deletions

View File

@ -1056,7 +1056,8 @@ test_apps += tests/test_poller \
tests/test_msg_init \
tests/test_hello_msg \
tests/test_disconnect_msg \
tests/test_channel
tests/test_channel \
tests/test_hiccup_msg
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
@ -1121,6 +1122,10 @@ tests_test_disconnect_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_channel_SOURCES = tests/test_channel.cpp
tests_test_channel_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_channel_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
tests_test_hiccup_msg_SOURCES = tests/test_hiccup_msg.cpp
tests_test_hiccup_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_hiccup_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
endif
if FUZZING_ENGINE_LIB

View File

@ -256,6 +256,21 @@ Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_ROUTER, ZMQ_SERVER and ZMQ_PEER
ZMQ_HICCUP_MSG: set a hiccup message that the socket will generate when connected peer temporarly disconnect
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
When set, the socket will generate a hiccup message when connect peer has been disconnected.
You may set this on DEALER, CLIENT and PEER sockets.
The combination with ZMQ_HEARTBEAT_IVL is powerful and simplify protocols, when heartbeat recognize a connection drop it
will generate a hiccup message that can match the protocol of the application.
[horizontal]
Option value type:: binary data
Option value unit:: N/A
Default value:: NULL
Applicable socket types:: ZMQ_DEALER, ZMQ_CLIENT and ZMQ_PEER
ZMQ_GSSAPI_PLAINTEXT: Disable GSSAPI encryption
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Defines whether communications on the socket will be encrypted, see

View File

@ -669,6 +669,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread_);
#define ZMQ_DISCONNECT_MSG 111
#define ZMQ_PRIORITY 112
#define ZMQ_BUSY_POLL 113
#define ZMQ_HICCUP_MSG 114
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1

View File

@ -38,6 +38,7 @@ zmq::client_t::client_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
{
options.type = ZMQ_CLIENT;
options.can_send_hello_msg = true;
options.can_recv_hiccup_msg = true;
}
zmq::client_t::~client_t ()

View File

@ -39,6 +39,7 @@ zmq::dealer_t::dealer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
{
options.type = ZMQ_DEALER;
options.can_send_hello_msg = true;
options.can_recv_hiccup_msg = true;
}
zmq::dealer_t::~dealer_t ()

View File

@ -255,6 +255,8 @@ zmq::options_t::options_t () :
can_send_hello_msg (false),
disconnect_msg (),
can_recv_disconnect_msg (false),
hiccup_msg (),
can_recv_hiccup_msg (false),
busy_poll (0)
{
memset (curve_public_key, 0, CURVE_KEYSIZE);
@ -859,6 +861,18 @@ int zmq::options_t::setsockopt (int option_,
}
break;
case ZMQ_HICCUP_MSG:
if (optvallen_ > 0) {
unsigned char *bytes = (unsigned char *) optval_;
hiccup_msg =
std::vector<unsigned char> (bytes, bytes + optvallen_);
} else {
hiccup_msg = std::vector<unsigned char> ();
}
return 0;
#endif
default:

View File

@ -309,6 +309,10 @@ struct options_t
std::vector<unsigned char> disconnect_msg;
bool can_recv_disconnect_msg;
// Hiccup msg
std::vector<unsigned char> hiccup_msg;
bool can_recv_hiccup_msg;
// This option removes several delays caused by scheduling, interrupts and context switching.
int busy_poll;
};

View File

@ -42,6 +42,7 @@ zmq::peer_t::peer_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
options.type = ZMQ_PEER;
options.can_send_hello_msg = true;
options.can_recv_disconnect_msg = true;
options.can_recv_hiccup_msg = true;
}
uint32_t zmq::peer_t::connect_peer (const char *endpoint_uri_)

View File

@ -615,3 +615,15 @@ void zmq::pipe_t::set_disconnect_msg (
_disconnect_msg.init_buffer (&disconnect_[0], disconnect_.size ());
errno_assert (rc == 0);
}
void zmq::pipe_t::send_hiccup_msg (const std::vector<unsigned char> &hiccup_)
{
if (!hiccup_.empty () && _out_pipe) {
msg_t msg;
const int rc = msg.init_buffer (&hiccup_[0], hiccup_.size ());
errno_assert (rc == 0);
_out_pipe->write (msg, false);
flush ();
}
}

View File

@ -150,6 +150,8 @@ class pipe_t ZMQ_FINAL : public object_t,
void send_disconnect_msg ();
void set_disconnect_msg (const std::vector<unsigned char> &disconnect_);
void send_hiccup_msg (const std::vector<unsigned char> &hiccup_);
private:
// Type of the underlying lock-free pipe.
typedef ypipe_base_t<msg_t> upipe_t;

View File

@ -460,14 +460,18 @@ void zmq::session_base_t::engine_error (bool handshaked_,
if (_pipe) {
clean_pipes ();
#ifdef ZMQ_BUILD_DRAFT_API
// Only send disconnect message if socket was accepted and handshake was completed
if (!_active && handshaked_ && options.can_recv_disconnect_msg
&& !options.disconnect_msg.empty ()) {
_pipe->set_disconnect_msg (options.disconnect_msg);
_pipe->send_disconnect_msg ();
}
#endif
// Only send hiccup message if socket was connected and handshake was completed
if (_active && handshaked_ && options.can_recv_hiccup_msg
&& !options.hiccup_msg.empty ()) {
_pipe->send_hiccup_msg (options.hiccup_msg);
}
}
zmq_assert (reason_ == i_engine::connection_error

View File

@ -70,6 +70,7 @@
#define ZMQ_DISCONNECT_MSG 111
#define ZMQ_PRIORITY 112
#define ZMQ_BUSY_POLL 113
#define ZMQ_HICCUP_MSG 114
/* DRAFT ZMQ_RECONNECT_STOP options */
#define ZMQ_RECONNECT_STOP_CONN_REFUSED 0x1

View File

@ -161,6 +161,7 @@ if(ENABLE_DRAFTS)
test_channel
test_hello_msg
test_disconnect_msg
test_hiccup_msg
)
if(ZMQ_HAVE_BUSY_POLL)
list(APPEND tests test_busy_poll)

76
tests/test_hiccup_msg.cpp Normal file
View File

@ -0,0 +1,76 @@
/*
Copyright (c) 2007-2021 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
#include "testutil_unity.hpp"
SETUP_TEARDOWN_TESTCONTEXT
void test ()
{
char address[MAX_SOCKET_STRING];
size_t addr_length = sizeof (address);
// Create a server
void *server = test_context_socket (ZMQ_SERVER);
// bind server
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (server, "tcp://127.0.0.1:*"));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_getsockopt (server, ZMQ_LAST_ENDPOINT, address, &addr_length));
// Create a client
void *client = test_context_socket (ZMQ_CLIENT);
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (client, ZMQ_HELLO_MSG, "HELLO", 5));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (client, ZMQ_HICCUP_MSG, "HICCUP", 6));
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (client, address));
// Receive the hello message from client
recv_string_expect_success (server, "HELLO", 0);
// Kill the server
test_context_socket_close (server);
// Receive the hiccup message
recv_string_expect_success (client, "HICCUP", 0);
// Clean up.
test_context_socket_close (client);
}
int main ()
{
setup_test_environment ();
UNITY_BEGIN ();
RUN_TEST (test);
return UNITY_END ();
}