From d38a2d507d7cfe0d0c30933c374612be566a33ba Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sat, 24 Sep 2016 18:07:23 +0100 Subject: [PATCH] Problem: zmq_ctx_term asserts with connect-before-bind and sockets with identity over inproc transport Solution: check if the connecting inproc socket has been closed before trying to send the identity. Otherwise the pipe will be in waiting_for_delimiter state causing writes to fail and the connect to assert when the context is being torn down and the pending inproc connects are resolved. Add test case that covers this behaviour. --- src/ctx.cpp | 8 +++++++- tests/test_inproc_connect.cpp | 25 +++++++++++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/ctx.cpp b/src/ctx.cpp index 666bd312..0ce48882 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -547,7 +547,13 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_, else pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false); - if (pending_connection_.endpoint.options.recv_identity) { + // When a ctx is terminated all pending inproc connection will be + // connected, but the socket will already be closed and the pipe will be + // in waiting_for_delimiter state, which means no more writes can be done + // and the identity write fails and causes an assert. Check if the socket + // is open before sending. + if (pending_connection_.endpoint.options.recv_identity && + pending_connection_.endpoint.socket->check_tag ()) { msg_t id; int rc = id.init_size (bind_options.identity_size); errno_assert (rc == 0); diff --git a/tests/test_inproc_connect.cpp b/tests/test_inproc_connect.cpp index 106765db..c634738b 100644 --- a/tests/test_inproc_connect.cpp +++ b/tests/test_inproc_connect.cpp @@ -212,6 +212,30 @@ void test_connect_before_bind_pub_sub () assert (rc == 0); } +void test_connect_before_bind_ctx_term () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + for (int i = 0; i < 20; ++i) { + // Connect first + void *connectSocket = zmq_socket (ctx, ZMQ_ROUTER); + assert (connectSocket); + + char ep[20]; + sprintf(ep, "inproc://cbbrr%d", i); + int rc = zmq_connect (connectSocket, ep); + assert (rc == 0); + + // Cleanup + rc = zmq_close (connectSocket); + assert (rc == 0); + } + + int rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + void test_multiple_connects () { const unsigned int no_of_connects = 10; @@ -478,6 +502,7 @@ int main (void) test_bind_before_connect (); test_connect_before_bind (); test_connect_before_bind_pub_sub (); + test_connect_before_bind_ctx_term (); test_multiple_connects (); test_multiple_threads (); test_simultaneous_connect_bind_threads ();