0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-26 23:01:04 +08:00

UDP engine aborts on networking-related errors from socket syscalls #2862 (#3638)

* UDP engine aborts on networking-related errors from socket syscalls #2862

* Add relicense statement
This commit is contained in:
Andrei Tomashpolskiy 2019-08-22 20:52:31 +03:00 committed by Luca Boccassi
parent e492dea80e
commit f48c86d077
7 changed files with 254 additions and 192 deletions

View File

@ -0,0 +1,15 @@
# Permission to Relicense under MPLv2 or any other OSI approved license chosen by the current ZeroMQ BDFL
This is a statement by Andrei Tomashpolskiy
that grants permission to relicense its copyrights in the libzmq C++
library (ZeroMQ) under the Mozilla Public License v2 (MPLv2) or any other
Open Source Initiative approved license chosen by the current ZeroMQ
BDFL (Benevolent Dictator for Life).
A portion of the commits made by the Github handle "atomashpolskiy", with
commit author "Andrei Tomashpolskiy <nordmann89@gmail.com>", are copyright of Andrei Tomashpolskiy .
This document hereby grants the libzmq project team to relicense libzmq,
including all past, present and future contributions of the author listed above.
Andrei Tomashpolskiy
2019/08/22

View File

@ -681,3 +681,47 @@ void zmq::make_socket_noninheritable (fd_t sock_)
LIBZMQ_UNUSED (sock_);
#endif
}
void zmq::assert_socket_tuning_error (zmq::fd_t s_, int rc_)
{
if (rc_ == 0)
return;
// Check whether an error occurred
int err = 0;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof err;
#else
socklen_t len = sizeof err;
#endif
int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0);
if (err != 0) {
wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET
|| err == WSAECONNABORTED || err == WSAEINTR
|| err == WSAETIMEDOUT || err == WSAEHOSTUNREACH
|| err == WSAENETUNREACH || err == WSAENETDOWN
|| err == WSAENETRESET || err == WSAEACCES
|| err == WSAEINVAL || err == WSAEADDRINUSE);
}
#else
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
if (rc == -1)
err = errno;
if (err != 0) {
errno = err;
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
|| errno == ECONNABORTED || errno == EINTR
|| errno == ETIMEDOUT || errno == EHOSTUNREACH
|| errno == ENETUNREACH || errno == ENETDOWN
|| errno == ENETRESET || errno == EINVAL);
}
#endif
}

View File

@ -71,6 +71,10 @@ int make_fdpair (fd_t *r_, fd_t *w_);
// Makes a socket non-inheritable to child processes.
// Asserts on any failure.
void make_socket_noninheritable (fd_t sock_);
// Asserts that an internal error did not occur. Does not assert
// on network errors such as reset or aborted connections.
void assert_socket_tuning_error (fd_t s_, int rc_);
}
#endif

View File

