From 3d353f830699eb8e1b0cd0ce8b939e4356de1556 Mon Sep 17 00:00:00 2001 From: Pieter Hintjens Date: Tue, 21 May 2013 10:25:02 +0200 Subject: [PATCH] Backported test_connect_delay.cpp from libzmq --- tests/test_connect_delay.cpp | 204 ++++++++++++++++------------------- 1 file changed, 91 insertions(+), 113 deletions(-) diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp index 92c0d682..a0563512 100644 --- a/tests/test_connect_delay.cpp +++ b/tests/test_connect_delay.cpp @@ -1,25 +1,23 @@ /* -Copyright (c) 2012 Ian Barber -Copyright (c) 2012 Other contributors as noted in the AUTHORS file + Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. -This file is part of 0MQ. + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. -0MQ is free software; you can redistribute it and/or modify it under -the terms of the GNU Lesser General Public License as published by -the Free Software Foundation; either version 3 of the License, or -(at your option) any later version. + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. -0MQ is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU Lesser General Public License for more details. - -You should have received a copy of the GNU Lesser General Public License -along with this program. If not, see . + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . */ #include "../include/zmq.h" -#include "../include/zmq_utils.h" #include #include #include @@ -31,12 +29,9 @@ along with this program. If not, see . int main (void) { - fprintf (stderr, "test_connect_delay running...\n"); int val; int rc; char buffer[16]; - int seen = 0; - // TEST 1. // First we're going to attempt to send messages to two // pipes, one connected, the other not. We should see @@ -53,7 +48,7 @@ int main (void) val = 0; rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); assert (rc == 0); - rc = zmq_bind(to, "tcp://*:5555"); + rc = zmq_bind (to, "tcp://*:6555"); assert (rc == 0); // Create a socket pushing to two endpoints - only 1 message should arrive. @@ -71,26 +66,22 @@ int main (void) // We send 10 messages, 5 should just get stuck in the queue // for the not-yet-connected pipe - for (int i = 0; i < 10; ++i) - { - std::string message("message "); - message += ('0' + i); - rc = zmq_send (from, message.data(), message.size(), 0); - assert(rc >= 0); + for (int i = 0; i < 10; ++i) { + rc = zmq_send (from, "Hello", 5, 0); + assert (rc == 5); } - // Sleep to allow the messages to be delivered - zmq_sleep (1); - // We now consume from the connected pipe // - we should see just 5 - seen = 0; - for (int i = 0; i < 10; ++i) - { - memset (&buffer, 0, sizeof(buffer)); - rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); - if( rc == -1) - break; + int timeout = 100; + rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)); + assert (rc == 0); + + int seen = 0; + while (true) { + rc = zmq_recv (to, &buffer, sizeof (buffer), 0); + if (rc == -1) + break; // Break when we didn't get a message seen++; } assert (seen == 5); @@ -101,7 +92,7 @@ int main (void) rc = zmq_close (to); assert (rc == 0); - rc = zmq_ctx_destroy(context); + rc = zmq_term (context); assert (rc == 0); // TEST 2 @@ -144,26 +135,21 @@ int main (void) assert (rc == 0); // Send 10 messages, all should be routed to the connected pipe - for (int i = 0; i < 10; ++i) - { - std::string message("message "); - message += ('0' + i); - rc = zmq_send (from, message.data(), message.size(), 0); - assert (rc >= 0); + for (int i = 0; i < 10; ++i) { + rc = zmq_send (from, "Hello", 5, 0); + assert (rc == 5); } - - // Sleep to allow the messages to be delivered - zmq_sleep (1); - - // Send 10 messages, all should arrive. + rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int)); + assert (rc == 0); + seen = 0; - for (int i = 0; i < 10; ++i) - { - memset(&buffer, 0, sizeof(buffer)); - rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); - // If there is a failed delivery, assert! - assert (rc != -1); + while (true) { + rc = zmq_recv (to, &buffer, sizeof (buffer), 0); + if (rc == -1) + break; // Break when we didn't get a message + seen++; } + assert (seen == 10); rc = zmq_close (from); assert (rc == 0); @@ -171,89 +157,81 @@ int main (void) rc = zmq_close (to); assert (rc == 0); - rc = zmq_ctx_destroy(context); + rc = zmq_term (context); assert (rc == 0); // TEST 3 // This time we want to validate that the same blocking behaviour // occurs with an existing connection that is broken. We will send - // messaages to a connected pipe, disconnect and verify the messages + // messages to a connected pipe, disconnect and verify the messages // block. Then we reconnect and verify messages flow again. - context = zmq_ctx_new(); - void *context2 = zmq_ctx_new(); + context = zmq_ctx_new (); - to = zmq_socket (context2, ZMQ_PULL); - assert (to); - rc = zmq_bind (to, "tcp://*:5560"); + void *backend = zmq_socket (context, ZMQ_DEALER); + assert (backend); + void *frontend = zmq_socket (context, ZMQ_DEALER); + assert (frontend); + int zero = 0; + rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)); + assert (rc == 0); + rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero)); assert (rc == 0); - val = 0; - rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); + // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT + int on = 1; + rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on)); + assert (rc == 0); + rc = zmq_bind (backend, "tcp://*:5560"); + assert (rc == 0); + rc = zmq_connect (frontend, "tcp://localhost:5560"); assert (rc == 0); - // Create a socket pushing - from = zmq_socket (context, ZMQ_PUSH); - assert (from); - - val = 0; - rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); - assert (rc == 0); - val = 1; - rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); - assert (rc == 0); - - // Connect to the valid socket socket - rc = zmq_connect (from, "tcp://localhost:5560"); + // Ping backend to frontend so we know when the connection is up + rc = zmq_send (backend, "Hello", 5, 0); + assert (rc == 5); + rc = zmq_recv (frontend, buffer, 255, 0); + assert (rc == 5); + + // Send message from frontend to backend + rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); + assert (rc == 5); + + rc = zmq_close (backend); assert (rc == 0); - // Allow connections to stabilise - zmq_sleep(1); - - // Send a message, should succeed - std::string message("message "); - rc = zmq_send (from, message.data(), message.size(), 0); - assert (rc >= 0); - - rc = zmq_close (to); - assert (rc == 0); - - rc = zmq_ctx_destroy(context2); - assert (rc == 0); - - // Give time to process disconnect - zmq_sleep(1); + // Give time to process disconnect + // There's no way to do this except with a sleep + struct timespec t = { 0, 250 * 1000000 }; + nanosleep (&t, NULL); // Send a message, should fail - rc = zmq_send (from, message.data(), message.size(), ZMQ_DONTWAIT); + rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); assert (rc == -1); - - context2 = zmq_ctx_new(); - to = zmq_socket (context2, ZMQ_PULL); - assert (to); - rc = zmq_bind (to, "tcp://*:5560"); + + // Recreate backend socket + backend = zmq_socket (context, ZMQ_DEALER); + assert (backend); + rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero)); + assert (rc == 0); + rc = zmq_bind (backend, "tcp://*:5560"); assert (rc == 0); - val = 0; - rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); - assert (rc == 0); - - // Allow connections to stabilise - zmq_sleep(1); - + // Ping backend to frontend so we know when the connection is up + rc = zmq_send (backend, "Hello", 5, 0); + assert (rc == 5); + rc = zmq_recv (frontend, buffer, 255, 0); + assert (rc == 5); + // After the reconnect, should succeed - rc = zmq_send (from, message.data(), message.size(), 0); - assert (rc >= 0); + rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT); + assert (rc == 5); - rc = zmq_close (to); + rc = zmq_close (backend); assert (rc == 0); - rc = zmq_close (from); + rc = zmq_close (frontend); assert (rc == 0); - rc = zmq_ctx_destroy(context); - assert (rc == 0); - - rc = zmq_ctx_destroy(context2); + rc = zmq_term (context); assert (rc == 0); } -