diff --git a/src/vmci.cpp b/src/vmci.cpp index 4edd096b..b8807e35 100644 --- a/src/vmci.cpp +++ b/src/vmci.cpp @@ -28,7 +28,9 @@ */ #include "precompiled.hpp" +#include "ip.hpp" #include "vmci.hpp" +#include "vmci_address.hpp" #if defined ZMQ_HAVE_VMCI @@ -97,4 +99,23 @@ void zmq::tune_vmci_connect_timeout (ctx_t *context_, #endif } +zmq::fd_t zmq::vmci_open_socket (const char *address_, + const zmq::options_t &options_, + zmq::vmci_address_t *out_vmci_addr_) +{ + // Convert the textual address into address structure. + int rc = out_vmci_addr_->resolve (address_); + if (rc != 0) + return retired_fd; + + // Create the socket. + fd_t s = open_socket (out_vmci_addr_->family (), SOCK_STREAM, 0); + + if (s == retired_fd) { + return retired_fd; + } + + return s; +} + #endif diff --git a/src/vmci.hpp b/src/vmci.hpp index 16a0c734..a79b4bf3 100644 --- a/src/vmci.hpp +++ b/src/vmci.hpp @@ -59,6 +59,10 @@ void tune_vmci_connect_timeout (ctx_t *context_, fd_t sockfd_, struct timeval timeout_); #endif + +fd_t vmci_open_socket (const char *address_, + const options_t &options_, + vmci_address_t *out_vmci_addr_); } #endif diff --git a/src/vmci_address.cpp b/src/vmci_address.cpp index cf97d99c..7af36a02 100644 --- a/src/vmci_address.cpp +++ b/src/vmci_address.cpp @@ -39,6 +39,11 @@ #include "err.hpp" +zmq::vmci_address_t::vmci_address_t () +{ + memset (&address, 0, sizeof address); +} + zmq::vmci_address_t::vmci_address_t (ctx_t *parent_) : parent (parent_) { memset (&address, 0, sizeof address); @@ -56,10 +61,6 @@ zmq::vmci_address_t::vmci_address_t (const sockaddr *sa, memcpy (&address, sa, sa_len); } -zmq::vmci_address_t::~vmci_address_t () -{ -} - int zmq::vmci_address_t::resolve (const char *path_) { // Find the ':' at end that separates address from the port number. @@ -125,7 +126,7 @@ int zmq::vmci_address_t::resolve (const char *path_) return 0; } -int zmq::vmci_address_t::to_string (std::string &addr_) +int zmq::vmci_address_t::to_string (std::string &addr_) const { if (address.svm_family != parent->get_vmci_socket_family ()) { addr_.clear (); @@ -164,4 +165,13 @@ socklen_t zmq::vmci_address_t::addrlen () const return static_cast (sizeof address); } +#if defined ZMQ_HAVE_WINDOWS +unsigned short zmq::vmci_address_t::family () const +#else +sa_family_t zmq::vmci_address_t::family () const +#endif +{ + return parent->get_vmci_socket_family (); +} + #endif diff --git a/src/vmci_address.hpp b/src/vmci_address.hpp index 16dab4a3..ae6b39fb 100644 --- a/src/vmci_address.hpp +++ b/src/vmci_address.hpp @@ -43,16 +43,21 @@ namespace zmq class vmci_address_t { public: + vmci_address_t (); vmci_address_t (ctx_t *parent_); vmci_address_t (const sockaddr *sa, socklen_t sa_len, ctx_t *parent_); - ~vmci_address_t (); // This function sets up the address for VMCI transport. int resolve (const char *path_); // The opposite to resolve() - int to_string (std::string &addr_); + int to_string (std::string &addr_) const; +#if defined ZMQ_HAVE_WINDOWS + unsigned short family () const; +#else + sa_family_t family () const; +#endif const sockaddr *addr () const; socklen_t addrlen () const; @@ -60,8 +65,6 @@ class vmci_address_t struct sockaddr_vm address; ctx_t *parent; - vmci_address_t (); - ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_address_t) }; } diff --git a/src/vmci_connecter.cpp b/src/vmci_connecter.cpp index 5768c78e..2e0e4b4f 100644 --- a/src/vmci_connecter.cpp +++ b/src/vmci_connecter.cpp @@ -35,69 +35,41 @@ #include -#include "stream_engine.hpp" #include "io_thread.hpp" #include "platform.hpp" #include "random.hpp" #include "err.hpp" #include "ip.hpp" #include "address.hpp" -#include "session_base.hpp" #include "vmci_address.hpp" #include "vmci.hpp" +#include "session_base.hpp" zmq::vmci_connecter_t::vmci_connecter_t (class io_thread_t *io_thread_, class session_base_t *session_, const options_t &options_, - const address_t *addr_, + address_t *addr_, bool delayed_start_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - addr (addr_), - s (retired_fd), - handle_valid (false), - delayed_start (delayed_start_), - timer_started (false), - session (session_), - current_reconnect_ivl (options.reconnect_ivl) + stream_connecter_base_t ( + io_thread_, session_, options_, addr_, delayed_start_), + _connect_timer_started (false) { - zmq_assert (addr); - zmq_assert (addr->protocol == "vmci"); - addr->to_string (endpoint); - socket = session->get_socket (); + zmq_assert (_addr->protocol == protocol_name::vmci); } zmq::vmci_connecter_t::~vmci_connecter_t () { - zmq_assert (!timer_started); - zmq_assert (!handle_valid); - zmq_assert (s == retired_fd); -} - -void zmq::vmci_connecter_t::process_plug () -{ - if (delayed_start) - add_reconnect_timer (); - else - start_connecting (); + zmq_assert (!_connect_timer_started); } void zmq::vmci_connecter_t::process_term (int linger_) { - if (timer_started) { - cancel_timer (reconnect_timer_id); - timer_started = false; + if (_connect_timer_started) { + cancel_timer (connect_timer_id); + _connect_timer_started = false; } - if (handle_valid) { - rm_fd (handle); - handle_valid = false; - } - - if (s != retired_fd) - close (); - - own_t::process_term (linger_); + stream_connecter_base_t::process_term (linger_); } void zmq::vmci_connecter_t::in_event () @@ -110,9 +82,26 @@ void zmq::vmci_connecter_t::in_event () void zmq::vmci_connecter_t::out_event () { - fd_t fd = connect (); - rm_fd (handle); - handle_valid = false; + if (_connect_timer_started) { + cancel_timer (connect_timer_id); + _connect_timer_started = false; + } + + // TODO this is still very similar to (t)ipc_connecter_t, maybe the + // differences can be factored out + + rm_handle (); + + const fd_t fd = connect (); + + if (fd == retired_fd + && ((options.reconnect_stop & ZMQ_RECONNECT_STOP_CONN_REFUSED) + && errno == ECONNREFUSED)) { + send_conn_failed (_session); + close (); + terminate (); + return; + } // Handle the error condition by attempt to reconnect. if (fd == retired_fd) { @@ -135,148 +124,154 @@ void zmq::vmci_connecter_t::out_event () #endif } - // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t ( - fd, options, make_unconnected_bind_endpoint_pair (endpoint)); - alloc_assert (engine); + create_engine ( + fd, zmq::vmci_connecter_t::get_socket_name (fd, socket_end_local)); +} - // Attach the engine to the corresponding session object. - send_attach (session, engine); +std::string +zmq::vmci_connecter_t::get_socket_name (zmq::fd_t fd_, + socket_end_t socket_end_) const +{ + struct sockaddr_storage ss; + const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss); + if (sl == 0) { + return std::string (); + } - // Shut the connecter down. - terminate (); - - socket->event_connected (make_unconnected_bind_endpoint_pair (endpoint), - fd); + const vmci_address_t addr (reinterpret_cast (&ss), sl, + this->get_ctx ()); + std::string address_string; + addr.to_string (address_string); + return address_string; } void zmq::vmci_connecter_t::timer_event (int id_) { - zmq_assert (id_ == reconnect_timer_id); - timer_started = false; - start_connecting (); + if (id_ == connect_timer_id) { + _connect_timer_started = false; + rm_handle (); + close (); + add_reconnect_timer (); + } else + stream_connecter_base_t::timer_event (id_); } void zmq::vmci_connecter_t::start_connecting () { // Open the connecting socket. - int rc = open (); + const int rc = open (); // Connect may succeed in synchronous manner. if (rc == 0) { - handle = add_fd (s); - handle_valid = true; + _handle = add_fd (_s); out_event (); } + // Connection establishment may be delayed. Poll for its completion. + else if (rc == -1 && errno == EINPROGRESS) { + _handle = add_fd (_s); + set_pollout (_handle); + _socket->event_connect_delayed ( + make_unconnected_connect_endpoint_pair (_endpoint), zmq_errno ()); + + // add userspace connect timeout + add_connect_timer (); + } + // Handle any other error condition by eventual reconnect. else { - if (s != retired_fd) + if (_s != retired_fd) close (); add_reconnect_timer (); } } -void zmq::vmci_connecter_t::add_reconnect_timer () +void zmq::vmci_connecter_t::add_connect_timer () { - if (options.reconnect_ivl > 0) { - int rc_ivl = get_new_reconnect_ivl (); - add_timer (rc_ivl, reconnect_timer_id); - socket->event_connect_retried ( - make_unconnected_bind_endpoint_pair (endpoint), rc_ivl); - timer_started = true; + if (options.connect_timeout > 0) { + add_timer (options.connect_timeout, connect_timer_id); + _connect_timer_started = true; } } -int zmq::vmci_connecter_t::get_new_reconnect_ivl () -{ - // The new interval is the current interval + random value. - int this_interval = - current_reconnect_ivl + (generate_random () % options.reconnect_ivl); - - // Only change the current reconnect interval if the maximum reconnect - // interval was set and if it's larger than the reconnect interval. - if (options.reconnect_ivl_max > 0 - && options.reconnect_ivl_max > options.reconnect_ivl) { - // Calculate the next interval - current_reconnect_ivl = current_reconnect_ivl * 2; - if (current_reconnect_ivl >= options.reconnect_ivl_max) { - current_reconnect_ivl = options.reconnect_ivl_max; - } - } - return this_interval; -} - int zmq::vmci_connecter_t::open () { - zmq_assert (s == retired_fd); + zmq_assert (_s == retired_fd); - int family = this->get_ctx ()->get_vmci_socket_family (); - if (family == -1) - return -1; + // Resolve the address + if (_addr->resolved.vmci_addr != NULL) { + LIBZMQ_DELETE (_addr->resolved.vmci_addr); + } - // Create the socket. - s = open_socket (family, SOCK_STREAM, 0); -#ifdef ZMQ_HAVE_WINDOWS - if (s == INVALID_SOCKET) { - errno = wsa_error_to_errno (WSAGetLastError ()); + _addr->resolved.vmci_addr = + new (std::nothrow) vmci_address_t (this->get_ctx ()); + alloc_assert (_addr->resolved.vmci_addr); + _s = vmci_open_socket (_addr->address.c_str (), options, + _addr->resolved.vmci_addr); + if (_s == retired_fd) { + // TODO we should emit some event in this case! + + LIBZMQ_DELETE (_addr->resolved.vmci_addr); return -1; } -#else - if (s == -1) - return -1; -#endif + zmq_assert (_addr->resolved.vmci_addr != NULL); + + // Set the socket to non-blocking mode so that we get async connect(). + unblock_socket (_s); + + const vmci_address_t *const vmci_addr = _addr->resolved.vmci_addr; + + int rc; // Connect to the remote peer. - int rc = ::connect (s, addr->resolved.vmci_addr->addr (), - addr->resolved.vmci_addr->addrlen ()); - - // Connect was successful immediately. - if (rc == 0) - return 0; - - // Forward the error. - return -1; -} - -void zmq::vmci_connecter_t::close () -{ - zmq_assert (s != retired_fd); -#ifdef ZMQ_HAVE_WINDOWS - const int rc = closesocket (s); - wsa_assert (rc != SOCKET_ERROR); +#if defined ZMQ_HAVE_VXWORKS + rc = ::connect (_s, (sockaddr *) vmci_addr->addr (), vmci_addr->addrlen ()); #else - const int rc = ::close (s); - errno_assert (rc == 0); + rc = ::connect (_s, vmci_addr->addr (), vmci_addr->addrlen ()); #endif - socket->event_closed (make_unconnected_bind_endpoint_pair (endpoint), s); - s = retired_fd; + // Connect was successful immediately. + if (rc == 0) { + return 0; + } + + // Translate error codes indicating asynchronous connect has been + // launched to a uniform EINPROGRESS. +#ifdef ZMQ_HAVE_WINDOWS + const int last_error = WSAGetLastError (); + if (last_error == WSAEINPROGRESS || last_error == WSAEWOULDBLOCK) + errno = EINPROGRESS; + else + errno = wsa_error_to_errno (last_error); +#else + if (errno == EINTR) + errno = EINPROGRESS; +#endif + return -1; } zmq::fd_t zmq::vmci_connecter_t::connect () { - // Following code should handle both Berkeley-derived socket - // implementations and Solaris. + // Async connect has finished. Check whether an error occurred int err = 0; -#if defined ZMQ_HAVE_HPUX - int len = sizeof (err); +#if defined ZMQ_HAVE_HPUX || defined ZMQ_HAVE_VXWORKS + int len = sizeof err; #else - socklen_t len = sizeof (err); + socklen_t len = sizeof err; #endif - int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char *) &err, &len); + + const int rc = getsockopt (_s, SOL_SOCKET, SO_ERROR, + reinterpret_cast (&err), &len); // Assert if the error was caused by 0MQ bug. // Networking problems are OK. No need to assert. #ifdef ZMQ_HAVE_WINDOWS zmq_assert (rc == 0); if (err != 0) { - if (err != WSAECONNREFUSED && err != WSAETIMEDOUT - && err != WSAECONNABORTED && err != WSAEHOSTUNREACH - && err != WSAENETUNREACH && err != WSAENETDOWN && err != WSAEACCES - && err != WSAEINVAL && err != WSAEADDRINUSE - && err != WSAECONNRESET) { + if (err == WSAEBADF || err == WSAENOPROTOOPT || err == WSAENOTSOCK + || err == WSAENOBUFS) { wsa_assert_no (err); } + errno = wsa_error_to_errno (err); return retired_fd; } #else @@ -286,16 +281,20 @@ zmq::fd_t zmq::vmci_connecter_t::connect () err = errno; if (err != 0) { errno = err; - errno_assert (errno == ECONNREFUSED || errno == ECONNRESET - || errno == ETIMEDOUT || errno == EHOSTUNREACH - || errno == ENETUNREACH || errno == ENETDOWN - || errno == EINVAL); +#if !defined(TARGET_OS_IPHONE) || !TARGET_OS_IPHONE + errno_assert (errno != EBADF && errno != ENOPROTOOPT + && errno != ENOTSOCK && errno != ENOBUFS); +#else + errno_assert (errno != ENOPROTOOPT && errno != ENOTSOCK + && errno != ENOBUFS); +#endif return retired_fd; } #endif - fd_t result = s; - s = retired_fd; + // Return the newly connected socket. + const fd_t result = _s; + _s = retired_fd; return result; } diff --git a/src/vmci_connecter.hpp b/src/vmci_connecter.hpp index d0ebaac9..64a176a9 100644 --- a/src/vmci_connecter.hpp +++ b/src/vmci_connecter.hpp @@ -38,6 +38,7 @@ #include "own.hpp" #include "stdint.hpp" #include "io_object.hpp" +#include "stream_connecter_base.hpp" namespace zmq { @@ -45,8 +46,7 @@ class io_thread_t; class session_base_t; struct address_t; -// TODO consider refactoring this to derive from stream_connecter_base_t -class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t +class vmci_connecter_t ZMQ_FINAL : public stream_connecter_base_t { public: // If 'delayed_start' is true connecter first waits for a while, @@ -54,19 +54,21 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t vmci_connecter_t (zmq::io_thread_t *io_thread_, zmq::session_base_t *session_, const options_t &options_, - const address_t *addr_, + address_t *addr_, bool delayed_start_); ~vmci_connecter_t (); + protected: + std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const; + private: - // ID of the timer used to delay the reconnection. + // ID of the timer used to check the connect timeout, must be different from stream_connecter_base_t::reconnect_timer_id. enum { - reconnect_timer_id = 1 + connect_timer_id = 2 }; // Handlers for incoming commands. - void process_plug (); void process_term (int linger_); // Handlers for I/O events. @@ -77,8 +79,8 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t // Internal function to start the actual connection establishment. void start_connecting (); - // Internal function to add a reconnect timer - void add_reconnect_timer (); + // Internal function to add a connect timer + void add_connect_timer (); // Internal function to return a reconnect backoff delay. // Will modify the current_reconnect_ivl used for next call @@ -90,43 +92,12 @@ class vmci_connecter_t ZMQ_FINAL : public own_t, public io_object_t // EAGAIN errno if async connect was launched. int open (); - // Close the connecting socket. - void close (); - // Get the file descriptor of newly created connection. Returns // retired_fd if the connection was unsuccessful. fd_t connect (); - // Address to connect to. Owned by session_base_t. - const address_t *addr; - - // Underlying socket. - fd_t s; - - // Handle corresponding to the listening socket. - handle_t handle; - - // If true file descriptor is registered with the poller and 'handle' - // contains valid value. - bool handle_valid; - - // If true, connecter is waiting a while before trying to connect. - const bool delayed_start; - // True iff a timer has been started. - bool timer_started; - - // Reference to the session we belong to. - zmq::session_base_t *session; - - // Current reconnect ivl, updated for backoff strategy - int current_reconnect_ivl; - - // String representation of endpoint to connect to - std::string endpoint; - - // Socket - zmq::socket_base_t *socket; + bool _connect_timer_started; ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_connecter_t) }; diff --git a/src/vmci_listener.cpp b/src/vmci_listener.cpp index 0ac6db34..c6b10dfb 100644 --- a/src/vmci_listener.cpp +++ b/src/vmci_listener.cpp @@ -35,7 +35,7 @@ #include -#include "stream_engine.hpp" +//#include "stream_engine.hpp" #include "vmci_address.hpp" #include "io_thread.hpp" #include "session_base.hpp" @@ -55,40 +55,18 @@ zmq::vmci_listener_t::vmci_listener_t (io_thread_t *io_thread_, socket_base_t *socket_, const options_t &options_) : - own_t (io_thread_, options_), - io_object_t (io_thread_), - s (retired_fd), - socket (socket_) + stream_listener_base_t (io_thread_, socket_, options_) { } -zmq::vmci_listener_t::~vmci_listener_t () -{ - zmq_assert (s == retired_fd); -} - -void zmq::vmci_listener_t::process_plug () -{ - // Start polling for incoming connections. - handle = add_fd (s); - set_pollin (handle); -} - -void zmq::vmci_listener_t::process_term (int linger_) -{ - rm_fd (handle); - close (); - own_t::process_term (linger_); -} - void zmq::vmci_listener_t::in_event () { fd_t fd = accept (); // If connection was reset by the peer in the meantime, just ignore it. if (fd == retired_fd) { - socket->event_accept_failed ( - make_unconnected_bind_endpoint_pair (endpoint), zmq_errno ()); + _socket->event_accept_failed ( + make_unconnected_bind_endpoint_pair (_endpoint), zmq_errno ()); return; } @@ -107,41 +85,24 @@ void zmq::vmci_listener_t::in_event () } // Create the engine object for this connection. - stream_engine_t *engine = new (std::nothrow) stream_engine_t ( - fd, options, make_unconnected_bind_endpoint_pair (endpoint)); - alloc_assert (engine); - - // Choose I/O thread to run connecter in. Given that we are already - // running in an I/O thread, there must be at least one available. - io_thread_t *io_thread = choose_io_thread (options.affinity); - zmq_assert (io_thread); - - // Create and launch a session object. - session_base_t *session = - session_base_t::create (io_thread, false, socket, options, NULL); - errno_assert (session); - session->inc_seqnum (); - launch_child (session); - send_attach (session, engine, false); - socket->event_accepted (make_unconnected_bind_endpoint_pair (endpoint), fd); + create_engine (fd); } -int zmq::vmci_listener_t::get_local_address (std::string &addr_) +std::string +zmq::vmci_listener_t::get_socket_name (zmq::fd_t fd_, + socket_end_t socket_end_) const { struct sockaddr_storage ss; -#ifdef ZMQ_HAVE_HPUX - int sl = sizeof (ss); -#else - socklen_t sl = sizeof (ss); -#endif - int rc = getsockname (s, (sockaddr *) &ss, &sl); - if (rc != 0) { - addr_.clear (); - return rc; + const zmq_socklen_t sl = get_socket_address (fd_, socket_end_, &ss); + if (sl == 0) { + return std::string (); } - vmci_address_t addr ((struct sockaddr *) &ss, sl, this->get_ctx ()); - return addr.to_string (addr_); + const vmci_address_t addr (reinterpret_cast (&ss), sl, + this->get_ctx ()); + std::string address_string; + addr.to_string (address_string); + return address_string; } int zmq::vmci_listener_t::set_local_address (const char *addr_) @@ -156,7 +117,7 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_) return -1; // Create a listening socket. - s = + _s = open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0); #ifdef ZMQ_HAVE_WINDOWS if (s == INVALID_SOCKET) { @@ -165,18 +126,18 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_) } #if !defined _WIN32_WCE // On Windows, preventing sockets to be inherited by child processes. - BOOL brc = SetHandleInformation ((HANDLE) s, HANDLE_FLAG_INHERIT, 0); + BOOL brc = SetHandleInformation ((HANDLE) _s, HANDLE_FLAG_INHERIT, 0); win_assert (brc); #endif #else - if (s == -1) + if (_s == -1) return -1; #endif - address.to_string (endpoint); + address.to_string (_endpoint); // Bind the socket. - rc = bind (s, address.addr (), address.addrlen ()); + rc = bind (_s, address.addr (), address.addrlen ()); #ifdef ZMQ_HAVE_WINDOWS if (rc == SOCKET_ERROR) { errno = wsa_error_to_errno (WSAGetLastError ()); @@ -188,7 +149,7 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_) #endif // Listen for incoming connections. - rc = listen (s, options.backlog); + rc = listen (_s, options.backlog); #ifdef ZMQ_HAVE_WINDOWS if (rc == SOCKET_ERROR) { errno = wsa_error_to_errno (WSAGetLastError ()); @@ -199,7 +160,8 @@ int zmq::vmci_listener_t::set_local_address (const char *addr_) goto error; #endif - socket->event_listening (make_unconnected_bind_endpoint_pair (endpoint), s); + _socket->event_listening (make_unconnected_bind_endpoint_pair (_endpoint), + _s); return 0; error: @@ -209,27 +171,13 @@ error: return -1; } -void zmq::vmci_listener_t::close () -{ - zmq_assert (s != retired_fd); -#ifdef ZMQ_HAVE_WINDOWS - int rc = closesocket (s); - wsa_assert (rc != SOCKET_ERROR); -#else - int rc = ::close (s); - errno_assert (rc == 0); -#endif - socket->event_closed (make_unconnected_bind_endpoint_pair (endpoint), s); - s = retired_fd; -} - zmq::fd_t zmq::vmci_listener_t::accept () { // Accept one connection and deal with different failure modes. // The situation where connection cannot be accepted due to insufficient // resources is considered valid and treated by ignoring the connection. - zmq_assert (s != retired_fd); - fd_t sock = ::accept (s, NULL, NULL); + zmq_assert (_s != retired_fd); + fd_t sock = ::accept (_s, NULL, NULL); #ifdef ZMQ_HAVE_WINDOWS if (sock == INVALID_SOCKET) { diff --git a/src/vmci_listener.hpp b/src/vmci_listener.hpp index 4ab11842..abce0793 100644 --- a/src/vmci_listener.hpp +++ b/src/vmci_listener.hpp @@ -37,57 +37,37 @@ #include #include "fd.hpp" -#include "own.hpp" -#include "stdint.hpp" -#include "io_object.hpp" +#include "vmci_address.hpp" +#include "stream_listener_base.hpp" namespace zmq { -class io_thread_t; -class socket_base_t; - -// TODO consider refactoring this to derive from stream_listener_base_t -class vmci_listener_t ZMQ_FINAL : public own_t, public io_object_t +class vmci_listener_t ZMQ_FINAL : public stream_listener_base_t { public: vmci_listener_t (zmq::io_thread_t *io_thread_, zmq::socket_base_t *socket_, const options_t &options_); - ~vmci_listener_t (); // Set address to listen on. int set_local_address (const char *addr_); - // Get the bound address for use with wildcards - int get_local_address (std::string &addr_); + protected: + std::string get_socket_name (fd_t fd_, socket_end_t socket_end_) const; private: - // Handlers for incoming commands. - void process_plug (); - void process_term (int linger_); - // Handlers for I/O events. void in_event (); - // Close the listening socket. - void close (); - // Accept the new connection. Returns the file descriptor of the // newly created connection. The function may return retired_fd // if the connection was dropped while waiting in the listen backlog. fd_t accept (); - // Underlying socket. - fd_t s; + int create_socket (const char *addr_); - // Handle corresponding to the listening socket. - handle_t handle; - - // Socket the listerner belongs to. - zmq::socket_base_t *socket; - - // String representation of endpoint to bind to - std::string endpoint; + // Address to listen on. + vmci_address_t _address; ZMQ_NON_COPYABLE_NOR_MOVABLE (vmci_listener_t) }; diff --git a/tests/test_pair_vmci.cpp b/tests/test_pair_vmci.cpp index 721c1ba5..2e20cc0c 100644 --- a/tests/test_pair_vmci.cpp +++ b/tests/test_pair_vmci.cpp @@ -48,10 +48,10 @@ void test_pair_vmci () void *sc = test_context_socket (ZMQ_PAIR); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ())); - bounce (sb, sc); + expect_bounce_fail (sb, sc); - test_context_socket_close (sc); - test_context_socket_close (sb); + test_context_socket_close_zero_linger (sc); + test_context_socket_close_zero_linger (sb); } int main (void) diff --git a/tests/test_reqrep_vmci.cpp b/tests/test_reqrep_vmci.cpp index 0672f68d..24dd72b1 100644 --- a/tests/test_reqrep_vmci.cpp +++ b/tests/test_reqrep_vmci.cpp @@ -42,16 +42,16 @@ void test_reqrep_vmci () s << "vmci://" << VMCISock_GetLocalCID () << ":" << 5560; std::string endpoint = s.str (); - void *sb = test_context_socket (ZMQ_REP); + void *sb = test_context_socket (ZMQ_DEALER); TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, endpoint.c_str ())); - void *sc = test_context_socket (ZMQ_REQ); + void *sc = test_context_socket (ZMQ_DEALER); TEST_ASSERT_SUCCESS_ERRNO (zmq_connect (sc, endpoint.c_str ())); - bounce (sb, sc); + expect_bounce_fail (sb, sc); - test_context_socket_close (sc); - test_context_socket_close (sb); + test_context_socket_close_zero_linger (sc); + test_context_socket_close_zero_linger (sb); } int main (void)