From 538e5d47421f45d88a8b3b005ed9ee4026623a7b Mon Sep 17 00:00:00 2001 From: Ilya Kulakov Date: Wed, 3 Feb 2016 19:12:11 +0600 Subject: [PATCH] Make VMCI to work on Windows. select was improved to support multiple service providers on Windows. it should be slightly faster because of optimized iteration over selected sockets. --- src/select.cpp | 404 ++++++++++++++++++++++++++++++++--------- src/select.hpp | 79 +++++--- src/vmci.cpp | 8 +- src/vmci_connecter.cpp | 65 +++++-- src/vmci_connecter.hpp | 4 +- src/vmci_listener.cpp | 47 ++++- 6 files changed, 481 insertions(+), 126 deletions(-) diff --git a/src/select.cpp b/src/select.cpp index ec0d2e82..3984146b 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -31,6 +31,7 @@ #if defined ZMQ_USE_SELECT #include "platform.hpp" + #if defined ZMQ_HAVE_WINDOWS #include "windows.hpp" #elif defined ZMQ_HAVE_HPUX @@ -44,23 +45,21 @@ #include #endif -#include -#include - #include "err.hpp" #include "config.hpp" #include "i_poll_events.hpp" zmq::select_t::select_t (const zmq::ctx_t &ctx_) : - ctx(ctx_), - maxfd (retired_fd), + ctx (ctx_), +#if defined ZMQ_HAVE_WINDOWS + // Fine as long as map is not cleared. + current_family_entry_it (family_entries.end ()), +#else retired (false), + maxfd (retired_fd), +#endif stopping (false) { - // Clear file descriptor sets. - FD_ZERO (&source_set_in); - FD_ZERO (&source_set_out); - FD_ZERO (&source_set_err); } zmq::select_t::~select_t () @@ -70,22 +69,24 @@ zmq::select_t::~select_t () zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) { - // Store the file descriptor. - fd_entry_t entry = {fd_, events_}; - fds.push_back (entry); + fd_entry_t fd_entry; + fd_entry.fd = fd_; + fd_entry.events = events_; - // Ensure we do not attempt to select () on more than FD_SETSIZE - // file descriptors. - zmq_assert (fds.size () <= FD_SETSIZE); +#if defined ZMQ_HAVE_WINDOWS + u_short family = get_fd_family (fd_); + wsa_assert (family != AF_UNSPEC); + family_entry_t& family_entry = family_entries [family]; + family_entry.fd_entries.push_back (fd_entry); + FD_SET (fd_, &family_entry.fds_set.error); +#else + fd_entries.push_back (fd_entry); + FD_SET (fd_, &fds_set.error); - // Start polling on errors. - FD_SET (fd_, &source_set_err); - - // Adjust maxfd if necessary. if (fd_ > maxfd) maxfd = fd_; +#endif - // Increase the load metric of the thread. adjust_load (1); return fd_; @@ -93,56 +94,106 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) void zmq::select_t::rm_fd (handle_t handle_) { - // Mark the descriptor as retired. - fd_set_t::iterator it; - for (it = fds.begin (); it != fds.end (); ++it) - if (it->fd == handle_) +#if defined ZMQ_HAVE_WINDOWS + u_short family = get_fd_family (handle_); + wsa_assert (family != AF_UNSPEC); + + family_entries_t::iterator family_entry_it = family_entries.find (family); + family_entry_t& family_entry = family_entry_it->second; + + if (family_entry_it != current_family_entry_it) { + // Family is not currently being iterated and can be safely + // modified in palce. So later it can be skipped withour re-verifying + // its content. + fd_entries_t::iterator fd_entry_it; + for (fd_entry_it = family_entry.fd_entries.begin (); + fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it) + if (fd_entry_it->fd == handle_) + break; + zmq_assert (fd_entry_it != family_entry.fd_entries.end ()); + + family_entry.fd_entries.erase (fd_entry_it); + family_entry.fds_set.remove_fd (handle_); + } else { + // Otherwise mark removed entries as retired. It will be cleaned up + // at the end of the iteration. See zmq::select_t::loop + fd_entries_t::iterator fd_entry_it; + for (fd_entry_it = family_entry.fd_entries.begin (); + fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it) + if (fd_entry_it->fd == handle_) + break; + zmq_assert (fd_entry_it != family_entry.fd_entries.end ()); + + fd_entry_it->fd = retired_fd; + family_entry.fds_set.remove_fd (handle_); + family_entry.retired = true; + } +#else + fd_entries_t::iterator fd_entry_it; + for (fd_entry_it = fd_entries.begin (); + fd_entry_it != fd_entries.end (); ++fd_entry_it) + if (fd_entry_it->fd == handle_) break; - zmq_assert (it != fds.end ()); - it->fd = retired_fd; - retired = true; + zmq_assert (fd_entry_it != fd_entries.end ()); - // Stop polling on the descriptor. - FD_CLR (handle_, &source_set_in); - FD_CLR (handle_, &source_set_out); - FD_CLR (handle_, &source_set_err); + fd_entry_it->fd = retired_fd; + fds_set.remove_fd (handle_); - // Discard all events generated on this file descriptor. - FD_CLR (handle_, &readfds); - FD_CLR (handle_, &writefds); - FD_CLR (handle_, &exceptfds); - - // Adjust the maxfd attribute if we have removed the - // highest-numbered file descriptor. if (handle_ == maxfd) { maxfd = retired_fd; - for (fd_set_t::iterator it = fds.begin (); it != fds.end (); ++it) - if (it->fd > maxfd) - maxfd = it->fd; + for (fd_entry_it = fd_entries.begin (); fd_entry_it != fd_entries.end (); + ++fd_entry_it) + if (fd_entry_it->fd > maxfd) + maxfd = fd_entry_it->fd; } - // Decrease the load metric of the thread. + retired = true; +#endif adjust_load (-1); } void zmq::select_t::set_pollin (handle_t handle_) { - FD_SET (handle_, &source_set_in); +#if defined ZMQ_HAVE_WINDOWS + u_short family = get_fd_family (handle_); + wsa_assert (family != AF_UNSPEC); + FD_SET (handle_, &family_entries [family].fds_set.read); +#else + FD_SET (handle_, &fds_set.read); +#endif } void zmq::select_t::reset_pollin (handle_t handle_) { - FD_CLR (handle_, &source_set_in); +#if defined ZMQ_HAVE_WINDOWS + u_short family = get_fd_family (handle_); + wsa_assert (family != AF_UNSPEC); + FD_CLR (handle_, &family_entries [family].fds_set.read); +#else + FD_CLR (handle_, &fds_set.read); +#endif } void zmq::select_t::set_pollout (handle_t handle_) { - FD_SET (handle_, &source_set_out); +#if defined ZMQ_HAVE_WINDOWS + u_short family = get_fd_family (handle_); + wsa_assert (family != AF_UNSPEC); + FD_SET (handle_, &family_entries [family].fds_set.write); +#else + FD_SET (handle_, &fds_set.write); +#endif } void zmq::select_t::reset_pollout (handle_t handle_) { - FD_CLR (handle_, &source_set_out); +#if defined ZMQ_HAVE_WINDOWS + u_short family = get_fd_family (handle_); + wsa_assert (family != AF_UNSPEC); + FD_CLR (handle_, &family_entries [family].fds_set.write); +#else + FD_CLR (handle_, &fds_set.write); +#endif } void zmq::select_t::start () @@ -163,61 +214,181 @@ int zmq::select_t::max_fds () void zmq::select_t::loop () { while (!stopping) { - // Execute any due timers. int timeout = (int) execute_timers (); - // Intialise the pollsets. - memcpy (&readfds, &source_set_in, sizeof source_set_in); - memcpy (&writefds, &source_set_out, sizeof source_set_out); - memcpy (&exceptfds, &source_set_err, sizeof source_set_err); - - // Wait for events. -#ifdef ZMQ_HAVE_OSX - struct timeval tv = {(long) (timeout / 1000), timeout % 1000 * 1000}; +#if defined ZMQ_HAVE_OSX + struct timeval tv = { (long) (timeout / 1000), timeout % 1000 * 1000 }; #else - struct timeval tv = {(long) (timeout / 1000), - (long) (timeout % 1000 * 1000)}; + struct timeval tv = { (long) (timeout / 1000), (long) (timeout % 1000 * 1000) }; #endif -#ifdef ZMQ_HAVE_WINDOWS - int rc = select (0, &readfds, &writefds, &exceptfds, - timeout ? &tv : NULL); - wsa_assert (rc != SOCKET_ERROR); + + int rc = 0; + +#if defined ZMQ_HAVE_WINDOWS + /* + On Windows select does not allow to mix descriptors from different + service providers. It seems to work for AF_INET and AF_INET6, + but fails for AF_INET and VMCI. The workaround is to use + WSAEventSelect and WSAWaitForMultipleEvents to wait, then use + select to find out what actually changed. WSAWaitForMultipleEvents + cannot be used alone, because it does not support more than 64 events + which is not enough. + + To reduce unncessary overhead, WSA is only used when there are more + than one family. Moreover, AF_INET and AF_INET6 are considered the same + family because Windows seems to handle them properly. + See get_fd_family for details. + */ + wsa_events_t wsa_events; + + // If there is just one family, there is no reason to use WSA events. + if (family_entries.size () > 1) { + for (family_entries_t::iterator family_entry_it = family_entries.begin (); + family_entry_it != family_entries.end (); ++family_entry_it) { + family_entry_t& family_entry = family_entry_it->second; + + for (fd_entries_t::iterator fd_entry_it = family_entry.fd_entries.begin (); + fd_entry_it != family_entry.fd_entries.end (); ++fd_entry_it) { + fd_t fd = fd_entry_it->fd; + + // http://stackoverflow.com/q/35043420/188530 + if (FD_ISSET (fd, &family_entry.fds_set.read) && + FD_ISSET (fd, &family_entry.fds_set.write)) + rc = WSAEventSelect (fd, wsa_events.events [3], + FD_READ | FD_ACCEPT | FD_CLOSE | FD_WRITE | FD_CONNECT | FD_OOB); + else if (FD_ISSET (fd, &family_entry.fds_set.read)) + rc = WSAEventSelect (fd, wsa_events.events [0], + FD_READ | FD_ACCEPT | FD_CLOSE | FD_OOB); + else if (FD_ISSET (fd, &family_entry.fds_set.write)) + rc = WSAEventSelect (fd, wsa_events.events [1], + FD_WRITE | FD_CONNECT | FD_OOB); + else if (FD_ISSET (fd, &family_entry.fds_set.error)) + rc = WSAEventSelect (fd, wsa_events.events [2], + FD_OOB); + else + rc = 0; + + wsa_assert (rc != SOCKET_ERROR); + } + } + } +#endif + +#if defined ZMQ_HAVE_WINDOWS + if (family_entries.size () > 1) { + rc = WSAWaitForMultipleEvents (4, wsa_events.events, FALSE, + timeout ? timeout : INFINITE, FALSE); + wsa_assert (rc != WSA_WAIT_FAILED); + zmq_assert (rc != WSA_WAIT_IO_COMPLETION); + + if (rc == WSA_WAIT_TIMEOUT) + continue; + } + + for (current_family_entry_it = family_entries.begin (); + current_family_entry_it != family_entries.end (); ++current_family_entry_it) { + family_entry_t& family_entry = current_family_entry_it->second; + + // select will fail when run with empty sets. + if (family_entry.fd_entries.empty ()) + continue; + + fds_set_t local_fds_set = family_entry.fds_set; + + if (family_entries.size () > 1) { + // There is no reason to wait again after WSAWaitForMultipleEvents. + // Simply collect what is ready. + struct timeval tv_nodelay = { 0, 0 }; + rc = select (0, &local_fds_set.read, &local_fds_set.write, &local_fds_set.error, + &tv_nodelay); + } + else + rc = select (0, &local_fds_set.read, &local_fds_set.write, + &local_fds_set.error, timeout > 0 ? &tv : NULL); + + wsa_assert (rc != SOCKET_ERROR); + + // Size is cached to avoid iteration through recently added descriptors. + for (fd_entries_t::size_type i = 0, size = family_entry.fd_entries.size (); i < size && rc > 0; ++i) { + fd_entry_t& fd_entry = family_entry.fd_entries [i]; + + if (fd_entry.fd == retired_fd) + continue; + + if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) { + fd_entry.events->in_event (); + --rc; + } + + if (fd_entry.fd == retired_fd || rc == 0) + continue; + + if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) { + fd_entry.events->out_event (); + --rc; + } + + if (fd_entry.fd == retired_fd || rc == 0) + continue; + + if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) { + fd_entry.events->in_event (); + --rc; + } + } + + if (family_entry.retired) { + family_entry.retired = false; + family_entry.fd_entries.erase (std::remove_if (family_entry.fd_entries.begin (), + family_entry.fd_entries.end (), is_retired_fd), family_entry.fd_entries.end ()); + } + } #else - int rc = select (maxfd + 1, &readfds, &writefds, &exceptfds, - timeout ? &tv : NULL); + fds_set_t local_fds_set = fds_set; + rc = select (maxfd + 1, &local_fds_set.read, &local_fds_set.write, + &local_fds_set.error, timeout ? &tv : NULL); + if (rc == -1) { errno_assert (errno == EINTR); continue; } -#endif - // If there are no events (i.e. it's a timeout) there's no point - // in checking the pollset. - if (rc == 0) - continue; + // Size is cached to avoid iteration through just added descriptors. + for (fd_entries_t::size_type i = 0, size = fd_entries.size (); i < size && rc > 0; ++i) { + fd_entry_t& fd_entry = fd_entries [i]; - for (fd_set_t::size_type i = 0; i < fds.size (); i ++) { - if (fds [i].fd == retired_fd) + if (fd_entry.fd == retired_fd) continue; - if (FD_ISSET (fds [i].fd, &exceptfds)) - fds [i].events->in_event (); - if (fds [i].fd == retired_fd) + + if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) { + fd_entry.events->in_event (); + --rc; + } + + if (fd_entry.fd == retired_fd || rc == 0) continue; - if (FD_ISSET (fds [i].fd, &writefds)) - fds [i].events->out_event (); - if (fds [i].fd == retired_fd) + + if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) { + fd_entry.events->out_event (); + --rc; + } + + if (fd_entry.fd == retired_fd || rc == 0) continue; - if (FD_ISSET (fds [i].fd, &readfds)) - fds [i].events->in_event (); + + if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) { + fd_entry.events->in_event (); + --rc; + } } - // Destroy retired event sources. if (retired) { - fds.erase (std::remove_if (fds.begin (), fds.end (), - zmq::select_t::is_retired_fd), fds.end ()); retired = false; + fd_entries.erase (std::remove_if (fd_entries.begin (), fd_entries.end (), + is_retired_fd), fd_entries.end ()); } +#endif } } @@ -226,9 +397,80 @@ void zmq::select_t::worker_routine (void *arg_) ((select_t*) arg_)->loop (); } +zmq::select_t::fds_set_t::fds_set_t () +{ + FD_ZERO (&read); + FD_ZERO (&write); + FD_ZERO (&error); +} + +zmq::select_t::fds_set_t::fds_set_t (const fds_set_t& other_) +{ + memcpy (&read, &other_.read, sizeof other_.read); + memcpy (&write, &other_.write, sizeof other_.write); + memcpy (&error, &other_.error, sizeof other_.error); +} + +zmq::select_t::fds_set_t& zmq::select_t::fds_set_t::operator= (const fds_set_t& other_) +{ + memcpy (&read, &other_.read, sizeof other_.read); + memcpy (&write, &other_.write, sizeof other_.write); + memcpy (&error, &other_.error, sizeof other_.error); + return *this; +} + +void zmq::select_t::fds_set_t::remove_fd (const fd_t& fd_) +{ + FD_CLR (fd_, &read); + FD_CLR (fd_, &write); + FD_CLR (fd_, &error); +} + bool zmq::select_t::is_retired_fd (const fd_entry_t &entry) { return (entry.fd == retired_fd); } +#if defined ZMQ_HAVE_WINDOWS +u_short zmq::select_t::get_fd_family (fd_t fd_) +{ + sockaddr addr{ 0 }; + int addr_size = sizeof addr; + int rc = getsockname (fd_, &addr, &addr_size); + + // AF_INET and AF_INET6 can be mixed in select + // TODO: If proven otherwise, should simply return addr.sa_family + if (rc != SOCKET_ERROR) + return addr.sa_family == AF_INET6 ? AF_INET : addr.sa_family; + else + return AF_UNSPEC; +} + +zmq::select_t::family_entry_t::family_entry_t () : + retired (false) +{ +} + + +zmq::select_t::wsa_events_t::wsa_events_t () +{ + events [0] = WSACreateEvent (); + wsa_assert (events [0] != WSA_INVALID_EVENT); + events [1] = WSACreateEvent (); + wsa_assert (events [1] != WSA_INVALID_EVENT); + events [2] = WSACreateEvent (); + wsa_assert (events [2] != WSA_INVALID_EVENT); + events [3] = WSACreateEvent (); + wsa_assert (events [3] != WSA_INVALID_EVENT); +} + +zmq::select_t::wsa_events_t::~wsa_events_t () +{ + wsa_assert (WSACloseEvent (events [0])); + wsa_assert (WSACloseEvent (events [1])); + wsa_assert (WSACloseEvent (events [2])); + wsa_assert (WSACloseEvent (events [3])); +} +#endif + #endif diff --git a/src/select.hpp b/src/select.hpp index 6e0dc24e..d69069d8 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -38,9 +38,10 @@ #include #include +#include -#ifdef ZMQ_HAVE_WINDOWS -#include +#if defined ZMQ_HAVE_WINDOWS +#include "windows.hpp" #elif defined ZMQ_HAVE_OPENVMS #include #include @@ -90,37 +91,69 @@ namespace zmq // Main event loop. void loop (); - // Reference to ZMQ context. + // Reference to ZMQ context. const ctx_t &ctx; + // Internal state. + struct fds_set_t + { + fds_set_t (); + fds_set_t (const fds_set_t& other_); + fds_set_t& operator=(const fds_set_t& other_); + // Convinient method to descriptor from all sets. + void remove_fd (const fd_t& fd_); + + fd_set read; + fd_set write; + fd_set error; + }; + struct fd_entry_t { fd_t fd; - zmq::i_poll_events *events; + zmq::i_poll_events* events; }; + typedef std::vector fd_entries_t; +#if defined ZMQ_HAVE_WINDOWS + struct family_entry_t + { + family_entry_t (); + + fd_entries_t fd_entries; + fds_set_t fds_set; + bool retired; + }; + typedef std::map family_entries_t; + + struct wsa_events_t + { + wsa_events_t (); + ~wsa_events_t (); + + // read, write, error and readwrite + WSAEVENT events [4]; + }; +#endif + +#if defined ZMQ_HAVE_WINDOWS + family_entries_t family_entries; + // See loop for details. + family_entries_t::iterator current_family_entry_it; +#else + fd_entries_t fd_entries; + fds_set_t fds_set; + fd_t maxfd; + bool retired; +#endif + +#if defined ZMQ_HAVE_WINDOWS + // Socket's family or AF_UNSPEC on error. + static u_short get_fd_family (fd_t fd_); +#endif // Checks if an fd_entry_t is retired. static bool is_retired_fd (const fd_entry_t &entry); - // Set of file descriptors that are used to retrieve - // information for fd_set. - typedef std::vector fd_set_t; - fd_set_t fds; - - fd_set source_set_in; - fd_set source_set_out; - fd_set source_set_err; - - fd_set readfds; - fd_set writefds; - fd_set exceptfds; - - // Maximum file descriptor. - fd_t maxfd; - - // If true, at least one file descriptor has retired. - bool retired; - // If true, thread is shutting down. bool stopping; diff --git a/src/vmci.cpp b/src/vmci.cpp index c06e09bb..b4823382 100644 --- a/src/vmci.cpp +++ b/src/vmci.cpp @@ -40,7 +40,7 @@ void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default assert (family != -1); if (default_size_ != 0) { - int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, &default_size_, sizeof default_size_); + int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, (char*) &default_size_, sizeof default_size_); #if defined ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else @@ -49,7 +49,7 @@ void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default } if (min_size_ != 0) { - int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, &min_size_, sizeof min_size_); + int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, (char*) &min_size_, sizeof min_size_); #if defined ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else @@ -58,7 +58,7 @@ void zmq::tune_vmci_buffer_size (ctx_t *context_, fd_t sockfd_, uint64_t default } if (max_size_ != 0) { - int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, &max_size_, sizeof max_size_); + int rc = setsockopt (sockfd_, family, SO_VMCI_BUFFER_SIZE, (char*) &max_size_, sizeof max_size_); #if defined ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else @@ -76,7 +76,7 @@ void zmq::tune_vmci_connect_timeout (ctx_t *context_, fd_t sockfd_, struct timev int family = context_->get_vmci_socket_family (); assert (family != -1); - int rc = setsockopt (sockfd_, family, SO_VMCI_CONNECT_TIMEOUT, &timeout_, sizeof timeout_); + int rc = setsockopt (sockfd_, family, SO_VMCI_CONNECT_TIMEOUT, (char*) &timeout_, sizeof timeout_); #if defined ZMQ_HAVE_WINDOWS wsa_assert (rc != SOCKET_ERROR); #else diff --git a/src/vmci_connecter.cpp b/src/vmci_connecter.cpp index f7c149b8..9a974c74 100644 --- a/src/vmci_connecter.cpp +++ b/src/vmci_connecter.cpp @@ -32,7 +32,6 @@ #if defined ZMQ_HAVE_VMCI #include -#include #include "stream_engine.hpp" #include "io_thread.hpp" @@ -118,7 +117,8 @@ void zmq::vmci_connecter_t::out_event () return; } - tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size, options.vmci_buffer_min_size, options.vmci_buffer_max_size); + tune_vmci_buffer_size (this->get_ctx (), fd, options.vmci_buffer_size, + options.vmci_buffer_min_size, options.vmci_buffer_max_size); if (options.vmci_connect_timeout > 0) { @@ -218,8 +218,15 @@ int zmq::vmci_connecter_t::open () // Create the socket. s = open_socket (family, SOCK_STREAM, 0); +#ifdef ZMQ_HAVE_WINDOWS + if (s == INVALID_SOCKET) { + errno = wsa_error_to_errno(WSAGetLastError()); + return -1; + } +#else if (s == -1) return -1; +#endif // Set the non-blocking flag. unblock_socket (s); @@ -233,12 +240,18 @@ int zmq::vmci_connecter_t::open () if (rc == 0) return 0; - // Translate other error codes indicating asynchronous connect has been + // Translate error codes indicating asynchronous connect has been // launched to a uniform EINPROGRESS. - if (rc == -1 && errno == EINTR) { +#ifdef ZMQ_HAVE_WINDOWS + const int error_code = WSAGetLastError(); + if (error_code == WSAEINPROGRESS || error_code == WSAEWOULDBLOCK) errno = EINPROGRESS; - return -1; - } + else + errno = wsa_error_to_errno(error_code); +#else + if (errno == EINTR) + errno = EINPROGRESS; +#endif // Forward the error. return -1; @@ -269,19 +282,45 @@ zmq::fd_t zmq::vmci_connecter_t::connect () socklen_t len = sizeof (err); #endif int rc = getsockopt (s, SOL_SOCKET, SO_ERROR, (char*) &err, &len); + + // Assert if the error was caused by 0MQ bug. + // Networking problems are OK. No need to assert. +#ifdef ZMQ_HAVE_WINDOWS + zmq_assert(rc == 0); + if (err != 0) { + if (err != WSAECONNREFUSED + && err != WSAETIMEDOUT + && err != WSAECONNABORTED + && err != WSAEHOSTUNREACH + && err != WSAENETUNREACH + && err != WSAENETDOWN + && err != WSAEACCES + && err != WSAEINVAL + && err != WSAEADDRINUSE + && err != WSAECONNRESET) + { + wsa_assert_no(err); + } + return retired_fd; + } +#else + // Following code should handle both Berkeley-derived socket + // implementations and Solaris. if (rc == -1) err = errno; if (err != 0) { - - // Assert if the error was caused by 0MQ bug. - // Networking problems are OK. No need to assert. errno = err; - errno_assert (errno == ECONNREFUSED || errno == ECONNRESET || - errno == ETIMEDOUT || errno == EHOSTUNREACH || - errno == ENETUNREACH || errno == ENETDOWN); - + errno_assert( + errno == ECONNREFUSED || + errno == ECONNRESET || + errno == ETIMEDOUT || + errno == EHOSTUNREACH || + errno == ENETUNREACH || + errno == ENETDOWN || + errno == EINVAL); return retired_fd; } +#endif fd_t result = s; s = retired_fd; diff --git a/src/vmci_connecter.hpp b/src/vmci_connecter.hpp index 4a5b190c..1636aa16 100644 --- a/src/vmci_connecter.hpp +++ b/src/vmci_connecter.hpp @@ -27,8 +27,8 @@ along with this program. If not, see . */ -#ifndef __VMCI_CONNECTER_HPP_INCLUDED__ -#define __VMCI_CONNECTER_HPP_INCLUDED__ +#ifndef __ZMQ_VMCI_CONNECTER_HPP_INCLUDED__ +#define __ZMQ_VMCI_CONNECTER_HPP_INCLUDED__ #include "platform.hpp" diff --git a/src/vmci_listener.cpp b/src/vmci_listener.cpp index e3c1de3f..047ebadc 100644 --- a/src/vmci_listener.cpp +++ b/src/vmci_listener.cpp @@ -33,8 +33,6 @@ #include -#include - #include "stream_engine.hpp" #include "vmci_address.hpp" #include "io_thread.hpp" @@ -153,20 +151,46 @@ int zmq::vmci_listener_t::set_address (const char *addr_) // Create a listening socket. s = open_socket (this->get_ctx ()->get_vmci_socket_family (), SOCK_STREAM, 0); +#ifdef ZMQ_HAVE_WINDOWS + if (s == INVALID_SOCKET) { + errno = wsa_error_to_errno(WSAGetLastError()); + return -1; + } +#if !defined _WIN32_WCE + // On Windows, preventing sockets to be inherited by child processes. + BOOL brc = SetHandleInformation((HANDLE)s, HANDLE_FLAG_INHERIT, 0); + win_assert(brc); +#endif +#else if (s == -1) return -1; +#endif address.to_string (endpoint); // Bind the socket. rc = bind (s, address.addr (), address.addrlen ()); +#ifdef ZMQ_HAVE_WINDOWS + if (rc == SOCKET_ERROR) { + errno = wsa_error_to_errno(WSAGetLastError()); + goto error; + } +#else if (rc != 0) goto error; +#endif // Listen for incoming connections. rc = listen (s, options.backlog); +#ifdef ZMQ_HAVE_WINDOWS + if (rc == SOCKET_ERROR) { + errno = wsa_error_to_errno(WSAGetLastError()); + goto error; + } +#else if (rc != 0) goto error; +#endif socket->event_listening (endpoint, s); return 0; @@ -199,12 +223,29 @@ zmq::fd_t zmq::vmci_listener_t::accept () // resources is considered valid and treated by ignoring the connection. zmq_assert (s != retired_fd); fd_t sock = ::accept (s, NULL, NULL); + +#ifdef ZMQ_HAVE_WINDOWS + if (sock == INVALID_SOCKET) { + wsa_assert(WSAGetLastError() == WSAEWOULDBLOCK || + WSAGetLastError() == WSAECONNRESET || + WSAGetLastError() == WSAEMFILE || + WSAGetLastError() == WSAENOBUFS); + return retired_fd; + } +#if !defined _WIN32_WCE + // On Windows, preventing sockets to be inherited by child processes. + BOOL brc = SetHandleInformation((HANDLE)sock, HANDLE_FLAG_INHERIT, 0); + win_assert(brc); +#endif +#else if (sock == -1) { - errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || + errno_assert(errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR || errno == ECONNABORTED || errno == EPROTO || + errno == ENOBUFS || errno == ENOMEM || errno == EMFILE || errno == ENFILE); return retired_fd; } +#endif // Race condition can cause socket not to be closed (if fork happens // between accept and this point).