From 25bf30bebe2580bfa36fe9970cb2fae834896a7b Mon Sep 17 00:00:00 2001 From: Luca Boccassi Date: Sun, 23 Oct 2016 21:30:51 +0100 Subject: [PATCH] Problem: 2 connects with same sourceip:port to different destip:port fail Solution: during a connect with a TCP endpoint if a source address is passed set the SO_REUSEADDR flag on the socket before the bind system call. Add unit test to cover this case for both IPv4 and IPv6. --- src/tcp_connecter.cpp | 12 +++ tests/test_reqrep_tcp.cpp | 152 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+) diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index 64691af6..eb365e5e 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -314,6 +314,18 @@ int zmq::tcp_connecter_t::open () // Set a source address for conversations if (tcp_addr->has_src_addr ()) { + // Allow reusing of the address, to connect to different servers + // using the same source port on the client. + int flag = 1; +#ifdef ZMQ_HAVE_WINDOWS + rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (const char*) &flag, + sizeof (int)); + wsa_assert (rc != SOCKET_ERROR); +#else + rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)); + errno_assert (rc == 0); +#endif + rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ()); if (rc == -1) return -1; diff --git a/tests/test_reqrep_tcp.cpp b/tests/test_reqrep_tcp.cpp index ba41743a..36707c87 100644 --- a/tests/test_reqrep_tcp.cpp +++ b/tests/test_reqrep_tcp.cpp @@ -131,6 +131,73 @@ void test_multi_connect_ipv4 (void) assert (rc == 0); } +void test_multi_connect_ipv4_same_port (void) +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sb0 = zmq_socket (ctx, ZMQ_REP); + assert (sb0); + int rc = zmq_bind (sb0, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + void *sb1 = zmq_socket (ctx, ZMQ_REP); + assert (sb1); + rc = zmq_bind (sb1, "tcp://127.0.0.1:5561"); + assert (rc == 0); + + void *sc0 = zmq_socket (ctx, ZMQ_REQ); + assert (sc0); + rc = zmq_connect (sc0, "tcp://127.0.0.1:5564;127.0.0.1:5560"); + assert (rc == 0); + rc = zmq_connect (sc0, "tcp://127.0.0.1:5565;127.0.0.1:5561"); + assert (rc == 0); + + void *sc1 = zmq_socket (ctx, ZMQ_REQ); + assert (sc1); + rc = zmq_connect (sc1, "tcp://127.0.0.1:5565;127.0.0.1:5560"); + assert (rc == 0); + rc = zmq_connect (sc1, "tcp://127.0.0.1:5564;127.0.0.1:5561"); + assert (rc == 0); + + bounce (sb0, sc0); + bounce (sb1, sc0); + bounce (sb0, sc1); + bounce (sb1, sc1); + bounce (sb0, sc0); + bounce (sb1, sc0); + + rc = zmq_disconnect (sc1, "tcp://127.0.0.1:5565;127.0.0.1:5560"); + assert (rc == 0); + rc = zmq_disconnect (sc1, "tcp://127.0.0.1:5564;127.0.0.1:5561"); + assert (rc == 0); + rc = zmq_disconnect (sc0, "tcp://127.0.0.1:5564;127.0.0.1:5560"); + assert (rc == 0); + rc = zmq_disconnect (sc0, "tcp://127.0.0.1:5565;127.0.0.1:5561"); + assert (rc == 0); + + rc = zmq_unbind (sb0, "tcp://127.0.0.1:5560"); + assert (rc == 0); + + rc = zmq_unbind (sb1, "tcp://127.0.0.1:5561"); + assert (rc == 0); + + rc = zmq_close (sc0); + assert (rc == 0); + + rc = zmq_close (sc1); + assert (rc == 0); + + rc = zmq_close (sb0); + assert (rc == 0); + + rc = zmq_close (sb1); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + void test_single_connect_ipv6 (void) { void *ctx = zmq_ctx_new (); @@ -257,6 +324,87 @@ void test_multi_connect_ipv6 (void) assert (rc == 0); } +void test_multi_connect_ipv6_same_port (void) +{ + void *ctx = zmq_ctx_new (); + assert (ctx); + + if (!is_ipv6_available ()) { + zmq_ctx_term (ctx); + return; + } + + void *sb0 = zmq_socket (ctx, ZMQ_REP); + assert (sb0); + int ipv6 = 1; + int rc = zmq_setsockopt (sb0, ZMQ_IPV6, &ipv6, sizeof (int)); + assert (rc == 0); + rc = zmq_bind (sb0, "tcp://[::1]:5560"); + assert (rc == 0); + + void *sb1 = zmq_socket (ctx, ZMQ_REP); + assert (sb1); + rc = zmq_setsockopt (sb1, ZMQ_IPV6, &ipv6, sizeof (int)); + assert (rc == 0); + rc = zmq_bind (sb1, "tcp://[::1]:5561"); + assert (rc == 0); + + void *sc0 = zmq_socket (ctx, ZMQ_REQ); + assert (sc0); + rc = zmq_setsockopt (sc0, ZMQ_IPV6, &ipv6, sizeof (int)); + assert (rc == 0); + rc = zmq_connect (sc0, "tcp://[::1]:5564;[::1]:5560"); + assert (rc == 0); + rc = zmq_connect (sc0, "tcp://[::1]:5565;[::1]:5561"); + assert (rc == 0); + + void *sc1 = zmq_socket (ctx, ZMQ_REQ); + assert (sc1); + rc = zmq_setsockopt (sc1, ZMQ_IPV6, &ipv6, sizeof (int)); + assert (rc == 0); + rc = zmq_connect (sc1, "tcp://[::1]:5565;[::1]:5560"); + assert (rc == 0); + rc = zmq_connect (sc1, "tcp://[::1]:5564;[::1]:5561"); + assert (rc == 0); + + bounce (sb0, sc0); + bounce (sb1, sc0); + bounce (sb0, sc1); + bounce (sb1, sc1); + bounce (sb0, sc0); + bounce (sb1, sc0); + + rc = zmq_disconnect (sc1, "tcp://[::1]:5565;[::1]:5560"); + assert (rc == 0); + rc = zmq_disconnect (sc1, "tcp://[::1]:5564;[::1]:5561"); + assert (rc == 0); + rc = zmq_disconnect (sc0, "tcp://[::1]:5564;[::1]:5560"); + assert (rc == 0); + rc = zmq_disconnect (sc0, "tcp://[::1]:5565;[::1]:5561"); + assert (rc == 0); + + rc = zmq_unbind (sb0, "tcp://[::1]:5560"); + assert (rc == 0); + + rc = zmq_unbind (sb1, "tcp://[::1]:5561"); + assert (rc == 0); + + rc = zmq_close (sc0); + assert (rc == 0); + + rc = zmq_close (sc1); + assert (rc == 0); + + rc = zmq_close (sb0); + assert (rc == 0); + + rc = zmq_close (sb1); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); +} + int main (void) { setup_test_environment (); @@ -265,9 +413,13 @@ int main (void) test_multi_connect_ipv4 (); + test_multi_connect_ipv4_same_port (); + test_single_connect_ipv6 (); test_multi_connect_ipv6 (); + test_multi_connect_ipv6_same_port (); + return 0 ; }