diff --git a/Makefile.am b/Makefile.am index 4d43c2ca..8ceba66c 100644 --- a/Makefile.am +++ b/Makefile.am @@ -358,7 +358,8 @@ test_apps = \ tests/test_server_drop_more \ tests/test_client_drop_more \ tests/test_thread_safe \ - tests/test_socketopt_hwm + tests/test_socketopt_hwm \ + tests/test_heartbeats tests_test_system_SOURCES = tests/test_system.cpp tests_test_system_LDADD = src/libzmq.la @@ -554,7 +555,8 @@ tests_test_thread_safe_LDADD = src/libzmq.la tests_test_socketopt_hwm_SOURCES = tests/test_sockopt_hwm.cpp tests_test_socketopt_hwm_LDADD = src/libzmq.la - +tests_test_heartbeats_SOURCES = tests/test_heartbeats.cpp +tests_test_heartbeats_LDADD = src/libzmq.la if !ON_MINGW if !ON_CYGWIN diff --git a/include/zmq.h b/include/zmq.h index c99338c5..0eb31f7a 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -316,6 +316,9 @@ ZMQ_EXPORT uint32_t zmq_msg_get_routing_id(zmq_msg_t *msg); #define ZMQ_XPUB_WELCOME_MSG 72 #define ZMQ_STREAM_NOTIFY 73 #define ZMQ_INVERT_MATCHING 74 +#define ZMQ_HEARTBEAT_IVL 75 +#define ZMQ_HEARTBEAT_TTL 76 +#define ZMQ_HEARTBEAT_TIMEOUT 77 /* Message options */ #define ZMQ_MORE 1 diff --git a/src/curve_client.cpp b/src/curve_client.cpp index 0b25b32c..a7c2cc10 100644 --- a/src/curve_client.cpp +++ b/src/curve_client.cpp @@ -130,6 +130,8 @@ int zmq::curve_client_t::encode (msg_t *msg_) uint8_t flags = 0; if (msg_->flags () & msg_t::more) flags |= 0x01; + if (msg_->flags () & msg_t::command) + flags |= 0x02; uint8_t message_nonce [crypto_box_NONCEBYTES]; memcpy (message_nonce, "CurveZMQMESSAGEC", 16); @@ -223,6 +225,8 @@ int zmq::curve_client_t::decode (msg_t *msg_) const uint8_t flags = message_plaintext [crypto_box_ZEROBYTES]; if (flags & 0x01) msg_->set_flags (msg_t::more); + if (flags & 0x02) + msg_->set_flags (msg_t::command); memcpy (msg_->data (), message_plaintext + crypto_box_ZEROBYTES + 1, diff --git a/src/curve_server.cpp b/src/curve_server.cpp index e85b10b8..b47f5dfd 100644 --- a/src/curve_server.cpp +++ b/src/curve_server.cpp @@ -142,6 +142,8 @@ int zmq::curve_server_t::encode (msg_t *msg_) uint8_t flags = 0; if (msg_->flags () & msg_t::more) flags |= 0x01; + if (msg_->flags () & msg_t::command) + flags |= 0x02; uint8_t *message_plaintext = static_cast (malloc (mlen)); alloc_assert (message_plaintext); @@ -232,6 +234,8 @@ int zmq::curve_server_t::decode (msg_t *msg_) const uint8_t flags = message_plaintext [crypto_box_ZEROBYTES]; if (flags & 0x01) msg_->set_flags (msg_t::more); + if (flags & 0x02) + msg_->set_flags (msg_t::command); memcpy (msg_->data (), message_plaintext + crypto_box_ZEROBYTES + 1, diff --git a/src/gssapi_mechanism_base.cpp b/src/gssapi_mechanism_base.cpp index 355f1528..bdd18361 100644 --- a/src/gssapi_mechanism_base.cpp +++ b/src/gssapi_mechanism_base.cpp @@ -80,6 +80,8 @@ int zmq::gssapi_mechanism_base_t::encode_message (msg_t *msg_) uint8_t flags = 0; if (msg_->flags () & msg_t::more) flags |= 0x01; + if (msg ->flags () & msg_t::command) + flags |= 0x02; uint8_t *plaintext_buffer = static_cast (malloc(msg_->size ()+1)); plaintext_buffer[0] = flags; @@ -177,6 +179,8 @@ int zmq::gssapi_mechanism_base_t::decode_message (msg_t *msg_) const uint8_t flags = static_cast (plaintext.value)[0]; if (flags & 0x01) msg_->set_flags (msg_t::more); + if (flags & 0x02) + msg_->set_flags (msg_t::command); memcpy (msg_->data (), static_cast (plaintext.value)+1, plaintext.length-1); diff --git a/src/options.cpp b/src/options.cpp index 40116996..ebc35290 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -69,7 +69,10 @@ zmq::options_t::options_t () : socket_id (0), conflate (false), handshake_ivl (30000), - connected (false) + connected (false), + heartbeat_ttl (0), + heartbeat_interval (0), + heartbeat_timeout (0) { } @@ -519,6 +522,27 @@ int zmq::options_t::setsockopt (int option_, const void *optval_, } break; + case ZMQ_HEARTBEAT_IVL: + if (is_int && value >= 0) { + heartbeat_interval = value; + return 0; + } + break; + + case ZMQ_HEARTBEAT_TTL: + if (is_int && value >= 0 && value < 0xffff) { + heartbeat_ttl = (uint16_t)value; + return 0; + } + break; + + case ZMQ_HEARTBEAT_TIMEOUT: + if (is_int && value >= 0) { + heartbeat_timeout = value; + return 0; + } + break; + default: #if defined (ZMQ_ACT_MILITANT) // There are valid scenarios for probing with unknown socket option @@ -872,6 +896,27 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_) } break; + case ZMQ_HEARTBEAT_IVL: + if (is_int) { + *value = heartbeat_interval; + return 0; + } + break; + + case ZMQ_HEARTBEAT_TTL: + if (is_int) { + *(uint16_t*)value = heartbeat_ttl; + return 0; + } + break; + + case ZMQ_HEARTBEAT_TIMEOUT: + if (is_int) { + *value = heartbeat_timeout; + return 0; + } + break; + default: #if defined (ZMQ_ACT_MILITANT) malformed = false; diff --git a/src/options.hpp b/src/options.hpp index cb83616f..bdaea966 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -198,6 +198,15 @@ namespace zmq int handshake_ivl; bool connected; + // If remote peer receives a PING message and doesn't receive another + // message within the ttl value, it should close the connection + // (measured in tenths of a second) + uint16_t heartbeat_ttl; + // Time in milliseconds between sending heartbeat PING messages. + int heartbeat_interval; + // Time in milliseconds to wait for a PING response before disconnecting + int heartbeat_timeout; + }; } diff --git a/src/session_base.cpp b/src/session_base.cpp index 9b2bcf78..be425f20 100644 --- a/src/session_base.cpp +++ b/src/session_base.cpp @@ -137,6 +137,8 @@ int zmq::session_base_t::pull_msg (msg_t *msg_) int zmq::session_base_t::push_msg (msg_t *msg_) { + if(msg_->flags() & msg_t::command) + return 0; if (pipe && pipe->write (msg_)) { int rc = msg_->init (); errno_assert (rc == 0); diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 8c8eb8a4..6190395c 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -95,6 +95,9 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, input_stopped (false), output_stopped (false), has_handshake_timer (false), + has_ttl_timer (false), + has_timeout_timer (false), + has_heartbeat_timer (false), socket (NULL) { int rc = tx_msg.init (); @@ -250,6 +253,20 @@ void zmq::stream_engine_t::unplug () has_handshake_timer = false; } + if (has_ttl_timer) { + cancel_timer (heartbeat_ttl_timer_id); + has_ttl_timer = false; + } + + if (has_timeout_timer) { + cancel_timer (heartbeat_timeout_timer_id); + has_timeout_timer = false; + } + + if (has_heartbeat_timer) { + cancel_timer (heartbeat_ivl_timer_id); + has_heartbeat_timer = false; + } // Cancel all fd subscriptions. if (!io_error) rm_fd (handle); @@ -686,6 +703,11 @@ bool zmq::stream_engine_t::handshake () } next_msg = &stream_engine_t::next_handshake_command; process_msg = &stream_engine_t::process_handshake_command; + + if(options.heartbeat_interval > 0) { + add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id); + has_heartbeat_timer = true; + } } // Start polling for output if necessary. @@ -883,6 +905,23 @@ int zmq::stream_engine_t::decode_and_push (msg_t *msg_) if (mechanism->decode (msg_) == -1) return -1; + + if(has_timeout_timer) { + has_timeout_timer = false; + cancel_timer(heartbeat_timeout_timer_id); + } + + if(has_ttl_timer) { + has_ttl_timer = false; + cancel_timer(heartbeat_ttl_timer_id); + } + + if(msg_->flags() & msg_t::command) { + uint8_t cmd_id = *((uint8_t*)msg_->data()); + if(cmd_id == 4) + process_heartbeat_message(msg_); + } + if (metadata) msg_->set_metadata (metadata); if (session->push_msg (msg_) == -1) { @@ -954,9 +993,86 @@ bool zmq::stream_engine_t::init_properties (properties_t & properties) { void zmq::stream_engine_t::timer_event (int id_) { - zmq_assert (id_ == handshake_timer_id); - has_handshake_timer = false; - - // handshake timer expired before handshake completed, so engine fails - error (timeout_error); + if(id_ == handshake_timer_id) { + has_handshake_timer = false; + // handshake timer expired before handshake completed, so engine fail + error (timeout_error); + } + else if(id_ == heartbeat_ivl_timer_id) { + next_msg = &stream_engine_t::produce_ping_message; + out_event(); + add_timer(options.heartbeat_interval, heartbeat_ivl_timer_id); + } + else if(id_ == heartbeat_ttl_timer_id) { + has_ttl_timer = false; + error(timeout_error); + } + else if(id_ == heartbeat_timeout_timer_id) { + has_timeout_timer = false; + error(timeout_error); + } + else + // There are no other valid timer ids! + assert(false); +} + +int zmq::stream_engine_t::produce_ping_message(msg_t * msg_) +{ + int rc = 0; + zmq_assert (mechanism != NULL); + + // 16-bit TTL + \4PING == 7 + msg_->init_size(7); + msg_->set_flags(msg_t::command); + // Copy in the command message + memcpy(msg_->data(), "\4PING", 5); + + uint16_t ttl_val = htons(options.heartbeat_ttl); + memcpy(((uint8_t*)msg_->data()) + 5, &ttl_val, sizeof(ttl_val)); + + rc = mechanism->encode (msg_); + next_msg = &stream_engine_t::pull_and_encode; + if(!has_timeout_timer && options.heartbeat_timeout > 0) { + add_timer(options.heartbeat_timeout, heartbeat_timeout_timer_id); + has_timeout_timer = true; + } + return rc; +} + +int zmq::stream_engine_t::produce_pong_message(msg_t * msg_) +{ + int rc = 0; + zmq_assert (mechanism != NULL); + + msg_->init_size(5); + msg_->set_flags(msg_t::command); + + memcpy(msg_->data(), "\4PONG", 5); + + rc = mechanism->encode (msg_); + next_msg = &stream_engine_t::pull_and_encode; + return rc; +} + +int zmq::stream_engine_t::process_heartbeat_message(msg_t * msg_) +{ + if(memcmp(msg_->data(), "\4PING", 5) == 0) { + uint16_t remote_heartbeat_ttl; + // Get the remote heartbeat TTL to setup the timer + memcpy(&remote_heartbeat_ttl, (uint8_t*)msg_->data() + 5, 2); + remote_heartbeat_ttl = ntohs(remote_heartbeat_ttl); + // The remote heartbeat is in 10ths of a second + // so we multiply it by 10 to get the timer interval. + remote_heartbeat_ttl *= 10; + + if(!has_ttl_timer && remote_heartbeat_ttl > 0) { + add_timer(remote_heartbeat_ttl, heartbeat_ttl_timer_id); + has_ttl_timer = true; + } + + next_msg = &stream_engine_t::produce_pong_message; + out_event(); + } + + return 0; } diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp index fad77f57..5d661d89 100644 --- a/src/stream_engine.hpp +++ b/src/stream_engine.hpp @@ -127,6 +127,10 @@ namespace zmq typedef metadata_t::dict_t properties_t; bool init_properties (properties_t & properties); + int produce_ping_message(msg_t * msg_); + int process_heartbeat_message(msg_t * msg_); + int produce_pong_message(msg_t * msg_); + // Underlying socket. fd_t s; @@ -206,6 +210,16 @@ namespace zmq // True is linger timer is running. bool has_handshake_timer; + // Heartbeat stuff + enum { + heartbeat_ivl_timer_id = 0x80, + heartbeat_timeout_timer_id = 0x81, + heartbeat_ttl_timer_id = 0x82 + }; + bool has_ttl_timer; + bool has_timeout_timer; + bool has_heartbeat_timer; + // Socket zmq::socket_base_t *socket; diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 0086b651..8256cdbc 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -49,6 +49,7 @@ set(tests test_thread_safe test_client_server test_sockopt_hwm + test_heartbeats ) if(NOT WIN32) list(APPEND tests diff --git a/tests/test_heartbeats.cpp b/tests/test_heartbeats.cpp new file mode 100644 index 00000000..7bfbc7d9 --- /dev/null +++ b/tests/test_heartbeats.cpp @@ -0,0 +1,329 @@ +/* + Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#if defined (ZMQ_HAVE_WINDOWS) +# include +# include +# include +# define close closesocket +#else +# include +# include +# include +# include +#endif + +// Read one event off the monitor socket; return value and address +// by reference, if not null, and event number by value. Returns -1 +// in case of error. + +static int +get_monitor_event (void *monitor) +{ + for(int i = 0; i < 2; i++) { + // First frame in message contains event number and value + zmq_msg_t msg; + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) { + msleep(150); + continue; // Interruped, presumably + } + assert (zmq_msg_more (&msg)); + + uint8_t *data = (uint8_t *) zmq_msg_data (&msg); + uint16_t event = *(uint16_t *) (data); + + // Second frame in message contains event address + zmq_msg_init (&msg); + if (zmq_msg_recv (&msg, monitor, 0) == -1) { + return -1; // Interruped, presumably + } + assert (!zmq_msg_more (&msg)); + + return event; + } + return -1; +} + +static void +mock_handshake (int fd) { + const uint8_t zmtp_greeting[33] = { 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0 }; + char buffer[128]; + memset(buffer, 0, sizeof(buffer)); + memcpy(buffer, zmtp_greeting, sizeof(zmtp_greeting)); + + int rc = send(fd, buffer, 64, 0); + assert(rc == 64); + + rc = recv(fd, buffer, 64, 0); + assert(rc == 64); + + const uint8_t zmtp_ready[43] = { + 4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p', 'e', + 0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y', + 0, 0, 0, 0 + }; + + memset(buffer, 0, sizeof(buffer)); + memcpy(buffer, zmtp_ready, 43); + rc = send(fd, buffer, 43, 0); + assert(rc == 43); + + rc = recv(fd, buffer, 43, 0); + assert(rc == 43); +} + +static void +setup_curve(void * socket, int is_server) { + const char *secret_key; + const char *public_key; + const char *server_key; + + if(is_server) { + secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6"; + public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7"; + server_key = NULL; + } + else { + secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs"; + public_key = "Yne@$w-vo -1); + + // Mock a ZMTP 3 client so we can forcibly time out a connection + mock_handshake(s); + + // By now everything should report as connected + rc = get_monitor_event(server_mon); + assert(rc == ZMQ_EVENT_ACCEPTED); + + // We should have been disconnected + rc = get_monitor_event(server_mon); + assert(rc == ZMQ_EVENT_DISCONNECTED); + + close(s); + + rc = zmq_close (server); + assert (rc == 0); + + rc = zmq_close (server_mon); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +// This checks that peers respect the TTL value in ping messages +// We set up a mock ZMTP 3 client and send a ping message with a TLL +// to a server that is not doing any heartbeating. Then we sleep, +// if the server disconnects the client, then we know the TTL did +// its thing correctly. +static void +test_heartbeat_ttl (void) +{ + int rc; + + // Set up our context and sockets + void *ctx = zmq_ctx_new (); + assert (ctx); + + void * server, * server_mon; + prep_server_socket(ctx, 0, 0, &server, &server_mon); + + struct sockaddr_in ip4addr; + int s; + + ip4addr.sin_family = AF_INET; + ip4addr.sin_port = htons(5556); + inet_pton(AF_INET, "127.0.0.1", &ip4addr.sin_addr); + + s = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP); + rc = connect (s, (struct sockaddr*) &ip4addr, sizeof ip4addr); + assert (rc > -1); + + // Mock a ZMTP 3 client so we can forcibly time out a connection + mock_handshake(s); + + // By now everything should report as connected + rc = get_monitor_event(server_mon); + assert(rc == ZMQ_EVENT_ACCEPTED); + + // This is a ping message with a 0.5 second TTL. + uint8_t ping_message[] = { + 0x4, // This specifies that this is a command message + 0x7, // The total payload length is 8 bytes + 0x4, 'P', 'I', 'N', 'G', // The command name + 0, 10 // This is a network-order 16-bit TTL value + }; + rc = send(s, ping_message, sizeof(ping_message), 0); + assert(rc == sizeof(ping_message)); + + uint8_t pong_buffer[8] = { 0 }; + rc = recv(s, pong_buffer, 7, 0); + assert(rc == 7 && memcmp(pong_buffer, "\4\5\4PONG", 7) == 0); + + // We should have been disconnected + rc = get_monitor_event(server_mon); + assert(rc == ZMQ_EVENT_DISCONNECTED); + + close(s); + + rc = zmq_close (server); + assert (rc == 0); + + rc = zmq_close (server_mon); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +// This checks for normal operation - that is pings and pongs being +// exchanged normally. There should be an accepted event on the server, +// and then no event afterwards. +static void +test_heartbeat_notimeout (int is_curve) +{ + int rc; + + // Set up our context and sockets + void *ctx = zmq_ctx_new (); + assert (ctx); + + void * server, * server_mon; + prep_server_socket(ctx, 1, is_curve, &server, &server_mon); + + void * client = zmq_socket(ctx, ZMQ_DEALER); + if(is_curve) + setup_curve(client, 0); + rc = zmq_connect(client, "tcp://127.0.0.1:5556"); + + // Give it a sec to connect and handshake + msleep(100); + + // By now everything should report as connected + rc = get_monitor_event(server_mon); + assert(rc == ZMQ_EVENT_ACCEPTED); + + // We should still be connected because pings and pongs are happenin' + rc = get_monitor_event(server_mon); + assert(rc == -1); + + rc = zmq_close (client); + assert (rc == 0); + + rc = zmq_close (server); + assert (rc == 0); + + rc = zmq_close (server_mon); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + +int main (void) +{ + setup_test_environment(); + test_heartbeat_timeout(); + test_heartbeat_ttl(); + // Run this test without curve + test_heartbeat_notimeout(0); + // Then rerun it with curve + test_heartbeat_notimeout(1); +}