diff --git a/tests/test_connect_delay.cpp b/tests/test_connect_delay.cpp
index 740e021f..e7e21bba 100644
--- a/tests/test_connect_delay.cpp
+++ b/tests/test_connect_delay.cpp
@@ -25,114 +25,10 @@ along with this program. If not, see .
#include
#include
#include
-#include
#undef NDEBUG
#include
-static void *server (void *)
-{
- void *socket, *context;
- char buffer[16];
- int rc, val;
-
- context = zmq_init (1);
- assert (context);
-
- socket = zmq_socket (context, ZMQ_PULL);
- assert (socket);
-
- val = 0;
- rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
- assert (rc == 0);
-
- rc = zmq_bind (socket, "ipc:///tmp/recon");
- assert (rc == 0);
-
- memset (&buffer, 0, sizeof(buffer));
- rc = zmq_recv (socket, &buffer, sizeof(buffer), 0);
-
- // Intentionally bail out
- rc = zmq_close (socket);
- assert (rc == 0);
-
- rc = zmq_term (context);
- assert (rc == 0);
-
- usleep (200000);
-
- context = zmq_init (1);
- assert (context);
-
- socket = zmq_socket (context, ZMQ_PULL);
- assert (socket);
-
- val = 0;
- rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
- assert (rc == 0);
-
- rc = zmq_bind (socket, "ipc:///tmp/recon");
- assert (rc == 0);
-
- usleep (200000);
-
- memset (&buffer, 0, sizeof(buffer));
- rc = zmq_recv (socket, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
- assert (rc != -1);
-
- // Start closing the socket while the connecting process is underway.
- rc = zmq_close (socket);
- assert (rc == 0);
-
- rc = zmq_term (context);
- assert (rc == 0);
-
- return NULL;
-}
-
-static void *worker (void *)
-{
- void *socket, *context;
- int rc, hadone, val;
-
- context = zmq_init (1);
- assert (context);
-
- socket = zmq_socket (context, ZMQ_PUSH);
- assert (socket);
-
- val = 0;
- rc = zmq_setsockopt(socket, ZMQ_LINGER, &val, sizeof(val));
- assert (rc == 0);
-
- val = 1;
- rc = zmq_setsockopt (socket, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
- assert (rc == 0);
-
- rc = zmq_connect (socket, "ipc:///tmp/recon");
- assert (rc == 0);
-
- hadone = 0;
- // Not checking RC as some may be -1
- for (int i = 0; i < 6; i++) {
- usleep(200000);
- rc = zmq_send (socket, "hi", 2, ZMQ_DONTWAIT);
- if (rc != -1)
- hadone ++;
- }
-
- assert (hadone >= 2);
- assert (hadone < 4);
-
- rc = zmq_close (socket);
- assert (rc == 0);
-
- rc = zmq_term (context);
- assert (rc == 0);
-
- return NULL;
-}
-
int main (void)
{
fprintf (stderr, "test_connect_delay running...\n");
@@ -141,15 +37,23 @@ int main (void)
char buffer[16];
int seen = 0;
+ // TEST 1.
+ // First we're going to attempt to send messages to two
+ // pipes, one connected, the other not. We should see
+ // the PUSH load balancing to both pipes, and hence half
+ // of the messages getting queued, as connect() creates a
+ // pipe immediately.
+
void *context = zmq_ctx_new();
assert (context);
void *to = zmq_socket(context, ZMQ_PULL);
assert (to);
+ // Bind the one valid receiver
val = 0;
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0);
- rc = zmq_bind(to, "tcp://*:6555");
+ rc = zmq_bind(to, "tcp://*:5555");
assert (rc == 0);
// Create a socket pushing to two endpoints - only 1 message should arrive.
@@ -158,11 +62,15 @@ int main (void)
val = 0;
zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
- rc = zmq_connect (from, "tcp://localhost:6556");
+ // This pipe will not connect
+ rc = zmq_connect (from, "tcp://localhost:5556");
assert (rc == 0);
- rc = zmq_connect (from, "tcp://localhost:6555");
+ // This pipe will
+ rc = zmq_connect (from, "tcp://localhost:5555");
assert (rc == 0);
+ // We send 10 messages, 5 should just get stuck in the queue
+ // for the not-yet-connected pipe
for (int i = 0; i < 10; ++i)
{
std::string message("message ");
@@ -171,7 +79,11 @@ int main (void)
assert(rc >= 0);
}
+ // Sleep to allow the messages to be delivered
zmq_sleep (1);
+
+ // We now consume from the connected pipe
+ // - we should see just 5
seen = 0;
for (int i = 0; i < 10; ++i)
{
@@ -192,9 +104,17 @@ int main (void)
rc = zmq_ctx_destroy(context);
assert (rc == 0);
+ // TEST 2
+ // This time we will do the same thing, connect two pipes,
+ // one of which will succeed in connecting to a bound
+ // receiver, the other of which will fail. However, we will
+ // also set the delay attach on connect flag, which should
+ // cause the pipe attachment to be delayed until the connection
+ // succeeds.
context = zmq_ctx_new();
fprintf (stderr, " Rerunning with DELAY_ATTACH_ON_CONNECT\n");
+ // Bind the valid socket
to = zmq_socket (context, ZMQ_PULL);
assert (to);
rc = zmq_bind (to, "tcp://*:5560");
@@ -212,16 +132,19 @@ int main (void)
rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0);
+ // Set the key flag
val = 1;
rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
assert (rc == 0);
+ // Connect to the invalid socket
rc = zmq_connect (from, "tcp://localhost:5561");
assert (rc == 0);
-
+ // Connect to the valid socket
rc = zmq_connect (from, "tcp://localhost:5560");
assert (rc == 0);
+ // Send 10 messages, all should be routed to the connected pipe
for (int i = 0; i < 10; ++i)
{
std::string message("message ");
@@ -230,13 +153,16 @@ int main (void)
assert (rc >= 0);
}
+ // Sleep to allow the messages to be delivered
zmq_sleep (1);
+ // Send 10 messages, all should arrive.
seen = 0;
for (int i = 0; i < 10; ++i)
{
memset(&buffer, 0, sizeof(buffer));
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT);
+ // If there is a failed delivery, assert!
assert (rc != -1);
}
@@ -249,15 +175,86 @@ int main (void)
rc = zmq_ctx_destroy(context);
assert (rc == 0);
+ // TEST 3
+ // This time we want to validate that the same blocking behaviour
+ // occurs with an existing connection that is broken. We will send
+ // messaages to a connected pipe, disconnect and verify the messages
+ // block. Then we reconnect and verify messages flow again.
+ context = zmq_ctx_new();
+ void *context2 = zmq_ctx_new();
fprintf (stderr, " Running DELAY_ATTACH_ON_CONNECT with disconnect\n");
- pthread_t serv, work;
-
- rc = pthread_create (&serv, NULL, server, NULL);
+ to = zmq_socket (context2, ZMQ_PULL);
+ assert (to);
+ rc = zmq_bind (to, "tcp://*:5560");
assert (rc == 0);
- rc = pthread_create (&work, NULL, worker, NULL);
+ val = 0;
+ rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
+ assert (rc == 0);
+
+ // Create a socket pushing
+ from = zmq_socket (context, ZMQ_PUSH);
+ assert (from);
+
+ val = 0;
+ rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
+ assert (rc == 0);
+ val = 1;
+ rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val));
+ assert (rc == 0);
+
+ // Connect to the valid socket socket
+ rc = zmq_connect (from, "tcp://localhost:5560");
assert (rc == 0);
- pthread_exit(NULL);
-}
+ // Allow connections to stabilise
+ zmq_sleep(1);
+
+ // Send a message, should succeed
+ std::string message("message ");
+ rc = zmq_send (from, message.data(), message.size(), 0);
+ assert (rc >= 0);
+
+ rc = zmq_close (to);
+ assert (rc == 0);
+
+ rc = zmq_ctx_destroy(context2);
+ assert (rc == 0);
+
+ // Give time to process disconnect
+ zmq_sleep(1);
+
+ // Send a message, should fail
+ rc = zmq_send (from, message.data(), message.size(), ZMQ_DONTWAIT);
+ assert (rc == -1);
+
+ context2 = zmq_ctx_new();
+ to = zmq_socket (context2, ZMQ_PULL);
+ assert (to);
+ rc = zmq_bind (to, "tcp://*:5560");
+ assert (rc == 0);
+
+ val = 0;
+ rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val));
+ assert (rc == 0);
+
+ // Allow connections to stabilise
+ zmq_sleep(1);
+
+ // After the reconnect, should succeed
+ rc = zmq_send (from, message.data(), message.size(), 0);
+ assert (rc >= 0);
+
+ rc = zmq_close (to);
+ assert (rc == 0);
+
+ rc = zmq_close (from);
+ assert (rc == 0);
+
+ rc = zmq_ctx_destroy(context);
+ assert (rc == 0);
+
+ rc = zmq_ctx_destroy(context2);
+ assert (rc == 0);
+}
\ No newline at end of file