0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-26 06:41:03 +08:00

Merge pull request #3751 from sigiesec/windows-domain-sockets-signaler

Use Unix domain sockets for listener when available on Windows
This commit is contained in:
Luca Boccassi 2019-12-07 00:25:21 +00:00 committed by GitHub
commit df993d113c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 229 additions and 124 deletions

View File

@ -10,6 +10,15 @@ environment:
MSVCYEAR: "vs2013"
ENABLE_DRAFTS: ON
matrix:
- platform: x64
configuration: Release
WITH_LIBSODIUM: ON
ENABLE_CURVE: ON
APPVEYOR_BUILD_WORKER_IMAGE: Visual Studio 2019
CMAKE_GENERATOR: "Visual Studio 16 2019"
MSVCVERSION: "v142"
MSVCYEAR: "vs2019"
ARTIFACT_NAME: v142-x64
- platform: Win32
configuration: Release
WITH_LIBSODIUM: OFF # unavailable build files for VS2008
@ -162,7 +171,7 @@ install:
- cmd: if "%Platform%"=="cygwin64" set PATH=C:\cygwin64\bin;%PATH%
- cmd: if "%Platform%"=="mingw64" C:\msys64\usr\bin\bash -lc "pacman -Qg"
- cmd: if "%Platform%"=="mingw64" set PATH=C:\msys64\usr\bin;%PATH%
- cmd: if "%Platform%"=="x64" set "CMAKE_GENERATOR=%CMAKE_GENERATOR% Win64"
- cmd: if "%Platform%"=="x64" (if not "%MSVCVERSION%"=="v142" set "CMAKE_GENERATOR=%CMAKE_GENERATOR% Win64")
- cmd: echo "Generator='%CMAKE_GENERATOR%'"
- cmd: echo "Platform='%Platform%'"
- cmd: if "%WITH_LIBSODIUM%"=="ON" set LIBSODIUMDIR=C:\projects\libsodium

View File

