mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-09 15:26:04 +00:00
Updated handling of Unix Domain Sockets, make use of temporary directories, and cleanup afterward. Fix test_term_endpoint handling of optvallen
This commit is contained in:
parent
96c9e4aabd
commit
b6080a798c
@ -49,6 +49,7 @@
|
|||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
|
#include <sys/stat.h>
|
||||||
|
|
||||||
#ifdef ZMQ_HAVE_LOCAL_PEERCRED
|
#ifdef ZMQ_HAVE_LOCAL_PEERCRED
|
||||||
# include <sys/types.h>
|
# include <sys/types.h>
|
||||||
@ -63,6 +64,58 @@
|
|||||||
# endif
|
# endif
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
const char *zmq::ipc_listener_t::tmp_env_vars[] = {
|
||||||
|
"TMPDIR",
|
||||||
|
"TEMPDIR",
|
||||||
|
"TMP",
|
||||||
|
0 // Sentinel
|
||||||
|
};
|
||||||
|
|
||||||
|
int zmq::ipc_listener_t::create_wildcard_address(std::string& path_)
|
||||||
|
{
|
||||||
|
std::string tmp_path;
|
||||||
|
|
||||||
|
// If TMPDIR, TEMPDIR, or TMP are available and are directories, create
|
||||||
|
// the socket directory there.
|
||||||
|
const char **tmp_env = tmp_env_vars;
|
||||||
|
while ( tmp_path.empty() && *tmp_env != 0 ) {
|
||||||
|
char *tmpdir = getenv(*tmp_env);
|
||||||
|
struct stat statbuf;
|
||||||
|
|
||||||
|
// Confirm it is actually a directory before trying to use
|
||||||
|
if ( tmpdir != 0 && ::stat(tmpdir, &statbuf) == 0 && S_ISDIR(statbuf.st_mode) ) {
|
||||||
|
tmp_path.assign(tmpdir);
|
||||||
|
if ( *(tmp_path.rbegin()) != '/' ) {
|
||||||
|
tmp_path.push_back('/');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try the next environment variable
|
||||||
|
++tmp_env;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append a directory name
|
||||||
|
tmp_path.append("tmpXXXXXX");
|
||||||
|
|
||||||
|
// We need room for tmp_path + trailing NUL
|
||||||
|
std::vector<char> buffer(tmp_path.length()+1);
|
||||||
|
strcpy(buffer.data(), tmp_path.c_str());
|
||||||
|
|
||||||
|
// Create the directory. POSIX requires that mkdtemp() creates the
|
||||||
|
// directory with 0700 permissions, meaning the only possible race
|
||||||
|
// with socket creation could be the same user. However, since
|
||||||
|
// each socket is created in a directory created by mkdtemp(), and
|
||||||
|
// mkdtemp() guarantees a unique directory name, there will be no
|
||||||
|
// collision.
|
||||||
|
if ( mkdtemp(buffer.data()) == 0 ) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
path_.assign(buffer.data());
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
|
zmq::ipc_listener_t::ipc_listener_t (io_thread_t *io_thread_,
|
||||||
socket_base_t *socket_, const options_t &options_) :
|
socket_base_t *socket_, const options_t &options_) :
|
||||||
own_t (io_thread_, options_),
|
own_t (io_thread_, options_),
|
||||||
@ -113,7 +166,7 @@ void zmq::ipc_listener_t::in_event ()
|
|||||||
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
io_thread_t *io_thread = choose_io_thread (options.affinity);
|
||||||
zmq_assert (io_thread);
|
zmq_assert (io_thread);
|
||||||
|
|
||||||
// Create and launch a session object.
|
// Create and launch a session object.
|
||||||
session_base_t *session = session_base_t::create (io_thread, false, socket,
|
session_base_t *session = session_base_t::create (io_thread, false, socket,
|
||||||
options, NULL);
|
options, NULL);
|
||||||
errno_assert (session);
|
errno_assert (session);
|
||||||
@ -148,12 +201,15 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
|
|||||||
|
|
||||||
// Allow wildcard file
|
// Allow wildcard file
|
||||||
if (addr [0] == '*') {
|
if (addr [0] == '*') {
|
||||||
char buffer [12] = "2134XXXXXX";
|
std::string tmp_path;
|
||||||
int fd = mkstemp (buffer);
|
|
||||||
if (fd == -1)
|
if ( create_wildcard_address(tmp_path) < 0 ) {
|
||||||
return -1;
|
return -1;
|
||||||
addr.assign (buffer);
|
}
|
||||||
::close (fd);
|
|
||||||
|
tmp_socket_dirname.assign(tmp_path);
|
||||||
|
|
||||||
|
addr.assign (tmp_path + "/socket");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get rid of the file associated with the UNIX domain socket that
|
// Get rid of the file associated with the UNIX domain socket that
|
||||||
@ -169,8 +225,16 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
|
|||||||
// Initialise the address structure.
|
// Initialise the address structure.
|
||||||
ipc_address_t address;
|
ipc_address_t address;
|
||||||
int rc = address.resolve (addr.c_str());
|
int rc = address.resolve (addr.c_str());
|
||||||
if (rc != 0)
|
if (rc != 0) {
|
||||||
|
if ( !tmp_socket_dirname.empty() ) {
|
||||||
|
// We need to preserve errno to return to the user
|
||||||
|
int errno_ = errno;
|
||||||
|
::rmdir(tmp_socket_dirname.c_str ());
|
||||||
|
tmp_socket_dirname.clear();
|
||||||
|
errno = errno_;
|
||||||
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
address.to_string (endpoint);
|
address.to_string (endpoint);
|
||||||
|
|
||||||
@ -179,8 +243,16 @@ int zmq::ipc_listener_t::set_address (const char *addr_)
|
|||||||
} else {
|
} else {
|
||||||
// Create a listening socket.
|
// Create a listening socket.
|
||||||
s = open_socket (AF_UNIX, SOCK_STREAM, 0);
|
s = open_socket (AF_UNIX, SOCK_STREAM, 0);
|
||||||
if (s == -1)
|
if (s == -1) {
|
||||||
|
if ( !tmp_socket_dirname.empty() ) {
|
||||||
|
// We need to preserve errno to return to the user
|
||||||
|
int errno_ = errno;
|
||||||
|
::rmdir(tmp_socket_dirname.c_str ());
|
||||||
|
tmp_socket_dirname.clear();
|
||||||
|
errno = errno_;
|
||||||
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// Bind the socket to the file path.
|
// Bind the socket to the file path.
|
||||||
rc = bind (s, address.addr (), address.addrlen ());
|
rc = bind (s, address.addr (), address.addrlen ());
|
||||||
@ -219,8 +291,18 @@ int zmq::ipc_listener_t::close ()
|
|||||||
// MUST NOT unlink if the FD is managed by the user, or it will stop
|
// MUST NOT unlink if the FD is managed by the user, or it will stop
|
||||||
// working after the first client connects. The user will take care of
|
// working after the first client connects. The user will take care of
|
||||||
// cleaning up the file after the service is stopped.
|
// cleaning up the file after the service is stopped.
|
||||||
if (has_file && !filename.empty () && options.use_fd == -1) {
|
if (has_file && options.use_fd == -1) {
|
||||||
rc = ::unlink(filename.c_str ());
|
rc = 0;
|
||||||
|
|
||||||
|
if ( !filename.empty () ) {
|
||||||
|
rc = ::unlink(filename.c_str ());
|
||||||
|
}
|
||||||
|
|
||||||
|
if ( rc == 0 && !tmp_socket_dirname.empty() ) {
|
||||||
|
rc = ::rmdir(tmp_socket_dirname.c_str ());
|
||||||
|
tmp_socket_dirname.clear();
|
||||||
|
}
|
||||||
|
|
||||||
if (rc != 0) {
|
if (rc != 0) {
|
||||||
socket->event_close_failed (endpoint, zmq_errno());
|
socket->event_close_failed (endpoint, zmq_errno());
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -73,6 +73,9 @@ namespace zmq
|
|||||||
// Close the listening socket.
|
// Close the listening socket.
|
||||||
int close ();
|
int close ();
|
||||||
|
|
||||||
|
// Create wildcard path address
|
||||||
|
static int create_wildcard_address(std::string& path_);
|
||||||
|
|
||||||
// Filter new connections if the OS provides a mechanism to get
|
// Filter new connections if the OS provides a mechanism to get
|
||||||
// the credentials of the peer process. Called from accept().
|
// the credentials of the peer process. Called from accept().
|
||||||
# if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
|
# if defined ZMQ_HAVE_SO_PEERCRED || defined ZMQ_HAVE_LOCAL_PEERCRED
|
||||||
@ -87,6 +90,10 @@ namespace zmq
|
|||||||
// True, if the underlying file for UNIX domain socket exists.
|
// True, if the underlying file for UNIX domain socket exists.
|
||||||
bool has_file;
|
bool has_file;
|
||||||
|
|
||||||
|
// Name of the temporary directory (if any) that has the
|
||||||
|
// the UNIX domain socket
|
||||||
|
std::string tmp_socket_dirname;
|
||||||
|
|
||||||
// Name of the file associated with the UNIX domain address.
|
// Name of the file associated with the UNIX domain address.
|
||||||
std::string filename;
|
std::string filename;
|
||||||
|
|
||||||
@ -99,9 +106,12 @@ namespace zmq
|
|||||||
// Socket the listener belongs to.
|
// Socket the listener belongs to.
|
||||||
zmq::socket_base_t *socket;
|
zmq::socket_base_t *socket;
|
||||||
|
|
||||||
// String representation of endpoint to bind to
|
// String representation of endpoint to bind to
|
||||||
std::string endpoint;
|
std::string endpoint;
|
||||||
|
|
||||||
|
// Acceptable temporary directory environment variables
|
||||||
|
static const char *tmp_env_vars[];
|
||||||
|
|
||||||
ipc_listener_t (const ipc_listener_t&);
|
ipc_listener_t (const ipc_listener_t&);
|
||||||
const ipc_listener_t &operator = (const ipc_listener_t&);
|
const ipc_listener_t &operator = (const ipc_listener_t&);
|
||||||
};
|
};
|
||||||
|
@ -29,12 +29,14 @@
|
|||||||
|
|
||||||
#include "testutil.hpp"
|
#include "testutil.hpp"
|
||||||
|
|
||||||
|
#define BUF_SIZE 32
|
||||||
|
|
||||||
int main (void)
|
int main (void)
|
||||||
{
|
{
|
||||||
setup_test_environment();
|
setup_test_environment();
|
||||||
int rc;
|
int rc;
|
||||||
const size_t buf_size = 32;
|
char buf[BUF_SIZE];
|
||||||
char buf[buf_size];
|
size_t buf_size;
|
||||||
const char *ep = "tcp://127.0.0.1:5560";
|
const char *ep = "tcp://127.0.0.1:5560";
|
||||||
const char *ep_wc_tcp = "tcp://127.0.0.1:*";
|
const char *ep_wc_tcp = "tcp://127.0.0.1:*";
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
||||||
@ -139,18 +141,21 @@ int main (void)
|
|||||||
#endif
|
#endif
|
||||||
|
|
||||||
// Unbind sockets binded by wild-card address
|
// Unbind sockets binded by wild-card address
|
||||||
rc = zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, buf, (size_t *)&buf_size);
|
buf_size = sizeof(buf);
|
||||||
|
rc = zmq_getsockopt (push, ZMQ_LAST_ENDPOINT, buf, &buf_size);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_unbind (push, buf);
|
rc = zmq_unbind (push, buf);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
#if !defined ZMQ_HAVE_WINDOWS && !defined ZMQ_HAVE_OPENVMS
|
||||||
rc = zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, buf, (size_t *)&buf_size);
|
buf_size = sizeof(buf);
|
||||||
|
rc = zmq_getsockopt (pull, ZMQ_LAST_ENDPOINT, buf, &buf_size);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_unbind (pull, buf);
|
rc = zmq_unbind (pull, buf);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
#endif
|
#endif
|
||||||
#if defined ZMQ_HAVE_VMCI
|
#if defined ZMQ_HAVE_VMCI
|
||||||
rc = zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, buf, (size_t *)&buf_size);
|
buf_size = sizeof(buf);
|
||||||
|
rc = zmq_getsockopt (req, ZMQ_LAST_ENDPOINT, buf, &buf_size);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
rc = zmq_unbind(req, buf);
|
rc = zmq_unbind(req, buf);
|
||||||
assert (rc == 0);
|
assert (rc == 0);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user