@ -62,7 +62,7 @@ int zmq::tune_tcp_socket (fd_t s_)
int nodelay = 1;
int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY,
reinterpret_cast<char *> (&nodelay), sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
@ -71,7 +71,7 @@ int zmq::tune_tcp_socket (fd_t s_)
int nodelack = 1;
rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack,
sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
#endif
return rc;
}
@ -81,7 +81,7 @@ int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_)
const int rc =
setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF,
reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
tcp_assert_tuning_error (sockfd_, rc);
assert_socket_tuning_error (sockfd_, rc);
return rc;
}
@ -90,7 +90,7 @@ int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_)
const int rc =
setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF,
reinterpret_cast<char *> (&bufsize_), sizeof bufsize_);
tcp_assert_tuning_error (sockfd_, rc);
assert_socket_tuning_error (sockfd_, rc);
return rc;
}
@ -123,7 +123,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts,
sizeof (keepalive_opts), NULL, 0,
&num_bytes_returned, NULL, NULL);
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc == SOCKET_ERROR)
return rc;
}
@ -133,7 +133,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
int rc =
setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE,
reinterpret_cast<char *> (&keepalive_), sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
@ -141,7 +141,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_cnt_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_,
sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
}
@ -151,7 +151,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_idle_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE,
&keepalive_idle_, sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
}
@ -160,7 +160,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_idle_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE,
&keepalive_idle_, sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
}
@ -171,7 +171,7 @@ int zmq::tune_tcp_keepalives (fd_t s_,
if (keepalive_intvl_ != -1) {
int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL,
&keepalive_intvl_, sizeof (int));
tcp_assert_tuning_error (s_, rc);
assert_socket_tuning_error (s_, rc);
if (rc != 0)
return rc;
}
@ -196,13 +196,13 @@ int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_)
int rc =
setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT,
reinterpret_cast<char *> (&timeout_), sizeof (timeout_));
tcp_assert_tuning_error (sockfd_, rc);
assert_socket_tuning_error (sockfd_, rc);
return rc;
// FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT
#elif defined(TCP_USER_TIMEOUT)
int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_,
sizeof (timeout_));
tcp_assert_tuning_error (sockfd_, rc);
assert_socket_tuning_error (sockfd_, rc);
return rc;
#else
return 0;
@ -316,50 +316,6 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
#endif
}
void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_)
{
if (rc_ == 0)
return;
// Check whether an error occurred
int err = 0;
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
int len = sizeof err;
#else
socklen_t len = sizeof err;
#endif
int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR,
reinterpret_cast<char *> (&err), &len);
// Assert if the error was caused by 0MQ bug.
// Networking problems are OK. No need to assert.
#ifdef ZMQ_HAVE_WINDOWS
zmq_assert (rc == 0);
if (err != 0) {
wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET
|| err == WSAECONNABORTED || err == WSAEINTR
|| err == WSAETIMEDOUT || err == WSAEHOSTUNREACH
|| err == WSAENETUNREACH || err == WSAENETDOWN
|| err == WSAENETRESET || err == WSAEACCES
|| err == WSAEINVAL || err == WSAEADDRINUSE);
}
#else
// Following code should handle both Berkeley-derived socket
// implementations and Solaris.
if (rc == -1)
err = errno;
if (err != 0) {
errno = err;
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
|| errno == ECONNABORTED || errno == EINTR
|| errno == ETIMEDOUT || errno == EHOSTUNREACH
|| errno == ENETUNREACH || errno == ENETDOWN
|| errno == ENETRESET || errno == EINVAL);
}
#endif
}
void zmq::tcp_tune_loopback_fast_path (const fd_t socket_)
{
#if defined ZMQ_HAVE_WINDOWS && defined SIO_LOOPBACK_FAST_PATH

View File

@ -66,10 +66,6 @@ int tcp_write (fd_t s_, const void *data_, size_t size_);
// Zero indicates the peer has closed the connection.
int tcp_read (fd_t s_, void *data_, size_t size_);
// Asserts that an internal error did not occur. Does not assert
// on network errors such as reset or aborted connections.
void tcp_assert_tuning_error (fd_t s_, int rc_);
void tcp_tune_loopback_fast_path (const fd_t socket_);
// Resolves the given address_ string, opens a socket and sets socket options

View File

@ -116,9 +116,11 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
const udp_address_t *const udp_addr = _address->resolved.udp_addr;
int rc = 0;
// Bind the socket to a device if applicable
if (!_options.bound_device.empty ())
bind_to_device (_fd, _options.bound_device);
rc = rc | bind_to_device (_fd, _options.bound_device);
if (_send_enabled) {
if (!_options.raw_socket) {
@ -127,73 +129,18 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
_out_address_len = out->sockaddr_len ();
if (out->is_multicast ()) {
int level;
int optname;
bool is_ipv6 = (out->family () == AF_INET6);
rc = rc
| set_udp_multicast_loop (_fd, is_ipv6,
_options.multicast_loop);
if (out->family () == AF_INET6) {
level = IPPROTO_IPV6;
optname = IPV6_MULTICAST_LOOP;
} else {
level = IPPROTO_IP;
optname = IP_MULTICAST_LOOP;
if (_options.multicast_hops > 0) {
rc = rc
| set_udp_multicast_ttl (_fd, is_ipv6,
_options.multicast_hops);
}
int loop = _options.multicast_loop;
int rc =
setsockopt (_fd, level, optname,
reinterpret_cast<char *> (&loop), sizeof (loop));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
int hops = _options.multicast_hops;
if (hops > 0) {
rc = setsockopt (_fd, level, IP_MULTICAST_TTL,
reinterpret_cast<char *> (&hops),
sizeof (hops));
} else {
rc = 0;
}
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
if (out->family () == AF_INET6) {
int bind_if = udp_addr->bind_if ();
if (bind_if > 0) {
// If a bind interface is provided we tell the
// kernel to use it to send multicast packets
rc = setsockopt (_fd, IPPROTO_IPV6, IPV6_MULTICAST_IF,
reinterpret_cast<char *> (&bind_if),
sizeof (bind_if));
} else {
rc = 0;
}
} else {
struct in_addr bind_addr =
udp_addr->bind_addr ()->ipv4.sin_addr;
if (bind_addr.s_addr != INADDR_ANY) {
rc = setsockopt (_fd, IPPROTO_IP, IP_MULTICAST_IF,
reinterpret_cast<char *> (&bind_addr),
sizeof (bind_addr));
} else {
rc = 0;
}
}
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
rc = rc | set_udp_multicast_iface (_fd, is_ipv6, udp_addr);
}
} else {
/// XXX fixme ?
@ -201,19 +148,10 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
_out_address_len =
static_cast<zmq_socklen_t> (sizeof (sockaddr_in));
}
set_pollout (_handle);
}
if (_recv_enabled) {
int on = 1;
int rc = setsockopt (_fd, SOL_SOCKET, SO_REUSEADDR,
reinterpret_cast<char *> (&on), sizeof (on));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
rc = rc | set_udp_reuse_address (_fd, true);
const ip_addr_t *bind_addr = udp_addr->bind_addr ();
ip_addr_t any = ip_addr_t::any (bind_addr->family ());
@ -225,13 +163,7 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
// Multicast addresses should be allowed to bind to more than
// one port as all ports should receive the message
#ifdef SO_REUSEPORT
rc = setsockopt (_fd, SOL_SOCKET, SO_REUSEPORT,
reinterpret_cast<char *> (&on), sizeof (on));
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
rc = rc | set_udp_reuse_port (_fd, true);
#endif
// In multicast we should bind ANY and use the mreq struct to
@ -244,62 +176,161 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_)
}
#ifdef ZMQ_HAVE_VXWORKS
rc = bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (),
real_bind_addr->sockaddr_len ());
rc = rc
| bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (),
real_bind_addr->sockaddr_len ());
#else
rc = bind (_fd, real_bind_addr->as_sockaddr (),
real_bind_addr->sockaddr_len ());
#endif
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
rc = rc
| bind (_fd, real_bind_addr->as_sockaddr (),
real_bind_addr->sockaddr_len ());
#endif
if (multicast) {
const ip_addr_t *mcast_addr = udp_addr->target_addr ();
if (mcast_addr->family () == AF_INET) {
struct ip_mreq mreq;
mreq.imr_multiaddr = mcast_addr->ipv4.sin_addr;
mreq.imr_interface = bind_addr->ipv4.sin_addr;
rc =
setsockopt (_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP,
reinterpret_cast<char *> (&mreq), sizeof (mreq));
errno_assert (rc == 0);
} else if (mcast_addr->family () == AF_INET6) {
struct ipv6_mreq mreq;
int iface = _address->resolved.udp_addr->bind_if ();
zmq_assert (iface >= -1);
mreq.ipv6mr_multiaddr = mcast_addr->ipv6.sin6_addr;
mreq.ipv6mr_interface = iface;
rc =
setsockopt (_fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
reinterpret_cast<char *> (&mreq), sizeof (mreq));
errno_assert (rc == 0);
} else {
// Shouldn't happen
abort ();
}
#ifdef ZMQ_HAVE_WINDOWS
wsa_assert (rc != SOCKET_ERROR);
#else
errno_assert (rc == 0);
#endif
rc = rc | add_membership (_fd, udp_addr);
}
set_pollin (_handle);
// Call restart output to drop all join/leave commands
restart_output ();
}
if (rc != 0) {
error (protocol_error);
} else {
if (_send_enabled) {
set_pollout (_handle);
}
if (_recv_enabled) {
set_pollin (_handle);
// Call restart output to drop all join/leave commands
restart_output ();
}
}
}
int zmq::udp_engine_t::set_udp_multicast_loop (fd_t s_,
bool is_ipv6_,
bool loop_)
{
int level;
int optname;
if (is_ipv6_) {
level = IPPROTO_IPV6;
optname = IPV6_MULTICAST_LOOP;
} else {
level = IPPROTO_IP;
optname = IP_MULTICAST_LOOP;
}
int loop = loop_ ? 1 : 0;
int rc = setsockopt (s_, level, optname, reinterpret_cast<char *> (&loop),
sizeof (loop));
assert_socket_tuning_error (s_, rc);
return rc;
}
int zmq::udp_engine_t::set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_)
{
int level;
if (is_ipv6_) {
level = IPPROTO_IPV6;
} else {
level = IPPROTO_IP;
}
int rc = setsockopt (s_, level, IP_MULTICAST_TTL,
reinterpret_cast<char *> (&hops_), sizeof (hops_));
assert_socket_tuning_error (s_, rc);
return rc;
}
int zmq::udp_engine_t::set_udp_multicast_iface (fd_t s_,
bool is_ipv6_,
const udp_address_t *addr_)
{
int rc = 0;
if (is_ipv6_) {
int bind_if = addr_->bind_if ();
if (bind_if > 0) {
// If a bind interface is provided we tell the
// kernel to use it to send multicast packets
rc = setsockopt (s_, IPPROTO_IPV6, IPV6_MULTICAST_IF,
reinterpret_cast<char *> (&bind_if),
sizeof (bind_if));
}
} else {
struct in_addr bind_addr = addr_->bind_addr ()->ipv4.sin_addr;
if (bind_addr.s_addr != INADDR_ANY) {
rc = setsockopt (s_, IPPROTO_IP, IP_MULTICAST_IF,
reinterpret_cast<char *> (&bind_addr),
sizeof (bind_addr));
}
}
assert_socket_tuning_error (s_, rc);
return rc;
}
int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_)
{
int on = on_ ? 1 : 0;
int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR,
reinterpret_cast<char *> (&on), sizeof (on));
assert_socket_tuning_error (s_, rc);
return rc;
}
int zmq::udp_engine_t::set_udp_reuse_port (fd_t s_, bool on_)
{
#ifndef SO_REUSEPORT
return 0;
#else
int on = on_ ? 1 : 0;
int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEPORT,
reinterpret_cast<char *> (&on), sizeof (on));
assert_socket_tuning_error (s_, rc);
return rc;
#endif
}
int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_)
{
const ip_addr_t *mcast_addr = addr_->target_addr ();
int rc = 0;
if (mcast_addr->family () == AF_INET) {
struct ip_mreq mreq;
mreq.imr_multiaddr = mcast_addr->ipv4.sin_addr;
mreq.imr_interface = addr_->bind_addr ()->ipv4.sin_addr;
rc = setsockopt (s_, IPPROTO_IP, IP_ADD_MEMBERSHIP,
reinterpret_cast<char *> (&mreq), sizeof (mreq));
} else if (mcast_addr->family () == AF_INET6) {
struct ipv6_mreq mreq;
int iface = addr_->bind_if ();
zmq_assert (iface >= -1);
mreq.ipv6mr_multiaddr = mcast_addr->ipv6.sin6_addr;
mreq.ipv6mr_interface = iface;
rc = setsockopt (s_, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP,
reinterpret_cast<char *> (&mreq), sizeof (mreq));
}
assert_socket_tuning_error (s_, rc);
return rc;
}
void zmq::udp_engine_t::error (error_reason_t reason_)
{
zmq_assert (_session);
_session->engine_error (reason_);
terminate ();
}
void zmq::udp_engine_t::terminate ()

View File

@ -49,6 +49,22 @@ class udp_engine_t : public io_object_t, public i_engine
int resolve_raw_address (char *addr_, size_t length_);
void sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_);
int set_udp_reuse_address (fd_t s_, bool on_);
int set_udp_reuse_port (fd_t s_, bool on_);
// Indicate, if the multicast data being sent should be looped back
int set_udp_multicast_loop (fd_t s_, bool is_ipv6_, bool loop_);
// Set multicast TTL
int set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_);
// Set multicast address/interface
int set_udp_multicast_iface (fd_t s_,
bool is_ipv6_,
const udp_address_t *addr_);
// Join a multicast group
int add_membership (fd_t s_, const udp_address_t *addr_);
// Function to handle network issues.
void error (error_reason_t reason_);
const endpoint_uri_pair_t _empty_endpoint;
bool _plugged;