mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-27 15:41:05 +08:00
commit
de0b3e72dd
@ -468,6 +468,11 @@ message(STATUS "Using polling method in zmq_poll(er)_* API: ${API_POLLER}")
|
||||
string(TOUPPER ${API_POLLER} UPPER_API_POLLER)
|
||||
set(ZMQ_POLL_BASED_ON_${UPPER_API_POLLER} 1)
|
||||
|
||||
check_cxx_symbol_exists(pselect sys/select.h HAVE_PSELECT)
|
||||
if (NOT WIN32 AND HAVE_PSELECT)
|
||||
set(ZMQ_HAVE_PPOLL 1)
|
||||
endif()
|
||||
|
||||
# special alignment settings
|
||||
execute_process(
|
||||
COMMAND getconf LEVEL1_DCACHE_LINESIZE
|
||||
|
15
Makefile.am
15
Makefile.am
@ -1065,7 +1065,8 @@ test_apps += tests/test_poller \
|
||||
tests/test_hello_msg \
|
||||
tests/test_disconnect_msg \
|
||||
tests/test_channel \
|
||||
tests/test_hiccup_msg
|
||||
tests/test_hiccup_msg \
|
||||
tests/test_zmq_ppoll_fd
|
||||
|
||||
tests_test_poller_SOURCES = tests/test_poller.cpp
|
||||
tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
@ -1134,6 +1135,18 @@ tests_test_channel_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
tests_test_hiccup_msg_SOURCES = tests/test_hiccup_msg.cpp
|
||||
tests_test_hiccup_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
tests_test_hiccup_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
|
||||
tests_test_zmq_ppoll_fd_SOURCES = tests/test_zmq_ppoll_fd.cpp
|
||||
tests_test_zmq_ppoll_fd_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
tests_test_zmq_ppoll_fd_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
|
||||
if HAVE_FORK
|
||||
test_apps += tests/test_zmq_ppoll_signals
|
||||
|
||||
tests_test_zmq_ppoll_signals_SOURCES = tests/test_zmq_ppoll_signals.cpp
|
||||
tests_test_zmq_ppoll_signals_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
|
||||
tests_test_zmq_ppoll_signals_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
|
||||
endif
|
||||
endif
|
||||
|
||||
if FUZZING_ENGINE_LIB
|
||||
|
47
acinclude.m4
47
acinclude.m4
@ -1069,6 +1069,32 @@ select(1, &t_rfds, 0, 0, &tv);
|
||||
)
|
||||
}])
|
||||
|
||||
dnl ################################################################################
|
||||
dnl # LIBZMQ_CHECK_PSELECT([action-if-found], [action-if-not-found]) #
|
||||
dnl # Checks pselect polling system #
|
||||
dnl ################################################################################
|
||||
AC_DEFUN([LIBZMQ_CHECK_PSELECT], [{
|
||||
AC_LINK_IFELSE([
|
||||
AC_LANG_PROGRAM([
|
||||
#include <sys/select.h>
|
||||
#include <signal.h>
|
||||
],[[
|
||||
fd_set t_rfds;
|
||||
struct timespec ts;
|
||||
FD_ZERO(&t_rfds);
|
||||
FD_SET(0, &t_rfds);
|
||||
ts.tv_sec = 5;
|
||||
ts.tv_nsec = 0;
|
||||
sigset_t sigmask, sigmask_without_sigterm;
|
||||
sigemptyset(&sigmask);
|
||||
sigprocmask(SIG_BLOCK, &sigmask, &sigmask_without_sigterm);
|
||||
|
||||
pselect(1, &t_rfds, 0, 0, &ts, &sigmask);
|
||||
]])],
|
||||
[$1],[$2]
|
||||
)
|
||||
}])
|
||||
|
||||
dnl ################################################################################
|
||||
dnl # LIBZMQ_CHECK_POLLER([action-if-found], [action-if-not-found]) #
|
||||
dnl # Choose polling system #
|
||||
@ -1197,6 +1223,27 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{
|
||||
fi
|
||||
}])
|
||||
|
||||
dnl ################################################################################
|
||||
dnl # LIBZMQ_CHECK_PPOLL([action-if-found], [action-if-not-found]) #
|
||||
dnl # Check whether zmq_ppoll can be activated, and do so if it can #
|
||||
dnl ################################################################################
|
||||
|
||||
AC_DEFUN([LIBZMQ_CHECK_PPOLL], [{
|
||||
AC_REQUIRE([AC_CANONICAL_HOST])
|
||||
|
||||
case "${host_os}" in
|
||||
*mingw*|*cygwin*|*msys*)
|
||||
# Disable ppoll by default on Windows
|
||||
AC_MSG_NOTICE([NOT building active zmq_ppoll on '$host_os']) ;;
|
||||
*)
|
||||
LIBZMQ_CHECK_PSELECT([
|
||||
AC_MSG_NOTICE([Building with zmq_ppoll])
|
||||
AC_DEFINE(ZMQ_HAVE_PPOLL, 1, [Build with zmq_ppoll])
|
||||
])
|
||||
;;
|
||||
esac
|
||||
}])
|
||||
|
||||
dnl ##############################################################################
|
||||
dnl # LIBZMQ_CHECK_CACHELINE #
|
||||
dnl # Check cacheline size for alignment purposes #
|
||||
|
@ -12,6 +12,7 @@
|
||||
#cmakedefine ZMQ_IOTHREAD_POLLER_USE_DEVPOLL
|
||||
#cmakedefine ZMQ_IOTHREAD_POLLER_USE_POLL
|
||||
#cmakedefine ZMQ_IOTHREAD_POLLER_USE_SELECT
|
||||
#cmakedefine ZMQ_HAVE_PPOLL
|
||||
|
||||
#cmakedefine ZMQ_POLL_BASED_ON_SELECT
|
||||
#cmakedefine ZMQ_POLL_BASED_ON_POLL
|
||||
|
@ -416,6 +416,9 @@ LIBZMQ_CHECK_DOC_BUILD
|
||||
# Check polling system, set appropriate macro in src/platform.hpp
|
||||
LIBZMQ_CHECK_POLLER
|
||||
|
||||
# Check for pselect to activate ppoll, set appropriate macro in src/platform.hpp
|
||||
LIBZMQ_CHECK_PPOLL
|
||||
|
||||
# Check cacheline size, set appropriate macro in src/platform.hpp
|
||||
LIBZMQ_CHECK_CACHELINE
|
||||
|
||||
|
@ -10,7 +10,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_connect_peer.3 zmq_disconnect.3
|
||||
zmq_send.3 zmq_recv.3 zmq_send_const.3 \
|
||||
zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \
|
||||
zmq_getsockopt.3 zmq_setsockopt.3 \
|
||||
zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \
|
||||
zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 zmq_ppoll.3 \
|
||||
zmq_socket_monitor_versioned.3 \
|
||||
zmq_errno.3 zmq_strerror.3 zmq_version.3 \
|
||||
zmq_sendmsg.3 zmq_recvmsg.3 \
|
||||
|
140
doc/zmq_ppoll.txt
Normal file
140
doc/zmq_ppoll.txt
Normal file
@ -0,0 +1,140 @@
|
||||
zmq_poll(3)
|
||||
===========
|
||||
|
||||
|
||||
NAME
|
||||
----
|
||||
zmq_ppoll - input/output multiplexing with signal mask
|
||||
|
||||
|
||||
SYNOPSIS
|
||||
--------
|
||||
|
||||
*int zmq_ppoll (zmq_pollitem_t '*items', int 'nitems', long 'timeout', const sigset_t '*sigmask');*
|
||||
|
||||
|
||||
DESCRIPTION
|
||||
-----------
|
||||
The relationship between _zmq_poll()_ and _zmq_ppoll()_ is analogous to the
|
||||
relationship between poll(2) and ppoll(2) and between select(2) and
|
||||
pselect(2): _zmq_ppoll()_ allows an application to safely wait until either a
|
||||
file descriptor becomes ready or until a signal is caught.
|
||||
|
||||
When using _zmq_ppoll()_ with 'sigmask' set to NULL, its behavior is identical
|
||||
to that of _zmq_poll()_. See linkzmq:zmq_poll[3] for more on this.
|
||||
|
||||
To make full use of _zmq_ppoll()_, a non-NULL pointer to a signal mask must be
|
||||
constructed and passed to 'sigmask'. See sigprocmask(2) for more details. When
|
||||
this is done, inside the actual _ppoll()_ (or _pselect()_, see note below)
|
||||
system call, an atomic operation consisting of three steps is performed:
|
||||
1. The current signal mask is replaced by the one pointed to by 'sigmask'.
|
||||
2. The actual _poll()_ call is done.
|
||||
3. The original signal mask is restored.
|
||||
Because these operations are done atomically, there is no opportunity for race
|
||||
conditions in between the calls changing the signal mask and the poll/select
|
||||
system call. This means that only during this (atomic) call, we can unblock
|
||||
certain signals, so that they can be handled *at that time only*, not outside
|
||||
of the call. This means that effectively, we extend our poller into a function
|
||||
that not only watches sockets for changes, but also watches the "POSIX signal
|
||||
socket" for incoming signals. At other times, these signals will be blocked,
|
||||
and we will not have to deal with interruptions in system calls at these other
|
||||
times.
|
||||
|
||||
NOTE: The _zmq_ppoll()_ function may be implemented or emulated using operating
|
||||
system interfaces other than _ppoll()_, and as such may be subject to the
|
||||
limits of those interfaces in ways not defined in this documentation.
|
||||
|
||||
NOTE: There is no _ppoll_ or _pselect_ on Windows, so _zmq_ppoll()_ is not
|
||||
supported in Windows builds. It is still callable, but its 'sigmask' has void
|
||||
pointer type (because 'sigset_t' is also not available on Windows) and
|
||||
_zmq_ppoll()_ will return with an error (see error section below).
|
||||
|
||||
THREAD SAFETY
|
||||
-------------
|
||||
The *zmq_pollitem_t* array must only be used by the thread which
|
||||
will/is calling _zmq_ppoll_.
|
||||
|
||||
If a socket is contained in multiple *zmq_pollitem_t* arrays, each owned by a
|
||||
different thread, the socket itself needs to be thead-safe (Server, Client, ...).
|
||||
Otherwise, behaviour is undefined.
|
||||
|
||||
|
||||
RETURN VALUE
|
||||
------------
|
||||
Upon successful completion, the _zmq_ppoll()_ function shall return the number
|
||||
of *zmq_pollitem_t* structures with events signaled in 'revents' or `0` if no
|
||||
events have been signaled. Upon failure, _zmq_ppoll()_ shall return `-1` and set
|
||||
'errno' to one of the values defined below.
|
||||
|
||||
|
||||
ERRORS
|
||||
------
|
||||
*ETERM*::
|
||||
At least one of the members of the 'items' array refers to a 'socket' whose
|
||||
associated 0MQ 'context' was terminated.
|
||||
*EFAULT*::
|
||||
The provided 'items' was not valid (NULL).
|
||||
*EINTR*::
|
||||
The operation was interrupted by delivery of a signal before any events were
|
||||
available.
|
||||
*EINTR*::
|
||||
The operation was interrupted by delivery of a signal before any events were
|
||||
available.
|
||||
*ENOTSUP*::
|
||||
_zmq_ppoll()_ was not activated in this build.
|
||||
|
||||
|
||||
EXAMPLE
|
||||
-------
|
||||
.Polling indefinitely for input events on both a 0MQ socket and a standard socket.
|
||||
See the _example section_ of linkzmq:zmq_poll[3]. One only needs to replace
|
||||
the _zmq_poll_ call with _zmq_ppoll_ and add a _NULL_ argument for the 'sigmask'
|
||||
parameter.
|
||||
|
||||
.Handle SIGTERM during _zmq_ppoll_ (and block it otherwise).
|
||||
----
|
||||
// simple global signal handler for SIGTERM
|
||||
static bool sigterm_received = false;
|
||||
void handle_sigterm (int signum) {
|
||||
sigterm_received = true;
|
||||
}
|
||||
|
||||
// set up signal mask and install handler for SIGTERM
|
||||
sigset_t sigmask, sigmask_without_sigterm;
|
||||
sigemptyset(&sigmask);
|
||||
sigaddset(&sigmask, SIGTERM);
|
||||
sigprocmask(SIG_BLOCK, &sigmask, &sigmask_without_sigterm);
|
||||
struct sigaction sa;
|
||||
memset(&sa, '\0', sizeof(sa));
|
||||
sa.sa_handler = handle_sigterm;
|
||||
|
||||
// poll
|
||||
zmq_pollitem_t items [1];
|
||||
// Just one item, which refers to 0MQ socket 'socket' */
|
||||
items[0].socket = socket;
|
||||
items[0].events = ZMQ_POLLIN;
|
||||
// Poll for events indefinitely, but also exit on SIGTERM
|
||||
int rc = zmq_poll (items, 2, -1, &sigmask_without_sigterm);
|
||||
if (rc < 0 && errno == EINTR && sigterm_received) {
|
||||
// do your SIGTERM business
|
||||
} else {
|
||||
// do your non-SIGTERM error handling
|
||||
}
|
||||
----
|
||||
|
||||
|
||||
SEE ALSO
|
||||
--------
|
||||
linkzmq:zmq_poll[3]
|
||||
linkzmq:zmq_socket[3]
|
||||
linkzmq:zmq_send[3]
|
||||
linkzmq:zmq_recv[3]
|
||||
linkzmq:zmq[7]
|
||||
|
||||
Your operating system documentation for the _poll()_ system call.
|
||||
|
||||
|
||||
AUTHORS
|
||||
-------
|
||||
This page was written by the 0MQ community. To make a change please
|
||||
read the 0MQ Contribution Policy at <http://www.zeromq.org/docs:contributing>.
|
@ -101,6 +101,11 @@ typedef unsigned __int8 uint8_t;
|
||||
#include <stdint.h>
|
||||
#endif
|
||||
|
||||
#if !defined _WIN32
|
||||
// needed for sigset_t definition in zmq_ppoll
|
||||
#include <signal.h>
|
||||
#endif
|
||||
|
||||
// 32-bit AIX's pollfd struct members are called reqevents and rtnevents so it
|
||||
// defines compatibility macros for them. Need to include that header first to
|
||||
// stop build failures since zmq_pollset_t defines them as events and revents.
|
||||
@ -763,6 +768,19 @@ ZMQ_EXPORT int zmq_socket_monitor_versioned (
|
||||
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_);
|
||||
ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s);
|
||||
|
||||
#if !defined _WIN32
|
||||
ZMQ_EXPORT int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const sigset_t *sigmask_);
|
||||
#else
|
||||
// Windows has no sigset_t
|
||||
ZMQ_EXPORT int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const void *sigmask_);
|
||||
#endif
|
||||
|
||||
#endif // ZMQ_BUILD_DRAFT_API
|
||||
|
||||
|
||||
|
@ -108,8 +108,9 @@ typedef int timeout_t;
|
||||
|
||||
timeout_t
|
||||
compute_timeout (bool first_pass_, long timeout_, uint64_t now_, uint64_t end_);
|
||||
|
||||
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
||||
#endif
|
||||
#if (!defined ZMQ_POLL_BASED_ON_POLL && defined ZMQ_POLL_BASED_ON_SELECT) \
|
||||
|| defined ZMQ_HAVE_PPOLL
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
inline size_t valid_pollset_bytes (const fd_set &pollset_)
|
||||
{
|
||||
|
303
src/zmq.cpp
303
src/zmq.cpp
@ -97,6 +97,11 @@ struct iovec
|
||||
#include "ip.hpp"
|
||||
#include "address.hpp"
|
||||
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
#include "polling_util.hpp"
|
||||
#include <sys/select.h>
|
||||
#endif
|
||||
|
||||
#if defined ZMQ_HAVE_OPENPGM
|
||||
#define __PGM_WININT_H__
|
||||
#include <pgm/pgm.h>
|
||||
@ -1148,6 +1153,304 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
#endif
|
||||
}
|
||||
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
// return values of 0 or -1 should be returned from zmq_poll; return value 1 means items passed checks
|
||||
int zmq_poll_check_items_ (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||
{
|
||||
if (unlikely (nitems_ < 0)) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
if (unlikely (nitems_ == 0)) {
|
||||
if (timeout_ == 0)
|
||||
return 0;
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
|
||||
return 0;
|
||||
#elif defined ZMQ_HAVE_VXWORKS
|
||||
struct timespec ns_;
|
||||
ns_.tv_sec = timeout_ / 1000;
|
||||
ns_.tv_nsec = timeout_ % 1000 * 1000000;
|
||||
return nanosleep (&ns_, 0);
|
||||
#else
|
||||
return usleep (timeout_ * 1000);
|
||||
#endif
|
||||
}
|
||||
if (!items_) {
|
||||
errno = EFAULT;
|
||||
return -1;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
struct zmq_poll_select_fds_t_
|
||||
{
|
||||
explicit zmq_poll_select_fds_t_ (int nitems_) :
|
||||
pollset_in (nitems_),
|
||||
pollset_out (nitems_),
|
||||
pollset_err (nitems_),
|
||||
inset (nitems_),
|
||||
outset (nitems_),
|
||||
errset (nitems_),
|
||||
maxfd (0)
|
||||
{
|
||||
FD_ZERO (pollset_in.get ());
|
||||
FD_ZERO (pollset_out.get ());
|
||||
FD_ZERO (pollset_err.get ());
|
||||
}
|
||||
|
||||
zmq::optimized_fd_set_t pollset_in;
|
||||
zmq::optimized_fd_set_t pollset_out;
|
||||
zmq::optimized_fd_set_t pollset_err;
|
||||
zmq::optimized_fd_set_t inset;
|
||||
zmq::optimized_fd_set_t outset;
|
||||
zmq::optimized_fd_set_t errset;
|
||||
zmq::fd_t maxfd;
|
||||
};
|
||||
|
||||
zmq_poll_select_fds_t_
|
||||
zmq_poll_build_select_fds_ (zmq_pollitem_t *items_, int nitems_, int &rc)
|
||||
{
|
||||
// Ensure we do not attempt to select () on more than FD_SETSIZE
|
||||
// file descriptors.
|
||||
// TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here
|
||||
zmq_assert (nitems_ <= FD_SETSIZE);
|
||||
|
||||
zmq_poll_select_fds_t_ fds (nitems_);
|
||||
|
||||
// Build the fd_sets for passing to select ().
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
// If the poll item is a 0MQ socket we are interested in input on the
|
||||
// notification file descriptor retrieved by the ZMQ_FD socket option.
|
||||
if (items_[i].socket) {
|
||||
size_t zmq_fd_size = sizeof (zmq::fd_t);
|
||||
zmq::fd_t notify_fd;
|
||||
if (zmq_getsockopt (items_[i].socket, ZMQ_FD, ¬ify_fd,
|
||||
&zmq_fd_size)
|
||||
== -1) {
|
||||
rc = -1;
|
||||
return fds;
|
||||
}
|
||||
if (items_[i].events) {
|
||||
FD_SET (notify_fd, fds.pollset_in.get ());
|
||||
if (fds.maxfd < notify_fd)
|
||||
fds.maxfd = notify_fd;
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor. Convert the poll item
|
||||
// events to the appropriate fd_sets.
|
||||
else {
|
||||
if (items_[i].events & ZMQ_POLLIN)
|
||||
FD_SET (items_[i].fd, fds.pollset_in.get ());
|
||||
if (items_[i].events & ZMQ_POLLOUT)
|
||||
FD_SET (items_[i].fd, fds.pollset_out.get ());
|
||||
if (items_[i].events & ZMQ_POLLERR)
|
||||
FD_SET (items_[i].fd, fds.pollset_err.get ());
|
||||
if (fds.maxfd < items_[i].fd)
|
||||
fds.maxfd = items_[i].fd;
|
||||
}
|
||||
}
|
||||
|
||||
rc = 0;
|
||||
return fds;
|
||||
}
|
||||
|
||||
timeval *zmq_poll_select_set_timeout_ (
|
||||
long timeout_, bool first_pass, uint64_t now, uint64_t end, timeval &timeout)
|
||||
{
|
||||
timeval *ptimeout;
|
||||
if (first_pass) {
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_usec = 0;
|
||||
ptimeout = &timeout;
|
||||
} else if (timeout_ < 0)
|
||||
ptimeout = NULL;
|
||||
else {
|
||||
timeout.tv_sec = static_cast<long> ((end - now) / 1000);
|
||||
timeout.tv_usec = static_cast<long> ((end - now) % 1000 * 1000);
|
||||
ptimeout = &timeout;
|
||||
}
|
||||
return ptimeout;
|
||||
}
|
||||
|
||||
timespec *zmq_poll_select_set_timeout_ (
|
||||
long timeout_, bool first_pass, uint64_t now, uint64_t end, timespec &timeout)
|
||||
{
|
||||
timespec *ptimeout;
|
||||
if (first_pass) {
|
||||
timeout.tv_sec = 0;
|
||||
timeout.tv_nsec = 0;
|
||||
ptimeout = &timeout;
|
||||
} else if (timeout_ < 0)
|
||||
ptimeout = NULL;
|
||||
else {
|
||||
timeout.tv_sec = static_cast<long> ((end - now) / 1000);
|
||||
timeout.tv_nsec = static_cast<long> ((end - now) % 1000 * 1000000);
|
||||
ptimeout = &timeout;
|
||||
}
|
||||
return ptimeout;
|
||||
}
|
||||
|
||||
int zmq_poll_select_check_events_ (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
zmq_poll_select_fds_t_ &fds,
|
||||
int &nevents)
|
||||
{
|
||||
// Check for the events.
|
||||
for (int i = 0; i != nitems_; i++) {
|
||||
items_[i].revents = 0;
|
||||
|
||||
// The poll item is a 0MQ socket. Retrieve pending events
|
||||
// using the ZMQ_EVENTS socket option.
|
||||
if (items_[i].socket) {
|
||||
size_t zmq_events_size = sizeof (uint32_t);
|
||||
uint32_t zmq_events;
|
||||
if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events,
|
||||
&zmq_events_size)
|
||||
== -1)
|
||||
return -1;
|
||||
if ((items_[i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT))
|
||||
items_[i].revents |= ZMQ_POLLOUT;
|
||||
if ((items_[i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN))
|
||||
items_[i].revents |= ZMQ_POLLIN;
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor, simply convert
|
||||
// the events to zmq_pollitem_t-style format.
|
||||
else {
|
||||
if (FD_ISSET (items_[i].fd, fds.inset.get ()))
|
||||
items_[i].revents |= ZMQ_POLLIN;
|
||||
if (FD_ISSET (items_[i].fd, fds.outset.get ()))
|
||||
items_[i].revents |= ZMQ_POLLOUT;
|
||||
if (FD_ISSET (items_[i].fd, fds.errset.get ()))
|
||||
items_[i].revents |= ZMQ_POLLERR;
|
||||
}
|
||||
|
||||
if (items_[i].revents)
|
||||
nevents++;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool zmq_poll_must_break_loop_ (long timeout_,
|
||||
int nevents,
|
||||
bool &first_pass,
|
||||
zmq::clock_t &clock,
|
||||
uint64_t &now,
|
||||
uint64_t &end)
|
||||
{
|
||||
// If timeout is zero, exit immediately whether there are events or not.
|
||||
if (timeout_ == 0)
|
||||
return true;
|
||||
|
||||
// If there are events to return, we can exit immediately.
|
||||
if (nevents)
|
||||
return true;
|
||||
|
||||
// At this point we are meant to wait for events but there are none.
|
||||
// If timeout is infinite we can just loop until we get some events.
|
||||
if (timeout_ < 0) {
|
||||
if (first_pass)
|
||||
first_pass = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
// The timeout is finite and there are no events. In the first pass
|
||||
// we get a timestamp of when the polling have begun. (We assume that
|
||||
// first pass have taken negligible time). We also compute the time
|
||||
// when the polling should time out.
|
||||
if (first_pass) {
|
||||
now = clock.now_ms ();
|
||||
end = now + timeout_;
|
||||
if (now == end)
|
||||
return true;
|
||||
first_pass = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Find out whether timeout have expired.
|
||||
now = clock.now_ms ();
|
||||
if (now >= end)
|
||||
return true;
|
||||
|
||||
// finally, in all other cases, we just continue
|
||||
return false;
|
||||
}
|
||||
#endif // ZMQ_HAVE_PPOLL
|
||||
|
||||
#if !defined _WIN32
|
||||
int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const sigset_t *sigmask_)
|
||||
#else
|
||||
// Windows has no sigset_t
|
||||
int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const void *sigmask_)
|
||||
#endif
|
||||
{
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
int rc = zmq_poll_check_items_ (items_, nitems_, timeout_);
|
||||
if (rc <= 0) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
zmq::clock_t clock;
|
||||
uint64_t now = 0;
|
||||
uint64_t end = 0;
|
||||
zmq_poll_select_fds_t_ fds =
|
||||
zmq_poll_build_select_fds_ (items_, nitems_, rc);
|
||||
if (rc == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
bool first_pass = true;
|
||||
int nevents = 0;
|
||||
|
||||
while (true) {
|
||||
// Compute the timeout for the subsequent poll.
|
||||
timespec timeout;
|
||||
timespec *ptimeout = zmq_poll_select_set_timeout_ (timeout_, first_pass,
|
||||
now, end, timeout);
|
||||
|
||||
// Wait for events. Ignore interrupts if there's infinite timeout.
|
||||
while (true) {
|
||||
memcpy (fds.inset.get (), fds.pollset_in.get (),
|
||||
zmq::valid_pollset_bytes (*fds.pollset_in.get ()));
|
||||
memcpy (fds.outset.get (), fds.pollset_out.get (),
|
||||
zmq::valid_pollset_bytes (*fds.pollset_out.get ()));
|
||||
memcpy (fds.errset.get (), fds.pollset_err.get (),
|
||||
zmq::valid_pollset_bytes (*fds.pollset_err.get ()));
|
||||
int rc =
|
||||
pselect (fds.maxfd + 1, fds.inset.get (), fds.outset.get (),
|
||||
fds.errset.get (), ptimeout, sigmask_);
|
||||
if (unlikely (rc == -1)) {
|
||||
errno_assert (errno == EINTR || errno == EBADF);
|
||||
return -1;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
rc = zmq_poll_select_check_events_ (items_, nitems_, fds, nevents);
|
||||
if (rc < 0) {
|
||||
return rc;
|
||||
}
|
||||
|
||||
if (zmq_poll_must_break_loop_ (timeout_, nevents, first_pass, clock,
|
||||
now, end)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return nevents;
|
||||
#else
|
||||
errno = ENOTSUP;
|
||||
return -1;
|
||||
#endif // ZMQ_HAVE_PPOLL
|
||||
}
|
||||
|
||||
// The poller functionality
|
||||
|
||||
void *zmq_poller_new (void)
|
||||
|
@ -169,6 +169,19 @@ int zmq_socket_monitor_versioned (
|
||||
void *s_, const char *addr_, uint64_t events_, int event_version_, int type_);
|
||||
int zmq_socket_monitor_pipes_stats (void *s_);
|
||||
|
||||
#if !defined _WIN32
|
||||
int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const sigset_t *sigmask_);
|
||||
#else
|
||||
// Windows has no sigset_t
|
||||
int zmq_ppoll (zmq_pollitem_t *items_,
|
||||
int nitems_,
|
||||
long timeout_,
|
||||
const void *sigmask_);
|
||||
#endif
|
||||
|
||||
#endif // ZMQ_BUILD_DRAFT_API
|
||||
|
||||
#endif //ifndef __ZMQ_DRAFT_H_INCLUDED__
|
||||
|
@ -165,7 +165,11 @@ if(ENABLE_DRAFTS)
|
||||
test_hello_msg
|
||||
test_disconnect_msg
|
||||
test_hiccup_msg
|
||||
test_zmq_ppoll_fd
|
||||
)
|
||||
if(HAVE_FORK)
|
||||
list(APPEND tests test_zmq_ppoll_signals)
|
||||
endif()
|
||||
if(ZMQ_HAVE_BUSY_POLL)
|
||||
list(APPEND tests test_busy_poll)
|
||||
endif()
|
||||
|
90
tests/test_zmq_ppoll_fd.cpp
Normal file
90
tests/test_zmq_ppoll_fd.cpp
Normal file
@ -0,0 +1,90 @@
|
||||
/*
|
||||
Copyright (c) 2021 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "testutil.hpp"
|
||||
#include "testutil_unity.hpp"
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#ifndef _WIN32
|
||||
#include <netdb.h>
|
||||
#include <unistd.h>
|
||||
#endif
|
||||
|
||||
SETUP_TEARDOWN_TESTCONTEXT
|
||||
|
||||
void test_ppoll_fd ()
|
||||
{
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
int recv_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
||||
TEST_ASSERT_NOT_EQUAL (-1, recv_socket);
|
||||
|
||||
int flag = 1;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
setsockopt (recv_socket, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int)));
|
||||
|
||||
struct sockaddr_in saddr = bind_bsd_socket (recv_socket);
|
||||
|
||||
void *sb = test_context_socket (ZMQ_REP);
|
||||
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://127.0.0.1:*"));
|
||||
|
||||
zmq_pollitem_t pollitems[] = {
|
||||
{sb, 0, ZMQ_POLLIN, 0},
|
||||
{NULL, recv_socket, ZMQ_POLLIN, 0},
|
||||
};
|
||||
|
||||
int send_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP);
|
||||
TEST_ASSERT_NOT_EQUAL (-1, send_socket);
|
||||
|
||||
char buf[10];
|
||||
memset (buf, 1, 10);
|
||||
|
||||
TEST_ASSERT_SUCCESS_ERRNO (sendto (
|
||||
send_socket, buf, 10, 0, (struct sockaddr *) &saddr, sizeof (saddr)));
|
||||
|
||||
TEST_ASSERT_EQUAL (1, zmq_ppoll (pollitems, 2, 1, NULL));
|
||||
TEST_ASSERT_BITS_LOW (ZMQ_POLLIN, pollitems[0].revents);
|
||||
TEST_ASSERT_BITS_HIGH (ZMQ_POLLIN, pollitems[1].revents);
|
||||
|
||||
test_context_socket_close (sb);
|
||||
|
||||
close (send_socket);
|
||||
close (recv_socket);
|
||||
#else
|
||||
TEST_IGNORE_MESSAGE ("libzmq without zmq_ppoll, ignoring test");
|
||||
#endif // ZMQ_HAVE_PPOLL
|
||||
}
|
||||
|
||||
int main ()
|
||||
{
|
||||
UNITY_BEGIN ();
|
||||
RUN_TEST (test_ppoll_fd);
|
||||
return UNITY_END ();
|
||||
}
|
189
tests/test_zmq_ppoll_signals.cpp
Normal file
189
tests/test_zmq_ppoll_signals.cpp
Normal file
@ -0,0 +1,189 @@
|
||||
/*
|
||||
Copyright (c) 2021 Contributors as noted in the AUTHORS file
|
||||
|
||||
This file is part of libzmq, the ZeroMQ core engine in C++.
|
||||
|
||||
libzmq is free software; you can redistribute it and/or modify it under
|
||||
the terms of the GNU Lesser General Public License (LGPL) as published
|
||||
by the Free Software Foundation; either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
As a special exception, the Contributors give you permission to link
|
||||
this library with independent modules to produce an executable,
|
||||
regardless of the license terms of these independent modules, and to
|
||||
copy and distribute the resulting executable under terms of your choice,
|
||||
provided that you also meet, for each linked independent module, the
|
||||
terms and conditions of the license of that module. An independent
|
||||
module is a module which is not derived from or based on this library.
|
||||
If you modify this library, you must extend this exception to your
|
||||
version of the library.
|
||||
|
||||
libzmq is distributed in the hope that it will be useful, but WITHOUT
|
||||
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||
License for more details.
|
||||
|
||||
You should have received a copy of the GNU Lesser General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// author: E. G. Patrick Bos, Netherlands eScience Center, 2021
|
||||
|
||||
#include "testutil.hpp"
|
||||
#include "testutil_unity.hpp"
|
||||
|
||||
#include <string.h> // memset
|
||||
// types.h and wait.h for waitpid:
|
||||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
|
||||
static bool sigterm_received = false;
|
||||
|
||||
void handle_sigterm (int /*signum*/)
|
||||
{
|
||||
sigterm_received = true;
|
||||
}
|
||||
|
||||
void recv_string_expect_success_or_eagain (void *socket_,
|
||||
const char *str_,
|
||||
int flags_)
|
||||
{
|
||||
const size_t len = str_ ? strlen (str_) : 0;
|
||||
char buffer[255];
|
||||
TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (sizeof (buffer), len,
|
||||
"recv_string_expect_success cannot be "
|
||||
"used for strings longer than 255 "
|
||||
"characters");
|
||||
|
||||
const int rc = zmq_recv (socket_, buffer, sizeof (buffer), flags_);
|
||||
if (rc < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
printf ("got EAGAIN\n");
|
||||
return;
|
||||
} else {
|
||||
TEST_ASSERT_SUCCESS_ERRNO (rc);
|
||||
}
|
||||
} else {
|
||||
TEST_ASSERT_EQUAL_INT ((int) len, rc);
|
||||
if (str_)
|
||||
TEST_ASSERT_EQUAL_STRING_LEN (str_, buffer, len);
|
||||
}
|
||||
}
|
||||
|
||||
void test_ppoll_signals ()
|
||||
{
|
||||
#ifdef ZMQ_HAVE_PPOLL
|
||||
pid_t child_pid;
|
||||
do {
|
||||
child_pid = fork ();
|
||||
} while (child_pid == -1); // retry if fork fails
|
||||
|
||||
if (child_pid > 0) { // parent
|
||||
setup_test_context ();
|
||||
void *socket = test_context_socket (ZMQ_REQ);
|
||||
// to make sure we don't hang when the child has already exited at the end, we set a receive timeout of five seconds
|
||||
int recv_timeout = 5000;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt (
|
||||
socket, ZMQ_RCVTIMEO, &recv_timeout, sizeof (recv_timeout)));
|
||||
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (socket, "tcp://*:6660"));
|
||||
// bind is on the master process to avoid zombie children to hold on to binds
|
||||
|
||||
// first send a test message to check whether the signal mask is setup in the child process
|
||||
send_string_expect_success (socket, "breaker breaker", 0);
|
||||
recv_string_expect_success (socket, "one-niner", 0);
|
||||
|
||||
// then send the signal
|
||||
kill (child_pid, SIGTERM);
|
||||
|
||||
// for good measure, and to make sure everything went as expected, close off with another handshake, which will trigger the second poll call on the other side
|
||||
send_string_expect_success (socket, "breaker breaker", 0);
|
||||
// in case the 1 second sleep was not enough on the child side, we are also fine with an EAGAIN here
|
||||
recv_string_expect_success_or_eagain (socket, "one-niner", 0);
|
||||
|
||||
// finish
|
||||
test_context_socket_close (socket);
|
||||
|
||||
// wait for child
|
||||
int status = 0;
|
||||
pid_t pid;
|
||||
do {
|
||||
pid = waitpid (child_pid, &status, 0);
|
||||
} while (-1 == pid
|
||||
&& EINTR == errno); // retry on interrupted system call
|
||||
|
||||
if (0 != status) {
|
||||
if (WIFEXITED (status)) {
|
||||
printf ("exited, status=%d\n", WEXITSTATUS (status));
|
||||
} else if (WIFSIGNALED (status)) {
|
||||
printf ("killed by signal %d\n", WTERMSIG (status));
|
||||
} else if (WIFSTOPPED (status)) {
|
||||
printf ("stopped by signal %d\n", WSTOPSIG (status));
|
||||
} else if (WIFCONTINUED (status)) {
|
||||
printf ("continued\n");
|
||||
}
|
||||
}
|
||||
|
||||
if (-1 == pid) {
|
||||
printf ("waitpid returned -1, with errno %s\n", strerror (errno));
|
||||
}
|
||||
} else { // child
|
||||
setup_test_context ();
|
||||
// set up signal mask and install handler for SIGTERM
|
||||
sigset_t sigmask, sigmask_without_sigterm;
|
||||
sigemptyset (&sigmask);
|
||||
sigaddset (&sigmask, SIGTERM);
|
||||
sigprocmask (SIG_BLOCK, &sigmask, &sigmask_without_sigterm);
|
||||
struct sigaction sa;
|
||||
memset (&sa, '\0', sizeof (sa));
|
||||
sa.sa_handler = handle_sigterm;
|
||||
TEST_ASSERT_SUCCESS_ERRNO (sigaction (SIGTERM, &sa, NULL));
|
||||
|
||||
void *socket = test_context_socket (ZMQ_REP);
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
zmq_connect (socket, "tcp://127.0.0.1:6660"));
|
||||
|
||||
zmq_pollitem_t pollitems[] = {
|
||||
{socket, 0, ZMQ_POLLIN, 0},
|
||||
};
|
||||
|
||||
// first receive test message and send back handshake
|
||||
recv_string_expect_success (socket, "breaker breaker", 0);
|
||||
send_string_expect_success (socket, "one-niner", 0);
|
||||
|
||||
// now start ppolling, which should exit with EINTR because of the SIGTERM
|
||||
TEST_ASSERT_FAILURE_ERRNO (
|
||||
EINTR, zmq_ppoll (pollitems, 1, -1, &sigmask_without_sigterm));
|
||||
TEST_ASSERT_TRUE (sigterm_received);
|
||||
|
||||
// poll again for the final handshake
|
||||
TEST_ASSERT_SUCCESS_ERRNO (
|
||||
zmq_ppoll (pollitems, 1, -1, &sigmask_without_sigterm));
|
||||
TEST_ASSERT_BITS_HIGH (ZMQ_POLLIN, pollitems[0].revents);
|
||||
// receive and send back handshake
|
||||
recv_string_expect_success (socket, "breaker breaker", 0);
|
||||
send_string_expect_success (socket, "one-niner", 0);
|
||||
|
||||
// finish
|
||||
// wait before closing socket, so that parent has time to receive
|
||||
sleep (1);
|
||||
test_context_socket_close (socket);
|
||||
_Exit (0);
|
||||
}
|
||||
#else
|
||||
TEST_IGNORE_MESSAGE ("libzmq without zmq_ppoll, ignoring test");
|
||||
#endif // ZMQ_HAVE_PPOLL
|
||||
}
|
||||
|
||||
// We note that using zmq_poll instead of zmq_ppoll in the test above, while
|
||||
// also not using the sigmask, will fail most of the time, because it is
|
||||
// impossible to predict during which call the signal will be handled. Of
|
||||
// course, every call could be surrounded with an EINTR check and a subsequent
|
||||
// check of sigterm_received's value, but even then a race condition can occur,
|
||||
// see the explanation given here: https://250bpm.com/blog:12/
|
||||
|
||||
int main ()
|
||||
{
|
||||
UNITY_BEGIN ();
|
||||
RUN_TEST (test_ppoll_signals);
|
||||
return UNITY_END ();
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user