Backported test_connect_delay.cpp from libzmq

This commit is contained in:
Pieter Hintjens 2013-05-21 10:25:02 +02:00
parent 37a0ec5936
commit 3d353f8306

View File

@ -1,6 +1,5 @@
/* /*
Copyright (c) 2012 Ian Barber Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ. This file is part of 0MQ.
@ -19,7 +18,6 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -31,12 +29,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
int main (void) int main (void)
{ {
fprintf (stderr, "test_connect_delay running...\n");
int val; int val;
int rc; int rc;
char buffer[16]; char buffer[16];
int seen = 0;
// TEST 1. // TEST 1.
// First we're going to attempt to send messages to two // First we're going to attempt to send messages to two
// pipes, one connected, the other not. We should see // pipes, one connected, the other not. We should see
@ -53,7 +48,7 @@ int main (void)
val = 0; val = 0;
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
rc = zmq_bind(to, "tcp://*:5555"); rc = zmq_bind (to, "tcp://*:6555");
assert (rc == 0); assert (rc == 0);
// Create a socket pushing to two endpoints - only 1 message should arrive. // Create a socket pushing to two endpoints - only 1 message should arrive.
@ -71,26 +66,22 @@ int main (void)
// We send 10 messages, 5 should just get stuck in the queue // We send 10 messages, 5 should just get stuck in the queue
// for the not-yet-connected pipe // for the not-yet-connected pipe
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i) {
{ rc = zmq_send (from, "Hello", 5, 0);
std::string message("message "); assert (rc == 5);
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert(rc >= 0);
} }
// Sleep to allow the messages to be delivered
zmq_sleep (1);
// We now consume from the connected pipe // We now consume from the connected pipe
// - we should see just 5 // - we should see just 5
seen = 0; int timeout = 100;
for (int i = 0; i < 10; ++i) rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
{ assert (rc == 0);
memset (&buffer, 0, sizeof(buffer));
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); int seen = 0;
while (true) {
rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
if (rc == -1) if (rc == -1)
break; break; // Break when we didn't get a message
seen++; seen++;
} }
assert (seen == 5); assert (seen == 5);
@ -101,7 +92,7 @@ int main (void)
rc = zmq_close (to); rc = zmq_close (to);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_destroy(context); rc = zmq_term (context);
assert (rc == 0); assert (rc == 0);
// TEST 2 // TEST 2
@ -144,26 +135,21 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Send 10 messages, all should be routed to the connected pipe // Send 10 messages, all should be routed to the connected pipe
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i) {
{ rc = zmq_send (from, "Hello", 5, 0);
std::string message("message "); assert (rc == 5);
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
} }
rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
assert (rc == 0);
// Sleep to allow the messages to be delivered
zmq_sleep (1);
// Send 10 messages, all should arrive.
seen = 0; seen = 0;
for (int i = 0; i < 10; ++i) while (true) {
{ rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
memset(&buffer, 0, sizeof(buffer)); if (rc == -1)
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); break; // Break when we didn't get a message
// If there is a failed delivery, assert! seen++;
assert (rc != -1);
} }
assert (seen == 10);
rc = zmq_close (from); rc = zmq_close (from);
assert (rc == 0); assert (rc == 0);
@ -171,89 +157,81 @@ int main (void)
rc = zmq_close (to); rc = zmq_close (to);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_destroy(context); rc = zmq_term (context);
assert (rc == 0); assert (rc == 0);
// TEST 3 // TEST 3
// This time we want to validate that the same blocking behaviour // This time we want to validate that the same blocking behaviour
// occurs with an existing connection that is broken. We will send // occurs with an existing connection that is broken. We will send
// messaages to a connected pipe, disconnect and verify the messages // messages to a connected pipe, disconnect and verify the messages
// block. Then we reconnect and verify messages flow again. // block. Then we reconnect and verify messages flow again.
context = zmq_ctx_new (); context = zmq_ctx_new ();
void *context2 = zmq_ctx_new();
to = zmq_socket (context2, ZMQ_PULL); void *backend = zmq_socket (context, ZMQ_DEALER);
assert (to); assert (backend);
rc = zmq_bind (to, "tcp://*:5560"); void *frontend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
int zero = 0;
rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0); assert (rc == 0);
val = 0; // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); int on = 1;
rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on));
assert (rc == 0);
rc = zmq_bind (backend, "tcp://*:5560");
assert (rc == 0);
rc = zmq_connect (frontend, "tcp://localhost:5560");
assert (rc == 0); assert (rc == 0);
// Create a socket pushing // Ping backend to frontend so we know when the connection is up
from = zmq_socket (context, ZMQ_PUSH); rc = zmq_send (backend, "Hello", 5, 0);
assert (from); assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
assert (rc == 5);
val = 0; // Send message from frontend to backend
rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == 0); assert (rc == 5);
val = 1;
rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
assert (rc == 0);
// Connect to the valid socket socket rc = zmq_close (backend);
rc = zmq_connect (from, "tcp://localhost:5560");
assert (rc == 0);
// Allow connections to stabilise
zmq_sleep(1);
// Send a message, should succeed
std::string message("message ");
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
rc = zmq_close (to);
assert (rc == 0);
rc = zmq_ctx_destroy(context2);
assert (rc == 0); assert (rc == 0);
// Give time to process disconnect // Give time to process disconnect
zmq_sleep(1); // There's no way to do this except with a sleep
struct timespec t = { 0, 250 * 1000000 };
nanosleep (&t, NULL);
// Send a message, should fail // Send a message, should fail
rc = zmq_send (from, message.data(), message.size(), ZMQ_DONTWAIT); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == -1); assert (rc == -1);
context2 = zmq_ctx_new(); // Recreate backend socket
to = zmq_socket (context2, ZMQ_PULL); backend = zmq_socket (context, ZMQ_DEALER);
assert (to); assert (backend);
rc = zmq_bind (to, "tcp://*:5560"); rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_bind (backend, "tcp://*:5560");
assert (rc == 0); assert (rc == 0);
val = 0; // Ping backend to frontend so we know when the connection is up
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_send (backend, "Hello", 5, 0);
assert (rc == 0); assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
// Allow connections to stabilise assert (rc == 5);
zmq_sleep(1);
// After the reconnect, should succeed // After the reconnect, should succeed
rc = zmq_send (from, message.data(), message.size(), 0); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc >= 0); assert (rc == 5);
rc = zmq_close (to); rc = zmq_close (backend);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (from); rc = zmq_close (frontend);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_destroy(context); rc = zmq_term (context);
assert (rc == 0);
rc = zmq_ctx_destroy(context2);
assert (rc == 0); assert (rc == 0);
} }