From acb68070415691413ade304c2a75beeb69106909 Mon Sep 17 00:00:00 2001 From: nikita kozlov Date: Wed, 30 Apr 2014 14:43:37 +0200 Subject: [PATCH] Allow to set up a source address and port for outgoing tcp connections in zmq_connect() Few examples: tcp:://192.168.0.4:55555;192.168.0.254:1234 tcp:://192.168.0.4:0;192.168.0.254:1234 tcp:://eth2:55555;192.168.0.254:1234 --- doc/zmq_tcp.txt | 9 +++ src/tcp_address.cpp | 115 +++++++++++++++++++++++++------- src/tcp_address.hpp | 19 ++++-- src/tcp_connecter.cpp | 9 +++ tests/Makefile.am | 2 + tests/test_bind_src_address.cpp | 47 +++++++++++++ 6 files changed, 174 insertions(+), 27 deletions(-) create mode 100644 tests/test_bind_src_address.cpp diff --git a/doc/zmq_tcp.txt b/doc/zmq_tcp.txt index a82d4fca..0185674d 100644 --- a/doc/zmq_tcp.txt +++ b/doc/zmq_tcp.txt @@ -52,6 +52,9 @@ Connecting a socket When connecting a socket to a peer address using _zmq_connect()_ with the 'tcp' transport, the 'endpoint' shall be interpreted as a 'peer address' followed by a colon and the TCP port number to use. +You can optionally specify a 'source_endpoint' which will be used as the source +address for your connection; tcp://'source_endpoint';'endpoint', see the +'interface' description above for details. A 'peer address' may be specified by either of the following: @@ -84,6 +87,12 @@ assert (rc == 0); // Connecting using a DNS name rc = zmq_connect(socket, "tcp://server1:5555"); assert (rc == 0); +// Connecting using a DNS name and bind to eth1 +rc = zmq_connect(socket, "tcp://eth1:0;server1:5555"); +assert (rc == 0); +// Connecting using a IP address and bind to an IP address +rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); +assert (rc == 0); ---- diff --git a/src/tcp_address.cpp b/src/tcp_address.cpp index 51b35d0e..8ec65111 100644 --- a/src/tcp_address.cpp +++ b/src/tcp_address.cpp @@ -50,7 +50,7 @@ #include // On Solaris platform, network interface name can be queried by ioctl. -int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) +int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) { // TODO: Unused parameter, IPv6 support not implemented for Solaris. (void) ipv6_; @@ -89,7 +89,11 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) rc = ioctl (fd, SIOCGLIFADDR, (char*) ifrp); errno_assert (rc != -1); if (ifrp->lifr_addr.ss_family == AF_INET) { - address.ipv4 = *(sockaddr_in*) &ifrp->lifr_addr; + if (is_src_) { + source_address.ipv4 = *(sockaddr_in*) &ifrp->lifr_addr; + } else { + address.ipv4 = *(sockaddr_in*) &ifrp->lifr_addr; + } found = true; break; } @@ -114,7 +118,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) #include #include -int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) +int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) { // TODO: Unused parameter, IPv6 support not implemented for AIX or HP/UX. (void) ipv6_; @@ -138,8 +142,13 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) errno = ENODEV; return -1; } - memcpy (&address.ipv4.sin_addr, &((sockaddr_in*) &ifr.ifr_addr)->sin_addr, - sizeof (in_addr)); + if (is_src_) { + memcpy (&source_address.ipv4.sin_addr, &((sockaddr_in*) &ifr.ifr_addr)->sin_addr, + sizeof (in_addr)); + } else { + memcpy (&address.ipv4.sin_addr, &((sockaddr_in*) &ifr.ifr_addr)->sin_addr, + sizeof (in_addr)); + } return 0; } @@ -153,7 +162,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) // On these platforms, network interface name can be queried // using getifaddrs function. -int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) +int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) { // Get the addresses. ifaddrs *ifa = NULL; @@ -171,9 +180,15 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) int family = ifp->ifa_addr->sa_family; if ((family == AF_INET || (ipv6_ && family == AF_INET6)) && !strcmp (nic_, ifp->ifa_name)) { - memcpy (&address, ifp->ifa_addr, - (family == AF_INET) ? sizeof (struct sockaddr_in) - : sizeof (struct sockaddr_in6)); + if (is_src_) { + memcpy (&source_address, ifp->ifa_addr, + (family == AF_INET) ? sizeof (struct sockaddr_in) + : sizeof (struct sockaddr_in6)); + } else { + memcpy (&address, ifp->ifa_addr, + (family == AF_INET) ? sizeof (struct sockaddr_in) + : sizeof (struct sockaddr_in6)); + } found = true; break; } @@ -193,7 +208,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) // On other platforms we assume there are no sane interface names. // This is true especially of Windows. -int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) +int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_) { // All unused parameters. (void) nic_; @@ -205,7 +220,7 @@ int zmq::tcp_address_t::resolve_nic_name (const char *nic_, bool ipv6_) #endif -int zmq::tcp_address_t::resolve_interface (const char *interface_, bool ipv6_) +int zmq::tcp_address_t::resolve_interface (const char *interface_, bool ipv6_, bool is_src_) { // Initialize temporary output pointers with storage address. sockaddr_storage ss; @@ -233,12 +248,16 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_, bool ipv6_) // "*" resolves to INADDR_ANY or in6addr_any. if (strcmp (interface_, "*") == 0) { zmq_assert (out_addrlen <= sizeof address); - memcpy (&address, out_addr, out_addrlen); + if (is_src_) { + memcpy (&source_address, out_addr, out_addrlen); + } else { + memcpy (&address, out_addr, out_addrlen); + } return 0; } // Try to resolve the string as a NIC name. - int rc = resolve_nic_name (interface_, ipv6_); + int rc = resolve_nic_name (interface_, ipv6_, is_src_); if (rc != 0 && errno != ENODEV) return rc; if (rc == 0) @@ -286,7 +305,11 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_, bool ipv6_) // Use the first result. zmq_assert (res != NULL); zmq_assert ((size_t) (res->ai_addrlen) <= sizeof (address)); - memcpy (&address, res->ai_addr, res->ai_addrlen); + if (is_src_) { + memcpy (&source_address, res->ai_addr, res->ai_addrlen); + } else { + memcpy (&address, res->ai_addr, res->ai_addrlen); + } // Cleanup getaddrinfo after copying the possibly referenced result. freeaddrinfo (res); @@ -294,7 +317,7 @@ int zmq::tcp_address_t::resolve_interface (const char *interface_, bool ipv6_) return 0; } -int zmq::tcp_address_t::resolve_hostname (const char *hostname_, bool ipv6_) +int zmq::tcp_address_t::resolve_hostname (const char *hostname_, bool ipv6_, bool is_src_) { // Set up the query. #if defined ZMQ_HAVE_OPENVMS && defined __ia64 && __INITIAL_POINTER_SIZE == 64 @@ -344,7 +367,11 @@ int zmq::tcp_address_t::resolve_hostname (const char *hostname_, bool ipv6_) // Copy first result to output addr with hostname and service. zmq_assert ((size_t) (res->ai_addrlen) <= sizeof (address)); - memcpy (&address, res->ai_addr, res->ai_addrlen); + if (is_src_) { + memcpy (&source_address, res->ai_addr, res->ai_addrlen); + } else { + memcpy (&address, res->ai_addr, res->ai_addrlen); + } freeaddrinfo (res); @@ -354,6 +381,7 @@ int zmq::tcp_address_t::resolve_hostname (const char *hostname_, bool ipv6_) zmq::tcp_address_t::tcp_address_t () { memset (&address, 0, sizeof (address)); + memset (&source_address, 0, sizeof (source_address)); } zmq::tcp_address_t::tcp_address_t (const sockaddr *sa, socklen_t sa_len) @@ -361,6 +389,7 @@ zmq::tcp_address_t::tcp_address_t (const sockaddr *sa, socklen_t sa_len) zmq_assert(sa && sa_len > 0); memset (&address, 0, sizeof (address)); + memset (&source_address, 0, sizeof (source_address)); if (sa->sa_family == AF_INET && sa_len >= (socklen_t) sizeof (address.ipv4)) memcpy(&address.ipv4, sa, sizeof (address.ipv4)); else @@ -372,14 +401,28 @@ zmq::tcp_address_t::~tcp_address_t () { } -int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_) +int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_, bool is_src_) { + if (!is_src_) { + // Test the ';' to know if we have a source address in name_ + const char *src_delimiter = strrchr (name_, ';'); + if (src_delimiter) { + std::string src_name (name_, src_delimiter - name_); + int rc = resolve(src_name.c_str(), local_, ipv6_, true); + if (rc != 0) + return -1; + name_ = src_delimiter + 1; + _has_src_addr = true; + } + } + // Find the ':' at end that separates address from the port number. const char *delimiter = strrchr (name_, ':'); if (!delimiter) { errno = EINVAL; return -1; } + // Separate the address/port. std::string addr_str (name_, delimiter - name_); std::string port_str (delimiter + 1); @@ -406,17 +449,25 @@ int zmq::tcp_address_t::resolve (const char *name_, bool local_, bool ipv6_) // Resolve the IP address. int rc; if (local_) - rc = resolve_interface (addr_str.c_str (), ipv6_); + rc = resolve_interface (addr_str.c_str (), ipv6_, is_src_); else - rc = resolve_hostname (addr_str.c_str (), ipv6_); + rc = resolve_hostname (addr_str.c_str (), ipv6_, is_src_); if (rc != 0) return -1; // Set the port into the address structure. - if (address.generic.sa_family == AF_INET6) - address.ipv6.sin6_port = htons (port); - else - address.ipv4.sin_port = htons (port); + if (is_src_) { + if (source_address.generic.sa_family == AF_INET6) + source_address.ipv6.sin6_port = htons (port); + else + source_address.ipv4.sin_port = htons (port); + } else { + if (address.generic.sa_family == AF_INET6) + address.ipv6.sin6_port = htons (port); + else + address.ipv4.sin_port = htons (port); + } + return 0; } @@ -463,6 +514,24 @@ socklen_t zmq::tcp_address_t::addrlen () const return (socklen_t) sizeof (address.ipv4); } +const sockaddr *zmq::tcp_address_t::src_addr () const +{ + return &source_address.generic; +} + +socklen_t zmq::tcp_address_t::src_addrlen () const +{ + if (address.generic.sa_family == AF_INET6) + return (socklen_t) sizeof (source_address.ipv6); + else + return (socklen_t) sizeof (source_address.ipv4); +} + +bool zmq::tcp_address_t::has_src_addr() const +{ + return _has_src_addr; +} + #if defined ZMQ_HAVE_WINDOWS unsigned short zmq::tcp_address_t::family () const #else diff --git a/src/tcp_address.hpp b/src/tcp_address.hpp index ccd3dec2..6d24a58b 100644 --- a/src/tcp_address.hpp +++ b/src/tcp_address.hpp @@ -44,7 +44,7 @@ namespace zmq // strcuture. If 'local' is true, names are resolved as local interface // names. If it is false, names are resolved as remote hostnames. // If 'ipv6' is true, the name may resolve to IPv6 address. - int resolve (const char *name_, bool local_, bool ipv6_); + int resolve (const char *name_, bool local_, bool ipv6_, bool is_src_ = false); // The opposite to resolve() virtual int to_string (std::string &addr_); @@ -57,16 +57,27 @@ namespace zmq const sockaddr *addr () const; socklen_t addrlen () const; + const sockaddr *src_addr () const; + socklen_t src_addrlen () const; + bool has_src_addr () const; + protected: - int resolve_nic_name (const char *nic_, bool ipv6_); - int resolve_interface (const char *interface_, bool ipv6_); - int resolve_hostname (const char *hostname_, bool ipv6_); + int resolve_nic_name (const char *nic_, bool ipv6_, bool is_src_ = false); + int resolve_interface (const char *interface_, bool ipv6_, bool is_src_ = false); + int resolve_hostname (const char *hostname_, bool ipv6_, bool is_src_ = false); union { sockaddr generic; sockaddr_in ipv4; sockaddr_in6 ipv6; } address; + + union { + sockaddr generic; + sockaddr_in ipv4; + sockaddr_in6 ipv6; + } source_address; + bool _has_src_addr; }; class tcp_address_mask_t : public tcp_address_t diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index fb2828e5..0bc3b55a 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -261,6 +261,15 @@ int zmq::tcp_connecter_t::open () if (options.tos != 0) set_ip_type_of_service (s, options.tos); + // Set a source address for conversations + if (addr->resolved.tcp_addr->has_src_addr ()) { + rc = ::bind (s, addr->resolved.tcp_addr->src_addr (), addr->resolved.tcp_addr->src_addrlen ()); + + if (rc == -1) { + return -1; + } + } + // Connect to the remote peer. rc = ::connect ( s, addr->resolved.tcp_addr->addr (), diff --git a/tests/Makefile.am b/tests/Makefile.am index c3101d78..e8856fff 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -14,6 +14,7 @@ noinst_PROGRAMS = test_system \ test_invalid_rep \ test_msg_flags \ test_connect_resolve \ + test_bind_src_address \ test_immediate \ test_last_endpoint \ test_term_endpoint \ @@ -114,6 +115,7 @@ test_many_sockets_SOURCES = test_many_sockets.cpp test_ipc_wildcard_SOURCES = test_ipc_wildcard.cpp test_diffserv_SOURCES = test_diffserv.cpp test_connect_rid_SOURCES = test_connect_rid.cpp +test_bind_src_address_SOURCES = test_bind_src_address.cpp if !ON_MINGW test_shutdown_stress_SOURCES = test_shutdown_stress.cpp test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp diff --git a/tests/test_bind_src_address.cpp b/tests/test_bind_src_address.cpp new file mode 100644 index 00000000..ac08e806 --- /dev/null +++ b/tests/test_bind_src_address.cpp @@ -0,0 +1,47 @@ +/* + Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" + +int main (void) +{ + setup_test_environment(); + void *ctx = zmq_ctx_new (); + assert (ctx); + + void *sock = zmq_socket (ctx, ZMQ_PUB); + assert (sock); + + int rc = zmq_connect (sock, "tcp://127.0.0.1:0;localhost:1234"); + assert (rc == 0); + + rc = zmq_connect (sock, "tcp://localhost:5555;localhost:1235"); + assert (rc == 0); + + rc = zmq_connect (sock, "tcp://lo:5555;localhost:1235"); + assert (rc == 0); + + rc = zmq_close (sock); + assert (rc == 0); + + rc = zmq_ctx_term (ctx); + assert (rc == 0); + + return 0; +}