diff --git a/src/ctx.cpp b/src/ctx.cpp index 73190a26..bd798eb0 100644 --- a/src/ctx.cpp +++ b/src/ctx.cpp @@ -573,7 +573,13 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_, else pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false); - if (pending_connection_.endpoint.options.recv_identity) { + // When a ctx is terminated all pending inproc connection will be + // connected, but the socket will already be closed and the pipe will be + // in waiting_for_delimiter state, which means no more writes can be done + // and the identity write fails and causes an assert. Check if the socket + // is open before sending. + if (pending_connection_.endpoint.options.recv_identity && + pending_connection_.endpoint.socket->check_tag ()) { msg_t id; const int rc = id.init_size (bind_options.identity_size); errno_assert (rc == 0); diff --git a/tests/test_inproc_connect.cpp b/tests/test_inproc_connect.cpp index c4f9b3eb..a1e76366 100644 --- a/tests/test_inproc_connect.cpp +++ b/tests/test_inproc_connect.cpp @@ -212,6 +212,30 @@ void test_connect_before_bind_pub_sub () assert (rc == 0); } +void test_connect_before_bind_ctx_term () +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + for (int i = 0; i < 20; ++i) { + // Connect first + void *connectSocket = zmq_socket (ctx, ZMQ_ROUTER); + assert (connectSocket); + + char ep[20]; + sprintf(ep, "inproc://cbbrr%d", i); + int rc = zmq_connect (connectSocket, ep); + assert (rc == 0); + + // Cleanup + rc = zmq_close (connectSocket); + assert (rc == 0); + } + + int rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + void test_multiple_connects () { const unsigned int no_of_connects = 10; @@ -499,6 +523,7 @@ int main (void) test_bind_before_connect (); test_connect_before_bind (); test_connect_before_bind_pub_sub (); + test_connect_before_bind_ctx_term (); test_multiple_connects (); test_multiple_threads (); test_simultaneous_connect_bind_threads ();