mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-14 01:37:56 +08:00
Problem: VMCI build broken
Solution: refactor it
This commit is contained in:
parent
38bb82b0c8
commit
8fe5b54b8a
21
src/vmci.cpp
21
src/vmci.cpp
@ -28,7 +28,9 @@
|
|||||||
*/
|
*/
|
||||||
#include "precompiled.hpp"
|
#include "precompiled.hpp"
|
||||||
|
|
||||||
|
#include "ip.hpp"
|
||||||
#include "vmci.hpp"
|
#include "vmci.hpp"
|
||||||
|
#include "vmci_address.hpp"
|
||||||
|
|
||||||
#if defined ZMQ_HAVE_VMCI
|
#if defined ZMQ_HAVE_VMCI
|
||||||
|
|
||||||
@ -97,4 +99,23 @@ void zmq::tune_vmci_connect_timeout (ctx_t *context_,
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
zmq::fd_t zmq::vmci_open_socket (const char *address_,
|
||||||
|
const zmq::options_t &options_,
|
||||||
|
zmq::vmci_address_t *out_vmci_addr_)
|
||||||
|
{
|
||||||
|
// Convert the textual address into address structure.
|
||||||
|
int rc = out_vmci_addr_->resolve (address_);
|
||||||
|
if (rc != 0)
|
||||||
|
return retired_fd;
|
||||||
|
|
||||||
|
// Create the socket.
|
||||||
|
fd_t s = open_socket (out_vmci_addr_->family (), SOCK_STREAM, 0);
|
||||||
|
|
||||||
|
if (s == retired_fd) {
|
||||||
|
return retired_fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -59,6 +59,10 @@ void tune_vmci_connect_timeout (ctx_t *context_,
|
|||||||
fd_t sockfd_,
|
fd_t sockfd_,
|
||||||
struct timeval timeout_);
|
struct timeval timeout_);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
fd_t vmci_open_socket (const char *address_,
|
||||||
|
const options_t &options_,
|
||||||
|
vmci_address_t *out_vmci_addr_);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -39,6 +39,11 @@
|
|||||||
|
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
|
|
||||||
|
zmq::vmci_address_t::vmci_address_t ()
|
||||||
|
{
|
||||||
|
memset (&address, 0, sizeof address);
|
||||||
|
}
|
||||||
|
|
||||||
zmq::vmci_address_t::vmci_address_t (ctx_t *parent_) : parent (parent_)
|
zmq::vmci_address_t::vmci_address_t (ctx_t *parent_) : parent (parent_)
|
||||||
{
|
{
|
||||||
memset (&address, 0, sizeof address);
|
memset (&address, 0, sizeof address);
|
||||||
@ -56,10 +61,6 @@ zmq::vmci_address_t::vmci_address_t (const sockaddr *sa,
|
|||||||
memcpy (&address, sa, sa_len);
|
memcpy (&address, sa, sa_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::vmci_address_t::~vmci_address_t ()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
|
|
||||||
int zmq::vmci_address_t::resolve (const char *path_)
|
int zmq::vmci_address_t::resolve (const char *path_)
|
||||||
{
|
{
|
||||||
// Find the ':' at end that separates address from the port number.
|
// Find the ':' at end that separates address from the port number.
|
||||||
@ -125,7 +126,7 @@ int zmq::vmci_address_t::resolve (const char *path_)
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::vmci_address_t::to_string (std::string &addr_)
|
int zmq::vmci_address_t::to_string (std::string &addr_) const
|
||||||
{
|
{
|
||||||
if (address.svm_family != parent->get_vmci_socket_family ()) {
|
if (address.svm_family != parent->get_vmci_socket_family ()) {
|
||||||
addr_.clear ();
|
addr_.clear ();
|
||||||
@ -164,4 +165,13 @@ socklen_t zmq::vmci_address_t::addrlen () const
|
|||||||
return static_cast<socklen_t> (sizeof address);
|
return static_cast<socklen_t> (sizeof address);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
|
unsigned short zmq::vmci_address_t::family () const
|
||||||
|
#else
|
||||||
|
sa_family_t zmq::vmci_address_t::family () const
|
||||||
|
#endif
|
||||||
|
{
|
||||||
|
return parent->get_vmci_socket_family ();
|
||||||
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
@ -43,16 +43,21 @@ namespace zmq
|
|||||||
class vmci_address_t
|
class vmci_address_t
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
|
vmci_address_t ();
|
||||||
vmci_address_t (ctx_t *parent_);
|
vmci_address_t (ctx_t *parent_);
|
||||||
vmci_address_t (const sockaddr *sa, socklen_t sa_len, ctx_t *parent_);
|
vmci_address_t (const sockaddr *sa, socklen_t sa_len, ctx_t *parent_);
|
||||||
~vmci_address_t ();
|
|
||||||
|
|
||||||
// This function sets up the address for VMCI transport.
|
// This function sets up the address for VMCI transport.
|
||||||
int resolve (const char *path_);
|
int resolve (const char *path_);
|
||||||
|
|
||||||
// The opposite to resolve()
|
// The opposite to resolve()
|
||||||
int to_string (std::string &addr_);
|
int to_string (std::string &addr_) const;
|
||||||
|
|
||||||
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
|
unsigned short family () const;
|
||||||
|
#else
|
||||||
|
sa_family_t family () const;
|
||||||
|
#endif
|
||||||
const sockaddr *addr () const;
|
const sockaddr *addr () const;
|
||||||
socklen_t addrlen () const;
|
socklen_t addrlen () const;
|
||||||
|
|
||||||
@ -60,8 +65,6 @@ class vmci_address_t
|
|||||||
struct sockaddr_vm address;
|
struct sockaddr_vm address;
|
||||||
ctx_t *parent;
|
ctx_t *parent;
|
||||||
|
|
||||||
vmci_address_t ();
|
|
||||||
|
|
||||||
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_address_t)
|
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_address_t)
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
@ -35,69 +35,41 @@
|
|||||||
|
|
||||||
#include <new>
|
#include <new>
|
||||||
|
|
||||||
#include "stream_engine.hpp"
|
|
||||||
#include "io_thread.hpp"
|
#include "io_thread.hpp"
|
||||||
#include "platform.hpp"
|
#include "platform.hpp"
|
||||||
#include "random.hpp"
|
#include "random.hpp"
|
||||||
#include "err.hpp"
|
#include "err.hpp"
|
||||||
#include "ip.hpp"
|
#include "ip.hpp"
|
||||||
#include "address.hpp"
|
#include "address.hpp"
|
||||||
#include "session_base.hpp"
|
|
||||||
#include "vmci_address.hpp"
|
#include "vmci_address.hpp"
|
||||||
#include "vmci.hpp"
|
#include "vmci.hpp"
|
||||||
|
#include "session_base.hpp"
|
||||||
|
|
||||||
zmq::vmci_connecter_t::vmci_connecter_t (class io_thread_t *io_thread_,
|
zmq::vmci_connecter_t::vmci_connecter_t (class io_thread_t *io_thread_,
|
||||||
class session_base_t *session_,
|
class session_base_t *session_,
|
||||||
const options_t &options_,
|
const options_t &options_,
|
||||||
const address_t *addr_,
|
address_t *addr_,
|
||||||
bool delayed_start_) :
|
bool delayed_start_) :
|
||||||
own_t (io_thread_, options_),
|
stream_connecter_base_t (
|
||||||
io_object_t (io_thread_),
|
io_thread_, session_, options_, addr_, delayed_start_),
|
||||||
addr (addr_),
|
_connect_timer_started (false)
|
||||||
s (retired_fd),
|
|
||||||
handle_valid (false),
|
|
||||||
delayed_start (delayed_start_),
|
|
||||||
timer_started (false),
|
|
||||||
session (session_),
|
|
||||||
current_reconnect_ivl (options.reconnect_ivl)
|
|
||||||
{
|
{
|
||||||
zmq_assert (addr);
|
zmq_assert (_addr->protocol == protocol_name::vmci);
|
||||||
zmq_assert (addr->protocol == "vmci");
|
|
||||||
addr->to_string (endpoint);
|
|
||||||
socket = session->get_socket ();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::vmci_connecter_t::~vmci_connecter_t ()
|
zmq::vmci_connecter_t::~vmci_connecter_t ()
|
||||||
{
|
{
|
||||||
zmq_assert (!timer_started);
|
zmq_assert (!_connect_timer_started);
|
||||||
zmq_assert (!handle_valid);
|
|
||||||
zmq_assert (s == retired_fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::vmci_connecter_t::process_plug ()
|
|
||||||
{
|
|
||||||
if (delayed_start)
|
|
||||||
add_reconnect_timer ();
|
|
||||||
else
|
|
||||||
start_connecting ();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::vmci_connecter_t::process_term (int linger_)
|
void zmq::vmci_connecter_t::process_term (int linger_)
|
||||||
{
|
{
|
||||||
if (timer_started) {
|
if (_connect_timer_started) {
|
||||||
cancel_timer (reconnect_timer_id);
|
cancel_timer (connect_timer_id);
|
||||||
timer_started = false;
|
_connect_timer_started = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle_valid) {
|
stream_connecter_base_t::process_term (linger_);
|
||||||
rm_fd (handle);
|
|
||||||
handle_valid = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (s != retired_fd)
|
|
||||||
close ();
|
|
||||||
|
|
||||||
own_t::process_term (linger_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::vmci_connecter_t::in_event ()
|
void zmq::vmci_connecter_t::in_event ()
|
||||||
@ -110,9 +82,26 @@ void zmq::vmci_connecter_t::in_event ()
|
|||||||
|
|
||||||
void zmq::vmci_connecter_t::out_event ()
|
void zmq::vmci_connecter_t::out_event ()
|
||||||
{
|
{
|
||||||
fd_t fd = connect ();
|
if (_connect_timer_started) {
|
||||||
rm_fd (handle);
|
cancel_timer (connect_timer_id);
|
||||||
handle_valid = false;
|
_connect_timer_started = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO this is still very similar to (t)ipc_connecter_t, maybe the
|
||||||
|
// differences can be factored out
|
||||||
|
|
||||||
|
rm_handle ();
|
||||||
|
|
||||||
|
const fd_t fd = connect ();
|
||||||
|
|
||||||
|
if (fd == retired_fd
|
||||||
|
&& ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED)
|
||||||
|
&& errno == ECONNREFUSED)) {
|
||||||
|
send_conn_failed (_session);
|
||||||
|
close ();
|
||||||
|
terminate ();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Handle the error condition by attempt to reconnect.
|
// Handle the error condition by attempt to reconnect.
|
||||||
if (fd == retired_fd) {
|
if (fd == retired_fd) {
|
||||||
@ -135,148 +124,154 @@ void zmq::vmci_connecter_t::out_event ()
|
|||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the engine object for this connection.
|
create_engine (
|
||||||
stream_engine_t *engine = new (std::nothrow) stream_engine_t (
|
fd, zmq::vmci_connecter_t::get_socket_name (fd, socket_end_local));
|
||||||
fd, options, make_unconnected_bind_endpoint_pair (endpoint));
|
}
|
||||||
alloc_assert (engine);
|
|
||||||
|
|
||||||
// Attach the engine to the corresponding session object.
|
std::string
|
||||||
send_attach (session, engine);
|
zmq::vmci_connecter_t::get_socket_name (zmq::fd_t fd_,
|
||||||
|
socket_end_t socket_end_) const
|
||||||
|
{
|
||||||
|
struct sockaddr_storage ss;
|
||||||
|
const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss);
|
||||||
|
if (sl == 0) {
|
||||||
|
return std::string ();
|
||||||
|
}
|
||||||
|
|
||||||
// Shut the connecter down.
|
const vmci_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl,
|
||||||
terminate ();
|
this->get_ctx ());
|
||||||
|
std::string address_string;
|
||||||
socket->event_connected (make_unconnected_bind_endpoint_pair (endpoint),
|
addr.to_string (address_string);
|
||||||
fd);
|
return address_string;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::vmci_connecter_t::timer_event (int id_)
|
void zmq::vmci_connecter_t::timer_event (int id_)
|
||||||
{
|
{
|
||||||
zmq_assert (id_ == reconnect_timer_id);
|
if (id_ == connect_timer_id) {
|
||||||
timer_started = false;
|
_connect_timer_started = false;
|
||||||
start_connecting ();
|
rm_handle ();
|
||||||
|
close ();
|
||||||
|
add_reconnect_timer ();
|
||||||
|
} else
|
||||||
|
stream_connecter_base_t::timer_event (id_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::vmci_connecter_t::start_connecting ()
|
void zmq::vmci_connecter_t::start_connecting ()
|
||||||
{
|
{
|
||||||
// Open the connecting socket.
|
// Open the connecting socket.
|
||||||
int rc = open ();
|
const int rc = open ();
|
||||||
|
|
||||||
// Connect may succeed in synchronous manner.
|
// Connect may succeed in synchronous manner.
|
||||||
if (rc == 0) {
|
if (rc == 0) {
|
||||||
handle = add_fd (s);
|
_handle = add_fd (_s);
|
||||||
handle_valid = true;
|
|
||||||
out_event ();
|
out_event ();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Connection establishment may be delayed. Poll for its completion.
|
||||||
|
else if (rc == -1 && errno == EINPROGRESS) {
|
||||||
|
_handle = add_fd (_s);
|
||||||
|
set_pollout (_handle);
|
||||||
|
_socket->event_connect_delayed (
|
||||||
|
make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ());
|
||||||
|
|
||||||
|
// add userspace connect timeout
|
||||||
|
add_connect_timer ();
|
||||||
|
}
|
||||||
|
|
||||||
// Handle any other error condition by eventual reconnect.
|
// Handle any other error condition by eventual reconnect.
|
||||||
else {
|
else {
|
||||||
if (s != retired_fd)
|
if (_s != retired_fd)
|
||||||
close ();
|
close ();
|
||||||
add_reconnect_timer ();
|
add_reconnect_timer ();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::vmci_connecter_t::add_reconnect_timer ()
|
void zmq::vmci_connecter_t::add_connect_timer ()
|
||||||
{
|
{
|
||||||
if (options.reconnect_ivl > 0) {
|
if (options.connect_timeout > 0) {
|
||||||
int rc_ivl = get_new_reconnect_ivl ();
|
add_timer (options.connect_timeout, connect_timer_id);
|
||||||
add_timer (rc_ivl, reconnect_timer_id);
|
_connect_timer_started = true;
|
||||||
socket->event_connect_retried (
|
|
||||||
make_unconnected_bind_endpoint_pair (endpoint), rc_ivl);
|
|
||||||
timer_started = true;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::vmci_connecter_t::get_new_reconnect_ivl ()
|
|
||||||
{
|
|
||||||
// The new interval is the current interval + random value.
|
|
||||||
int this_interval =
|
|
||||||
current_reconnect_ivl + (generate_random () % options.reconnect_ivl);
|
|
||||||
|
|
||||||
// Only change the current reconnect interval if the maximum reconnect
|
|
||||||
// interval was set and if it's larger than the reconnect interval.
|
|
||||||
if (options.reconnect_ivl_max > 0
|
|
||||||
&& options.reconnect_ivl_max > options.reconnect_ivl) {
|
|
||||||
// Calculate the next interval
|
|
||||||
current_reconnect_ivl = current_reconnect_ivl * 2;
|
|
||||||
if (current_reconnect_ivl >= options.reconnect_ivl_max) {
|
|
||||||
current_reconnect_ivl = options.reconnect_ivl_max;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return this_interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
int zmq::vmci_connecter_t::open ()
|
int zmq::vmci_connecter_t::open ()
|
||||||
{
|
{
|
||||||
zmq_assert (s == retired_fd);
|
zmq_assert (_s == retired_fd);
|
||||||
|
|
||||||
int family = this->get_ctx ()->get_vmci_socket_family ();
|
// Resolve the address
|
||||||
if (family == -1)
|
if (_addr->resolved.vmci_addr != NULL) {
|
||||||
return -1;
|
LIBZMQ_DELETE (_addr->resolved.vmci_addr);
|
||||||
|
}
|
||||||
|
|
||||||
// Create the socket.
|
_addr->resolved.vmci_addr =
|
||||||
s = open_socket (family, SOCK_STREAM, 0);
|
new (std::nothrow) vmci_address_t (this->get_ctx ());
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
alloc_assert (_addr->resolved.vmci_addr);
|
||||||
if (s == INVALID_SOCKET) {
|
_s = vmci_open_socket (_addr->address.c_str (), options,
|
||||||
errno = wsa_error_to_errno (WSAGetLastError ());
|
_addr->resolved.vmci_addr);
|
||||||
|
if (_s == retired_fd) {
|
||||||
|
// TODO we should emit some event in this case!
|
||||||
|
|
||||||
|
LIBZMQ_DELETE (_addr->resolved.vmci_addr);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
#else
|
zmq_assert (_addr->resolved.vmci_addr != NULL);
|
||||||
if (s == -1)
|
|
||||||
return -1;
|
// Set the socket to non-blocking mode so that we get async connect().
|
||||||
#endif
|
unblock_socket (_s);
|
||||||
|
|
||||||
|
const vmci_address_t *const vmci_addr = _addr->resolved.vmci_addr;
|
||||||
|
|
||||||
|
int rc;
|
||||||
|
|
||||||
// Connect to the remote peer.
|
// Connect to the remote peer.
|
||||||
int rc = ::connect (s, addr->resolved.vmci_addr->addr (),
|
#if defined ZMQ_HAVE_VXWORKS
|
||||||
addr->resolved.vmci_addr->addrlen ());
|
rc = ::connect (_s, (sockaddr *) vmci_addr->addr (), vmci_addr->addrlen ());
|
||||||
|
|
||||||
// Connect was successful immediately.
|
|
||||||
if (rc == 0)
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
// Forward the error.
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::vmci_connecter_t::close ()
|
|
||||||
{
|
|
||||||
zmq_assert (s != retired_fd);
|
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
|
||||||
const int rc = closesocket (s);
|
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
|
||||||
#else
|
#else
|
||||||
const int rc = ::close (s);
|
rc = ::connect (_s, vmci_addr->addr (), vmci_addr->addrlen ());
|
||||||
errno_assert (rc == 0);
|
|
||||||
#endif
|
#endif
|
||||||
socket->event_closed (make_unconnected_bind_endpoint_pair (endpoint), s);
|
// Connect was successful immediately.
|
||||||
s = retired_fd;
|
if (rc == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Translate error codes indicating asynchronous connect has been
|
||||||
|
// launched to a uniform EINPROGRESS.
|
||||||
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
|
const int last_error = WSAGetLastError ();
|
||||||
|
if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK)
|
||||||
|
errno = EINPROGRESS;
|
||||||
|
else
|
||||||
|
errno = wsa_error_to_errno (last_error);
|
||||||
|
#else
|
||||||
|
if (errno == EINTR)
|
||||||
|
errno = EINPROGRESS;
|
||||||
|
#endif
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::fd_t zmq::vmci_connecter_t::connect ()
|
zmq::fd_t zmq::vmci_connecter_t::connect ()
|
||||||
{
|
{
|
||||||
// Following code should handle both Berkeley-derived socket
|
// Async connect has finished. Check whether an error occurred
|
||||||
// implementations and Solaris.
|
|
||||||
int err = 0;
|
int err = 0;
|
||||||
#if defined ZMQ_HAVE_HPUX
|
#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS
|
||||||
int len = sizeof (err);
|
int len = sizeof err;
|
||||||
#else
|
#else
|
||||||
socklen_t len = sizeof (err);
|
socklen_t len = sizeof err;
|
||||||
#endif
|
#endif
|
||||||
int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char *) &err, &len);
|
|
||||||
|
const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR,
|
||||||
|
reinterpret_cast<char *> (&err), &len);
|
||||||
|
|
||||||
// Assert if the error was caused by 0MQ bug.
|
// Assert if the error was caused by 0MQ bug.
|
||||||
// Networking problems are OK. No need to assert.
|
// Networking problems are OK. No need to assert.
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
zmq_assert (rc == 0);
|
zmq_assert (rc == 0);
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
if (err != WSAECONNREFUSED && err != WSAETIMEDOUT
|
if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK
|
||||||
&& err != WSAECONNABORTED && err != WSAEHOSTUNREACH
|
|| err == WSAENOBUFS) {
|
||||||
&& err != WSAENETUNREACH && err != WSAENETDOWN && err != WSAEACCES
|
|
||||||
&& err != WSAEINVAL && err != WSAEADDRINUSE
|
|
||||||
&& err != WSAECONNRESET) {
|
|
||||||
wsa_assert_no (err);
|
wsa_assert_no (err);
|
||||||
}
|
}
|
||||||
|
errno = wsa_error_to_errno (err);
|
||||||
return retired_fd;
|
return retired_fd;
|
||||||
}
|
}
|
||||||
#else
|
#else
|
||||||
@ -286,16 +281,20 @@ zmq::fd_t zmq::vmci_connecter_t::connect ()
|
|||||||
err = errno;
|
err = errno;
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
errno = err;
|
errno = err;
|
||||||
errno_assert (errno == ECONNREFUSED || errno == ECONNRESET
|
#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE
|
||||||
|| errno == ETIMEDOUT || errno == EHOSTUNREACH
|
errno_assert (errno != EBADF && errno != ENOPROTOOPT
|
||||||
|| errno == ENETUNREACH || errno == ENETDOWN
|
&& errno != ENOTSOCK && errno != ENOBUFS);
|
||||||
|| errno == EINVAL);
|
#else
|
||||||
|
errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK
|
||||||
|
&& errno != ENOBUFS);
|
||||||
|
#endif
|
||||||
return retired_fd;
|
return retired_fd;
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
fd_t result = s;
|
// Return the newly connected socket.
|
||||||
s = retired_fd;
|
const fd_t result = _s;
|
||||||
|
_s = retired_fd;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
#include "own.hpp"
|
#include "own.hpp"
|
||||||
#include "stdint.hpp"
|
#include "stdint.hpp"
|
||||||
#include "io_object.hpp"
|
#include "io_object.hpp"
|
||||||
|
#include "stream_connecter_base.hpp"
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
@ -45,8 +46,7 @@ class io_thread_t;
|
|||||||
class session_base_t;
|
class session_base_t;
|
||||||
struct address_t;
|
struct address_t;
|
||||||
|
|
||||||
// TODO consider refactoring this to derive from stream_connecter_base_t
|
class vmci_connecter_t ZMQ_FINAL : public stream_connecter_base_t
|
||||||
class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
// If 'delayed_start' is true connecter first waits for a while,
|
// If 'delayed_start' is true connecter first waits for a while,
|
||||||
@ -54,19 +54,21 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
|
|||||||
vmci_connecter_t (zmq::io_thread_t *io_thread_,
|
vmci_connecter_t (zmq::io_thread_t *io_thread_,
|
||||||
zmq::session_base_t *session_,
|
zmq::session_base_t *session_,
|
||||||
const options_t &options_,
|
const options_t &options_,
|
||||||
const address_t *addr_,
|
address_t *addr_,
|
||||||
bool delayed_start_);
|
bool delayed_start_);
|
||||||
~vmci_connecter_t ();
|
~vmci_connecter_t ();
|
||||||
|
|
||||||
|
protected:
|
||||||
|
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// ID of the timer used to delay the reconnection.
|
// ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id.
|
||||||
enum
|
enum
|
||||||
{
|
{
|
||||||
reconnect_timer_id = 1
|
connect_timer_id = 2
|
||||||
};
|
};
|
||||||
|
|
||||||
// Handlers for incoming commands.
|
// Handlers for incoming commands.
|
||||||
void process_plug ();
|
|
||||||
void process_term (int linger_);
|
void process_term (int linger_);
|
||||||
|
|
||||||
// Handlers for I/O events.
|
// Handlers for I/O events.
|
||||||
@ -77,8 +79,8 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
|
|||||||
// Internal function to start the actual connection establishment.
|
// Internal function to start the actual connection establishment.
|
||||||
void start_connecting ();
|
void start_connecting ();
|
||||||
|
|
||||||
// Internal function to add a reconnect timer
|
// Internal function to add a connect timer
|
||||||
void add_reconnect_timer ();
|
void add_connect_timer ();
|
||||||
|
|
||||||
// Internal function to return a reconnect backoff delay.
|
// Internal function to return a reconnect backoff delay.
|
||||||
// Will modify the current_reconnect_ivl used for next call
|
// Will modify the current_reconnect_ivl used for next call
|
||||||
@ -90,43 +92,12 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t
|
|||||||
// EAGAIN errno if async connect was launched.
|
// EAGAIN errno if async connect was launched.
|
||||||
int open ();
|
int open ();
|
||||||
|
|
||||||
// Close the connecting socket.
|
|
||||||
void close ();
|
|
||||||
|
|
||||||
// Get the file descriptor of newly created connection. Returns
|
// Get the file descriptor of newly created connection. Returns
|
||||||
// retired_fd if the connection was unsuccessful.
|
// retired_fd if the connection was unsuccessful.
|
||||||
fd_t connect ();
|
fd_t connect ();
|
||||||
|
|
||||||
// Address to connect to. Owned by session_base_t.
|
|
||||||
const address_t *addr;
|
|
||||||
|
|
||||||
// Underlying socket.
|
|
||||||
fd_t s;
|
|
||||||
|
|
||||||
// Handle corresponding to the listening socket.
|
|
||||||
handle_t handle;
|
|
||||||
|
|
||||||
// If true file descriptor is registered with the poller and 'handle'
|
|
||||||
// contains valid value.
|
|
||||||
bool handle_valid;
|
|
||||||
|
|
||||||
// If true, connecter is waiting a while before trying to connect.
|
|
||||||
const bool delayed_start;
|
|
||||||
|
|
||||||
// True iff a timer has been started.
|
// True iff a timer has been started.
|
||||||
bool timer_started;
|
bool _connect_timer_started;
|
||||||
|
|
||||||
// Reference to the session we belong to.
|
|
||||||
zmq::session_base_t *session;
|
|
||||||
|
|
||||||
// Current reconnect ivl, updated for backoff strategy
|
|
||||||
int current_reconnect_ivl;
|
|
||||||
|
|
||||||
// String representation of endpoint to connect to
|
|
||||||
std::string endpoint;
|
|
||||||
|
|
||||||
// Socket
|
|
||||||
zmq::socket_base_t *socket;
|
|
||||||
|
|
||||||
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_connecter_t)
|
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_connecter_t)
|
||||||
};
|
};
|
||||||
|
@ -35,7 +35,7 @@
|
|||||||
|
|
||||||
#include <new>
|
#include <new>
|
||||||
|
|
||||||
#include "stream_engine.hpp"
|
//#include "stream_engine.hpp"
|
||||||
#include "vmci_address.hpp"
|
#include "vmci_address.hpp"
|
||||||
#include "io_thread.hpp"
|
#include "io_thread.hpp"
|
||||||
#include "session_base.hpp"
|
#include "session_base.hpp"
|
||||||
@ -55,40 +55,18 @@
|
|||||||
zmq::vmci_listener_t::vmci_listener_t (io_thread_t *io_thread_,
|
zmq::vmci_listener_t::vmci_listener_t (io_thread_t *io_thread_,
|
||||||
socket_base_t *socket_,
|
socket_base_t *socket_,
|
||||||
const options_t &options_) :
|
const options_t &options_) :
|
||||||
own_t (io_thread_, options_),
|
stream_listener_base_t (io_thread_, socket_, options_)
|
||||||
io_object_t (io_thread_),
|
|
||||||
s (retired_fd),
|
|
||||||
socket (socket_)
|
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
zmq::vmci_listener_t::~vmci_listener_t ()
|
|
||||||
{
|
|
||||||
zmq_assert (s == retired_fd);
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::vmci_listener_t::process_plug ()
|
|
||||||
{
|
|
||||||
// Start polling for incoming connections.
|
|
||||||
handle = add_fd (s);
|
|
||||||
set_pollin (handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::vmci_listener_t::process_term (int linger_)
|
|
||||||
{
|
|
||||||
rm_fd (handle);
|
|
||||||
close ();
|
|
||||||
own_t::process_term (linger_);
|
|
||||||
}
|
|
||||||
|
|
||||||
void zmq::vmci_listener_t::in_event ()
|
void zmq::vmci_listener_t::in_event ()
|
||||||
{
|
{
|
||||||
fd_t fd = accept ();
|
fd_t fd = accept ();
|
||||||
|
|
||||||
// If connection was reset by the peer in the meantime, just ignore it.
|
// If connection was reset by the peer in the meantime, just ignore it.
|
||||||
if (fd == retired_fd) {
|
if (fd == retired_fd) {
|
||||||
socket->event_accept_failed (
|
_socket->event_accept_failed (
|
||||||
make_unconnected_bind_endpoint_pair (endpoint), zmq_errno ());
|
make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -107,41 +85,24 @@ void zmq::vmci_listener_t::in_event ()
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Create the engine object for this connection.
|
// Create the engine object for this connection.
|
||||||
stream_engine_t *engine = new (std::nothrow) stream_engine_t (
|
create_engine (fd);
|
||||||
fd, options, make_unconnected_bind_endpoint_pair (endpoint));
|
|
||||||
alloc_assert (engine);
|
|
||||||
|
|
||||||
// Choose I/O thread to run connecter in. Given that we are already
|
|
||||||
// running in an I/O thread, there must be at least one available.
|
|
||||||
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
|
||||||
zmq_assert (io_thread);
|
|
||||||
|
|
||||||
// Create and launch a session object.
|
|
||||||
session_base_t *session =
|
|
||||||
session_base_t::create (io_thread, false, socket, options, NULL);
|
|
||||||
errno_assert (session);
|
|
||||||
session->inc_seqnum ();
|
|
||||||
launch_child (session);
|
|
||||||
send_attach (session, engine, false);
|
|
||||||
socket->event_accepted (make_unconnected_bind_endpoint_pair (endpoint), fd);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::vmci_listener_t::get_local_address (std::string &addr_)
|
std::string
|
||||||
|
zmq::vmci_listener_t::get_socket_name (zmq::fd_t fd_,
|
||||||
|
socket_end_t socket_end_) const
|
||||||
{
|
{
|
||||||
struct sockaddr_storage ss;
|
struct sockaddr_storage ss;
|
||||||
#ifdef ZMQ_HAVE_HPUX
|
const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss);
|
||||||
int sl = sizeof (ss);
|
if (sl == 0) {
|
||||||
#else
|
return std::string ();
|
||||||
socklen_t sl = sizeof (ss);
|
|
||||||
#endif
|
|
||||||
int rc = getsockname (s, (sockaddr *) &ss, &sl);
|
|
||||||
if (rc != 0) {
|
|
||||||
addr_.clear ();
|
|
||||||
return rc;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
vmci_address_t addr ((struct sockaddr *) &ss, sl, this->get_ctx ());
|
const vmci_address_t addr (reinterpret_cast<struct sockaddr *> (&ss), sl,
|
||||||
return addr.to_string (addr_);
|
this->get_ctx ());
|
||||||
|
std::string address_string;
|
||||||
|
addr.to_string (address_string);
|
||||||
|
return address_string;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::vmci_listener_t::set_local_address (const char *addr_)
|
int zmq::vmci_listener_t::set_local_address (const char *addr_)
|
||||||
@ -156,7 +117,7 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
|
|||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
// Create a listening socket.
|
// Create a listening socket.
|
||||||
s =
|
_s =
|
||||||
open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0);
|
open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0);
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
if (s == INVALID_SOCKET) {
|
if (s == INVALID_SOCKET) {
|
||||||
@ -165,18 +126,18 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
|
|||||||
}
|
}
|
||||||
#if !defined _WIN32_WCE
|
#if !defined _WIN32_WCE
|
||||||
// On Windows, preventing sockets to be inherited by child processes.
|
// On Windows, preventing sockets to be inherited by child processes.
|
||||||
BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0);
|
BOOL brc = SetHandleInformation ((HANDLE) _s, HANDLE_FLAG_INHERIT, 0);
|
||||||
win_assert (brc);
|
win_assert (brc);
|
||||||
#endif
|
#endif
|
||||||
#else
|
#else
|
||||||
if (s == -1)
|
if (_s == -1)
|
||||||
return -1;
|
return -1;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
address.to_string (endpoint);
|
address.to_string (_endpoint);
|
||||||
|
|
||||||
// Bind the socket.
|
// Bind the socket.
|
||||||
rc = bind (s, address.addr (), address.addrlen ());
|
rc = bind (_s, address.addr (), address.addrlen ());
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
if (rc == SOCKET_ERROR) {
|
if (rc == SOCKET_ERROR) {
|
||||||
errno = wsa_error_to_errno (WSAGetLastError ());
|
errno = wsa_error_to_errno (WSAGetLastError ());
|
||||||
@ -188,7 +149,7 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Listen for incoming connections.
|
// Listen for incoming connections.
|
||||||
rc = listen (s, options.backlog);
|
rc = listen (_s, options.backlog);
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
if (rc == SOCKET_ERROR) {
|
if (rc == SOCKET_ERROR) {
|
||||||
errno = wsa_error_to_errno (WSAGetLastError ());
|
errno = wsa_error_to_errno (WSAGetLastError ());
|
||||||
@ -199,7 +160,8 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_)
|
|||||||
goto error;
|
goto error;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
socket->event_listening (make_unconnected_bind_endpoint_pair (endpoint), s);
|
_socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint),
|
||||||
|
_s);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
error:
|
error:
|
||||||
@ -209,27 +171,13 @@ error:
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void zmq::vmci_listener_t::close ()
|
|
||||||
{
|
|
||||||
zmq_assert (s != retired_fd);
|
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
|
||||||
int rc = closesocket (s);
|
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
|
||||||
#else
|
|
||||||
int rc = ::close (s);
|
|
||||||
errno_assert (rc == 0);
|
|
||||||
#endif
|
|
||||||
socket->event_closed (make_unconnected_bind_endpoint_pair (endpoint), s);
|
|
||||||
s = retired_fd;
|
|
||||||
}
|
|
||||||
|
|
||||||
zmq::fd_t zmq::vmci_listener_t::accept ()
|
zmq::fd_t zmq::vmci_listener_t::accept ()
|
||||||
{
|
{
|
||||||
// Accept one connection and deal with different failure modes.
|
// Accept one connection and deal with different failure modes.
|
||||||
// The situation where connection cannot be accepted due to insufficient
|
// The situation where connection cannot be accepted due to insufficient
|
||||||
// resources is considered valid and treated by ignoring the connection.
|
// resources is considered valid and treated by ignoring the connection.
|
||||||
zmq_assert (s != retired_fd);
|
zmq_assert (_s != retired_fd);
|
||||||
fd_t sock = ::accept (s, NULL, NULL);
|
fd_t sock = ::accept (_s, NULL, NULL);
|
||||||
|
|
||||||
#ifdef ZMQ_HAVE_WINDOWS
|
#ifdef ZMQ_HAVE_WINDOWS
|
||||||
if (sock == INVALID_SOCKET) {
|
if (sock == INVALID_SOCKET) {
|
||||||
|
@ -37,57 +37,37 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
#include "fd.hpp"
|
#include "fd.hpp"
|
||||||
#include "own.hpp"
|
#include "vmci_address.hpp"
|
||||||
#include "stdint.hpp"
|
#include "stream_listener_base.hpp"
|
||||||
#include "io_object.hpp"
|
|
||||||
|
|
||||||
namespace zmq
|
namespace zmq
|
||||||
{
|
{
|
||||||
class io_thread_t;
|
class vmci_listener_t ZMQ_FINAL : public stream_listener_base_t
|
||||||
class socket_base_t;
|
|
||||||
|
|
||||||
// TODO consider refactoring this to derive from stream_listener_base_t
|
|
||||||
class vmci_listener_t ZMQ_FINAL : public own_t, public io_object_t
|
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
vmci_listener_t (zmq::io_thread_t *io_thread_,
|
vmci_listener_t (zmq::io_thread_t *io_thread_,
|
||||||
zmq::socket_base_t *socket_,
|
zmq::socket_base_t *socket_,
|
||||||
const options_t &options_);
|
const options_t &options_);
|
||||||
~vmci_listener_t ();
|
|
||||||
|
|
||||||
// Set address to listen on.
|
// Set address to listen on.
|
||||||
int set_local_address (const char *addr_);
|
int set_local_address (const char *addr_);
|
||||||
|
|
||||||
// Get the bound address for use with wildcards
|
protected:
|
||||||
int get_local_address (std::string &addr_);
|
std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Handlers for incoming commands.
|
|
||||||
void process_plug ();
|
|
||||||
void process_term (int linger_);
|
|
||||||
|
|
||||||
// Handlers for I/O events.
|
// Handlers for I/O events.
|
||||||
void in_event ();
|
void in_event ();
|
||||||
|
|
||||||
// Close the listening socket.
|
|
||||||
void close ();
|
|
||||||
|
|
||||||
// Accept the new connection. Returns the file descriptor of the
|
// Accept the new connection. Returns the file descriptor of the
|
||||||
// newly created connection. The function may return retired_fd
|
// newly created connection. The function may return retired_fd
|
||||||
// if the connection was dropped while waiting in the listen backlog.
|
// if the connection was dropped while waiting in the listen backlog.
|
||||||
fd_t accept ();
|
fd_t accept ();
|
||||||
|
|
||||||
// Underlying socket.
|
int create_socket (const char *addr_);
|
||||||
fd_t s;
|
|
||||||
|
|
||||||
// Handle corresponding to the listening socket.
|
// Address to listen on.
|
||||||
handle_t handle;
|
vmci_address_t _address;
|
||||||
|
|
||||||
// Socket the listerner belongs to.
|
|
||||||
zmq::socket_base_t *socket;
|
|
||||||
|
|
||||||
// String representation of endpoint to bind to
|
|
||||||
std::string endpoint;
|
|
||||||
|
|
||||||
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_listener_t)
|
ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_listener_t)
|
||||||
};
|
};
|
||||||
|
@ -48,10 +48,10 @@ void test_pair_vmci ()
|
|||||||
void *sc = test_context_socket (ZMQ_PAIR);
|
void *sc = test_context_socket (ZMQ_PAIR);
|
||||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ()));
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ()));
|
||||||
|
|
||||||
bounce (sb, sc);
|
expect_bounce_fail (sb, sc);
|
||||||
|
|
||||||
test_context_socket_close (sc);
|
test_context_socket_close_zero_linger (sc);
|
||||||
test_context_socket_close (sb);
|
test_context_socket_close_zero_linger (sb);
|
||||||
}
|
}
|
||||||
|
|
||||||
int main (void)
|
int main (void)
|
||||||
|
@ -42,16 +42,16 @@ void test_reqrep_vmci ()
|
|||||||
s << "vmci://" << VMCISock_GetLocalCID () << ":" << 5560;
|
s << "vmci://" << VMCISock_GetLocalCID () << ":" << 5560;
|
||||||
std::string endpoint = s.str ();
|
std::string endpoint = s.str ();
|
||||||
|
|
||||||
void *sb = test_context_socket (ZMQ_REP);
|
void *sb = test_context_socket (ZMQ_DEALER);
|
||||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, endpoint.c_str ()));
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, endpoint.c_str ()));
|
||||||
|
|
||||||
void *sc = test_context_socket (ZMQ_REQ);
|
void *sc = test_context_socket (ZMQ_DEALER);
|
||||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ()));
|
TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ()));
|
||||||
|
|
||||||
bounce (sb, sc);
|
expect_bounce_fail (sb, sc);
|
||||||
|
|
||||||
test_context_socket_close (sc);
|
test_context_socket_close_zero_linger (sc);
|
||||||
test_context_socket_close (sb);
|
test_context_socket_close_zero_linger (sb);
|
||||||
}
|
}
|
||||||
|
|
||||||
int main (void)
|
int main (void)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user