@ -38,11 +38,21 @@
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdlib.h>
#include <unistd.h>
#include <vector>
#else
#include "tcp.hpp"
#ifdef ZMQ_HAVE_IPC
#include "ipc_address.hpp"
#endif
#include <direct.h>
#endif
#if defined ZMQ_HAVE_OPENVMS || defined ZMQ_HAVE_VXWORKS
@ -71,6 +81,14 @@
#include <TargetConditionals.h>
#endif
#ifndef ZMQ_HAVE_WINDOWS
// Acceptable temporary directory environment variables
static const char *tmp_env_vars[] = {
"TMPDIR", "TEMPDIR", "TMP",
0 // Sentinel
};
#endif
zmq::fd_t zmq::open_socket (int domain_, int type_, int protocol_)
{
int rc;
@ -316,29 +334,9 @@ static void tune_socket (const SOCKET socket_)
zmq::tcp_tune_loopback_fast_path (socket_);
}
#endif
int zmq::make_fdpair (fd_t *r_, fd_t *w_)
static int make_fdpair_tcpip (zmq::fd_t *r_, zmq::fd_t *w_)
{
#if defined ZMQ_HAVE_EVENTFD
int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
// Setting this option result in sane behaviour when exec() functions
// are used. Old sockets are closed and don't block TCP ports, avoid
// leaks, etc.
flags |= EFD_CLOEXEC;
#endif
fd_t fd = eventfd (0, flags);
if (fd == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
} else {
*w_ = *r_ = fd;
return 0;
}
#elif defined ZMQ_HAVE_WINDOWS
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
// Windows CE does not manage security attributes
SECURITY_DESCRIPTOR sd;
@ -367,7 +365,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
// Otherwise use Mutex implementation.
int event_signaler_port = 5905;
if (signaler_port == event_signaler_port) {
if (zmq::signaler_port == event_signaler_port) {
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
sync =
CreateEventW (&sa, FALSE, TRUE, L"Global\\zmq-signaler-port-sync");
@ -380,14 +378,14 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
L"Global\\zmq-signaler-port-sync");
win_assert (sync != NULL);
} else if (signaler_port != 0) {
} else if (zmq::signaler_port != 0) {
wchar_t mutex_name[MAX_PATH];
#ifdef __MINGW32__
_snwprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
signaler_port);
#else
swprintf (mutex_name, MAX_PATH, L"Global\\zmq-signaler-port-%d",
signaler_port);
zmq::signaler_port);
#endif
#if !defined _WIN32_WCE && !defined ZMQ_HAVE_WINDOWS_UWP
@ -408,7 +406,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
// Create listening socket.
SOCKET listener;
listener = open_socket (AF_INET, SOCK_STREAM, 0);
listener = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (listener != INVALID_SOCKET);
// Set SO_REUSEADDR and TCP_NODELAY on listening socket.
@ -425,10 +423,10 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
memset (&addr, 0, sizeof addr);
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
addr.sin_port = htons (signaler_port);
addr.sin_port = htons (zmq::signaler_port);
// Create the writer socket.
*w_ = open_socket (AF_INET, SOCK_STREAM, 0);
*w_ = zmq::open_socket (AF_INET, SOCK_STREAM, 0);
wsa_assert (*w_ != INVALID_SOCKET);
if (sync != NULL) {
@ -441,7 +439,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
rc = bind (listener, reinterpret_cast<const struct sockaddr *> (&addr),
sizeof addr);
if (rc != SOCKET_ERROR && signaler_port == 0) {
if (rc != SOCKET_ERROR && zmq::signaler_port == 0) {
// Retrieve ephemeral port number
int addrlen = sizeof addr;
rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&addr),
@ -510,7 +508,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
if (sync != NULL) {
// Exit the critical section.
BOOL brc;
if (signaler_port == event_signaler_port)
if (zmq::signaler_port == event_signaler_port)
brc = SetEvent (sync);
else
brc = ReleaseMutex (sync);
@ -522,7 +520,7 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
}
if (*r_ != INVALID_SOCKET) {
make_socket_noninheritable (*r_);
zmq::make_socket_noninheritable (*r_);
return 0;
}
// Cleanup writer if connection failed
@ -532,10 +530,116 @@ int zmq::make_fdpair (fd_t *r_, fd_t *w_)
*w_ = INVALID_SOCKET;
}
// Set errno from saved value
errno = wsa_error_to_errno (saved_errno);
errno = zmq::wsa_error_to_errno (saved_errno);
return -1;
}
#endif
int zmq::make_fdpair (fd_t *r_, fd_t *w_)
{
#if defined ZMQ_HAVE_EVENTFD
int flags = 0;
#if defined ZMQ_HAVE_EVENTFD_CLOEXEC
// Setting this option result in sane behaviour when exec() functions
// are used. Old sockets are closed and don't block TCP ports, avoid
// leaks, etc.
flags |= EFD_CLOEXEC;
#endif
fd_t fd = eventfd (0, flags);
if (fd == -1) {
errno_assert (errno == ENFILE || errno == EMFILE);
*w_ = *r_ = -1;
return -1;
} else {
*w_ = *r_ = fd;
return 0;
}
#elif defined ZMQ_HAVE_WINDOWS
#ifdef ZMQ_HAVE_IPC
ipc_address_t address;
std::string dirname, filename;
// Create a listening socket.
SOCKET listener = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (listener == retired_fd) {
// This may happen if the library was built on a system supporting AF_UNIX, but the system running doesn't support it.
goto try_tcpip;
}
create_ipc_wildcard_address (dirname, filename);
// Initialise the address structure.
int rc = address.resolve (filename.c_str ());
if (rc != 0) {
goto error_closelistener;
}
// Bind the socket to the file path.
rc = bind (listener, const_cast<sockaddr *> (address.addr ()),
address.addrlen ());
if (rc != 0) {
errno = wsa_error_to_errno (WSAGetLastError ());
goto error_closelistener;
}
// Listen for incoming connections.
rc = listen (listener, 1);
if (rc != 0) {
errno = wsa_error_to_errno (WSAGetLastError ());
goto error_closelistener;
}
sockaddr_un lcladdr;
socklen_t lcladdr_len = sizeof lcladdr;
rc = getsockname (listener, reinterpret_cast<struct sockaddr *> (&lcladdr),
&lcladdr_len);
wsa_assert (rc != -1);
// Create the client socket.
*w_ = open_socket (AF_UNIX, SOCK_STREAM, 0);
if (*w_ == -1) {
errno = wsa_error_to_errno (WSAGetLastError ());
goto error_closelistener;
}
// Connect to the remote peer.
rc = ::connect (*w_, reinterpret_cast<const struct sockaddr *> (&lcladdr),
lcladdr_len);
if (rc == -1) {
goto error_closeclient;
}
*r_ = accept (listener, NULL, NULL);
errno_assert (*r_ != -1);
// Close the listener socket, we don't need it anymore.
rc = closesocket (listener);
wsa_assert (rc == 0);
return 0;
error_closeclient:
int saved_errno = errno;
rc = closesocket (*w_);
wsa_assert (rc == 0);
errno = saved_errno;
error_closelistener:
saved_errno = errno;
rc = closesocket (listener);
wsa_assert (rc == 0);
errno = saved_errno;
return -1;
try_tcpip:
// try to fallback to TCP/IP
// TODO: maybe remember this decision permanently?
#endif
return make_fdpair_tcpip (r_, w_);
#elif defined ZMQ_HAVE_OPENVMS
// Whilst OpenVMS supports socketpair - it maps to AF_INET only. Further,
@ -727,3 +831,80 @@ void zmq::assert_success_or_recoverable (zmq::fd_t s_, int rc_)
}
#endif
}
#ifdef ZMQ_HAVE_IPC
int zmq::create_ipc_wildcard_address (std::string &path_, std::string &file_)
{
#if defined ZMQ_HAVE_WINDOWS
char buffer[MAX_PATH];
{
const errno_t rc = tmpnam_s (buffer);
errno_assert (rc == 0);
}
// TODO or use CreateDirectoryA and specify permissions?
const int rc = _mkdir (buffer);
if (rc != 0) {
return -1;
}
path_.assign (buffer);
file_ = path_ + "/socket";
#else
std::string tmp_path;
// If TMPDIR, TEMPDIR, or TMP are available and are directories, create
// the socket directory there.
const char **tmp_env = tmp_env_vars;
while (tmp_path.empty () && *tmp_env != 0) {
char *tmpdir = getenv (*tmp_env);
struct stat statbuf;
// Confirm it is actually a directory before trying to use
if (tmpdir != 0 && ::stat (tmpdir, &statbuf) == 0
&& S_ISDIR (statbuf.st_mode)) {
tmp_path.assign (tmpdir);
if (*(tmp_path.rbegin ()) != '/') {
tmp_path.push_back ('/');
}
}
// Try the next environment variable
++tmp_env;
}
// Append a directory name
tmp_path.append ("tmpXXXXXX");
// We need room for tmp_path + trailing NUL
std::vector<char> buffer (tmp_path.length () + 1);
strcpy (&buffer[0], tmp_path.c_str ());
#if defined HAVE_MKDTEMP
// Create the directory. POSIX requires that mkdtemp() creates the
// directory with 0700 permissions, meaning the only possible race
// with socket creation could be the same user. However, since
// each socket is created in a directory created by mkdtemp(), and
// mkdtemp() guarantees a unique directory name, there will be no
// collision.
if (mkdtemp (&buffer[0]) == 0) {
return -1;
}
path_.assign (&buffer[0]);
file_ = path_ + "/socket";
#else
LIBZMQ_UNUSED (path_);
int fd = mkstemp (&buffer[0]);
if (fd == -1)
return -1;
::close (fd);
file_.assign (&buffer[0]);
#endif
#endif
return 0;
}
#endif

