mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-14 01:37:56 +08:00
add zmq_ppoll
zmq_ppoll mostly mimics zmq_poll behavior, except for the added feature of being able to specify a signal mask. Signals in this mask will be blocked during execution of zmq_ppoll. Switching of the process' active signal mask happens atomically with the actual poll call, so that no race conditions can occur. This behavior is useful when one wants to gracefully handle POSIX signals without race conditions. See e.g. the discussion below https://250bpm.com/blog:12/ for an explanation. Also includes two new tests: 1. test_zmq_ppoll_fd does the same thing as test_zmq_poll_fd, demonstrating backwards compatibility with zmq_poll when used with a default signal mask. 2. test_zmq_ppoll_signals demonstrates the use of zmq_ppoll with a signal mask, blocking out SIGTERM everywhere except in zmq_ppoll, allowing to handle the signal in one place without having to worry about race conditions.
This commit is contained in:
parent
cf8afcc411
commit
36e4c9b474
@ -468,6 +468,11 @@ message(STATUS "Using polling method in zmq_poll(er)_* API: ${API_POLLER}")
|
||||
string(TOUPPER ${API_POLLER} UPPER_API_POLLER)
|
||||
set(ZMQ_POLL_BASED_ON_${UPPER_API_POLLER} 1)
|
||||
|
||||
check_cxx_symbol_exists(pselect sys/select.h HAVE_PSELECT)
|
||||
if (NOT WIN32 AND HAVE_PSELECT)
|
||||
set(ZMQ_HAVE_PPOLL 1)
|
||||
endif()
|
||||
|
||||
# special alignment settings
|
||||
execute_process(
|
||||
COMMAND getconf LEVEL1_DCACHE_LINESIZE
|
||||
|
15
Makefile.am
15
Makefile.am
@ -1065,7 +1065,8 @@ test_apps += tests/test_poller \
|
||||
tests/test_hello_msg \
|
||||
tests/test_disconnect_msg \
|
||||
tests/test_channel \
|
||||
tests/test_hiccup_msg
|
||||
tests/test_hiccup_msg \
|
||||
tests/test_zmq_ppoll_fd
|
||||
|
||||
tests_test_poller_SOURCES = tests/test_poller.cpp
|
||||
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
@ -1134,6 +1135,18 @@ tests_test_channel_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
tests_test_hiccup_msg_SOURCES = tests/test_hiccup_msg.cpp
|
||||
tests_test_hiccup_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
tests_test_hiccup_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
|
||||
tests_test_zmq_ppoll_fd_SOURCES = tests/test_zmq_ppoll_fd.cpp
|
||||
tests_test_zmq_ppoll_fd_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
tests_test_zmq_ppoll_fd_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
|
||||
if HAVE_FORK
|
||||
test_apps += tests/test_zmq_ppoll_signals
|
||||
|
||||
tests_test_zmq_ppoll_signals_SOURCES = tests/test_zmq_ppoll_signals.cpp
|
||||
tests_test_zmq_ppoll_signals_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
tests_test_zmq_ppoll_signals_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
endif
|
||||
endif
|
||||
|
||||
if FUZZING_ENGINE_LIB
|
||||
|
47
acinclude.m4
47
acinclude.m4
@ -1069,6 +1069,32 @@ select(1, &t_rfds, 0, 0, &tv);
|
||||
)
|
||||
}])
|
||||
|
||||
dnl ################################################################################
|
||||
dnl # LIBZMQ_CHECK_PSELECT([action-if-found], [action-if-not-found]) #
|
||||
dnl # Checks pselect polling system #
|
||||
dnl ################################################################################
|
||||
AC_DEFUN([LIBZMQ_CHECK_PSELECT], [{
|
||||
AC_LINK_IFELSE([
|
||||
AC_LANG_PROGRAM([
|
||||
#include <sys/select.h>
|
||||
#include <signal.h>
|
||||
],[[
|
||||
fd_set t_rfds;
|
||||
struct timespec ts;
|
||||
FD_ZERO(&t_rfds);
|
||||
FD_SET(0, &t_rfds);
|
||||
ts.tv_sec = 5;
|
||||
ts.tv_nsec = 0;
|
||||
sigset_t sigmask, sigmask_without_sigterm;
|
||||
sigemptyset(&sigmask);
|
||||
sigprocmask(SIG_BLOCK, &sigmask, &sigmask_without_sigterm);
|
||||
|
||||
pselect(1, &t_rfds, 0, 0, &ts, &sigmask);
|
||||
]])],
|
||||
[$1],[$2]
|
||||
)
|
||||
}])
|
||||
|
||||
dnl ################################################################################
|
||||
dnl # LIBZMQ_CHECK_POLLER([action-if-found], [action-if-not-found]) #
|
||||
dnl # Choose polling system #
|
||||
@ -1197,6 +1223,27 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{
|
||||
fi
|
||||
}])
|
||||
|
||||
dnl ################################################################################
|
||||
dnl # LIBZMQ_CHECK_PPOLL([action-if-found], [action-if-not-found]) #
|
||||
dnl # Check whether zmq_ppoll can be activated, and do so if it can #
|
||||
dnl ################################################################################
|
||||
|
||||
AC_DEFUN([LIBZMQ_CHECK_PPOLL], [{
|
||||
AC_REQUIRE([AC_CANONICAL_HOST])
|
||||
|
||||
case "${host_os}" in
|
||||
*mingw*|*cygwin*|*msys*)
|
||||
# Disable ppoll by default on Windows
|
||||
AC_MSG_NOTICE([NOT building active zmq_ppoll on '$host_os']) ;;
|
||||
*)
|
||||
LIBZMQ_CHECK_PSELECT([
|
||||
AC_MSG_NOTICE([Building with zmq_ppoll])
|
||||
AC_DEFINE(ZMQ_HAVE_PPOLL, 1, [Build with zmq_ppoll])
|
||||
])
|
||||
;;
|
||||
esac
|
||||
}])
|
||||
|
||||
dnl ##############################################################################
|
||||
dnl # LIBZMQ_CHECK_CACHELINE #
|
||||
dnl # Check cacheline size for alignment purposes #
|
||||
|
@ -12,6 +12,7 @@
|
||||
#cmakedefine ZMQ_IOTHREAD_POLLER_USE_DEVPOLL
|
||||
#cmakedefine ZMQ_IOTHREAD_POLLER_USE_POLL
|
||||
#cmakedefine ZMQ_IOTHREAD_POLLER_USE_SELECT
|
||||
#cmakedefine ZMQ_HAVE_PPOLL
|
||||
|
||||
#cmakedefine ZMQ_POLL_BASED_ON_SELECT
|
||||
#cmakedefine ZMQ_POLL_BASED_ON_POLL
|
||||
|
@ -416,6 +416,9 @@ LIBZMQ_CHECK_DOC_BUILD
|
||||
# Check polling system, set appropriate macro in src/platform.hpp
|
||||
LIBZMQ_CHECK_POLLER
|
||||
|
||||
# Check for pselect to activate ppoll, set appropriate macro in src/platform.hpp
|
||||
LIBZMQ_CHECK_PPOLL
|
||||
|
||||
# Check cacheline size, set appropriate macro in src/platform.hpp
|
||||
LIBZMQ_CHECK_CACHELINE
|
||||
|
||||
|
@ -10,7 +10,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_connect_peer.3 zmq_disconnect.3
|
||||
zmq_send.3 zmq_recv.3 zmq_send_const.3 \
|
||||
zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \
|
||||
zmq_getsockopt.3 zmq_setsockopt.3 \
|
||||
zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \
|
||||
zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 zmq_ppoll.3 \
|
||||
zmq_socket_monitor_versioned.3 \
|
||||
zmq_errno.3 zmq_strerror.3 zmq_version.3 \
|
||||
zmq_sendmsg.3 zmq_recvmsg.3 \
|
||||
|
140
doc/zmq_ppoll.txt
Normal file
140
doc/zmq_ppoll.txt
Normal file
@ -0,0 +1,140 @@
|
||||
zmq_poll(3)
|
||||
===========
|
||||
|
||||
|
||||
NAME
|
||||
----
|
||||
zmq_ppoll - input/output multiplexing with signal mask
|
||||
|
||||
|
||||
SYNOPSIS
|
||||
--------
|
||||
|
||||
*int zmq_ppoll (zmq_pollitem_t '*items', int 'nitems', long 'timeout', const sigset_t '*sigmask');*
|
||||
|
||||
|
||||
DESCRIPTION
|
||||
-----------
|
||||
The relationship between _zmq_poll()_ and _zmq_ppoll()_ is analogous to the
|
||||
relationship between poll(2) and ppoll(2) and between select(2) and
|
||||
pselect(2): _zmq_ppoll()_ allows an application to safely wait until either a
|
||||
file descriptor becomes ready or until a signal is caught.
|
||||
|
||||
When using _zmq_ppoll()_ with 'sigmask' set to NULL, its behavior is identical
|
||||
to that of _zmq_poll()_. See linkzmq:zmq_poll[3] for more on this.
|
||||
|
||||
To make full use of _zmq_ppoll()_, a non-NULL pointer to a signal mask must be
|
||||
constructed and passed to 'sigmask'. See sigprocmask(2) for more details. When
|
||||
this is done, inside the actual _ppoll()_ (or _pselect()_, see note below)
|
||||
system call, an atomic operation consisting of three steps is performed:
|
||||
1. The current signal mask is replaced by the one pointed to by 'sigmask'.
|
||||
2. The actual _poll()_ call is done.
|
||||
3. The original signal mask is restored.
|
||||
Because these operations are done atomically, there is no opportunity for race
|
||||
conditions in between the calls changing the signal mask and the poll/select
|
||||
system call. This means that only during this (atomic) call, we can unblock
|
||||
certain signals, so that they can be handled *at that time only*, not outside
|
||||
of the call. This means that effectively, we extend our poller into a function
|
||||
that not only watches sockets for changes, but also watches the "POSIX signal
|
||||
socket" for incoming signals. At other times, these signals will be blocked,
|
||||
and we will not have to deal with interruptions in system calls at these other
|
||||
times.
|
||||
|
||||
NOTE: The _zmq_ppoll()_ function may be implemented or emulated using operating
|
||||
system interfaces other than _ppoll()_, and as such may be subject to the
|
||||
limits of those interfaces in ways not defined in this documentation.
|
||||
|
||||
NOTE: There is no _ppoll_ or _pselect_ on Windows, so _zmq_ppoll()_ is not
|
||||
supported in Windows builds. It is still callable, but its 'sigmask' has void
|
||||
pointer type (because 'sigset_t' is also not available on Windows) and
|
||||
_zmq_ppoll()_ will return with an error (see error section below).
|
||||
|
||||
THREAD SAFETY
|
||||
-------------
|
||||
The *zmq_pollitem_t* array must only be used by the thread which
|
||||
will/is calling _zmq_ppoll_.
|
||||
|
||||
If a socket is contained in multiple *zmq_pollitem_t* arrays, each owned by a
|
||||
different thread, the socket itself needs to be thead-safe (Server, Client, ...).
|
||||
Otherwise, behaviour is undefined.
|
||||
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
Upon successful completion, the _zmq_ppoll()_ function shall return the number
|
||||
of *zmq_pollitem_t* structures with events signaled in 'revents' or `0` if no
|
||||
events have been signaled. Upon failure, _zmq_ppoll()_ shall return `-1` and set
|
||||
'errno' to one of the values defined below.
|
||||
|
||||
|
||||
ERRORS
|
||||
------
|
||||
*ETERM*::
|
||||
At least one of the members of the 'items' array refers to a 'socket' whose
|
||||
associated 0MQ 'context' was terminated.
|
||||
*EFAULT*::
|
||||
The provided 'items' was not valid (NULL).
|
||||
*EINTR*::
|
||||
The operation was interrupted by delivery of a signal before any events were
|
||||
available.
|
||||
*EINTR*::
|
||||
The operation was interrupted by delivery of a signal before any events were
|
||||
available.
|
||||
*ENOTSUP*::
|
||||
_zmq_ppoll()_ was not activated in this build.
|
||||
|
||||
|
||||
EXAMPLE
|
||||
-------
|
||||
.Polling indefinitely for input events on both a 0MQ socket and a standard socket.
|
||||
See the _example section_ of linkzmq:zmq_poll[3]. One only needs to replace
|
||||
the _zmq_poll_ call with _zmq_ppoll_ and add a _NULL_ argument for the 'sigmask'
|
||||
parameter.
|
||||
|
||||
.Handle SIGTERM during _zmq_ppoll_ (and block it otherwise).
|
||||
----
|
||||
// simple global signal handler for SIGTERM
|
||||
static bool sigterm_received = false;
|
||||
void handle_sigterm (int signum) {
|
||||
sigterm_received = true;
|
||||
}
|
||||
|
||||
// set up signal mask and install handler for SIGTERM
|
||||
sigset_t sigmask, sigmask_without_sigterm;
|
||||
sigemptyset(&sigmask);
|
||||
sigaddset(&sigmask, SIGTERM);
|
||||
sigprocmask(SIG_BLOCK, &sigmask, &sigmask_without_sigterm);
|
||||
struct sigaction sa;
|
||||
memset(&sa, '\0', sizeof(sa));
|
||||
sa.sa_handler = handle_sigterm;
|
||||
|
||||
// poll
|
||||
zmq_pollitem_t items [1];
|
||||
// Just one item, which refers to 0MQ socket 'socket' */
|
||||
items[0].socket = socket;
|
||||
items[0].events = ZMQ_POLLIN;
|
||||
// Poll for events indefinitely, but also exit on SIGTERM
|
||||
int rc = zmq_poll (items, 2, -1, &sigmask_without_sigterm);
|
||||
if (rc < 0 && errno == EINTR && sigterm_received) {
|
||||
// do your SIGTERM business
|
||||
} else {
|
||||
// do your non-SIGTERM error handling
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
linkzmq:zmq_poll[3]
|
||||
linkzmq:zmq_socket[3]
|
||||
linkzmq:zmq_send[3]
|
||||
linkzmq:zmq_recv[3]
|
||||
linkzmq:zmq[7]
|
||||
|
||||
Your operating system documentation for the _poll()_ system call.
|
||||
|
||||
|
||||
AUTHORS
|
||||
-------
|
||||
This page was written by the 0MQ community. To make a change please
|
||||
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
|
@ -101,6 +101,11 @@ typedef unsigned __int8 uint8_t;
|
||||
#include <stdint.h>
|
||||
#endif
|
||||
|
||||
#if !defined _WIN32
|
||||
// needed for sigset_t definition in zmq_ppoll
|
||||
#include <signal.h>
|
||||
#endif
|
||||
|
||||
// 32-bit AIX's pollfd struct members are called reqevents and rtnevents so it
|
||||
// defines compatibility macros for them. Need to include that header first to
|
||||
// stop build failures since zmq_pollset_t defines them as events and revents.
|
||||
@ -763,6 +768,19 @@ ZMQ_EXPORT int zmq_socket_monitor_versioned (
|
||||
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_);
|
||||
ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s);
|
||||
|
||||
#if !defined _WIN32
|
||||
ZMQ_EXPORT int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const sigset_t *sigmask_);
|
||||
#else
|
||||
// Windows has no sigset_t
|
||||
ZMQ_EXPORT int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const void *sigmask_);
|
||||
#endif
|
||||
|
||||
#endif // ZMQ_BUILD_DRAFT_API
|
||||
|
||||
|
||||
|
@ -108,8 +108,9 @@ typedef int timeout_t;
|
||||
|
||||
timeout_t
|
||||
compute_timeout (bool first_pass_, long timeout_, uint64_t now_, uint64_t end_);
|
||||
|
||||
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
||||
#endif
|
||||
#if (!defined ZMQ_POLL_BASED_ON_POLL && defined ZMQ_POLL_BASED_ON_SELECT) \
|
||||
|| defined ZMQ_HAVE_PPOLL
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
inline size_t valid_pollset_bytes (const fd_set &pollset_)
|
||||
{
|
||||
|
303
src/zmq.cpp
303
src/zmq.cpp
@ -97,6 +97,11 @@ struct iovec
|
||||
#include "ip.hpp"
|
||||
#include "address.hpp"
|
||||
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
#include "polling_util.hpp"
|
||||
#include <sys/select.h>
|
||||
#endif
|
||||
|
||||
#if defined ZMQ_HAVE_OPENPGM
|
||||
#define __PGM_WININT_H__
|
||||
#include <pgm/pgm.h>
|
||||
@ -1148,6 +1153,304 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
// return values of 0 or -1 should be returned from zmq_poll; return value 1 means items passed checks
|
||||
int zmq_poll_check_items_ (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
{
|
||||
if (unlikely (nitems_ < 0)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
if (unlikely (nitems_ == 0)) {
|
||||
if (timeout_ == 0)
|
||||
return 0;
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
|
||||
return 0;
|
||||
#elif defined ZMQ_HAVE_VXWORKS
|
||||
struct timespec ns_;
|
||||
ns_.tv_sec = timeout_ / 1000;
|
||||
ns_.tv_nsec = timeout_ % 1000 * 1000000;
|
||||
return nanosleep (&ns_, 0);
|
||||
#else
|
||||
return usleep (timeout_ * 1000);
|
||||
#endif
|
||||
}
|
||||
if (!items_) {
|
||||
errno = EFAULT;
|
||||
return -1;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
struct zmq_poll_select_fds_t_
|
||||
{
|
||||
explicit zmq_poll_select_fds_t_ (int nitems_) :
|
||||
pollset_in (nitems_),
|
||||
pollset_out (nitems_),
|
||||
pollset_err (nitems_),
|
||||
inset (nitems_),
|
||||
outset (nitems_),
|
||||
errset (nitems_),
|
||||
maxfd (0)
|
||||
{
|
||||
FD_ZERO (pollset_in.get ());
|
||||
FD_ZERO (pollset_out.get ());
|
||||
FD_ZERO (pollset_err.get ());
|
||||
}
|
||||
|
||||
zmq::optimized_fd_set_t pollset_in;
|
||||
zmq::optimized_fd_set_t pollset_out;
|
||||
zmq::optimized_fd_set_t pollset_err;
|
||||
zmq::optimized_fd_set_t inset;
|
||||
zmq::optimized_fd_set_t outset;
|
||||
zmq::optimized_fd_set_t errset;
|
||||
zmq::fd_t maxfd;
|
||||
};
|
||||
|
||||
zmq_poll_select_fds_t_
|
||||
zmq_poll_build_select_fds_ (zmq_pollitem_t *items_, int nitems_, int &rc)
|
||||
{
|
||||
// Ensure we do not attempt to select () on more than FD_SETSIZE
|
||||
// file descriptors.
|
||||
// TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
|
||||
zmq_assert (nitems_ <= FD_SETSIZE);
|
||||
|
||||
zmq_poll_select_fds_t_ fds (nitems_);
|
||||
|
||||
// Build the fd_sets for passing to select ().
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
// If the poll item is a 0MQ socket we are interested in input on the
|
||||
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
||||
if (items_[i].socket) {
|
||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||
zmq::fd_t notify_fd;
|
||||
if (zmq_getsockopt (items_[i].socket, ZMQ_FD, ¬ify_fd,
|
||||
&zmq_fd_size)
|
||||
== -1) {
|
||||
rc = -1;
|
||||
return fds;
|
||||
}
|
||||
if (items_[i].events) {
|
||||
FD_SET (notify_fd, fds.pollset_in.get ());
|
||||
if (fds.maxfd < notify_fd)
|
||||
fds.maxfd = notify_fd;
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor. Convert the poll item
|
||||
// events to the appropriate fd_sets.
|
||||
else {
|
||||
if (items_[i].events & ZMQ_POLLIN)
|
||||
FD_SET (items_[i].fd, fds.pollset_in.get ());
|
||||
if (items_[i].events & ZMQ_POLLOUT)
|
||||
FD_SET (items_[i].fd, fds.pollset_out.get ());
|
||||
if (items_[i].events & ZMQ_POLLERR)
|
||||
FD_SET (items_[i].fd, fds.pollset_err.get ());
|
||||
if (fds.maxfd < items_[i].fd)
|
||||
fds.maxfd = items_[i].fd;
|
||||
}
|
||||
}
|
||||
|
||||
rc = 0;
|
||||
return fds;
|
||||
}
|
||||
|
||||
timeval *zmq_poll_select_set_timeout_ (
|
||||
long timeout_, bool first_pass, uint64_t now, uint64_t end, timeval &timeout)
|
||||
{
|
||||
timeval *ptimeout;
|
||||
if (first_pass) {
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = 0;
|
||||
ptimeout = &timeout;
|
||||
} else if (timeout_ < 0)
|
||||
ptimeout = NULL;
|
||||
else {
|
||||
timeout.tv_sec = static_cast<long> ((end - now) / 1000);
|
||||
timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
|
||||
ptimeout = &timeout;
|
||||
}
|
||||
return ptimeout;
|
||||
}
|
||||
|
||||
timespec *zmq_poll_select_set_timeout_ (
|
||||
long timeout_, bool first_pass, uint64_t now, uint64_t end, timespec &timeout)
|
||||
{
|
||||
timespec *ptimeout;
|
||||
if (first_pass) {
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_nsec = 0;
|
||||
ptimeout = &timeout;
|
||||
} else if (timeout_ < 0)
|
||||
ptimeout = NULL;
|
||||
else {
|
||||
timeout.tv_sec = static_cast<long> ((end - now) / 1000);
|
||||
timeout.tv_nsec = static_cast<long> ((end - now) % 1000 * 1000000);
|
||||
ptimeout = &timeout;
|
||||
}
|
||||
return ptimeout;
|
||||
}
|
||||
|
||||
int zmq_poll_select_check_events_ (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
zmq_poll_select_fds_t_ &fds,
|
||||
int &nevents)
|
||||
{
|
||||
// Check for the events.
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
items_[i].revents = 0;
|
||||
|
||||
// The poll item is a 0MQ socket. Retrieve pending events
|
||||
// using the ZMQ_EVENTS socket option.
|
||||
if (items_[i].socket) {
|
||||
size_t zmq_events_size = sizeof (uint32_t);
|
||||
uint32_t zmq_events;
|
||||
if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
|
||||
&zmq_events_size)
|
||||
== -1)
|
||||
return -1;
|
||||
if ((items_[i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT))
|
||||
items_[i].revents |= ZMQ_POLLOUT;
|
||||
if ((items_[i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN))
|
||||
items_[i].revents |= ZMQ_POLLIN;
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor, simply convert
|
||||
// the events to zmq_pollitem_t-style format.
|
||||
else {
|
||||
if (FD_ISSET (items_[i].fd, fds.inset.get ()))
|
||||
items_[i].revents |= ZMQ_POLLIN;
|
||||
if (FD_ISSET (items_[i].fd, fds.outset.get ()))
|
||||
items_[i].revents |= ZMQ_POLLOUT;
|
||||
if (FD_ISSET (items_[i].fd, fds.errset.get ()))
|
||||
items_[i].revents |= ZMQ_POLLERR;
|
||||
}
|
||||
|
||||
if (items_[i].revents)
|
||||
nevents++;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool zmq_poll_must_break_loop_ (long timeout_,
|
||||
int nevents,
|
||||
bool &first_pass,
|
||||
zmq::clock_t &clock,
|
||||
uint64_t &now,
|
||||
uint64_t &end)
|
||||
{
|
||||
// If timeout is zero, exit immediately whether there are events or not.
|
||||
if (timeout_ == 0)
|
||||
return true;
|
||||
|
||||
// If there are events to return, we can exit immediately.
|
||||
if (nevents)
|
||||
return true;
|
||||
|
||||
// At this point we are meant to wait for events but there are none.
|
||||
// If timeout is infinite we can just loop until we get some events.
|
||||
if (timeout_ < 0) {
|
||||
if (first_pass)
|
||||
first_pass = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
// The timeout is finite and there are no events. In the first pass
|
||||
// we get a timestamp of when the polling have begun. (We assume that
|
||||
// first pass have taken negligible time). We also compute the time
|
||||
// when the polling should time out.
|
||||
if (first_pass) {
|
||||
now = clock.now_ms ();
|
||||
end = now + timeout_;
|
||||
if (now == end)
|
||||
return true;
|
||||
first_pass = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Find out whether timeout have expired.
|
||||
now = clock.now_ms ();
|
||||
if (now >= end)
|
||||
return true;
|
||||
|
||||
// finally, in all other cases, we just continue
|
||||
return false;
|
||||
}
|
||||
#endif // ZMQ_HAVE_PPOLL
|
||||
|
||||
#if !defined _WIN32
|
||||
int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const sigset_t *sigmask_)
|
||||
#else
|
||||
// Windows has no sigset_t
|
||||
int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const void *sigmask_)
|
||||
#endif
|
||||
{
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
int rc = zmq_poll_check_items_ (items_, nitems_, timeout_);
|
||||
if (rc <= 0) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
zmq::clock_t clock;
|
||||
uint64_t now = 0;
|
||||
uint64_t end = 0;
|
||||
zmq_poll_select_fds_t_ fds =
|
||||
zmq_poll_build_select_fds_ (items_, nitems_, rc);
|
||||
if (rc == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
bool first_pass = true;
|
||||
int nevents = 0;
|
||||
|
||||
while (true) {
|
||||
// Compute the timeout for the subsequent poll.
|
||||
timespec timeout;
|
||||
timespec *ptimeout = zmq_poll_select_set_timeout_ (timeout_, first_pass,
|
||||
now, end, timeout);
|
||||
|
||||
// Wait for events. Ignore interrupts if there's infinite timeout.
|
||||
while (true) {
|
||||
memcpy (fds.inset.get (), fds.pollset_in.get (),
|
||||
zmq::valid_pollset_bytes (*fds.pollset_in.get ()));
|
||||
memcpy (fds.outset.get (), fds.pollset_out.get (),
|
||||
zmq::valid_pollset_bytes (*fds.pollset_out.get ()));
|
||||
memcpy (fds.errset.get (), fds.pollset_err.get (),
|
||||
zmq::valid_pollset_bytes (*fds.pollset_err.get ()));
|
||||
int rc =
|
||||
pselect (fds.maxfd + 1, fds.inset.get (), fds.outset.get (),
|
||||
fds.errset.get (), ptimeout, sigmask_);
|
||||
if (unlikely (rc == -1)) {
|
||||
errno_assert (errno == EINTR || errno == EBADF);
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
rc = zmq_poll_select_check_events_ (items_, nitems_, fds, nevents);
|
||||
if (rc < 0) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (zmq_poll_must_break_loop_ (timeout_, nevents, first_pass, clock,
|
||||
now, end)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return nevents;
|
||||
#else
|
||||
errno = ENOTSUP;
|
||||
return -1;
|
||||
#endif // ZMQ_HAVE_PPOLL
|
||||
}
|
||||
|
||||
// The poller functionality
|
||||
|
||||
void *zmq_poller_new (void)
|
||||
|
@ -169,6 +169,19 @@ int zmq_socket_monitor_versioned (
|
||||
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_);
|
||||
int zmq_socket_monitor_pipes_stats (void *s_);
|
||||
|
||||
#if !defined _WIN32
|
||||
int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const sigset_t *sigmask_);
|
||||
#else
|
||||
// Windows has no sigset_t
|
||||
int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const void *sigmask_);
|
||||
#endif
|
||||
|
||||
#endif // ZMQ_BUILD_DRAFT_API
|
||||
|
||||
#endif //ifndef __ZMQ_DRAFT_H_INCLUDED__
|
||||
|
@ -165,7 +165,11 @@ if(ENABLE_DRAFTS)
|
||||
test_hello_msg
|
||||
test_disconnect_msg
|
||||
test_hiccup_msg
|
||||
test_zmq_ppoll_fd
|
||||
)
|
||||
if(HAVE_FORK)
|
||||
list(APPEND tests test_zmq_ppoll_signals)
|
||||
endif()
|
||||
if(ZMQ_HAVE_BUSY_POLL)
|
||||
list(APPEND tests test_busy_poll)
|
||||
endif()
|
||||
|
90
tests/test_zmq_ppoll_fd.cpp
Normal file
90
tests/test_zmq_ppoll_fd.cpp
Normal file
@ -0,0 +1,90 @@
|
||||
/*
|
||||
Copyright (c) 2021 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "testutil.hpp"
|
||||
#include "testutil_unity.hpp"
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#ifndef _WIN32
|
||||
#include <netdb.h>
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
|
||||
SETUP_TEARDOWN_TESTCONTEXT
|
||||
|
||||
void test_ppoll_fd ()
|
||||
{
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
int recv_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
||||
TEST_ASSERT_NOT_EQUAL (-1, recv_socket);
|
||||
|
||||
int flag = 1;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
setsockopt (recv_socket, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)));
|
||||
|
||||
struct sockaddr_in saddr = bind_bsd_socket (recv_socket);
|
||||
|
||||
void *sb = test_context_socket (ZMQ_REP);
|
||||
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://127.0.0.1:*"));
|
||||
|
||||
zmq_pollitem_t pollitems[] = {
|
||||
{sb, 0, ZMQ_POLLIN, 0},
|
||||
{NULL, recv_socket, ZMQ_POLLIN, 0},
|
||||
};
|
||||
|
||||
int send_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
||||
TEST_ASSERT_NOT_EQUAL (-1, send_socket);
|
||||
|
||||
char buf[10];
|
||||
memset (buf, 1, 10);
|
||||
|
||||
TEST_ASSERT_SUCCESS_ERRNO (sendto (
|
||||
send_socket, buf, 10, 0, (struct sockaddr *) &saddr, sizeof (saddr)));
|
||||
|
||||
TEST_ASSERT_EQUAL (1, zmq_ppoll (pollitems, 2, 1, NULL));
|
||||
TEST_ASSERT_BITS_LOW (ZMQ_POLLIN, pollitems[0].revents);
|
||||
TEST_ASSERT_BITS_HIGH (ZMQ_POLLIN, pollitems[1].revents);
|
||||
|
||||
test_context_socket_close (sb);
|
||||
|
||||
close (send_socket);
|
||||
close (recv_socket);
|
||||
#else
|
||||
TEST_IGNORE_MESSAGE ("libzmq without zmq_ppoll, ignoring test");
|
||||
#endif // ZMQ_HAVE_PPOLL
|
||||
}
|
||||
|
||||
int main ()
|
||||
{
|
||||
UNITY_BEGIN ();
|
||||
RUN_TEST (test_ppoll_fd);
|
||||
return UNITY_END ();
|
||||
}
|
189
tests/test_zmq_ppoll_signals.cpp
Normal file
189
tests/test_zmq_ppoll_signals.cpp
Normal file
@ -0,0 +1,189 @@
|
||||
/*
|
||||
Copyright (c) 2021 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// author: E. G. Patrick Bos, Netherlands eScience Center, 2021
|
||||
|
||||
#include "testutil.hpp"
|
||||
#include "testutil_unity.hpp"
|
||||
|
||||
#include <string.h> // memset
|
||||
// types.h and wait.h for waitpid:
|
||||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
|
||||
static bool sigterm_received = false;
|
||||
|
||||
void handle_sigterm (int /*signum*/)
|
||||
{
|
||||
sigterm_received = true;
|
||||
}
|
||||
|
||||
void recv_string_expect_success_or_eagain (void *socket_,
|
||||
const char *str_,
|
||||
int flags_)
|
||||
{
|
||||
const size_t len = str_ ? strlen (str_) : 0;
|
||||
char buffer[255];
|
||||
TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (sizeof (buffer), len,
|
||||
"recv_string_expect_success cannot be "
|
||||
"used for strings longer than 255 "
|
||||
"characters");
|
||||
|
||||
const int rc = zmq_recv (socket_, buffer, sizeof (buffer), flags_);
|
||||
if (rc < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
printf ("got EAGAIN\n");
|
||||
return;
|
||||
} else {
|
||||
TEST_ASSERT_SUCCESS_ERRNO (rc);
|
||||
}
|
||||
} else {
|
||||
TEST_ASSERT_EQUAL_INT ((int) len, rc);
|
||||
if (str_)
|
||||
TEST_ASSERT_EQUAL_STRING_LEN (str_, buffer, len);
|
||||
}
|
||||
}
|
||||
|
||||
void test_ppoll_signals ()
|
||||
{
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
pid_t child_pid;
|
||||
do {
|
||||
child_pid = fork ();
|
||||
} while (child_pid == -1); // retry if fork fails
|
||||
|
||||
if (child_pid > 0) { // parent
|
||||
setup_test_context ();
|
||||
void *socket = test_context_socket (ZMQ_REQ);
|
||||
// to make sure we don't hang when the child has already exited at the end, we set a receive timeout of five seconds
|
||||
int recv_timeout = 5000;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
|
||||
socket, ZMQ_RCVTIMEO, &recv_timeout, sizeof (recv_timeout)));
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (socket, "tcp://*:6660"));
|
||||
// bind is on the master process to avoid zombie children to hold on to binds
|
||||
|
||||
// first send a test message to check whether the signal mask is setup in the child process
|
||||
send_string_expect_success (socket, "breaker breaker", 0);
|
||||
recv_string_expect_success (socket, "one-niner", 0);
|
||||
|
||||
// then send the signal
|
||||
kill (child_pid, SIGTERM);
|
||||
|
||||
// for good measure, and to make sure everything went as expected, close off with another handshake, which will trigger the second poll call on the other side
|
||||
send_string_expect_success (socket, "breaker breaker", 0);
|
||||
// in case the 1 second sleep was not enough on the child side, we are also fine with an EAGAIN here
|
||||
recv_string_expect_success_or_eagain (socket, "one-niner", 0);
|
||||
|
||||
// finish
|
||||
test_context_socket_close (socket);
|
||||
|
||||
// wait for child
|
||||
int status = 0;
|
||||
pid_t pid;
|
||||
do {
|
||||
pid = waitpid (child_pid, &status, 0);
|
||||
} while (-1 == pid
|
||||
&& EINTR == errno); // retry on interrupted system call
|
||||
|
||||
if (0 != status) {
|
||||
if (WIFEXITED (status)) {
|
||||
printf ("exited, status=%d\n", WEXITSTATUS (status));
|
||||
} else if (WIFSIGNALED (status)) {
|
||||
printf ("killed by signal %d\n", WTERMSIG (status));
|
||||
} else if (WIFSTOPPED (status)) {
|
||||
printf ("stopped by signal %d\n", WSTOPSIG (status));
|
||||
} else if (WIFCONTINUED (status)) {
|
||||
printf ("continued\n");
|
||||
}
|
||||
}
|
||||
|
||||
if (-1 == pid) {
|
||||
printf ("waitpid returned -1, with errno %s\n", strerror (errno));
|
||||
}
|
||||
} else { // child
|
||||
setup_test_context ();
|
||||
// set up signal mask and install handler for SIGTERM
|
||||
sigset_t sigmask, sigmask_without_sigterm;
|
||||
sigemptyset (&sigmask);
|
||||
sigaddset (&sigmask, SIGTERM);
|
||||
sigprocmask (SIG_BLOCK, &sigmask, &sigmask_without_sigterm);
|
||||
struct sigaction sa;
|
||||
memset (&sa, '\0', sizeof (sa));
|
||||
sa.sa_handler = handle_sigterm;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (sigaction (SIGTERM, &sa, NULL));
|
||||
|
||||
void *socket = test_context_socket (ZMQ_REP);
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
zmq_connect (socket, "tcp://127.0.0.1:6660"));
|
||||
|
||||
zmq_pollitem_t pollitems[] = {
|
||||
{socket, 0, ZMQ_POLLIN, 0},
|
||||
};
|
||||
|
||||
// first receive test message and send back handshake
|
||||
recv_string_expect_success (socket, "breaker breaker", 0);
|
||||
send_string_expect_success (socket, "one-niner", 0);
|
||||
|
||||
// now start ppolling, which should exit with EINTR because of the SIGTERM
|
||||
TEST_ASSERT_FAILURE_ERRNO (
|
||||
EINTR, zmq_ppoll (pollitems, 1, -1, &sigmask_without_sigterm));
|
||||
TEST_ASSERT_TRUE (sigterm_received);
|
||||
|
||||
// poll again for the final handshake
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
zmq_ppoll (pollitems, 1, -1, &sigmask_without_sigterm));
|
||||
TEST_ASSERT_BITS_HIGH (ZMQ_POLLIN, pollitems[0].revents);
|
||||
// receive and send back handshake
|
||||
recv_string_expect_success (socket, "breaker breaker", 0);
|
||||
send_string_expect_success (socket, "one-niner", 0);
|
||||
|
||||
// finish
|
||||
// wait before closing socket, so that parent has time to receive
|
||||
sleep (1);
|
||||
test_context_socket_close (socket);
|
||||
_Exit (0);
|
||||
}
|
||||
#else
|
||||
TEST_IGNORE_MESSAGE ("libzmq without zmq_ppoll, ignoring test");
|
||||
#endif // ZMQ_HAVE_PPOLL
|
||||
}
|
||||
|
||||
// We note that using zmq_poll instead of zmq_ppoll in the test above, while
|
||||
// also not using the sigmask, will fail most of the time, because it is
|
||||
// impossible to predict during which call the signal will be handled. Of
|
||||
// course, every call could be surrounded with an EINTR check and a subsequent
|
||||
// check of sigterm_received's value, but even then a race condition can occur,
|
||||
// see the explanation given here: https://250bpm.com/blog:12/
|
||||
|
||||
int main ()
|
||||
{
|
||||
UNITY_BEGIN ();
|
||||
RUN_TEST (test_ppoll_signals);
|
||||
return UNITY_END ();
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user