diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp index 67fceb3d..031c6a04 100644 --- a/src/stream_engine.cpp +++ b/src/stream_engine.cpp @@ -83,25 +83,6 @@ zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, // Put the socket into non-blocking mode. unblock_socket (s); - // Set the socket buffer limits for the underlying socket. - if (options.sndbuf) { - rc = setsockopt (s, SOL_SOCKET, SO_SNDBUF, - (char*) &options.sndbuf, sizeof (int)); -#ifdef ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - errno_assert (rc == 0); -#endif - } - if (options.rcvbuf) { - rc = setsockopt (s, SOL_SOCKET, SO_RCVBUF, - (char*) &options.rcvbuf, sizeof (int)); -#ifdef ZMQ_HAVE_WINDOWS - wsa_assert (rc != SOCKET_ERROR); -#else - errno_assert (rc == 0); -#endif - } #ifdef SO_NOSIGPIPE // Make sure that SIGPIPE signal is not generated when writing to a diff --git a/src/tcp.cpp b/src/tcp.cpp index 767beead..fff520ea 100644 --- a/src/tcp.cpp +++ b/src/tcp.cpp @@ -59,6 +59,28 @@ void zmq::tune_tcp_socket (fd_t s_) #endif } +void zmq::set_tcp_send_buffer (fd_t sockfd_, int bufsize_) +{ + const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_SNDBUF, + (char*) &bufsize_, sizeof bufsize_); +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif +} + +void zmq::set_tcp_receive_buffer (fd_t sockfd_, int bufsize_) +{ + const int rc = setsockopt (sockfd_, SOL_SOCKET, SO_RCVBUF, + (char*) &bufsize_, sizeof bufsize_); +#ifdef ZMQ_HAVE_WINDOWS + wsa_assert (rc != SOCKET_ERROR); +#else + errno_assert (rc == 0); +#endif +} + void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_) { // These options are used only under certain #ifdefs below. diff --git a/src/tcp.hpp b/src/tcp.hpp index 40dfc7b9..28d6af77 100644 --- a/src/tcp.hpp +++ b/src/tcp.hpp @@ -28,6 +28,12 @@ namespace zmq // Tunes the supplied TCP socket for the best latency. void tune_tcp_socket (fd_t s_); + // Sets the socket send buffer size. + void set_tcp_send_buffer (fd_t sockfd_, int bufsize_); + + // Sets the socket receive buffer size. + void set_tcp_receive_buffer (fd_t sockfd_, int bufsize_); + // Tunes TCP keep-alives void tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_); diff --git a/src/tcp_connecter.cpp b/src/tcp_connecter.cpp index eacb105d..9e87a71d 100644 --- a/src/tcp_connecter.cpp +++ b/src/tcp_connecter.cpp @@ -226,6 +226,12 @@ int zmq::tcp_connecter_t::open () // Set the socket to non-blocking mode so that we get async connect(). unblock_socket (s); + // Set the socket buffer limits for the underlying socket. + if (options.sndbuf != 0) + set_tcp_send_buffer (s, options.sndbuf); + if (options.rcvbuf != 0) + set_tcp_receive_buffer (s, options.rcvbuf); + // Connect to the remote peer. int rc = ::connect ( s, addr->resolved.tcp_addr->addr (), diff --git a/src/tcp_listener.cpp b/src/tcp_listener.cpp index 05e9064f..89e146b6 100644 --- a/src/tcp_listener.cpp +++ b/src/tcp_listener.cpp @@ -188,6 +188,12 @@ int zmq::tcp_listener_t::set_address (const char *addr_) if (address.family () == AF_INET6) enable_ipv4_mapping (s); + // Set the socket buffer limits for the underlying socket. + if (options.sndbuf != 0) + set_tcp_send_buffer (s, options.sndbuf); + if (options.rcvbuf != 0) + set_tcp_receive_buffer (s, options.rcvbuf); + // Allow reusing of the address. int flag = 1; #ifdef ZMQ_HAVE_WINDOWS