Merge pull request #91 from hintjens/master

test_connect_delay was failing, so I backported the code from libzmq master.
This commit is contained in:
Ian Barber 2013-05-21 13:31:51 -07:00
commit 8b6dcc8420

View File

@ -1,25 +1,23 @@
/* /*
Copyright (c) 2012 Ian Barber Copyright (c) 2007-2013 Contributors as noted in the AUTHORS file
Copyright (c) 2012 Other contributors as noted in the AUTHORS file
This file is part of 0MQ.
This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
0MQ is free software; you can redistribute it and/or modify it under 0MQ is distributed in the hope that it will be useful,
the terms of the GNU Lesser General Public License as published by but WITHOUT ANY WARRANTY; without even the implied warranty of
the Free Software Foundation; either version 3 of the License, or MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
(at your option) any later version. GNU Lesser General Public License for more details.
0MQ is distributed in the hope that it will be useful, You should have received a copy of the GNU Lesser General Public License
but WITHOUT ANY WARRANTY; without even the implied warranty of along with this program. If not, see <http://www.gnu.org/licenses/>.
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "../include/zmq.h" #include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <errno.h> #include <errno.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
@ -31,12 +29,9 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
int main (void) int main (void)
{ {
fprintf (stderr, "test_connect_delay running...\n");
int val; int val;
int rc; int rc;
char buffer[16]; char buffer[16];
int seen = 0;
// TEST 1. // TEST 1.
// First we're going to attempt to send messages to two // First we're going to attempt to send messages to two
// pipes, one connected, the other not. We should see // pipes, one connected, the other not. We should see
@ -53,7 +48,7 @@ int main (void)
val = 0; val = 0;
rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_setsockopt(to, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0); assert (rc == 0);
rc = zmq_bind(to, "tcp://*:5555"); rc = zmq_bind (to, "tcp://*:6555");
assert (rc == 0); assert (rc == 0);
// Create a socket pushing to two endpoints - only 1 message should arrive. // Create a socket pushing to two endpoints - only 1 message should arrive.
@ -71,26 +66,22 @@ int main (void)
// We send 10 messages, 5 should just get stuck in the queue // We send 10 messages, 5 should just get stuck in the queue
// for the not-yet-connected pipe // for the not-yet-connected pipe
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i) {
{ rc = zmq_send (from, "Hello", 5, 0);
std::string message("message "); assert (rc == 5);
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert(rc >= 0);
} }
// Sleep to allow the messages to be delivered
zmq_sleep (1);
// We now consume from the connected pipe // We now consume from the connected pipe
// - we should see just 5 // - we should see just 5
seen = 0; int timeout = 100;
for (int i = 0; i < 10; ++i) rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
{ assert (rc == 0);
memset (&buffer, 0, sizeof(buffer));
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); int seen = 0;
if( rc == -1) while (true) {
break; rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
if (rc == -1)
break; // Break when we didn't get a message
seen++; seen++;
} }
assert (seen == 5); assert (seen == 5);
@ -101,7 +92,7 @@ int main (void)
rc = zmq_close (to); rc = zmq_close (to);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_destroy(context); rc = zmq_term (context);
assert (rc == 0); assert (rc == 0);
// TEST 2 // TEST 2
@ -144,26 +135,21 @@ int main (void)
assert (rc == 0); assert (rc == 0);
// Send 10 messages, all should be routed to the connected pipe // Send 10 messages, all should be routed to the connected pipe
for (int i = 0; i < 10; ++i) for (int i = 0; i < 10; ++i) {
{ rc = zmq_send (from, "Hello", 5, 0);
std::string message("message "); assert (rc == 5);
message += ('0' + i);
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
} }
rc = zmq_setsockopt (to, ZMQ_RCVTIMEO, &timeout, sizeof (int));
// Sleep to allow the messages to be delivered assert (rc == 0);
zmq_sleep (1);
// Send 10 messages, all should arrive.
seen = 0; seen = 0;
for (int i = 0; i < 10; ++i) while (true) {
{ rc = zmq_recv (to, &buffer, sizeof (buffer), 0);
memset(&buffer, 0, sizeof(buffer)); if (rc == -1)
rc = zmq_recv (to, &buffer, sizeof(buffer), ZMQ_DONTWAIT); break; // Break when we didn't get a message
// If there is a failed delivery, assert! seen++;
assert (rc != -1);
} }
assert (seen == 10);
rc = zmq_close (from); rc = zmq_close (from);
assert (rc == 0); assert (rc == 0);
@ -171,89 +157,81 @@ int main (void)
rc = zmq_close (to); rc = zmq_close (to);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_destroy(context); rc = zmq_term (context);
assert (rc == 0); assert (rc == 0);
// TEST 3 // TEST 3
// This time we want to validate that the same blocking behaviour // This time we want to validate that the same blocking behaviour
// occurs with an existing connection that is broken. We will send // occurs with an existing connection that is broken. We will send
// messaages to a connected pipe, disconnect and verify the messages // messages to a connected pipe, disconnect and verify the messages
// block. Then we reconnect and verify messages flow again. // block. Then we reconnect and verify messages flow again.
context = zmq_ctx_new(); context = zmq_ctx_new ();
void *context2 = zmq_ctx_new();
to = zmq_socket (context2, ZMQ_PULL); void *backend = zmq_socket (context, ZMQ_DEALER);
assert (to); assert (backend);
rc = zmq_bind (to, "tcp://*:5560"); void *frontend = zmq_socket (context, ZMQ_DEALER);
assert (frontend);
int zero = 0;
rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_setsockopt (frontend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0); assert (rc == 0);
val = 0; // Frontend connects to backend using DELAY_ATTACH_ON_CONNECT
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); int on = 1;
rc = zmq_setsockopt (frontend, ZMQ_DELAY_ATTACH_ON_CONNECT, &on, sizeof (on));
assert (rc == 0);
rc = zmq_bind (backend, "tcp://*:5560");
assert (rc == 0);
rc = zmq_connect (frontend, "tcp://localhost:5560");
assert (rc == 0); assert (rc == 0);
// Create a socket pushing // Ping backend to frontend so we know when the connection is up
from = zmq_socket (context, ZMQ_PUSH); rc = zmq_send (backend, "Hello", 5, 0);
assert (from); assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
val = 0; assert (rc == 5);
rc = zmq_setsockopt (from, ZMQ_LINGER, &val, sizeof(val));
assert (rc == 0); // Send message from frontend to backend
val = 1; rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
rc = zmq_setsockopt (from, ZMQ_DELAY_ATTACH_ON_CONNECT, &val, sizeof(val)); assert (rc == 5);
assert (rc == 0);
rc = zmq_close (backend);
// Connect to the valid socket socket
rc = zmq_connect (from, "tcp://localhost:5560");
assert (rc == 0); assert (rc == 0);
// Allow connections to stabilise // Give time to process disconnect
zmq_sleep(1); // There's no way to do this except with a sleep
struct timespec t = { 0, 250 * 1000000 };
// Send a message, should succeed nanosleep (&t, NULL);
std::string message("message ");
rc = zmq_send (from, message.data(), message.size(), 0);
assert (rc >= 0);
rc = zmq_close (to);
assert (rc == 0);
rc = zmq_ctx_destroy(context2);
assert (rc == 0);
// Give time to process disconnect
zmq_sleep(1);
// Send a message, should fail // Send a message, should fail
rc = zmq_send (from, message.data(), message.size(), ZMQ_DONTWAIT); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc == -1); assert (rc == -1);
context2 = zmq_ctx_new(); // Recreate backend socket
to = zmq_socket (context2, ZMQ_PULL); backend = zmq_socket (context, ZMQ_DEALER);
assert (to); assert (backend);
rc = zmq_bind (to, "tcp://*:5560"); rc = zmq_setsockopt (backend, ZMQ_LINGER, &zero, sizeof (zero));
assert (rc == 0);
rc = zmq_bind (backend, "tcp://*:5560");
assert (rc == 0); assert (rc == 0);
val = 0; // Ping backend to frontend so we know when the connection is up
rc = zmq_setsockopt (to, ZMQ_LINGER, &val, sizeof(val)); rc = zmq_send (backend, "Hello", 5, 0);
assert (rc == 0); assert (rc == 5);
rc = zmq_recv (frontend, buffer, 255, 0);
// Allow connections to stabilise assert (rc == 5);
zmq_sleep(1);
// After the reconnect, should succeed // After the reconnect, should succeed
rc = zmq_send (from, message.data(), message.size(), 0); rc = zmq_send (frontend, "Hello", 5, ZMQ_DONTWAIT);
assert (rc >= 0); assert (rc == 5);
rc = zmq_close (to); rc = zmq_close (backend);
assert (rc == 0); assert (rc == 0);
rc = zmq_close (from); rc = zmq_close (frontend);
assert (rc == 0); assert (rc == 0);
rc = zmq_ctx_destroy(context); rc = zmq_term (context);
assert (rc == 0);
rc = zmq_ctx_destroy(context2);
assert (rc == 0); assert (rc == 0);
} }