/* Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see . */ #include "testutil.hpp" #if defined (ZMQ_HAVE_WINDOWS) # include # include # include # define close closesocket #else # include # include # include # include #endif // Read one event off the monitor socket; return value and address // by reference, if not null, and event number by value. Returns -1 // in case of error. static int get_monitor_event (void *monitor) { for(int i = 0; i < 2; i++) { // First frame in message contains event number and value zmq_msg_t msg; zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, ZMQ_DONTWAIT) == -1) { msleep(150); continue; // Interruped, presumably } assert (zmq_msg_more (&msg)); uint8_t *data = (uint8_t *) zmq_msg_data (&msg); uint16_t event = *(uint16_t *) (data); // Second frame in message contains event address zmq_msg_init (&msg); if (zmq_msg_recv (&msg, monitor, 0) == -1) { return -1; // Interruped, presumably } assert (!zmq_msg_more (&msg)); return event; } return -1; } static void recv_with_retry (int fd, char *buffer, int bytes) { int received = 0; while (true) { int rc = recv(fd, buffer + received, bytes - received, 0); assert(rc > 0); received += rc; assert(received <= bytes); if (received == bytes) break; } } static void mock_handshake (int fd) { const uint8_t zmtp_greeting[33] = { 0xff, 0, 0, 0, 0, 0, 0, 0, 0, 0x7f, 3, 0, 'N', 'U', 'L', 'L', 0 }; char buffer[128]; memset(buffer, 0, sizeof(buffer)); memcpy(buffer, zmtp_greeting, sizeof(zmtp_greeting)); int rc = send(fd, buffer, 64, 0); assert(rc == 64); recv_with_retry(fd, buffer, 64); const uint8_t zmtp_ready[43] = { 4, 41, 5, 'R', 'E', 'A', 'D', 'Y', 11, 'S', 'o', 'c', 'k', 'e', 't', '-', 'T', 'y', 'p', 'e', 0, 0, 0, 6, 'D', 'E', 'A', 'L', 'E', 'R', 8, 'I', 'd', 'e', 'n', 't', 'i', 't', 'y', 0, 0, 0, 0 }; memset(buffer, 0, sizeof(buffer)); memcpy(buffer, zmtp_ready, 43); rc = send(fd, buffer, 43, 0); assert(rc == 43); recv_with_retry(fd, buffer, 43); } static void setup_curve(void * socket, int is_server) { const char *secret_key; const char *public_key; const char *server_key; if(is_server) { secret_key = "JTKVSB%%)wK0E.X)V>+}o?pNmC{O&4W4b!Ni{Lh6"; public_key = "rq:rM>}U?@Lns47E1%kR.o@n%FcmmsL/@{H8]yf7"; server_key = NULL; } else { secret_key = "D:)Q[IlAW!ahhC2ac:9*A}h:p?([4%wOTJ%JR%cs"; public_key = "Yne@$w-vo -1); // Mock a ZMTP 3 client so we can forcibly time out a connection mock_handshake(s); // By now everything should report as connected rc = get_monitor_event(server_mon); assert(rc == ZMQ_EVENT_ACCEPTED); // We should have been disconnected rc = get_monitor_event(server_mon); assert(rc == ZMQ_EVENT_DISCONNECTED); close(s); rc = zmq_close (server); assert (rc == 0); rc = zmq_close (server_mon); assert (rc == 0); rc = zmq_ctx_term (ctx); assert (rc == 0); } // This checks that peers respect the TTL value in ping messages // We set up a mock ZMTP 3 client and send a ping message with a TLL // to a server that is not doing any heartbeating. Then we sleep, // if the server disconnects the client, then we know the TTL did // its thing correctly. static void test_heartbeat_ttl (void) { int rc, value; // Set up our context and sockets void *ctx = zmq_ctx_new (); assert (ctx); void * server, * server_mon, *client; prep_server_socket(ctx, 0, 0, &server, &server_mon); client = zmq_socket(ctx, ZMQ_DEALER); assert(client != NULL); // Set the heartbeat TTL to 0.1 seconds value = 100; zmq_setsockopt(client, ZMQ_HEARTBEAT_TTL, &value, sizeof(value)); // Set the heartbeat interval to much longer than the TTL so that // the socket times out oon the remote side. value = 250; zmq_setsockopt(client, ZMQ_HEARTBEAT_IVL, &value, sizeof(value)); rc = zmq_connect(client, "tcp://localhost:5556"); assert(rc == 0); // By now everything should report as connected rc = get_monitor_event(server_mon); assert(rc == ZMQ_EVENT_ACCEPTED); msleep(100); // We should have been disconnected rc = get_monitor_event(server_mon); assert(rc == ZMQ_EVENT_DISCONNECTED); rc = zmq_close (server); assert (rc == 0); rc = zmq_close (server_mon); assert (rc == 0); rc = zmq_close (client); assert (rc == 0); rc = zmq_ctx_term (ctx); assert (rc == 0); } // This checks for normal operation - that is pings and pongs being // exchanged normally. There should be an accepted event on the server, // and then no event afterwards. static void test_heartbeat_notimeout (int is_curve) { int rc; // Set up our context and sockets void *ctx = zmq_ctx_new (); assert (ctx); void * server, * server_mon; prep_server_socket(ctx, 1, is_curve, &server, &server_mon); void * client = zmq_socket(ctx, ZMQ_DEALER); if(is_curve) setup_curve(client, 0); rc = zmq_connect(client, "tcp://127.0.0.1:5556"); // Give it a sec to connect and handshake msleep(100); // By now everything should report as connected rc = get_monitor_event(server_mon); assert(rc == ZMQ_EVENT_ACCEPTED); // We should still be connected because pings and pongs are happenin' rc = get_monitor_event(server_mon); assert(rc == -1); rc = zmq_close (client); assert (rc == 0); rc = zmq_close (server); assert (rc == 0); rc = zmq_close (server_mon); assert (rc == 0); rc = zmq_ctx_term (ctx); assert (rc == 0); } int main (void) { setup_test_environment(); test_heartbeat_timeout(); test_heartbeat_ttl(); // Run this test without curve test_heartbeat_notimeout(0); // Then rerun it with curve test_heartbeat_notimeout(1); }