diff --git a/include/zmq.h b/include/zmq.h index 6de1107e..84208864 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -584,6 +584,7 @@ ZMQ_EXPORT void zmq_threadclose (void *thread); #define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91 #define ZMQ_BINDTODEVICE 92 #define ZMQ_ZAP_ENFORCE_DOMAIN 93 +#define ZMQ_LOOPBACK_FASTPATH 94 /* DRAFT 0MQ socket events and monitoring */ /* Unspecified system errors during handshake. Event value is an errno. */ diff --git a/src/options.cpp b/src/options.cpp index e42fcea7..643ca6ce 100644 --- a/src/options.cpp +++ b/src/options.cpp @@ -90,7 +90,8 @@ zmq::options_t::options_t () : heartbeat_interval (0), heartbeat_timeout (-1), use_fd (-1), - zap_enforce_domain (false) + zap_enforce_domain (false), + loopback_fastpath (false) { memset (curve_public_key, 0, CURVE_KEYSIZE); memset (curve_secret_key, 0, CURVE_KEYSIZE); @@ -627,6 +628,13 @@ int zmq::options_t::setsockopt (int option_, } break; + case ZMQ_LOOPBACK_FASTPATH: + if (is_int) { + loopback_fastpath = (value != 0); + return 0; + } + break; + default: #if defined(ZMQ_ACT_MILITANT) @@ -1064,6 +1072,13 @@ int zmq::options_t::getsockopt (int option_, } break; + case ZMQ_LOOPBACK_FASTPATH: + if (is_int) { + *value = loopback_fastpath; + return 0; + } + break; + default: #if defined(ZMQ_ACT_MILITANT) malformed = false; diff --git a/src/options.hpp b/src/options.hpp index 43f43416..14a6b815 100644 --- a/src/options.hpp +++ b/src/options.hpp @@ -248,6 +248,9 @@ struct options_t // Enforce a non-empty ZAP domain requirement for PLAIN auth bool zap_enforce_domain; + + // Use of loopback fastpath. + bool loopback_fastpath; }; } diff --git a/src/signaler.cpp b/src/signaler.cpp index 01911766..7c396c23 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -65,6 +65,7 @@ #include "err.hpp" #include "fd.hpp" #include "ip.hpp" +#include "tcp.hpp" #if defined ZMQ_HAVE_EVENTFD #include @@ -392,22 +393,7 @@ static void tune_socket (const SOCKET socket) (char *) &tcp_nodelay, sizeof tcp_nodelay); wsa_assert (rc != SOCKET_ERROR); - int sio_loopback_fastpath = 1; - DWORD numberOfBytesReturned = 0; - - rc = WSAIoctl (socket, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath, - sizeof sio_loopback_fastpath, NULL, 0, - &numberOfBytesReturned, 0, 0); - - if (SOCKET_ERROR == rc) { - DWORD lastError = ::WSAGetLastError (); - - if (WSAEOPNOTSUPP == lastError) { - // This system is not Windows 8 or Server 2012, and the call is not supported. - } else { - wsa_assert (false); - } - } + zmq::tcp_tune_loopback_fast_path (socket); } #endif diff --git a/src/tcp.cpp b/src/tcp.cpp index aabd8ecb..94431d0f 100644 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -332,3 +332,27 @@ void zmq::tcp_assert_tuning_error (zmq::fd_t s_, int rc_) } #endif } + +void zmq::tcp_tune_loopback_fast_path (const fd_t socket_) +{ +#if defined ZMQ_HAVE_WINDOWS + int sio_loopback_fastpath = 1; + DWORD numberOfBytesReturned = 0; + + int rc = WSAIoctl (socket_, SIO_LOOPBACK_FAST_PATH, &sio_loopback_fastpath, + sizeof sio_loopback_fastpath, NULL, 0, + &numberOfBytesReturned, 0, 0); + + if (SOCKET_ERROR == rc) { + DWORD lastError = ::WSAGetLastError (); + + if (WSAEOPNOTSUPP == lastError) { + // This system is not Windows 8 or Server 2012, and the call is not supported. + } else { + wsa_assert (false); + } + } +#else + LIBZMQ_UNUSED (socket_); +#endif +} diff --git a/src/tcp.hpp b/src/tcp.hpp index 842b0e4e..81c1a789 100644 --- a/src/tcp.hpp +++ b/src/tcp.hpp @@ -66,6 +66,8 @@ int tcp_read (fd_t s_, void *data_, size_t size_); // Asserts that an internal error did not occur. Does not assert // on network errors such as reset or aborted connections. void tcp_assert_tuning_error (fd_t s_, int rc_); + +void tcp_tune_loopback_fast_path (fd_t socket_); } #endif diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index bcaeff59..11c00307 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -302,6 +302,10 @@ int zmq::tcp_connecter_t::open () // Set the socket to non-blocking mode so that we get async connect(). unblock_socket (s); + // Set the socket to loopback fastpath if configured. + if (options.loopback_fastpath) + tcp_tune_loopback_fast_path (s); + // Set the socket buffer limits for the underlying socket. if (options.sndbuf >= 0) set_tcp_send_buffer (s, options.sndbuf); diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index e446fa98..9a54521f 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -216,6 +216,10 @@ int zmq::tcp_listener_t::set_address (const char *addr_) if (options.tos != 0) set_ip_type_of_service (s, options.tos); + // Set the socket to loopback fastpath if configured. + if (options.loopback_fastpath) + tcp_tune_loopback_fast_path (s); + // Bind the socket to a device if applicable if (!options.bound_device.empty ()) bind_to_device (s, options.bound_device); diff --git a/src/zmq_draft.h b/src/zmq_draft.h index ae4494fc..5dd6201b 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -55,6 +55,7 @@ unsigned long zmq_stopwatch_intermediate (void *watch_); #define ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE 91 #define ZMQ_BINDTODEVICE 92 #define ZMQ_ZAP_ENFORCE_DOMAIN 93 +#define ZMQ_LOOPBACK_FASTPATH 94 /* DRAFT 0MQ socket events and monitoring */ /* Unspecified system errors during handshake. Event value is an errno. */ diff --git a/tests/test_pair_tcp.cpp b/tests/test_pair_tcp.cpp index aadd9ad8..e5c819df 100644 --- a/tests/test_pair_tcp.cpp +++ b/tests/test_pair_tcp.cpp @@ -29,9 +29,20 @@ #include "testutil.hpp" -int main (void) +typedef void (*extra_func_t) (void *socket); + +#ifdef ZMQ_BUILD_DRAFT +void set_sockopt_fastpath (void *socket) +{ + int value = 1; + int rc = + zmq_setsockopt (socket, ZMQ_LOOPBACK_FASTPATH, &value, sizeof value); + assert (rc == 0); +} +#endif + +void test_pair_tcp (extra_func_t extra_func = NULL) { - setup_test_environment (); size_t len = MAX_SOCKET_STRING; char my_endpoint[MAX_SOCKET_STRING]; void *ctx = zmq_ctx_new (); @@ -39,6 +50,10 @@ int main (void) void *sb = zmq_socket (ctx, ZMQ_PAIR); assert (sb); + + if (extra_func) + extra_func (sb); + int rc = zmq_bind (sb, "tcp://127.0.0.1:*"); assert (rc == 0); rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, my_endpoint, &len); @@ -46,6 +61,9 @@ int main (void) void *sc = zmq_socket (ctx, ZMQ_PAIR); assert (sc); + if (extra_func) + extra_func (sc); + rc = zmq_connect (sc, my_endpoint); assert (rc == 0); @@ -59,6 +77,16 @@ int main (void) rc = zmq_ctx_term (ctx); assert (rc == 0); +} + +int main (void) +{ + setup_test_environment (); + + test_pair_tcp (); +#ifdef ZMQ_BUILD_DRAFT + test_pair_tcp (set_sockopt_fastpath); +#endif return 0; }