View File

@ -76,6 +76,11 @@ void make_socket_noninheritable (fd_t sock_);
// - an internal 0MQ error did not occur,
// - and, if a socket error occured, it can be recovered from.
void assert_success_or_recoverable (fd_t s_, int rc_);
#ifdef ZMQ_HAVE_IPC
// Create an IPC wildcard path address
int create_ipc_wildcard_address (std::string &path_, std::string &file_);
#endif
}
#endif

View File

@ -52,8 +52,6 @@
#include <afunix.h>
#include <direct.h>
#define S_ISDIR(m) (((m) &S_IFMT) == S_IFDIR)
#define rmdir _rmdir
#define unlink _unlink
@ -62,7 +60,6 @@
#include <sys/socket.h>
#include <fcntl.h>
#include <sys/un.h>
#include <sys/stat.h>
#endif
#ifdef ZMQ_HAVE_LOCAL_PEERCRED
@ -78,87 +75,6 @@
#endif
#endif
const char *zmq::ipc_listener_t::tmp_env_vars[] = {
"TMPDIR", "TEMPDIR", "TMP",
0 // Sentinel
};
int zmq::ipc_listener_t::create_wildcard_address (std::string &path_,
std::string &file_)
{
#if defined ZMQ_HAVE_WINDOWS
char buffer[MAX_PATH];
{
const errno_t rc = tmpnam_s (buffer);
errno_assert (rc == 0);
}
// TODO or use CreateDirectoryA and specify permissions?
const int rc = _mkdir (buffer);
if (rc != 0) {
return -1;
}
path_.assign (buffer);
file_ = path_ + "/socket";
#else
std::string tmp_path;
// If TMPDIR, TEMPDIR, or TMP are available and are directories, create
// the socket directory there.
const char **tmp_env = tmp_env_vars;
while (tmp_path.empty () && *tmp_env != 0) {
char *tmpdir = getenv (*tmp_env);
struct stat statbuf;
// Confirm it is actually a directory before trying to use
if (tmpdir != 0 && ::stat (tmpdir, &statbuf) == 0
&& S_ISDIR (statbuf.st_mode)) {
tmp_path.assign (tmpdir);
if (*(tmp_path.rbegin ()) != '/') {
tmp_path.push_back ('/');
}
}
// Try the next environment variable
++tmp_env;
}
// Append a directory name
tmp_path.append ("tmpXXXXXX");
// We need room for tmp_path + trailing NUL
std::vector<char> buffer (tmp_path.length () + 1);
strcpy (&buffer[0], tmp_path.c_str ());
#if defined HAVE_MKDTEMP
// Create the directory. POSIX requires that mkdtemp() creates the
// directory with 0700 permissions, meaning the only possible race
// with socket creation could be the same user. However, since
// each socket is created in a directory created by mkdtemp(), and
// mkdtemp() guarantees a unique directory name, there will be no
// collision.
if (mkdtemp (&buffer[0]) == 0) {
return -1;
}
path_.assign (&buffer[0]);
file_ = path_ + "/socket";
#else
LIBZMQ_UNUSED (path_);
int fd = mkstemp (&buffer[0]);
if (fd == -1)
return -1;
::close (fd);
file_.assign (&buffer[0]);
#endif
#endif
return 0;
}
zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
socket_base_t *socket_,
const options_t &options_) :
@ -197,7 +113,7 @@ int zmq::ipc_listener_t::set_local_address (const char *addr_)
// Allow wildcard file
if (options.use_fd == -1 && addr[0] == '*') {
if (create_wildcard_address (_tmp_socket_dirname, addr) < 0) {
if (create_ipc_wildcard_address (_tmp_socket_dirname, addr) < 0) {
return -1;
}
}

View File

@ -56,9 +56,6 @@ class ipc_listener_t : public stream_listener_base_t
// Handlers for I/O events.
void in_event ();
// Create wildcard path address
static int create_wildcard_address (std::string &path_, std::string &file_);
// Filter new connections if the OS provides a mechanism to get
// the credentials of the peer process. Called from accept().
#if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
@ -82,9 +79,6 @@ class ipc_listener_t : public stream_listener_base_t
// Name of the file associated with the UNIX domain address.
std::string _filename;
// Acceptable temporary directory environment variables
static const char *tmp_env_vars[];
ipc_listener_t (const ipc_listener_t &);
const ipc_listener_t &operator= (const ipc_listener_t &);
};

View File

@ -40,7 +40,7 @@ void tearDown ()
void test_capabilities ()
{
#if !defined(ZMQ_HAVE_WINDOWS) && !defined(ZMQ_HAVE_OPENVMS)
#if defined(ZMQ_HAVE_IPC)
TEST_ASSERT_TRUE (zmq_has ("ipc"));
#else
TEST_ASSERT_TRUE (!zmq_has ("ipc"));