mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 15:26:04 +00:00
Merge pull request #2171 from bluca/connect_reuse_addr
Problem: 2 connects with same sourceip:port to different destip:port fail
This commit is contained in:
commit
1e69309e21
@ -314,6 +314,18 @@ int zmq::tcp_connecter_t::open ()
|
||||
|
||||
// Set a source address for conversations
|
||||
if (tcp_addr->has_src_addr ()) {
|
||||
// Allow reusing of the address, to connect to different servers
|
||||
// using the same source port on the client.
|
||||
int flag = 1;
|
||||
#ifdef ZMQ_HAVE_WINDOWS
|
||||
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (const char*) &flag,
|
||||
sizeof (int));
|
||||
wsa_assert (rc != SOCKET_ERROR);
|
||||
#else
|
||||
rc = setsockopt (s, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int));
|
||||
errno_assert (rc == 0);
|
||||
#endif
|
||||
|
||||
rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
|
||||
if (rc == -1)
|
||||
return -1;
|
||||
|
@ -131,6 +131,73 @@ void test_multi_connect_ipv4 (void)
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_multi_connect_ipv4_same_port (void)
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
void *sb0 = zmq_socket (ctx, ZMQ_REP);
|
||||
assert (sb0);
|
||||
int rc = zmq_bind (sb0, "tcp://127.0.0.1:5560");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sb1 = zmq_socket (ctx, ZMQ_REP);
|
||||
assert (sb1);
|
||||
rc = zmq_bind (sb1, "tcp://127.0.0.1:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sc0 = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (sc0);
|
||||
rc = zmq_connect (sc0, "tcp://127.0.0.1:5564;127.0.0.1:5560");
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sc0, "tcp://127.0.0.1:5565;127.0.0.1:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sc1 = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (sc1);
|
||||
rc = zmq_connect (sc1, "tcp://127.0.0.1:5565;127.0.0.1:5560");
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sc1, "tcp://127.0.0.1:5564;127.0.0.1:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
bounce (sb0, sc0);
|
||||
bounce (sb1, sc0);
|
||||
bounce (sb0, sc1);
|
||||
bounce (sb1, sc1);
|
||||
bounce (sb0, sc0);
|
||||
bounce (sb1, sc0);
|
||||
|
||||
rc = zmq_disconnect (sc1, "tcp://127.0.0.1:5565;127.0.0.1:5560");
|
||||
assert (rc == 0);
|
||||
rc = zmq_disconnect (sc1, "tcp://127.0.0.1:5564;127.0.0.1:5561");
|
||||
assert (rc == 0);
|
||||
rc = zmq_disconnect (sc0, "tcp://127.0.0.1:5564;127.0.0.1:5560");
|
||||
assert (rc == 0);
|
||||
rc = zmq_disconnect (sc0, "tcp://127.0.0.1:5565;127.0.0.1:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_unbind (sb0, "tcp://127.0.0.1:5560");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_unbind (sb1, "tcp://127.0.0.1:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sc0);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sc1);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sb0);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sb1);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_single_connect_ipv6 (void)
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
@ -257,6 +324,87 @@ void test_multi_connect_ipv6 (void)
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
void test_multi_connect_ipv6_same_port (void)
|
||||
{
|
||||
void *ctx = zmq_ctx_new ();
|
||||
assert (ctx);
|
||||
|
||||
if (!is_ipv6_available ()) {
|
||||
zmq_ctx_term (ctx);
|
||||
return;
|
||||
}
|
||||
|
||||
void *sb0 = zmq_socket (ctx, ZMQ_REP);
|
||||
assert (sb0);
|
||||
int ipv6 = 1;
|
||||
int rc = zmq_setsockopt (sb0, ZMQ_IPV6, &ipv6, sizeof (int));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (sb0, "tcp://[::1]:5560");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sb1 = zmq_socket (ctx, ZMQ_REP);
|
||||
assert (sb1);
|
||||
rc = zmq_setsockopt (sb1, ZMQ_IPV6, &ipv6, sizeof (int));
|
||||
assert (rc == 0);
|
||||
rc = zmq_bind (sb1, "tcp://[::1]:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sc0 = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (sc0);
|
||||
rc = zmq_setsockopt (sc0, ZMQ_IPV6, &ipv6, sizeof (int));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sc0, "tcp://[::1]:5564;[::1]:5560");
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sc0, "tcp://[::1]:5565;[::1]:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
void *sc1 = zmq_socket (ctx, ZMQ_REQ);
|
||||
assert (sc1);
|
||||
rc = zmq_setsockopt (sc1, ZMQ_IPV6, &ipv6, sizeof (int));
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sc1, "tcp://[::1]:5565;[::1]:5560");
|
||||
assert (rc == 0);
|
||||
rc = zmq_connect (sc1, "tcp://[::1]:5564;[::1]:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
bounce (sb0, sc0);
|
||||
bounce (sb1, sc0);
|
||||
bounce (sb0, sc1);
|
||||
bounce (sb1, sc1);
|
||||
bounce (sb0, sc0);
|
||||
bounce (sb1, sc0);
|
||||
|
||||
rc = zmq_disconnect (sc1, "tcp://[::1]:5565;[::1]:5560");
|
||||
assert (rc == 0);
|
||||
rc = zmq_disconnect (sc1, "tcp://[::1]:5564;[::1]:5561");
|
||||
assert (rc == 0);
|
||||
rc = zmq_disconnect (sc0, "tcp://[::1]:5564;[::1]:5560");
|
||||
assert (rc == 0);
|
||||
rc = zmq_disconnect (sc0, "tcp://[::1]:5565;[::1]:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_unbind (sb0, "tcp://[::1]:5560");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_unbind (sb1, "tcp://[::1]:5561");
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sc0);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sc1);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sb0);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_close (sb1);
|
||||
assert (rc == 0);
|
||||
|
||||
rc = zmq_ctx_term (ctx);
|
||||
assert (rc == 0);
|
||||
}
|
||||
|
||||
int main (void)
|
||||
{
|
||||
setup_test_environment ();
|
||||
@ -265,9 +413,13 @@ int main (void)
|
||||
|
||||
test_multi_connect_ipv4 ();
|
||||
|
||||
test_multi_connect_ipv4_same_port ();
|
||||
|
||||
test_single_connect_ipv6 ();
|
||||
|
||||
test_multi_connect_ipv6 ();
|
||||
|
||||
test_multi_connect_ipv6_same_port ();
|
||||
|
||||
return 0 ;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user