0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-27 07:31:03 +08:00

Merge pull request #3063 from bluca/heartbeat

Problem: heartbeat commands break REP session, unclear documentation
This commit is contained in:
Doron Somech 2018-04-28 18:41:46 +03:00 committed by GitHub
commit 046689bbe7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 43 additions and 15 deletions

View File

@ -344,7 +344,7 @@ traffic will cancel the timeout.
[horizontal]
Option value type:: int
Option value unit:: milliseconds
Default value:: 0
Default value:: 0 normally, ZMQ_HEARTBEAT_IVL if it is set
Applicable socket types:: all, when using connection-oriented transports

View File

@ -285,6 +285,11 @@ zmq::req_session_t::~req_session_t ()
int zmq::req_session_t::push_msg (msg_t *msg_)
{
// Ignore commands, they are processed by the engine and should not
// affect the state machine.
if (unlikely (msg_->flags () & msg_t::command))
return 0;
switch (state) {
case bottom:
if (msg_->flags () == msg_t::more) {

View File

@ -215,6 +215,7 @@ endforeach()
if(ZMQ_HAVE_CURVE)
set_tests_properties(test_security_curve PROPERTIES TIMEOUT 60)
endif()
set_tests_properties(test_heartbeats PROPERTIES TIMEOUT 60)
if(WIN32 AND ${POLLER} MATCHES "poll")
set_tests_properties(test_many_sockets PROPERTIES TIMEOUT 30)

View File

@ -72,6 +72,10 @@ static void recv_with_retry (raw_socket fd, char *buffer, int bytes)
assert (received <= bytes);
if (received == bytes)
break;
// ZMQ_REP READY message is shorter, check the actual socket type
if (received >= 3 && buffer[received - 1] == 'P'
&& buffer[received - 2] == 'E' && buffer[received - 3] == 'R')
break;
}
}
@ -135,11 +139,12 @@ static void prep_server_socket (void *ctx,
void **server_out,
void **mon_out,
char *endpoint,
size_t ep_length)
size_t ep_length,
int socket_type)
{
int rc;
// We'll be using this socket in raw mode
void *server = zmq_socket (ctx, ZMQ_ROUTER);
void *server = zmq_socket (ctx, socket_type);
assert (server);
int value = 0;
@ -180,7 +185,7 @@ static void prep_server_socket (void *ctx,
// This checks for a broken TCP connection (or, in this case a stuck one
// where the peer never responds to PINGS). There should be an accepted event
// then a disconnect event.
static void test_heartbeat_timeout (void)
static void test_heartbeat_timeout (int server_type)
{
int rc;
char my_endpoint[MAX_SOCKET_STRING];
@ -191,7 +196,7 @@ static void test_heartbeat_timeout (void)
void *server, *server_mon;
prep_server_socket (ctx, 1, 0, &server, &server_mon, my_endpoint,
MAX_SOCKET_STRING);
MAX_SOCKET_STRING, server_type);
struct sockaddr_in ip4addr;
raw_socket s;
@ -236,7 +241,7 @@ static void test_heartbeat_timeout (void)
// to a server that is not doing any heartbeating. Then we sleep,
// if the server disconnects the client, then we know the TTL did
// its thing correctly.
static void test_heartbeat_ttl (void)
static void test_heartbeat_ttl (int client_type, int server_type)
{
int rc, value;
char my_endpoint[MAX_SOCKET_STRING];
@ -247,9 +252,9 @@ static void test_heartbeat_ttl (void)
void *server, *server_mon, *client;
prep_server_socket (ctx, 0, 0, &server, &server_mon, my_endpoint,
MAX_SOCKET_STRING);
MAX_SOCKET_STRING, server_type);
client = zmq_socket (ctx, ZMQ_DEALER);
client = zmq_socket (ctx, client_type);
assert (client != NULL);
// Set the heartbeat TTL to 0.1 seconds
@ -292,7 +297,8 @@ static void test_heartbeat_ttl (void)
// This checks for normal operation - that is pings and pongs being
// exchanged normally. There should be an accepted event on the server,
// and then no event afterwards.
static void test_heartbeat_notimeout (int is_curve)
static void
test_heartbeat_notimeout (int is_curve, int client_type, int server_type)
{
int rc;
char my_endpoint[MAX_SOCKET_STRING];
@ -303,9 +309,9 @@ static void test_heartbeat_notimeout (int is_curve)
void *server, *server_mon;
prep_server_socket (ctx, 1, is_curve, &server, &server_mon, my_endpoint,
MAX_SOCKET_STRING);
MAX_SOCKET_STRING, server_type);
void *client = zmq_socket (ctx, ZMQ_DEALER);
void *client = zmq_socket (ctx, client_type);
if (is_curve)
setup_curve (client, 0);
rc = zmq_connect (client, my_endpoint);
@ -337,10 +343,26 @@ static void test_heartbeat_notimeout (int is_curve)
int main (void)
{
setup_test_environment ();
test_heartbeat_timeout ();
test_heartbeat_ttl ();
test_heartbeat_timeout (ZMQ_ROUTER);
test_heartbeat_timeout (ZMQ_REP);
test_heartbeat_ttl (ZMQ_DEALER, ZMQ_ROUTER);
test_heartbeat_ttl (ZMQ_REQ, ZMQ_REP);
test_heartbeat_ttl (ZMQ_PULL, ZMQ_PUSH);
test_heartbeat_ttl (ZMQ_SUB, ZMQ_PUB);
test_heartbeat_ttl (ZMQ_PAIR, ZMQ_PAIR);
// Run this test without curve
test_heartbeat_notimeout (0);
test_heartbeat_notimeout (0, ZMQ_DEALER, ZMQ_ROUTER);
test_heartbeat_notimeout (0, ZMQ_REQ, ZMQ_REP);
test_heartbeat_notimeout (0, ZMQ_PULL, ZMQ_PUSH);
test_heartbeat_notimeout (0, ZMQ_SUB, ZMQ_PUB);
test_heartbeat_notimeout (0, ZMQ_PAIR, ZMQ_PAIR);
// Then rerun it with curve
test_heartbeat_notimeout (1);
test_heartbeat_notimeout (1, ZMQ_DEALER, ZMQ_ROUTER);
test_heartbeat_notimeout (1, ZMQ_REQ, ZMQ_REP);
test_heartbeat_notimeout (1, ZMQ_PULL, ZMQ_PUSH);
test_heartbeat_notimeout (1, ZMQ_SUB, ZMQ_PUB);
test_heartbeat_notimeout (1, ZMQ_PAIR, ZMQ_PAIR);
}