diff --git a/include/zmq.h b/include/zmq.h
index 7206ee2d..86a52638 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -302,6 +302,7 @@ ZMQ_EXPORT const char *zmq_msg_gets (zmq_msg_t *msg, const char *property);
#define ZMQ_GSSAPI_PLAINTEXT 65
#define ZMQ_HANDSHAKE_IVL 66
#define ZMQ_IDENTITY_FD 67
+#define ZMQ_SOCKS_PROXY 68
/* Message options */
#define ZMQ_MORE 1
diff --git a/src/Makefile.am b/src/Makefile.am
index dae104dd..89c5f16f 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -78,6 +78,8 @@ libzmq_la_SOURCES = \
session_base.hpp \
signaler.hpp \
socket_base.hpp \
+ socks.hpp \
+ socks_connecter.hpp \
stdint.hpp \
stream.hpp \
stream_engine.hpp \
@@ -149,6 +151,8 @@ libzmq_la_SOURCES = \
session_base.cpp \
signaler.cpp \
socket_base.cpp \
+ socks.cpp \
+ socks_connecter.cpp \
stream.cpp \
stream_engine.cpp \
sub.cpp \
diff --git a/src/options.cpp b/src/options.cpp
index 959b2a55..ef945486 100644
--- a/src/options.cpp
+++ b/src/options.cpp
@@ -206,6 +206,19 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
}
break;
+ case ZMQ_SOCKS_PROXY:
+ if (optval_ == NULL && optvallen_ == 0) {
+ socks_proxy_address.clear ();
+ return 0;
+ }
+ else
+ if (optval_ != NULL && optvallen_ > 0 ) {
+ socks_proxy_address =
+ std::string ((const char *) optval_, optvallen_);
+ return 0;
+ }
+ break;
+
case ZMQ_TCP_KEEPALIVE:
if (is_int && (value == -1 || value == 0 || value == 1)) {
tcp_keepalive = value;
@@ -618,6 +631,14 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
}
break;
+ case ZMQ_SOCKS_PROXY:
+ if (*optvallen_ >= socks_proxy_address.size () + 1) {
+ memcpy (optval_, socks_proxy_address.c_str (), socks_proxy_address.size () + 1);
+ *optvallen_ = socks_proxy_address.size () + 1;
+ return 0;
+ }
+ break;
+
case ZMQ_TCP_KEEPALIVE:
if (is_int) {
*value = tcp_keepalive;
diff --git a/src/options.hpp b/src/options.hpp
index 16aa3bd2..3bacec0e 100644
--- a/src/options.hpp
+++ b/src/options.hpp
@@ -114,6 +114,9 @@ namespace zmq
// if true, router socket accepts non-zmq tcp connections
bool raw_sock;
+ // Addres of SOCKS proxy
+ std::string socks_proxy_address;
+
// TCP keep-alive settings.
// Defaults to -1 = do not change socket options
int tcp_keepalive;
diff --git a/src/session_base.cpp b/src/session_base.cpp
index 074db2a3..60c530fa 100644
--- a/src/session_base.cpp
+++ b/src/session_base.cpp
@@ -25,6 +25,7 @@
#include "tcp_connecter.hpp"
#include "ipc_connecter.hpp"
#include "tipc_connecter.hpp"
+#include "socks_connecter.hpp"
#include "pgm_sender.hpp"
#include "pgm_receiver.hpp"
#include "address.hpp"
@@ -497,10 +498,22 @@ void zmq::session_base_t::start_connecting (bool wait_)
// Create the connecter object.
if (addr->protocol == "tcp") {
- tcp_connecter_t *connecter = new (std::nothrow) tcp_connecter_t (
- io_thread, this, options, addr, wait_);
- alloc_assert (connecter);
- launch_child (connecter);
+ if (options.socks_proxy_address != "") {
+ address_t *proxy_address = new (std::nothrow)
+ address_t ("tcp", options.socks_proxy_address);
+ alloc_assert (proxy_address);
+ socks_connecter_t *connecter =
+ new (std::nothrow) socks_connecter_t (
+ io_thread, this, options, addr, proxy_address, wait_);
+ alloc_assert (connecter);
+ launch_child (connecter);
+ }
+ else {
+ tcp_connecter_t *connecter = new (std::nothrow)
+ tcp_connecter_t (io_thread, this, options, addr, wait_);
+ alloc_assert (connecter);
+ launch_child (connecter);
+ }
return;
}
diff --git a/src/socks.cpp b/src/socks.cpp
new file mode 100644
index 00000000..e5a3f560
--- /dev/null
+++ b/src/socks.cpp
@@ -0,0 +1,269 @@
+/*
+ Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#include
+#include
+#include
+
+#include "err.hpp"
+#include "platform.hpp"
+#include "socks.hpp"
+#include "tcp.hpp"
+
+zmq::socks_greeting_t::socks_greeting_t (uint8_t method_) :
+ num_methods (1)
+{
+ methods [0] = method_;
+}
+
+zmq::socks_greeting_t::socks_greeting_t (
+ uint8_t *methods_, size_t num_methods_)
+ : num_methods (num_methods_)
+{
+ zmq_assert (num_methods_ <= 255);
+
+ for (size_t i = 0; i < num_methods_; i++)
+ methods [i] = methods_ [i];
+}
+
+zmq::socks_greeting_encoder_t::socks_greeting_encoder_t ()
+ : bytes_encoded (0), bytes_written (0)
+{}
+
+void zmq::socks_greeting_encoder_t::encode (const socks_greeting_t &greeting_)
+{
+ uint8_t *ptr = buf;
+
+ *ptr++ = 0x05;
+ *ptr++ = greeting_.num_methods;
+ for (size_t i = 0; i < greeting_.num_methods; i++)
+ *ptr++ = greeting_.methods [i];
+
+ bytes_encoded = 2 + greeting_.num_methods;
+ bytes_written = 0;
+}
+
+int zmq::socks_greeting_encoder_t::output (fd_t fd_)
+{
+ const int rc = tcp_write (
+ fd_, buf + bytes_written, bytes_encoded - bytes_written);
+ if (rc > 0)
+ bytes_written += static_cast (rc);
+ return rc;
+}
+
+bool zmq::socks_greeting_encoder_t::has_pending_data () const
+{
+ return bytes_written < bytes_encoded;
+}
+
+void zmq::socks_greeting_encoder_t::reset ()
+{
+ bytes_encoded = bytes_written = 0;
+}
+
+zmq::socks_choice_t::socks_choice_t (unsigned char method_)
+ : method (method_)
+{}
+
+zmq::socks_choice_decoder_t::socks_choice_decoder_t ()
+ : bytes_read (0)
+{}
+
+int zmq::socks_choice_decoder_t::input (fd_t fd_)
+{
+ zmq_assert (bytes_read < 2);
+ const int rc = tcp_read (fd_, buf + bytes_read, 2 - bytes_read);
+ if (rc > 0) {
+ bytes_read += static_cast (rc);
+ if (buf [0] != 0x05)
+ return -1;
+ }
+ return rc;
+}
+
+bool zmq::socks_choice_decoder_t::message_ready () const
+{
+ return bytes_read == 2;
+}
+
+zmq::socks_choice_t zmq::socks_choice_decoder_t::decode ()
+{
+ zmq_assert (message_ready ());
+ return socks_choice_t (buf [1]);
+}
+
+void zmq::socks_choice_decoder_t::reset ()
+{
+ bytes_read = 0;
+}
+
+zmq::socks_request_t::socks_request_t (
+ uint8_t command_, std::string hostname_, uint16_t port_)
+ : command (command_), hostname (hostname_), port (port_)
+{}
+
+zmq::socks_request_encoder_t::socks_request_encoder_t ()
+ : bytes_encoded (0), bytes_written (0)
+{}
+
+void zmq::socks_request_encoder_t::encode (const socks_request_t &req)
+{
+ unsigned char *ptr = buf;
+ *ptr++ = 0x05;
+ *ptr++ = req.command;
+ *ptr++ = 0x00;
+
+#if defined ZMQ_HAVE_OPENVMS && defined __ia64 && __INITIAL_POINTER_SIZE == 64
+ __addrinfo64 hints, *res = NULL;
+#else
+ addrinfo hints, *res = NULL;
+#endif
+
+ memset (&hints, 0, sizeof hints);
+
+ // Suppress potential DNS lookups.
+ hints.ai_flags = AI_NUMERICHOST;
+
+ const int rc = getaddrinfo (req.hostname.c_str (), NULL, &hints, &res);
+ if (rc == 0 && res->ai_family == AF_INET) {
+ struct sockaddr_in *sockaddr_in =
+ reinterpret_cast (res->ai_addr);
+ *ptr++ = 0x01;
+ memcpy (ptr, &sockaddr_in->sin_addr, 4);
+ ptr += 4;
+ }
+ else
+ if (rc == 0 && res->ai_family == AF_INET6) {
+ struct sockaddr_in6 *sockaddr_in6 =
+ reinterpret_cast (res->ai_addr);
+ *ptr++ = 0x04;
+ memcpy (ptr, &sockaddr_in6->sin6_addr, 16);
+ ptr += 16;
+ }
+ else {
+ *ptr++ = 0x03;
+ *ptr++ = req.hostname.size ();
+ memcpy (ptr, req.hostname.c_str (), req.hostname.size ());
+ ptr += req.hostname.size ();
+ }
+
+ if (rc == 0)
+ freeaddrinfo (res);
+
+ *ptr++ = req.port / 256;
+ *ptr++ = req.port % 256;
+
+ bytes_encoded = ptr - buf;
+ bytes_written = 0;
+}
+
+int zmq::socks_request_encoder_t::output (fd_t fd_)
+{
+ const int rc = tcp_write (
+ fd_, buf + bytes_written, bytes_encoded - bytes_written);
+ if (rc > 0)
+ bytes_written += static_cast (rc);
+ return rc;
+}
+
+bool zmq::socks_request_encoder_t::has_pending_data () const
+{
+ return bytes_written < bytes_encoded;
+}
+
+void zmq::socks_request_encoder_t::reset ()
+{
+ bytes_encoded = bytes_written = 0;
+}
+
+zmq::socks_response_t::socks_response_t (
+ uint8_t response_code_, std::string address_, uint16_t port_)
+ : response_code (response_code_), address (address_), port (port_)
+{}
+
+zmq::socks_response_decoder_t::socks_response_decoder_t ()
+ : bytes_read (0)
+{}
+
+int zmq::socks_response_decoder_t::input (fd_t fd_)
+{
+ size_t n = 0;
+
+ if (bytes_read < 5)
+ n = 5 - bytes_read;
+ else {
+ const uint8_t atyp = buf [3];
+ zmq_assert (atyp == 0x01 || atyp == 0x03 || atyp == 0x04);
+ if (atyp == 0x01)
+ n = 3 + 2;
+ else
+ if (atyp == 0x03)
+ n = buf [4] + 2;
+ else
+ if (atyp == 0x04)
+ n = 15 + 2;
+ }
+ const int rc = tcp_read (fd_, buf + bytes_read, n);
+ if (rc > 0) {
+ bytes_read += static_cast (rc);
+ if (buf [0] != 0x05)
+ return -1;
+ if (bytes_read >= 2)
+ if (buf [1] > 0x08)
+ return -1;
+ if (bytes_read >= 3)
+ if (buf [2] != 0x00)
+ return -1;
+ if (bytes_read >= 4) {
+ const uint8_t atyp = buf [3];
+ if (atyp != 0x01 && atyp != 0x03 && atyp != 0x04)
+ return -1;
+ }
+ }
+ return rc;
+}
+
+bool zmq::socks_response_decoder_t::message_ready () const
+{
+ if (bytes_read < 4)
+ return false;
+ else {
+ const uint8_t atyp = buf [3];
+ zmq_assert (atyp == 0x01 || atyp == 0x03 || atyp == 0x04);
+ if (atyp == 0x01)
+ return bytes_read == 10;
+ else
+ if (atyp == 0x03)
+ return bytes_read > 4 && bytes_read == 4 + 1 + buf [4] + 2u;
+ else
+ return bytes_read == 22;
+ }
+}
+
+zmq::socks_response_t zmq::socks_response_decoder_t::decode ()
+{
+ zmq_assert (message_ready ());
+ return socks_response_t (buf [1], "", 0);
+}
+
+void zmq::socks_response_decoder_t::reset ()
+{
+ bytes_read = 0;
+}
diff --git a/src/socks.hpp b/src/socks.hpp
new file mode 100644
index 00000000..8fe4594b
--- /dev/null
+++ b/src/socks.hpp
@@ -0,0 +1,125 @@
+/*
+ Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __ZMQ_SOCKS_HPP_INCLUDED__
+#define __ZMQ_SOCKS_HPP_INCLUDED__
+
+#include
+#include "fd.hpp"
+#include "stdint.hpp"
+
+namespace zmq
+{
+
+ struct socks_greeting_t
+ {
+ socks_greeting_t (uint8_t method);
+ socks_greeting_t (uint8_t *methods_, size_t num_methods_);
+
+ uint8_t methods [255];
+ const size_t num_methods;
+ };
+
+ class socks_greeting_encoder_t
+ {
+ public:
+ socks_greeting_encoder_t ();
+ void encode (const socks_greeting_t &greeting_);
+ int output (fd_t fd_);
+ bool has_pending_data () const;
+ void reset ();
+
+ private:
+ size_t bytes_encoded;
+ size_t bytes_written;
+ uint8_t buf [2 + 255];
+ };
+
+ struct socks_choice_t
+ {
+ socks_choice_t (uint8_t method_);
+
+ uint8_t method;
+ };
+
+ class socks_choice_decoder_t
+ {
+ public:
+ socks_choice_decoder_t ();
+ int input (fd_t fd_);
+ bool message_ready () const;
+ socks_choice_t decode ();
+ void reset ();
+
+ private:
+ unsigned char buf [2];
+ size_t bytes_read;
+ };
+
+ struct socks_request_t
+ {
+ socks_request_t (
+ uint8_t command_, std::string hostname_, uint16_t port_);
+
+ const uint8_t command;
+ const std::string hostname;
+ const uint16_t port;
+ };
+
+ class socks_request_encoder_t
+ {
+ public:
+ socks_request_encoder_t ();
+ void encode (const socks_request_t &req);
+ int output (fd_t fd_);
+ bool has_pending_data () const;
+ void reset ();
+
+ private:
+ size_t bytes_encoded;
+ size_t bytes_written;
+ uint8_t buf [4 + 256 + 2];
+ };
+
+ struct socks_response_t
+ {
+ socks_response_t (
+ uint8_t response_code_, std::string address_, uint16_t port_);
+ uint8_t response_code;
+ std::string address;
+ uint16_t port;
+ };
+
+ class socks_response_decoder_t
+ {
+ public:
+ socks_response_decoder_t ();
+ int input (fd_t fd_);
+ bool message_ready () const;
+ socks_response_t decode ();
+ void reset ();
+
+ private:
+ uint8_t buf [4 + 256 + 2];
+ size_t bytes_read;
+ };
+
+}
+
+#endif
diff --git a/src/socks_connecter.cpp b/src/socks_connecter.cpp
new file mode 100644
index 00000000..7527da48
--- /dev/null
+++ b/src/socks_connecter.cpp
@@ -0,0 +1,467 @@
+/*
+ Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#include
+#include
+
+#include "socks_connecter.hpp"
+#include "stream_engine.hpp"
+#include "platform.hpp"
+#include "random.hpp"
+#include "err.hpp"
+#include "ip.hpp"
+#include "tcp.hpp"
+#include "address.hpp"
+#include "tcp_address.hpp"
+#include "session_base.hpp"
+#include "socks.hpp"
+
+#ifdef ZMQ_HAVE_WINDOWS
+#include "windows.hpp"
+#else
+#include
+#include
+#include
+#endif
+
+zmq::socks_connecter_t::socks_connecter_t (class io_thread_t *io_thread_,
+ class session_base_t *session_, const options_t &options_,
+ address_t *addr_, address_t *proxy_addr_, bool delayed_start_) :
+ own_t (io_thread_, options_),
+ io_object_t (io_thread_),
+ addr (addr_),
+ proxy_addr (proxy_addr_),
+ status (unplugged),
+ s (retired_fd),
+ delayed_start (delayed_start_),
+ session (session_),
+ current_reconnect_ivl (options.reconnect_ivl)
+{
+ zmq_assert (addr);
+ zmq_assert (addr->protocol == "tcp");
+ proxy_addr->to_string (endpoint);
+ socket = session->get_socket ();
+}
+
+zmq::socks_connecter_t::~socks_connecter_t ()
+{
+ zmq_assert (s == retired_fd);
+}
+
+void zmq::socks_connecter_t::process_plug ()
+{
+ if (delayed_start)
+ start_timer ();
+ else
+ initiate_connect ();
+}
+
+void zmq::socks_connecter_t::process_term (int linger_)
+{
+ switch (status) {
+ case unplugged:
+ break;
+ case waiting_for_reconnect_time:
+ cancel_timer (reconnect_timer_id);
+ break;
+ case waiting_for_proxy_connection:
+ case sending_greeting:
+ case waiting_for_choice:
+ case sending_request:
+ case waiting_for_response:
+ rm_fd (handle);
+ if (s != -1)
+ close ();
+ break;
+ }
+
+ own_t::process_term (linger_);
+}
+
+void zmq::socks_connecter_t::in_event ()
+{
+ zmq_assert (status != unplugged
+ && status != waiting_for_reconnect_time);
+
+ if (status == waiting_for_choice) {
+ const int rc = choice_decoder.input (s);
+ if (rc == 0 || rc == -1)
+ error ();
+ else
+ if (choice_decoder.message_ready ()) {
+ const socks_choice_t choice = choice_decoder.decode ();
+ const int rc = process_server_response (choice);
+ if (rc == -1)
+ error ();
+ else {
+ std::string hostname = "";
+ uint16_t port = 0;
+ if (parse_address (addr->address, hostname, port) == -1)
+ error ();
+ else {
+ request_encoder.encode (
+ socks_request_t (1, hostname, port));
+ reset_pollin (handle);
+ set_pollout (handle);
+ status = sending_request;
+ }
+ }
+ }
+ }
+ else
+ if (status == waiting_for_response) {
+ const int rc = response_decoder.input (s);
+ if (rc == 0 || rc == -1)
+ error ();
+ else
+ if (response_decoder.message_ready ()) {
+ const socks_response_t response = response_decoder.decode ();
+ const int rc = process_server_response (response);
+ if (rc == -1)
+ error ();
+ else {
+ // Remember our fd for ZMQ_SRCFD in messages
+ socket->set_fd (s);
+
+ // Create the engine object for this connection.
+ stream_engine_t *engine = new (std::nothrow)
+ stream_engine_t (s, options, endpoint);
+ alloc_assert (engine);
+
+ // Attach the engine to the corresponding session object.
+ send_attach (session, engine);
+
+ socket->event_connected (endpoint, s);
+
+ rm_fd (handle);
+ s = -1;
+ status = unplugged;
+
+ // Shut the connecter down.
+ terminate ();
+ }
+ }
+ }
+ else
+ error ();
+}
+
+void zmq::socks_connecter_t::out_event ()
+{
+ zmq_assert (status == waiting_for_proxy_connection
+ || status == sending_greeting
+ || status == sending_request);
+
+ if (status == waiting_for_proxy_connection) {
+ const int rc = check_proxy_connection ();
+ if (rc == -1)
+ error ();
+ else {
+ greeting_encoder.encode (
+ socks_greeting_t (socks_no_auth_required));
+ status = sending_greeting;
+ }
+ }
+ else
+ if (status == sending_greeting) {
+ zmq_assert (greeting_encoder.has_pending_data ());
+ const int rc = greeting_encoder.output (s);
+ if (rc == -1 || rc == 0)
+ error ();
+ else
+ if (!greeting_encoder.has_pending_data ()) {
+ reset_pollout (handle);
+ set_pollin (handle);
+ status = waiting_for_choice;
+ }
+ }
+ else {
+ zmq_assert (request_encoder.has_pending_data ());
+ const int rc = request_encoder.output (s);
+ if (rc == -1 || rc == 0)
+ error ();
+ else
+ if (!request_encoder.has_pending_data ()) {
+ reset_pollout (handle);
+ set_pollin (handle);
+ status = waiting_for_response;
+ }
+ }
+}
+
+void zmq::socks_connecter_t::initiate_connect ()
+{
+ // Open the connecting socket.
+ const int rc = connect_to_proxy ();
+
+ // Connect may succeed in synchronous manner.
+ if (rc == 0) {
+ handle = add_fd (s);
+ set_pollout (handle);
+ status = sending_greeting;
+ }
+ // Connection establishment may be delayed. Poll for its completion.
+ else
+ if (errno == EINPROGRESS) {
+ handle = add_fd (s);
+ set_pollout (handle);
+ status = waiting_for_proxy_connection;
+ socket->event_connect_delayed (endpoint, zmq_errno ());
+ }
+ // Handle any other error condition by eventual reconnect.
+ else {
+ if (s != -1)
+ close ();
+ start_timer ();
+ }
+}
+
+int zmq::socks_connecter_t::process_server_response (
+ const socks_choice_t &response)
+{
+ // We do not support any authentication method for now.
+ return response.method == 0? 0: -1;
+}
+
+int zmq::socks_connecter_t::process_server_response (
+ const socks_response_t &response)
+{
+ return response.response_code == 0? 0: -1;
+}
+
+void zmq::socks_connecter_t::timer_event (int id_)
+{
+ zmq_assert (status == waiting_for_reconnect_time);
+ zmq_assert (id_ == reconnect_timer_id);
+ initiate_connect ();
+}
+
+void zmq::socks_connecter_t::error ()
+{
+ rm_fd (handle);
+ close ();
+ greeting_encoder.reset ();
+ choice_decoder.reset ();
+ request_encoder.reset ();
+ response_decoder.reset ();
+ start_timer ();
+}
+
+void zmq::socks_connecter_t::start_timer ()
+{
+ const int interval = get_new_reconnect_ivl ();
+ add_timer (interval, reconnect_timer_id);
+ status = waiting_for_reconnect_time;
+ socket->event_connect_retried (endpoint, interval);
+}
+
+int zmq::socks_connecter_t::get_new_reconnect_ivl ()
+{
+ // The new interval is the current interval + random value.
+ const int interval = current_reconnect_ivl +
+ generate_random () % options.reconnect_ivl;
+
+ // Only change the current reconnect interval if the maximum reconnect
+ // interval was set and if it's larger than the reconnect interval.
+ if (options.reconnect_ivl_max > 0 &&
+ options.reconnect_ivl_max > options.reconnect_ivl)
+ // Calculate the next interval
+ current_reconnect_ivl =
+ std::min (current_reconnect_ivl * 2, options.reconnect_ivl_max);
+ return interval;
+}
+
+int zmq::socks_connecter_t::connect_to_proxy ()
+{
+ zmq_assert (s == retired_fd);
+
+ // Resolve the address
+ delete proxy_addr->resolved.tcp_addr;
+ proxy_addr->resolved.tcp_addr = new (std::nothrow) tcp_address_t ();
+ alloc_assert (proxy_addr->resolved.tcp_addr);
+
+ int rc = proxy_addr->resolved.tcp_addr->resolve (
+ proxy_addr->address.c_str (), false, options.ipv6);
+ if (rc != 0) {
+ delete proxy_addr->resolved.tcp_addr;
+ proxy_addr->resolved.tcp_addr = NULL;
+ return -1;
+ }
+ zmq_assert (proxy_addr->resolved.tcp_addr != NULL);
+ const tcp_address_t *tcp_addr = proxy_addr->resolved.tcp_addr;
+
+ // Create the socket.
+ s = open_socket (tcp_addr->family (), SOCK_STREAM, IPPROTO_TCP);
+#ifdef ZMQ_HAVE_WINDOWS
+ if (s == INVALID_SOCKET)
+ return -1;
+#else
+ if (s == -1)
+ return -1;
+#endif
+
+ // On some systems, IPv4 mapping in IPv6 sockets is disabled by default.
+ // Switch it on in such cases.
+ if (tcp_addr->family () == AF_INET6)
+ enable_ipv4_mapping (s);
+
+ // Set the IP Type-Of-Service priority for this socket
+ if (options.tos != 0)
+ set_ip_type_of_service (s, options.tos);
+
+ // Set the socket to non-blocking mode so that we get async connect().
+ unblock_socket (s);
+
+ // Set the socket buffer limits for the underlying socket.
+ if (options.sndbuf != 0)
+ set_tcp_send_buffer (s, options.sndbuf);
+ if (options.rcvbuf != 0)
+ set_tcp_receive_buffer (s, options.rcvbuf);
+
+ // Set the IP Type-Of-Service for the underlying socket
+ if (options.tos != 0)
+ set_ip_type_of_service (s, options.tos);
+
+ // Set a source address for conversations
+ if (tcp_addr->has_src_addr ()) {
+ rc = ::bind (s, tcp_addr->src_addr (), tcp_addr->src_addrlen ());
+ if (rc == -1) {
+ close ();
+ return -1;
+ }
+ }
+
+ // Connect to the remote peer.
+ rc = ::connect (s, tcp_addr->addr (), tcp_addr->addrlen ());
+
+ // Connect was successfull immediately.
+ if (rc == 0)
+ return 0;
+
+ // Translate error codes indicating asynchronous connect has been
+ // launched to a uniform EINPROGRESS.
+#ifdef ZMQ_HAVE_WINDOWS
+ const int error_code = WSAGetLastError ();
+ if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK)
+ errno = EINPROGRESS;
+ else {
+ errno = wsa_error_to_errno (error_code);
+ close ();
+ }
+#else
+ if (errno == EINTR)
+ errno = EINPROGRESS;
+#endif
+ return -1;
+}
+
+zmq::fd_t zmq::socks_connecter_t::check_proxy_connection ()
+{
+ // Async connect has finished. Check whether an error occurred
+ int err = 0;
+#ifdef ZMQ_HAVE_HPUX
+ int len = sizeof err;
+#else
+ socklen_t len = sizeof err;
+#endif
+
+ const int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len);
+
+ // Assert if the error was caused by 0MQ bug.
+ // Networking problems are OK. No need to assert.
+#ifdef ZMQ_HAVE_WINDOWS
+ zmq_assert (rc == 0);
+ if (err != 0) {
+ wsa_assert (err == WSAECONNREFUSED
+ || err == WSAETIMEDOUT
+ || err == WSAECONNABORTED
+ || err == WSAEHOSTUNREACH
+ || err == WSAENETUNREACH
+ || err == WSAENETDOWN
+ || err == WSAEACCES
+ || err == WSAEINVAL
+ || err == WSAEADDRINUSE);
+ return -1;
+ }
+#else
+ // Following code should handle both Berkeley-derived socket
+ // implementations and Solaris.
+ if (rc == -1)
+ err = errno;
+ if (err != 0) {
+ errno = err;
+ errno_assert (
+ errno == ECONNREFUSED ||
+ errno == ECONNRESET ||
+ errno == ETIMEDOUT ||
+ errno == EHOSTUNREACH ||
+ errno == ENETUNREACH ||
+ errno == ENETDOWN ||
+ errno == EINVAL);
+ return -1;
+ }
+#endif
+
+ tune_tcp_socket (s);
+ tune_tcp_keepalives (s, options.tcp_keepalive, options.tcp_keepalive_cnt,
+ options.tcp_keepalive_idle, options.tcp_keepalive_intvl);
+
+ return 0;
+}
+
+void zmq::socks_connecter_t::close ()
+{
+ zmq_assert (s != retired_fd);
+#ifdef ZMQ_HAVE_WINDOWS
+ const int rc = closesocket (s);
+ wsa_assert (rc != SOCKET_ERROR);
+#else
+ const int rc = ::close (s);
+ errno_assert (rc == 0);
+#endif
+ socket->event_closed (endpoint, s);
+ s = retired_fd;
+}
+
+int zmq::socks_connecter_t::parse_address (
+ const std::string &address_, std::string &hostname_, uint16_t &port_)
+{
+ // Find the ':' at end that separates address from the port number.
+ const size_t idx = address_.rfind (':');
+ if (idx == std::string::npos) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ // Extract hostname
+ if (idx < 2 || address_ [0] != '[' || address_ [idx - 1] != ']')
+ hostname_ = address_.substr (0, idx);
+ else
+ hostname_ = address_.substr (1, idx - 2);
+
+ // Separate the hostname/port.
+ const std::string port_str = address_.substr (idx + 1);
+ // Parse the port number (0 is not a valid port).
+ port_ = (uint16_t) atoi (port_str.c_str ());
+ if (port_ == 0) {
+ errno = EINVAL;
+ return -1;
+ }
+ return 0;
+}
diff --git a/src/socks_connecter.hpp b/src/socks_connecter.hpp
new file mode 100644
index 00000000..3313289e
--- /dev/null
+++ b/src/socks_connecter.hpp
@@ -0,0 +1,153 @@
+/*
+ Copyright (c) 2007-2014 Contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see .
+*/
+
+#ifndef __SOCKS_CONNECTER_HPP_INCLUDED__
+#define __SOCKS_CONNECTER_HPP_INCLUDED__
+
+#include "fd.hpp"
+#include "io_object.hpp"
+#include "own.hpp"
+#include "stdint.hpp"
+#include "../include/zmq.h"
+#include "socks.hpp"
+
+namespace zmq
+{
+
+ class io_thread_t;
+ class session_base_t;
+ struct address_t;
+
+ class socks_connecter_t : public own_t, public io_object_t
+ {
+ public:
+
+ // If 'delayed_start' is true connecter first waits for a while,
+ // then starts connection process.
+ socks_connecter_t (zmq::io_thread_t *io_thread_,
+ zmq::session_base_t *session_, const options_t &options_,
+ address_t *addr_, address_t *proxy_addr_, bool delayed_start_);
+ ~socks_connecter_t ();
+
+ private:
+ enum {
+ unplugged,
+ waiting_for_reconnect_time,
+ waiting_for_proxy_connection,
+ sending_greeting,
+ waiting_for_choice,
+ sending_request,
+ waiting_for_response
+ };
+
+ // ID of the timer used to delay the reconnection.
+ enum { reconnect_timer_id = 1 };
+
+ // Method ID
+ enum { socks_no_auth_required = 0 };
+
+ // Handlers for incoming commands.
+ virtual void process_plug ();
+ virtual void process_term (int linger_);
+
+ // Handlers for I/O events.
+ virtual void in_event ();
+ virtual void out_event ();
+ virtual void timer_event (int id_);
+
+ // Internal function to start the actual connection establishment.
+ void initiate_connect ();
+
+ int process_server_response (const socks_choice_t &response);
+ int process_server_response (const socks_response_t &response);
+
+ int parse_address (const std::string &address_,
+ std::string &hostname_, uint16_t &port_);
+
+ int connect_to_proxy ();
+
+ void error ();
+
+ // Internal function to start reconnect timer
+ void start_timer ();
+
+ // Internal function to return a reconnect backoff delay.
+ // Will modify the current_reconnect_ivl used for next call
+ // Returns the currently used interval
+ int get_new_reconnect_ivl ();
+
+ // Open TCP connecting socket. Returns -1 in case of error,
+ // 0 if connect was successfull immediately. Returns -1 with
+ // EAGAIN errno if async connect was launched.
+ int open ();
+
+ // Close the connecting socket.
+ void close ();
+
+ // Get the file descriptor of newly created connection. Returns
+ // retired_fd if the connection was unsuccessfull.
+ int check_proxy_connection ();
+
+ socks_greeting_encoder_t greeting_encoder;
+ socks_choice_decoder_t choice_decoder;
+ socks_request_encoder_t request_encoder;
+ socks_response_decoder_t response_decoder;
+
+ // Address to connect to. Owned by session_base_t.
+ address_t *addr;
+
+ address_t *proxy_addr;
+
+ int status;
+
+ // Underlying socket.
+ fd_t s;
+
+ // Handle corresponding to the listening socket.
+ handle_t handle;
+
+ // If true file descriptor is registered with the poller and 'handle'
+ // contains valid value.
+ bool handle_valid;
+
+ // If true, connecter is waiting a while before trying to connect.
+ const bool delayed_start;
+
+ // True iff a timer has been started.
+ bool timer_started;
+
+ // Reference to the session we belong to.
+ zmq::session_base_t *session;
+
+ // Current reconnect ivl, updated for backoff strategy
+ int current_reconnect_ivl;
+
+ // String representation of endpoint to connect to
+ std::string endpoint;
+
+ // Socket
+ zmq::socket_base_t *socket;
+
+ socks_connecter_t (const socks_connecter_t&);
+ const socks_connecter_t &operator = (const socks_connecter_t&);
+ };
+
+}
+
+#endif
diff --git a/src/stream_engine.cpp b/src/stream_engine.cpp
index 4c521dea..759fb051 100644
--- a/src/stream_engine.cpp
+++ b/src/stream_engine.cpp
@@ -54,6 +54,7 @@
#include "config.hpp"
#include "err.hpp"
#include "ip.hpp"
+#include "tcp.hpp"
#include "likely.hpp"
#include "wire.hpp"
@@ -272,7 +273,7 @@ void zmq::stream_engine_t::in_event ()
size_t bufsize = 0;
decoder->get_buffer (&inpos, &bufsize);
- int const rc = read (inpos, bufsize);
+ const int rc = tcp_read (s, inpos, bufsize);
if (rc == 0) {
error (connection_error);
return;
@@ -359,7 +360,7 @@ void zmq::stream_engine_t::out_event ()
// arbitrarily large. However, we assume that underlying TCP layer has
// limited transmission buffer and thus the actual number of bytes
// written should be reasonably modest.
- int nbytes = write (outpos, outsize);
+ const int nbytes = tcp_write (s, outpos, outsize);
// IO error has occurred. We stop waiting for output events.
// The engine is not terminated until we detect input error;
@@ -448,8 +449,8 @@ bool zmq::stream_engine_t::handshake ()
zmq_assert (greeting_bytes_read < greeting_size);
// Receive the greeting.
while (greeting_bytes_read < greeting_size) {
- const int n = read (greeting_recv + greeting_bytes_read,
- greeting_size - greeting_bytes_read);
+ const int n = tcp_read (s, greeting_recv + greeting_bytes_read,
+ greeting_size - greeting_bytes_read);
if (n == 0) {
error (connection_error);
return false;
@@ -892,107 +893,6 @@ void zmq::stream_engine_t::error (error_reason_t reason)
delete this;
}
-int zmq::stream_engine_t::write (const void *data_, size_t size_)
-{
-#ifdef ZMQ_HAVE_WINDOWS
-
- int nbytes = send (s, (char*) data_, (int) size_, 0);
-
- // If not a single byte can be written to the socket in non-blocking mode
- // we'll get an error (this may happen during the speculative write).
- if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
- return 0;
-
- // Signalise peer failure.
- if (nbytes == SOCKET_ERROR && (
- WSAGetLastError () == WSAENETDOWN ||
- WSAGetLastError () == WSAENETRESET ||
- WSAGetLastError () == WSAEHOSTUNREACH ||
- WSAGetLastError () == WSAECONNABORTED ||
- WSAGetLastError () == WSAETIMEDOUT ||
- WSAGetLastError () == WSAECONNRESET))
- return -1;
-
- wsa_assert (nbytes != SOCKET_ERROR);
- return nbytes;
-
-#else
- ssize_t nbytes = send (s, data_, size_, 0);
-
- // Several errors are OK. When speculative write is being done we may not
- // be able to write a single byte from the socket. Also, SIGSTOP issued
- // by a debugging tool can result in EINTR error.
- if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
- errno == EINTR))
- return 0;
-
- // Signalise peer failure.
- if (nbytes == -1) {
- errno_assert (errno != EACCES
- && errno != EBADF
- && errno != EDESTADDRREQ
- && errno != EFAULT
- && errno != EINVAL
- && errno != EISCONN
- && errno != EMSGSIZE
- && errno != ENOMEM
- && errno != ENOTSOCK
- && errno != EOPNOTSUPP);
- return -1;
- }
-
- return static_cast (nbytes);
-
-#endif
-}
-
-int zmq::stream_engine_t::read (void *data_, size_t size_)
-{
-#ifdef ZMQ_HAVE_WINDOWS
-
- const int rc = recv (s, (char*) data_, (int) size_, 0);
-
- // If not a single byte can be read from the socket in non-blocking mode
- // we'll get an error (this may happen during the speculative read).
- if (rc == SOCKET_ERROR) {
- if (WSAGetLastError () == WSAEWOULDBLOCK)
- errno = EAGAIN;
- else {
- wsa_assert (WSAGetLastError () == WSAENETDOWN
- || WSAGetLastError () == WSAENETRESET
- || WSAGetLastError () == WSAECONNABORTED
- || WSAGetLastError () == WSAETIMEDOUT
- || WSAGetLastError () == WSAECONNRESET
- || WSAGetLastError () == WSAECONNREFUSED
- || WSAGetLastError () == WSAENOTCONN);
- errno = wsa_error_to_errno (WSAGetLastError ());
- }
- }
-
- return rc == SOCKET_ERROR? -1: rc;
-
-#else
-
- const ssize_t rc = recv (s, data_, size_, 0);
-
- // Several errors are OK. When speculative read is being done we may not
- // be able to read a single byte from the socket. Also, SIGSTOP issued
- // by a debugging tool can result in EINTR error.
- if (rc == -1) {
- errno_assert (errno != EBADF
- && errno != EFAULT
- && errno != EINVAL
- && errno != ENOMEM
- && errno != ENOTSOCK);
- if (errno == EWOULDBLOCK || errno == EINTR)
- errno = EAGAIN;
- }
-
- return static_cast (rc);
-
-#endif
-}
-
void zmq::stream_engine_t::set_handshake_timer ()
{
zmq_assert (!has_handshake_timer);
diff --git a/src/stream_engine.hpp b/src/stream_engine.hpp
index ccee2693..c2448aa1 100644
--- a/src/stream_engine.hpp
+++ b/src/stream_engine.hpp
@@ -92,16 +92,6 @@ namespace zmq
// Detects the protocol used by the peer.
bool handshake ();
- // Writes data to the socket. Returns the number of bytes actually
- // written (even zero is to be considered to be a success). In case
- // of error or orderly shutdown by the other peer -1 is returned.
- int write (const void *data_, size_t size_);
-
- // Reads data from the socket (up to 'size' bytes).
- // Returns the number of bytes actually read or -1 on error.
- // Zero indicates the peer has closed the connection.
- int read (void *data_, size_t size_);
-
int identity_msg (msg_t *msg_);
int process_identity_msg (msg_t *msg_);
diff --git a/src/tcp.cpp b/src/tcp.cpp
index 736669fa..3d646e42 100755
--- a/src/tcp.cpp
+++ b/src/tcp.cpp
@@ -141,3 +141,104 @@ void zmq::tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int
#endif // ZMQ_HAVE_SO_KEEPALIVE
#endif // ZMQ_HAVE_WINDOWS
}
+
+int zmq::tcp_write (fd_t s_, const void *data_, size_t size_)
+{
+#ifdef ZMQ_HAVE_WINDOWS
+
+ int nbytes = send (s_, (char*) data_, (int) size_, 0);
+
+ // If not a single byte can be written to the socket in non-blocking mode
+ // we'll get an error (this may happen during the speculative write).
+ if (nbytes == SOCKET_ERROR && WSAGetLastError () == WSAEWOULDBLOCK)
+ return 0;
+
+ // Signalise peer failure.
+ if (nbytes == SOCKET_ERROR && (
+ WSAGetLastError () == WSAENETDOWN ||
+ WSAGetLastError () == WSAENETRESET ||
+ WSAGetLastError () == WSAEHOSTUNREACH ||
+ WSAGetLastError () == WSAECONNABORTED ||
+ WSAGetLastError () == WSAETIMEDOUT ||
+ WSAGetLastError () == WSAECONNRESET))
+ return -1;
+
+ wsa_assert (nbytes != SOCKET_ERROR);
+ return nbytes;
+
+#else
+ ssize_t nbytes = send (s_, data_, size_, 0);
+
+ // Several errors are OK. When speculative write is being done we may not
+ // be able to write a single byte from the socket. Also, SIGSTOP issued
+ // by a debugging tool can result in EINTR error.
+ if (nbytes == -1 && (errno == EAGAIN || errno == EWOULDBLOCK ||
+ errno == EINTR))
+ return 0;
+
+ // Signalise peer failure.
+ if (nbytes == -1) {
+ errno_assert (errno != EACCES
+ && errno != EBADF
+ && errno != EDESTADDRREQ
+ && errno != EFAULT
+ && errno != EINVAL
+ && errno != EISCONN
+ && errno != EMSGSIZE
+ && errno != ENOMEM
+ && errno != ENOTSOCK
+ && errno != EOPNOTSUPP);
+ return -1;
+ }
+
+ return static_cast (nbytes);
+
+#endif
+}
+
+int zmq::tcp_read (fd_t s_, void *data_, size_t size_)
+{
+#ifdef ZMQ_HAVE_WINDOWS
+
+ const int rc = recv (s_, (char*) data_, (int) size_, 0);
+
+ // If not a single byte can be read from the socket in non-blocking mode
+ // we'll get an error (this may happen during the speculative read).
+ if (rc == SOCKET_ERROR) {
+ if (WSAGetLastError () == WSAEWOULDBLOCK)
+ errno = EAGAIN;
+ else {
+ wsa_assert (WSAGetLastError () == WSAENETDOWN
+ || WSAGetLastError () == WSAENETRESET
+ || WSAGetLastError () == WSAECONNABORTED
+ || WSAGetLastError () == WSAETIMEDOUT
+ || WSAGetLastError () == WSAECONNRESET
+ || WSAGetLastError () == WSAECONNREFUSED
+ || WSAGetLastError () == WSAENOTCONN);
+ errno = wsa_error_to_errno (WSAGetLastError ());
+ }
+ }
+
+ return rc == SOCKET_ERROR? -1: rc;
+
+#else
+
+ const ssize_t rc = recv (s_, data_, size_, 0);
+
+ // Several errors are OK. When speculative read is being done we may not
+ // be able to read a single byte from the socket. Also, SIGSTOP issued
+ // by a debugging tool can result in EINTR error.
+ if (rc == -1) {
+ errno_assert (errno != EBADF
+ && errno != EFAULT
+ && errno != EINVAL
+ && errno != ENOMEM
+ && errno != ENOTSOCK);
+ if (errno == EWOULDBLOCK || errno == EINTR)
+ errno = EAGAIN;
+ }
+
+ return static_cast (rc);
+
+#endif
+}
diff --git a/src/tcp.hpp b/src/tcp.hpp
index e5b5daca..d2856ecf 100644
--- a/src/tcp.hpp
+++ b/src/tcp.hpp
@@ -37,6 +37,16 @@ namespace zmq
// Tunes TCP keep-alives
void tune_tcp_keepalives (fd_t s_, int keepalive_, int keepalive_cnt_, int keepalive_idle_, int keepalive_intvl_);
+ // Writes data to the socket. Returns the number of bytes actually
+ // written (even zero is to be considered to be a success). In case
+ // of error or orderly shutdown by the other peer -1 is returned.
+ int tcp_write (fd_t s_, const void *data_, size_t size_);
+
+ // Reads data from the socket (up to 'size' bytes).
+ // Returns the number of bytes actually read or -1 on error.
+ // Zero indicates the peer has closed the connection.
+ int tcp_read (fd_t s_, void *data_, size_t size_);
+
}
#endif