From 5cf74db6bb796208be9b535a33e5167287112246 Mon Sep 17 00:00:00 2001 From: Joe Eli McIlvain Date: Fri, 9 May 2014 11:46:17 -0700 Subject: [PATCH] Add failing test reproducing issue #1015. There is a race condition when connect and bind on a new inproc endpoint happen "simultaneously" in threads. Causes the error: Assertion failed: ok (ctx.cpp:474) --- tests/test_inproc_connect.cpp | 71 +++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/tests/test_inproc_connect.cpp b/tests/test_inproc_connect.cpp index 45c7e581..07771c32 100644 --- a/tests/test_inproc_connect.cpp +++ b/tests/test_inproc_connect.cpp @@ -36,6 +36,40 @@ static void pusher (void *ctx) assert (rc == 0); } +static void simult_conn (void *payload) +{ + // Pull out arguments - context followed by endpoint string + void* ctx = (void*)((void**)payload)[0]; + char* endpt = (char*)((void**)payload)[1]; + + // Connect + void *connectSocket = zmq_socket (ctx, ZMQ_SUB); + assert (connectSocket); + int rc = zmq_connect (connectSocket, endpt); + assert (rc == 0); + + // Cleanup + rc = zmq_close (connectSocket); + assert (rc == 0); +} + +static void simult_bind (void *payload) +{ + // Pull out arguments - context followed by endpoint string + void* ctx = (void*)((void**)payload)[0]; + char* endpt = (char*)((void**)payload)[1]; + + // Bind + void *bindSocket = zmq_socket (ctx, ZMQ_PUB); + assert (bindSocket); + int rc = zmq_bind (bindSocket, endpt); + assert (rc == 0); + + // Cleanup + rc = zmq_close (bindSocket); + assert (rc == 0); +} + void test_bind_before_connect () { void *ctx = zmq_ctx_new (); @@ -268,6 +302,42 @@ void test_multiple_threads () assert (rc == 0); } +void test_simultaneous_connect_bind_threads () +{ + const unsigned int no_of_times = 50; + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *threads[no_of_times*2]; + void *thr_args[no_of_times][2]; + char endpts[no_of_times][20]; + + // Set up thread arguments: context followed by endpoint string + for (unsigned int i = 0; i < no_of_times; ++i) + { + thr_args[i][0] = (void*) ctx; + thr_args[i][1] = (void*) endpts[i]; + sprintf (endpts[i], "inproc://foo_%d", i); + } + + // Spawn all threads as simultaneously as possible + for (unsigned int i = 0; i < no_of_times; ++i) + { + threads[i*2+0] = zmq_threadstart (&simult_conn, (void*)thr_args[i]); + threads[i*2+1] = zmq_threadstart (&simult_bind, (void*)thr_args[i]); + } + + // Close all threads + for (unsigned int i = 0; i < no_of_times; ++i) + { + zmq_threadclose (threads[i*2+0]); + zmq_threadclose (threads[i*2+1]); + } + + int rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + void test_identity () { // Create the infrastructure @@ -350,6 +420,7 @@ int main (void) test_connect_before_bind_pub_sub (); test_multiple_connects (); test_multiple_threads (); + test_simultaneous_connect_bind_threads (); test_identity (); test_connect_only ();