Merge pull request #146 from bluca/dealer_router_async

Problem: zmq_ctx_term asserts with connect-before-bind and sockets with identity over inproc transport
This commit is contained in:
Constantin Rack 2016-09-24 21:33:05 +02:00 committed by GitHub
commit 44c1d81506
4 changed files with 38 additions and 1 deletions

View File

@ -18,6 +18,10 @@ env:
- PKG_CONFIG_PATH=${BUILD_PREFIX}/lib/pkgconfig
- DISTCHECK_CONFIGURE_FLAGS="--with-libsodium --prefix=${BUILD_PREFIX}"
before_install:
# workaround for Travis OSX CI bug, hasn't been fixed in a month so time for a hack
- if [ $TRAVIS_OS_NAME == "osx" ] ; then brew uninstall libtool && brew install libtool ; fi
# Build required projects first
before_script:
- mkdir tmp

2
NEWS
View File

@ -9,6 +9,8 @@
* Fixed #2107 - zmq_connect with IPv6 "source:port;dest:port" broken
* Fixed #2117 - ctx_term assert with inproc zmq_router connect-before-bind
0MQ version 4.1.5 stable, released on 2016/06/17
================================================

View File

@ -547,7 +547,13 @@ void zmq::ctx_t::connect_inproc_sockets (zmq::socket_base_t *bind_socket_,
else
pending_connection_.connect_pipe->send_bind (bind_socket_, pending_connection_.bind_pipe, false);
if (pending_connection_.endpoint.options.recv_identity) {
// When a ctx is terminated all pending inproc connection will be
// connected, but the socket will already be closed and the pipe will be
// in waiting_for_delimiter state, which means no more writes can be done
// and the identity write fails and causes an assert. Check if the socket
// is open before sending.
if (pending_connection_.endpoint.options.recv_identity &&
pending_connection_.endpoint.socket->check_tag ()) {
msg_t id;
int rc = id.init_size (bind_options.identity_size);
errno_assert (rc == 0);

View File

@ -212,6 +212,30 @@ void test_connect_before_bind_pub_sub ()
assert (rc == 0);
}
void test_connect_before_bind_ctx_term ()
{
void *ctx = zmq_ctx_new ();
assert (ctx);
for (int i = 0; i < 20; ++i) {
// Connect first
void *connectSocket = zmq_socket (ctx, ZMQ_ROUTER);
assert (connectSocket);
char ep[20];
sprintf(ep, "inproc://cbbrr%d", i);
int rc = zmq_connect (connectSocket, ep);
assert (rc == 0);
// Cleanup
rc = zmq_close (connectSocket);
assert (rc == 0);
}
int rc = zmq_ctx_term (ctx);
assert (rc == 0);
}
void test_multiple_connects ()
{
const unsigned int no_of_connects = 10;
@ -478,6 +502,7 @@ int main (void)
test_bind_before_connect ();
test_connect_before_bind ();
test_connect_before_bind_pub_sub ();
test_connect_before_bind_ctx_term ();
test_multiple_connects ();
test_multiple_threads ();
test_simultaneous_connect_bind_threads ();