diff --git a/RELICENSE/atomashpolskiy.md b/RELICENSE/atomashpolskiy.md new file mode 100644 index 00000000..75a8223a --- /dev/null +++ b/RELICENSE/atomashpolskiy.md @@ -0,0 +1,15 @@ +# Permission to Relicense under MPLv2 or any other OSI approved license chosen by the current ZeroMQ BDFL + +This is a statement by Andrei Tomashpolskiy +that grants permission to relicense its copyrights in the libzmq C++ +library (ZeroMQ) under the Mozilla Public License v2 (MPLv2) or any other +Open Source Initiative approved license chosen by the current ZeroMQ +BDFL (Benevolent Dictator for Life). + +A portion of the commits made by the Github handle "atomashpolskiy", with +commit author "Andrei Tomashpolskiy ", are copyright of Andrei Tomashpolskiy . +This document hereby grants the libzmq project team to relicense libzmq, +including all past, present and future contributions of the author listed above. + +Andrei Tomashpolskiy +2019/08/22 diff --git a/src/ip.cpp b/src/ip.cpp index 9b505798..ca73e4f6 100644 --- a/src/ip.cpp +++ b/src/ip.cpp @@ -681,3 +681,47 @@ void zmq::make_socket_noninheritable (fd_t sock_) LIBZMQ_UNUSED (sock_); #endif } + +void zmq::assert_socket_tuning_error (zmq::fd_t s_, int rc_) +{ + if (rc_ == 0) + return; + + // Check whether an error occurred + int err = 0; +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS + int len = sizeof err; +#else + socklen_t len = sizeof err; +#endif + + int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR, + reinterpret_cast (&err), &len); + + // Assert if the error was caused by 0MQ bug. + // Networking problems are OK. No need to assert. +#ifdef ZMQ_HAVE_WINDOWS + zmq_assert (rc == 0); + if (err != 0) { + wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET + || err == WSAECONNABORTED || err == WSAEINTR + || err == WSAETIMEDOUT || err == WSAEHOSTUNREACH + || err == WSAENETUNREACH || err == WSAENETDOWN + || err == WSAENETRESET || err == WSAEACCES + || err == WSAEINVAL || err == WSAEADDRINUSE); + } +#else + // Following code should handle both Berkeley-derived socket + // implementations and Solaris. + if (rc == -1) + err = errno; + if (err != 0) { + errno = err; + errno_assert (errno == ECONNREFUSED || errno == ECONNRESET + || errno == ECONNABORTED || errno == EINTR + || errno == ETIMEDOUT || errno == EHOSTUNREACH + || errno == ENETUNREACH || errno == ENETDOWN + || errno == ENETRESET || errno == EINVAL); + } +#endif +} diff --git a/src/ip.hpp b/src/ip.hpp index a6d368ff..f9be6500 100644 --- a/src/ip.hpp +++ b/src/ip.hpp @@ -71,6 +71,10 @@ int make_fdpair (fd_t *r_, fd_t *w_); // Makes a socket non-inheritable to child processes. // Asserts on any failure. void make_socket_noninheritable (fd_t sock_); + +// Asserts that an internal error did not occur. Does not assert +// on network errors such as reset or aborted connections. +void assert_socket_tuning_error (fd_t s_, int rc_); } #endif diff --git a/src/tcp.cpp b/src/tcp.cpp index e79c84d6..aa0b998c 100644 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -62,7 +62,7 @@ int zmq::tune_tcp_socket (fd_t s_) int nodelay = 1; int rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELAY, reinterpret_cast (&nodelay), sizeof (int)); - tcp_assert_tuning_error (s_, rc); + assert_socket_tuning_error (s_, rc); if (rc != 0) return rc; @@ -71,7 +71,7 @@ int zmq::tune_tcp_socket (fd_t s_) int nodelack = 1; rc = setsockopt (s_, IPPROTO_TCP, TCP_NODELACK, (char *) &nodelack, sizeof (int)); - tcp_assert_tuning_error (s_, rc); + assert_socket_tuning_error (s_, rc); #endif return rc; } @@ -81,7 +81,7 @@ int zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_) const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF, reinterpret_cast (&bufsize_), sizeof bufsize_); - tcp_assert_tuning_error (sockfd_, rc); + assert_socket_tuning_error (sockfd_, rc); return rc; } @@ -90,7 +90,7 @@ int zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_) const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF, reinterpret_cast (&bufsize_), sizeof bufsize_); - tcp_assert_tuning_error (sockfd_, rc); + assert_socket_tuning_error (sockfd_, rc); return rc; } @@ -123,7 +123,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, int rc = WSAIoctl (s_, SIO_KEEPALIVE_VALS, &keepalive_opts, sizeof (keepalive_opts), NULL, 0, &num_bytes_returned, NULL, NULL); - tcp_assert_tuning_error (s_, rc); + assert_socket_tuning_error (s_, rc); if (rc == SOCKET_ERROR) return rc; } @@ -133,7 +133,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, int rc = setsockopt (s_, SOL_SOCKET, SO_KEEPALIVE, reinterpret_cast (&keepalive_), sizeof (int)); - tcp_assert_tuning_error (s_, rc); + assert_socket_tuning_error (s_, rc); if (rc != 0) return rc; @@ -141,7 +141,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, if (keepalive_cnt_ != -1) { int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPCNT, &keepalive_cnt_, sizeof (int)); - tcp_assert_tuning_error (s_, rc); + assert_socket_tuning_error (s_, rc); if (rc != 0) return rc; } @@ -151,7 +151,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, if (keepalive_idle_ != -1) { int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPIDLE, &keepalive_idle_, sizeof (int)); - tcp_assert_tuning_error (s_, rc); + assert_socket_tuning_error (s_, rc); if (rc != 0) return rc; } @@ -160,7 +160,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, if (keepalive_idle_ != -1) { int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPALIVE, &keepalive_idle_, sizeof (int)); - tcp_assert_tuning_error (s_, rc); + assert_socket_tuning_error (s_, rc); if (rc != 0) return rc; } @@ -171,7 +171,7 @@ int zmq::tune_tcp_keepalives (fd_t s_, if (keepalive_intvl_ != -1) { int rc = setsockopt (s_, IPPROTO_TCP, TCP_KEEPINTVL, &keepalive_intvl_, sizeof (int)); - tcp_assert_tuning_error (s_, rc); + assert_socket_tuning_error (s_, rc); if (rc != 0) return rc; } @@ -196,13 +196,13 @@ int zmq::tune_tcp_maxrt (fd_t sockfd_, int timeout_) int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_MAXRT, reinterpret_cast (&timeout_), sizeof (timeout_)); - tcp_assert_tuning_error (sockfd_, rc); + assert_socket_tuning_error (sockfd_, rc); return rc; // FIXME: should be ZMQ_HAVE_TCP_USER_TIMEOUT #elif defined(TCP_USER_TIMEOUT) int rc = setsockopt (sockfd_, IPPROTO_TCP, TCP_USER_TIMEOUT, &timeout_, sizeof (timeout_)); - tcp_assert_tuning_error (sockfd_, rc); + assert_socket_tuning_error (sockfd_, rc); return rc; #else return 0; @@ -316,50 +316,6 @@ int zmq::tcp_read (fd_t s_, void *data_, size_t size_) #endif } -void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_) -{ - if (rc_ == 0) - return; - - // Check whether an error occurred - int err = 0; -#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS - int len = sizeof err; -#else - socklen_t len = sizeof err; -#endif - - int rc = getsockopt (s_, SOL_SOCKET, SO_ERROR, - reinterpret_cast (&err), &len); - - // Assert if the error was caused by 0MQ bug. - // Networking problems are OK. No need to assert. -#ifdef ZMQ_HAVE_WINDOWS - zmq_assert (rc == 0); - if (err != 0) { - wsa_assert (err == WSAECONNREFUSED || err == WSAECONNRESET - || err == WSAECONNABORTED || err == WSAEINTR - || err == WSAETIMEDOUT || err == WSAEHOSTUNREACH - || err == WSAENETUNREACH || err == WSAENETDOWN - || err == WSAENETRESET || err == WSAEACCES - || err == WSAEINVAL || err == WSAEADDRINUSE); - } -#else - // Following code should handle both Berkeley-derived socket - // implementations and Solaris. - if (rc == -1) - err = errno; - if (err != 0) { - errno = err; - errno_assert (errno == ECONNREFUSED || errno == ECONNRESET - || errno == ECONNABORTED || errno == EINTR - || errno == ETIMEDOUT || errno == EHOSTUNREACH - || errno == ENETUNREACH || errno == ENETDOWN - || errno == ENETRESET || errno == EINVAL); - } -#endif -} - void zmq::tcp_tune_loopback_fast_path (const fd_t socket_) { #if defined ZMQ_HAVE_WINDOWS && defined SIO_LOOPBACK_FAST_PATH diff --git a/src/tcp.hpp b/src/tcp.hpp index fbb61af3..a3fbb71d 100644 --- a/src/tcp.hpp +++ b/src/tcp.hpp @@ -66,10 +66,6 @@ int tcp_write (fd_t s_, const void *data_, size_t size_); // Zero indicates the peer has closed the connection. int tcp_read (fd_t s_, void *data_, size_t size_); -// Asserts that an internal error did not occur. Does not assert -// on network errors such as reset or aborted connections. -void tcp_assert_tuning_error (fd_t s_, int rc_); - void tcp_tune_loopback_fast_path (const fd_t socket_); // Resolves the given address_ string, opens a socket and sets socket options diff --git a/src/udp_engine.cpp b/src/udp_engine.cpp index 88433b0d..7009b1e5 100644 --- a/src/udp_engine.cpp +++ b/src/udp_engine.cpp @@ -116,9 +116,11 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) const udp_address_t *const udp_addr = _address->resolved.udp_addr; + int rc = 0; + // Bind the socket to a device if applicable if (!_options.bound_device.empty ()) - bind_to_device (_fd, _options.bound_device); + rc = rc | bind_to_device (_fd, _options.bound_device); if (_send_enabled) { if (!_options.raw_socket) { @@ -127,73 +129,18 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) _out_address_len = out->sockaddr_len (); if (out->is_multicast ()) { - int level; - int optname; + bool is_ipv6 = (out->family () == AF_INET6); + rc = rc + | set_udp_multicast_loop (_fd, is_ipv6, + _options.multicast_loop); - if (out->family () == AF_INET6) { - level = IPPROTO_IPV6; - optname = IPV6_MULTICAST_LOOP; - } else { - level = IPPROTO_IP; - optname = IP_MULTICAST_LOOP; + if (_options.multicast_hops > 0) { + rc = rc + | set_udp_multicast_ttl (_fd, is_ipv6, + _options.multicast_hops); } - int loop = _options.multicast_loop; - int rc = - setsockopt (_fd, level, optname, - reinterpret_cast (&loop), sizeof (loop)); - -#ifdef ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - errno_assert (rc == 0); -#endif - - int hops = _options.multicast_hops; - - if (hops > 0) { - rc = setsockopt (_fd, level, IP_MULTICAST_TTL, - reinterpret_cast (&hops), - sizeof (hops)); - } else { - rc = 0; - } - -#ifdef ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - errno_assert (rc == 0); -#endif - if (out->family () == AF_INET6) { - int bind_if = udp_addr->bind_if (); - - if (bind_if > 0) { - // If a bind interface is provided we tell the - // kernel to use it to send multicast packets - rc = setsockopt (_fd, IPPROTO_IPV6, IPV6_MULTICAST_IF, - reinterpret_cast (&bind_if), - sizeof (bind_if)); - } else { - rc = 0; - } - } else { - struct in_addr bind_addr = - udp_addr->bind_addr ()->ipv4.sin_addr; - - if (bind_addr.s_addr != INADDR_ANY) { - rc = setsockopt (_fd, IPPROTO_IP, IP_MULTICAST_IF, - reinterpret_cast (&bind_addr), - sizeof (bind_addr)); - } else { - rc = 0; - } - } - -#ifdef ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - errno_assert (rc == 0); -#endif + rc = rc | set_udp_multicast_iface (_fd, is_ipv6, udp_addr); } } else { /// XXX fixme ? @@ -201,19 +148,10 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) _out_address_len = static_cast (sizeof (sockaddr_in)); } - - set_pollout (_handle); } if (_recv_enabled) { - int on = 1; - int rc = setsockopt (_fd, SOL_SOCKET, SO_REUSEADDR, - reinterpret_cast (&on), sizeof (on)); -#ifdef ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - errno_assert (rc == 0); -#endif + rc = rc | set_udp_reuse_address (_fd, true); const ip_addr_t *bind_addr = udp_addr->bind_addr (); ip_addr_t any = ip_addr_t::any (bind_addr->family ()); @@ -225,13 +163,7 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) // Multicast addresses should be allowed to bind to more than // one port as all ports should receive the message #ifdef SO_REUSEPORT - rc = setsockopt (_fd, SOL_SOCKET, SO_REUSEPORT, - reinterpret_cast (&on), sizeof (on)); -#ifdef ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - errno_assert (rc == 0); -#endif + rc = rc | set_udp_reuse_port (_fd, true); #endif // In multicast we should bind ANY and use the mreq struct to @@ -244,62 +176,161 @@ void zmq::udp_engine_t::plug (io_thread_t *io_thread_, session_base_t *session_) } #ifdef ZMQ_HAVE_VXWORKS - rc = bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (), - real_bind_addr->sockaddr_len ()); + rc = rc + | bind (_fd, (sockaddr *) real_bind_addr->as_sockaddr (), + real_bind_addr->sockaddr_len ()); #else - rc = bind (_fd, real_bind_addr->as_sockaddr (), - real_bind_addr->sockaddr_len ()); - -#endif -#ifdef ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - errno_assert (rc == 0); + rc = rc + | bind (_fd, real_bind_addr->as_sockaddr (), + real_bind_addr->sockaddr_len ()); #endif if (multicast) { - const ip_addr_t *mcast_addr = udp_addr->target_addr (); - - if (mcast_addr->family () == AF_INET) { - struct ip_mreq mreq; - mreq.imr_multiaddr = mcast_addr->ipv4.sin_addr; - mreq.imr_interface = bind_addr->ipv4.sin_addr; - - rc = - setsockopt (_fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, - reinterpret_cast (&mreq), sizeof (mreq)); - - errno_assert (rc == 0); - } else if (mcast_addr->family () == AF_INET6) { - struct ipv6_mreq mreq; - int iface = _address->resolved.udp_addr->bind_if (); - - zmq_assert (iface >= -1); - - mreq.ipv6mr_multiaddr = mcast_addr->ipv6.sin6_addr; - mreq.ipv6mr_interface = iface; - - rc = - setsockopt (_fd, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, - reinterpret_cast (&mreq), sizeof (mreq)); - - errno_assert (rc == 0); - } else { - // Shouldn't happen - abort (); - } - -#ifdef ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - errno_assert (rc == 0); -#endif + rc = rc | add_membership (_fd, udp_addr); } - set_pollin (_handle); - - // Call restart output to drop all join/leave commands - restart_output (); } + + if (rc != 0) { + error (protocol_error); + } else { + if (_send_enabled) { + set_pollout (_handle); + } + + if (_recv_enabled) { + set_pollin (_handle); + + // Call restart output to drop all join/leave commands + restart_output (); + } + } +} + +int zmq::udp_engine_t::set_udp_multicast_loop (fd_t s_, + bool is_ipv6_, + bool loop_) +{ + int level; + int optname; + + if (is_ipv6_) { + level = IPPROTO_IPV6; + optname = IPV6_MULTICAST_LOOP; + } else { + level = IPPROTO_IP; + optname = IP_MULTICAST_LOOP; + } + + int loop = loop_ ? 1 : 0; + int rc = setsockopt (s_, level, optname, reinterpret_cast (&loop), + sizeof (loop)); + assert_socket_tuning_error (s_, rc); + return rc; +} + +int zmq::udp_engine_t::set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_) +{ + int level; + + if (is_ipv6_) { + level = IPPROTO_IPV6; + } else { + level = IPPROTO_IP; + } + + int rc = setsockopt (s_, level, IP_MULTICAST_TTL, + reinterpret_cast (&hops_), sizeof (hops_)); + assert_socket_tuning_error (s_, rc); + return rc; +} + +int zmq::udp_engine_t::set_udp_multicast_iface (fd_t s_, + bool is_ipv6_, + const udp_address_t *addr_) +{ + int rc = 0; + + if (is_ipv6_) { + int bind_if = addr_->bind_if (); + + if (bind_if > 0) { + // If a bind interface is provided we tell the + // kernel to use it to send multicast packets + rc = setsockopt (s_, IPPROTO_IPV6, IPV6_MULTICAST_IF, + reinterpret_cast (&bind_if), + sizeof (bind_if)); + } + } else { + struct in_addr bind_addr = addr_->bind_addr ()->ipv4.sin_addr; + + if (bind_addr.s_addr != INADDR_ANY) { + rc = setsockopt (s_, IPPROTO_IP, IP_MULTICAST_IF, + reinterpret_cast (&bind_addr), + sizeof (bind_addr)); + } + } + + assert_socket_tuning_error (s_, rc); + return rc; +} + +int zmq::udp_engine_t::set_udp_reuse_address (fd_t s_, bool on_) +{ + int on = on_ ? 1 : 0; + int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEADDR, + reinterpret_cast (&on), sizeof (on)); + assert_socket_tuning_error (s_, rc); + return rc; +} + +int zmq::udp_engine_t::set_udp_reuse_port (fd_t s_, bool on_) +{ +#ifndef SO_REUSEPORT + return 0; +#else + int on = on_ ? 1 : 0; + int rc = setsockopt (s_, SOL_SOCKET, SO_REUSEPORT, + reinterpret_cast (&on), sizeof (on)); + assert_socket_tuning_error (s_, rc); + return rc; +#endif +} + +int zmq::udp_engine_t::add_membership (fd_t s_, const udp_address_t *addr_) +{ + const ip_addr_t *mcast_addr = addr_->target_addr (); + int rc = 0; + + if (mcast_addr->family () == AF_INET) { + struct ip_mreq mreq; + mreq.imr_multiaddr = mcast_addr->ipv4.sin_addr; + mreq.imr_interface = addr_->bind_addr ()->ipv4.sin_addr; + + rc = setsockopt (s_, IPPROTO_IP, IP_ADD_MEMBERSHIP, + reinterpret_cast (&mreq), sizeof (mreq)); + + } else if (mcast_addr->family () == AF_INET6) { + struct ipv6_mreq mreq; + int iface = addr_->bind_if (); + + zmq_assert (iface >= -1); + + mreq.ipv6mr_multiaddr = mcast_addr->ipv6.sin6_addr; + mreq.ipv6mr_interface = iface; + + rc = setsockopt (s_, IPPROTO_IPV6, IPV6_ADD_MEMBERSHIP, + reinterpret_cast (&mreq), sizeof (mreq)); + } + + assert_socket_tuning_error (s_, rc); + return rc; +} + +void zmq::udp_engine_t::error (error_reason_t reason_) +{ + zmq_assert (_session); + _session->engine_error (reason_); + terminate (); } void zmq::udp_engine_t::terminate () diff --git a/src/udp_engine.hpp b/src/udp_engine.hpp index dd5cdc9b..6c969b16 100644 --- a/src/udp_engine.hpp +++ b/src/udp_engine.hpp @@ -49,6 +49,22 @@ class udp_engine_t : public io_object_t, public i_engine int resolve_raw_address (char *addr_, size_t length_); void sockaddr_to_msg (zmq::msg_t *msg_, sockaddr_in *addr_); + int set_udp_reuse_address (fd_t s_, bool on_); + int set_udp_reuse_port (fd_t s_, bool on_); + // Indicate, if the multicast data being sent should be looped back + int set_udp_multicast_loop (fd_t s_, bool is_ipv6_, bool loop_); + // Set multicast TTL + int set_udp_multicast_ttl (fd_t s_, bool is_ipv6_, int hops_); + // Set multicast address/interface + int set_udp_multicast_iface (fd_t s_, + bool is_ipv6_, + const udp_address_t *addr_); + // Join a multicast group + int add_membership (fd_t s_, const udp_address_t *addr_); + + // Function to handle network issues. + void error (error_reason_t reason_); + const endpoint_uri_pair_t _empty_endpoint; bool _plugged;