mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-17 04:50:57 +08:00
Merge pull request #921 from mbarbisan/master
Allow TCP addresses to be re-resolved whenever there is a reconnection a...
This commit is contained in:
commit
49e035fb40
@ -240,7 +240,7 @@ int zmq::req_t::recv_reply_pipe (msg_t *msg_)
|
|||||||
|
|
||||||
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
|
zmq::req_session_t::req_session_t (io_thread_t *io_thread_, bool connect_,
|
||||||
socket_base_t *socket_, const options_t &options_,
|
socket_base_t *socket_, const options_t &options_,
|
||||||
const address_t *addr_) :
|
address_t *addr_) :
|
||||||
session_base_t (io_thread_, connect_, socket_, options_, addr_),
|
session_base_t (io_thread_, connect_, socket_, options_, addr_),
|
||||||
state (bottom)
|
state (bottom)
|
||||||
{
|
{
|
||||||
|
@ -87,7 +87,7 @@ namespace zmq
|
|||||||
|
|
||||||
req_session_t (zmq::io_thread_t *io_thread_, bool connect_,
|
req_session_t (zmq::io_thread_t *io_thread_, bool connect_,
|
||||||
zmq::socket_base_t *socket_, const options_t &options_,
|
zmq::socket_base_t *socket_, const options_t &options_,
|
||||||
const address_t *addr_);
|
address_t *addr_);
|
||||||
~req_session_t ();
|
~req_session_t ();
|
||||||
|
|
||||||
// Overrides of the functions from session_base_t.
|
// Overrides of the functions from session_base_t.
|
||||||
|
@ -34,7 +34,7 @@
|
|||||||
|
|
||||||
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
|
zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
|
||||||
bool active_, class socket_base_t *socket_, const options_t &options_,
|
bool active_, class socket_base_t *socket_, const options_t &options_,
|
||||||
const address_t *addr_)
|
address_t *addr_)
|
||||||
{
|
{
|
||||||
|
|
||||||
session_base_t *s = NULL;
|
session_base_t *s = NULL;
|
||||||
@ -67,7 +67,7 @@ zmq::session_base_t *zmq::session_base_t::create (class io_thread_t *io_thread_,
|
|||||||
|
|
||||||
zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
|
zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
|
||||||
bool active_, class socket_base_t *socket_, const options_t &options_,
|
bool active_, class socket_base_t *socket_, const options_t &options_,
|
||||||
const address_t *addr_) :
|
address_t *addr_) :
|
||||||
own_t (io_thread_, options_),
|
own_t (io_thread_, options_),
|
||||||
io_object_t (io_thread_),
|
io_object_t (io_thread_),
|
||||||
active (active_),
|
active (active_),
|
||||||
|
@ -47,7 +47,7 @@ namespace zmq
|
|||||||
// Create a session of the particular type.
|
// Create a session of the particular type.
|
||||||
static session_base_t *create (zmq::io_thread_t *io_thread_,
|
static session_base_t *create (zmq::io_thread_t *io_thread_,
|
||||||
bool active_, zmq::socket_base_t *socket_,
|
bool active_, zmq::socket_base_t *socket_,
|
||||||
const options_t &options_, const address_t *addr_);
|
const options_t &options_, address_t *addr_);
|
||||||
|
|
||||||
// To be used once only, when creating the session.
|
// To be used once only, when creating the session.
|
||||||
void attach_pipe (zmq::pipe_t *pipe_);
|
void attach_pipe (zmq::pipe_t *pipe_);
|
||||||
@ -90,7 +90,7 @@ namespace zmq
|
|||||||
|
|
||||||
session_base_t (zmq::io_thread_t *io_thread_, bool active_,
|
session_base_t (zmq::io_thread_t *io_thread_, bool active_,
|
||||||
zmq::socket_base_t *socket_, const options_t &options_,
|
zmq::socket_base_t *socket_, const options_t &options_,
|
||||||
const address_t *addr_);
|
address_t *addr_);
|
||||||
virtual ~session_base_t ();
|
virtual ~session_base_t ();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -152,7 +152,7 @@ namespace zmq
|
|||||||
bool has_linger_timer;
|
bool has_linger_timer;
|
||||||
|
|
||||||
// Protocol and address to use when connecting.
|
// Protocol and address to use when connecting.
|
||||||
const address_t *addr;
|
address_t *addr;
|
||||||
|
|
||||||
session_base_t (const session_base_t&);
|
session_base_t (const session_base_t&);
|
||||||
const session_base_t &operator = (const session_base_t&);
|
const session_base_t &operator = (const session_base_t&);
|
||||||
|
@ -585,14 +585,8 @@ int zmq::socket_base_t::connect (const char *addr_)
|
|||||||
|
|
||||||
// Resolve address (if needed by the protocol)
|
// Resolve address (if needed by the protocol)
|
||||||
if (protocol == "tcp") {
|
if (protocol == "tcp") {
|
||||||
paddr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
|
// Defer resolution until a socket is opened
|
||||||
alloc_assert (paddr->resolved.tcp_addr);
|
paddr->resolved.tcp_addr = NULL;
|
||||||
int rc = paddr->resolved.tcp_addr->resolve (
|
|
||||||
address.c_str (), false, options.ipv6);
|
|
||||||
if (rc != 0) {
|
|
||||||
delete paddr;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
||||||
else
|
else
|
||||||
|
@ -50,7 +50,7 @@
|
|||||||
|
|
||||||
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
|
zmq::tcp_connecter_t::tcp_connecter_t (class io_thread_t *io_thread_,
|
||||||
class session_base_t *session_, const options_t &options_,
|
class session_base_t *session_, const options_t &options_,
|
||||||
const address_t *addr_, bool delayed_start_) :
|
address_t *addr_, bool delayed_start_) :
|
||||||
own_t (io_thread_, options_),
|
own_t (io_thread_, options_),
|
||||||
io_object_t (io_thread_),
|
io_object_t (io_thread_),
|
||||||
addr (addr_),
|
addr (addr_),
|
||||||
@ -209,6 +209,24 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
{
|
{
|
||||||
zmq_assert (s == retired_fd);
|
zmq_assert (s == retired_fd);
|
||||||
|
|
||||||
|
// Resolve the address
|
||||||
|
if (addr->resolved.tcp_addr != NULL) {
|
||||||
|
delete addr->resolved.tcp_addr;
|
||||||
|
addr->resolved.tcp_addr = NULL;
|
||||||
|
}
|
||||||
|
zmq_assert (addr->resolved.tcp_addr == NULL);
|
||||||
|
|
||||||
|
addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
|
||||||
|
alloc_assert (addr->resolved.tcp_addr);
|
||||||
|
int rc = addr->resolved.tcp_addr->resolve (
|
||||||
|
addr->address.c_str (), false, options.ipv6);
|
||||||
|
if (rc != 0) {
|
||||||
|
delete addr->resolved.tcp_addr;
|
||||||
|
addr->resolved.tcp_addr = NULL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
zmq_assert (addr->resolved.tcp_addr != NULL);
|
||||||
|
|
||||||
// Create the socket.
|
// Create the socket.
|
||||||
s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
|
s = open_socket (addr->resolved.tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
@ -244,7 +262,7 @@ int zmq::tcp_connecter_t::open ()
|
|||||||
set_ip_type_of_service (s, options.tos);
|
set_ip_type_of_service (s, options.tos);
|
||||||
|
|
||||||
// Connect to the remote peer.
|
// Connect to the remote peer.
|
||||||
int rc = ::connect (
|
rc = ::connect (
|
||||||
s, addr->resolved.tcp_addr->addr (),
|
s, addr->resolved.tcp_addr->addr (),
|
||||||
addr->resolved.tcp_addr->addrlen ());
|
addr->resolved.tcp_addr->addrlen ());
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ namespace zmq
|
|||||||
// then starts connection process.
|
// then starts connection process.
|
||||||
tcp_connecter_t (zmq::io_thread_t *io_thread_,
|
tcp_connecter_t (zmq::io_thread_t *io_thread_,
|
||||||
zmq::session_base_t *session_, const options_t &options_,
|
zmq::session_base_t *session_, const options_t &options_,
|
||||||
const address_t *addr_, bool delayed_start_);
|
address_t *addr_, bool delayed_start_);
|
||||||
~tcp_connecter_t ();
|
~tcp_connecter_t ();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -82,7 +82,7 @@ namespace zmq
|
|||||||
fd_t connect ();
|
fd_t connect ();
|
||||||
|
|
||||||
// Address to connect to. Owned by session_base_t.
|
// Address to connect to. Owned by session_base_t.
|
||||||
const address_t *addr;
|
address_t *addr;
|
||||||
|
|
||||||
// Underlying socket.
|
// Underlying socket.
|
||||||
fd_t s;
|
fd_t s;
|
||||||
|
@ -30,15 +30,15 @@ int main (void)
|
|||||||
|
|
||||||
int rc = zmq_connect (sock, "tcp://localhost:1234");
|
int rc = zmq_connect (sock, "tcp://localhost:1234");
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
|
||||||
|
// Because of lazy resolution of TCP names, this will succeed
|
||||||
rc = zmq_connect (sock, "tcp://localhost:invalid");
|
rc = zmq_connect (sock, "tcp://localhost:invalid");
|
||||||
assert (rc == -1);
|
assert (rc == 0);
|
||||||
assert (errno == EINVAL);
|
|
||||||
|
|
||||||
|
// Because of lazy resolution of TCP names, this will succeed
|
||||||
rc = zmq_connect (sock, "tcp://in val id:1234");
|
rc = zmq_connect (sock, "tcp://in val id:1234");
|
||||||
assert (rc == -1);
|
assert (rc == 0);
|
||||||
assert (errno == EINVAL);
|
|
||||||
|
|
||||||
rc = zmq_connect (sock, "invalid://localhost:1234");
|
rc = zmq_connect (sock, "invalid://localhost:1234");
|
||||||
assert (rc == -1);
|
assert (rc == -1);
|
||||||
assert (errno == EPROTONOSUPPORT);
|
assert (errno == EPROTONOSUPPORT);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user