From 36e4c9b474e2ed4f702b89541f3c2653a876a716 Mon Sep 17 00:00:00 2001 From: "E. G. Patrick Bos" Date: Thu, 16 Sep 2021 17:20:24 +0200 Subject: [PATCH] add zmq_ppoll zmq_ppoll mostly mimics zmq_poll behavior, except for the added feature of being able to specify a signal mask. Signals in this mask will be blocked during execution of zmq_ppoll. Switching of the process' active signal mask happens atomically with the actual poll call, so that no race conditions can occur. This behavior is useful when one wants to gracefully handle POSIX signals without race conditions. See e.g. the discussion below https://250bpm.com/blog:12/ for an explanation. Also includes two new tests: 1. test_zmq_ppoll_fd does the same thing as test_zmq_poll_fd, demonstrating backwards compatibility with zmq_poll when used with a default signal mask. 2. test_zmq_ppoll_signals demonstrates the use of zmq_ppoll with a signal mask, blocking out SIGTERM everywhere except in zmq_ppoll, allowing to handle the signal in one place without having to worry about race conditions. --- CMakeLists.txt | 5 + Makefile.am | 15 +- acinclude.m4 | 47 +++++ builds/cmake/platform.hpp.in | 1 + configure.ac | 3 + doc/Makefile.am | 2 +- doc/zmq_ppoll.txt | 140 ++++++++++++++ include/zmq.h | 18 ++ src/polling_util.hpp | 5 +- src/zmq.cpp | 303 +++++++++++++++++++++++++++++++ src/zmq_draft.h | 13 ++ tests/CMakeLists.txt | 4 + tests/test_zmq_ppoll_fd.cpp | 90 +++++++++ tests/test_zmq_ppoll_signals.cpp | 189 +++++++++++++++++++ 14 files changed, 831 insertions(+), 4 deletions(-) create mode 100644 doc/zmq_ppoll.txt create mode 100644 tests/test_zmq_ppoll_fd.cpp create mode 100644 tests/test_zmq_ppoll_signals.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 788b20d8..7f2d5d86 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -468,6 +468,11 @@ message(STATUS "Using polling method in zmq_poll(er)_* API: ${API_POLLER}") string(TOUPPER ${API_POLLER} UPPER_API_POLLER) set(ZMQ_POLL_BASED_ON_${UPPER_API_POLLER} 1) +check_cxx_symbol_exists(pselect sys/select.h HAVE_PSELECT) +if (NOT WIN32 AND HAVE_PSELECT) + set(ZMQ_HAVE_PPOLL 1) +endif() + # special alignment settings execute_process( COMMAND getconf LEVEL1_DCACHE_LINESIZE diff --git a/Makefile.am b/Makefile.am index 715204f9..b61ddf0f 100755 --- a/Makefile.am +++ b/Makefile.am @@ -1065,7 +1065,8 @@ test_apps += tests/test_poller \ tests/test_hello_msg \ tests/test_disconnect_msg \ tests/test_channel \ - tests/test_hiccup_msg + tests/test_hiccup_msg \ + tests/test_zmq_ppoll_fd tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la @@ -1134,6 +1135,18 @@ tests_test_channel_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_hiccup_msg_SOURCES = tests/test_hiccup_msg.cpp tests_test_hiccup_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_hiccup_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +tests_test_zmq_ppoll_fd_SOURCES = tests/test_zmq_ppoll_fd.cpp +tests_test_zmq_ppoll_fd_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_zmq_ppoll_fd_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +if HAVE_FORK +test_apps += tests/test_zmq_ppoll_signals + +tests_test_zmq_ppoll_signals_SOURCES = tests/test_zmq_ppoll_signals.cpp +tests_test_zmq_ppoll_signals_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_zmq_ppoll_signals_CPPFLAGS = ${TESTUTIL_CPPFLAGS} +endif endif if FUZZING_ENGINE_LIB diff --git a/acinclude.m4 b/acinclude.m4 index ab8a3670..ac55776e 100644 --- a/acinclude.m4 +++ b/acinclude.m4 @@ -1069,6 +1069,32 @@ select(1, &t_rfds, 0, 0, &tv); ) }]) +dnl ################################################################################ +dnl # LIBZMQ_CHECK_PSELECT([action-if-found], [action-if-not-found]) # +dnl # Checks pselect polling system # +dnl ################################################################################ +AC_DEFUN([LIBZMQ_CHECK_PSELECT], [{ + AC_LINK_IFELSE([ + AC_LANG_PROGRAM([ +#include +#include + ],[[ +fd_set t_rfds; +struct timespec ts; +FD_ZERO(&t_rfds); +FD_SET(0, &t_rfds); +ts.tv_sec = 5; +ts.tv_nsec = 0; +sigset_t sigmask, sigmask_without_sigterm; +sigemptyset(&sigmask); +sigprocmask(SIG_BLOCK, &sigmask, &sigmask_without_sigterm); + +pselect(1, &t_rfds, 0, 0, &ts, &sigmask); + ]])], + [$1],[$2] + ) +}]) + dnl ################################################################################ dnl # LIBZMQ_CHECK_POLLER([action-if-found], [action-if-not-found]) # dnl # Choose polling system # @@ -1197,6 +1223,27 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{ fi }]) +dnl ################################################################################ +dnl # LIBZMQ_CHECK_PPOLL([action-if-found], [action-if-not-found]) # +dnl # Check whether zmq_ppoll can be activated, and do so if it can # +dnl ################################################################################ + +AC_DEFUN([LIBZMQ_CHECK_PPOLL], [{ + AC_REQUIRE([AC_CANONICAL_HOST]) + + case "${host_os}" in + *mingw*|*cygwin*|*msys*) + # Disable ppoll by default on Windows + AC_MSG_NOTICE([NOT building active zmq_ppoll on '$host_os']) ;; + *) + LIBZMQ_CHECK_PSELECT([ + AC_MSG_NOTICE([Building with zmq_ppoll]) + AC_DEFINE(ZMQ_HAVE_PPOLL, 1, [Build with zmq_ppoll]) + ]) + ;; + esac +}]) + dnl ############################################################################## dnl # LIBZMQ_CHECK_CACHELINE # dnl # Check cacheline size for alignment purposes # diff --git a/builds/cmake/platform.hpp.in b/builds/cmake/platform.hpp.in index 013dca1b..2d23e8b2 100644 --- a/builds/cmake/platform.hpp.in +++ b/builds/cmake/platform.hpp.in @@ -12,6 +12,7 @@ #cmakedefine ZMQ_IOTHREAD_POLLER_USE_DEVPOLL #cmakedefine ZMQ_IOTHREAD_POLLER_USE_POLL #cmakedefine ZMQ_IOTHREAD_POLLER_USE_SELECT +#cmakedefine ZMQ_HAVE_PPOLL #cmakedefine ZMQ_POLL_BASED_ON_SELECT #cmakedefine ZMQ_POLL_BASED_ON_POLL diff --git a/configure.ac b/configure.ac index 6af4cadc..5454f2a4 100644 --- a/configure.ac +++ b/configure.ac @@ -416,6 +416,9 @@ LIBZMQ_CHECK_DOC_BUILD # Check polling system, set appropriate macro in src/platform.hpp LIBZMQ_CHECK_POLLER +# Check for pselect to activate ppoll, set appropriate macro in src/platform.hpp +LIBZMQ_CHECK_PPOLL + # Check cacheline size, set appropriate macro in src/platform.hpp LIBZMQ_CHECK_CACHELINE diff --git a/doc/Makefile.am b/doc/Makefile.am index 4fa53818..67eb55e8 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -10,7 +10,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_connect_peer.3 zmq_disconnect.3 zmq_send.3 zmq_recv.3 zmq_send_const.3 \ zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \ zmq_getsockopt.3 zmq_setsockopt.3 \ - zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \ + zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 zmq_ppoll.3 \ zmq_socket_monitor_versioned.3 \ zmq_errno.3 zmq_strerror.3 zmq_version.3 \ zmq_sendmsg.3 zmq_recvmsg.3 \ diff --git a/doc/zmq_ppoll.txt b/doc/zmq_ppoll.txt new file mode 100644 index 00000000..9f9a93fb --- /dev/null +++ b/doc/zmq_ppoll.txt @@ -0,0 +1,140 @@ +zmq_poll(3) +=========== + + +NAME +---- +zmq_ppoll - input/output multiplexing with signal mask + + +SYNOPSIS +-------- + +*int zmq_ppoll (zmq_pollitem_t '*items', int 'nitems', long 'timeout', const sigset_t '*sigmask');* + + +DESCRIPTION +----------- +The relationship between _zmq_poll()_ and _zmq_ppoll()_ is analogous to the +relationship between poll(2) and ppoll(2) and between select(2) and +pselect(2): _zmq_ppoll()_ allows an application to safely wait until either a +file descriptor becomes ready or until a signal is caught. + +When using _zmq_ppoll()_ with 'sigmask' set to NULL, its behavior is identical +to that of _zmq_poll()_. See linkzmq:zmq_poll[3] for more on this. + +To make full use of _zmq_ppoll()_, a non-NULL pointer to a signal mask must be +constructed and passed to 'sigmask'. See sigprocmask(2) for more details. When +this is done, inside the actual _ppoll()_ (or _pselect()_, see note below) +system call, an atomic operation consisting of three steps is performed: + 1. The current signal mask is replaced by the one pointed to by 'sigmask'. + 2. The actual _poll()_ call is done. + 3. The original signal mask is restored. +Because these operations are done atomically, there is no opportunity for race +conditions in between the calls changing the signal mask and the poll/select +system call. This means that only during this (atomic) call, we can unblock +certain signals, so that they can be handled *at that time only*, not outside +of the call. This means that effectively, we extend our poller into a function +that not only watches sockets for changes, but also watches the "POSIX signal +socket" for incoming signals. At other times, these signals will be blocked, +and we will not have to deal with interruptions in system calls at these other +times. + +NOTE: The _zmq_ppoll()_ function may be implemented or emulated using operating +system interfaces other than _ppoll()_, and as such may be subject to the +limits of those interfaces in ways not defined in this documentation. + +NOTE: There is no _ppoll_ or _pselect_ on Windows, so _zmq_ppoll()_ is not +supported in Windows builds. It is still callable, but its 'sigmask' has void +pointer type (because 'sigset_t' is also not available on Windows) and +_zmq_ppoll()_ will return with an error (see error section below). + +THREAD SAFETY +------------- +The *zmq_pollitem_t* array must only be used by the thread which +will/is calling _zmq_ppoll_. + +If a socket is contained in multiple *zmq_pollitem_t* arrays, each owned by a +different thread, the socket itself needs to be thead-safe (Server, Client, ...). +Otherwise, behaviour is undefined. + + +RETURN VALUE +------------ +Upon successful completion, the _zmq_ppoll()_ function shall return the number +of *zmq_pollitem_t* structures with events signaled in 'revents' or `0` if no +events have been signaled. Upon failure, _zmq_ppoll()_ shall return `-1` and set +'errno' to one of the values defined below. + + +ERRORS +------ +*ETERM*:: +At least one of the members of the 'items' array refers to a 'socket' whose +associated 0MQ 'context' was terminated. +*EFAULT*:: +The provided 'items' was not valid (NULL). +*EINTR*:: +The operation was interrupted by delivery of a signal before any events were +available. +*EINTR*:: +The operation was interrupted by delivery of a signal before any events were +available. +*ENOTSUP*:: +_zmq_ppoll()_ was not activated in this build. + + +EXAMPLE +------- +.Polling indefinitely for input events on both a 0MQ socket and a standard socket. +See the _example section_ of linkzmq:zmq_poll[3]. One only needs to replace +the _zmq_poll_ call with _zmq_ppoll_ and add a _NULL_ argument for the 'sigmask' +parameter. + +.Handle SIGTERM during _zmq_ppoll_ (and block it otherwise). +---- +// simple global signal handler for SIGTERM +static bool sigterm_received = false; +void handle_sigterm (int signum) { + sigterm_received = true; +} + +// set up signal mask and install handler for SIGTERM +sigset_t sigmask, sigmask_without_sigterm; +sigemptyset(&sigmask); +sigaddset(&sigmask, SIGTERM); +sigprocmask(SIG_BLOCK, &sigmask, &sigmask_without_sigterm); +struct sigaction sa; +memset(&sa, '\0', sizeof(sa)); +sa.sa_handler = handle_sigterm; + +// poll +zmq_pollitem_t items [1]; +// Just one item, which refers to 0MQ socket 'socket' */ +items[0].socket = socket; +items[0].events = ZMQ_POLLIN; +// Poll for events indefinitely, but also exit on SIGTERM +int rc = zmq_poll (items, 2, -1, &sigmask_without_sigterm); +if (rc < 0 && errno == EINTR && sigterm_received) { + // do your SIGTERM business +} else { + // do your non-SIGTERM error handling +} +---- + + +SEE ALSO +-------- +linkzmq:zmq_poll[3] +linkzmq:zmq_socket[3] +linkzmq:zmq_send[3] +linkzmq:zmq_recv[3] +linkzmq:zmq[7] + +Your operating system documentation for the _poll()_ system call. + + +AUTHORS +------- +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/include/zmq.h b/include/zmq.h index e3313ce5..2612c664 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -101,6 +101,11 @@ typedef unsigned __int8 uint8_t; #include #endif +#if !defined _WIN32 +// needed for sigset_t definition in zmq_ppoll +#include +#endif + // 32-bit AIX's pollfd struct members are called reqevents and rtnevents so it // defines compatibility macros for them. Need to include that header first to // stop build failures since zmq_pollset_t defines them as events and revents. @@ -763,6 +768,19 @@ ZMQ_EXPORT int zmq_socket_monitor_versioned ( void *s_, const char *addr_, uint64_t events_, int event_version_, int type_); ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s); +#if !defined _WIN32 +ZMQ_EXPORT int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const sigset_t *sigmask_); +#else +// Windows has no sigset_t +ZMQ_EXPORT int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const void *sigmask_); +#endif + #endif // ZMQ_BUILD_DRAFT_API diff --git a/src/polling_util.hpp b/src/polling_util.hpp index a73ec747..6c11d645 100644 --- a/src/polling_util.hpp +++ b/src/polling_util.hpp @@ -108,8 +108,9 @@ typedef int timeout_t; timeout_t compute_timeout (bool first_pass_, long timeout_, uint64_t now_, uint64_t end_); - -#elif defined ZMQ_POLL_BASED_ON_SELECT +#endif +#if (!defined ZMQ_POLL_BASED_ON_POLL && defined ZMQ_POLL_BASED_ON_SELECT) \ + || defined ZMQ_HAVE_PPOLL #if defined ZMQ_HAVE_WINDOWS inline size_t valid_pollset_bytes (const fd_set &pollset_) { diff --git a/src/zmq.cpp b/src/zmq.cpp index 2c2e3d52..1733cafc 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -97,6 +97,11 @@ struct iovec #include "ip.hpp" #include "address.hpp" +#ifdef ZMQ_HAVE_PPOLL +#include "polling_util.hpp" +#include +#endif + #if defined ZMQ_HAVE_OPENPGM #define __PGM_WININT_H__ #include @@ -1148,6 +1153,304 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #endif } +#ifdef ZMQ_HAVE_PPOLL +// return values of 0 or -1 should be returned from zmq_poll; return value 1 means items passed checks +int zmq_poll_check_items_ (zmq_pollitem_t *items_, int nitems_, long timeout_) +{ + if (unlikely (nitems_ < 0)) { + errno = EINVAL; + return -1; + } + if (unlikely (nitems_ == 0)) { + if (timeout_ == 0) + return 0; +#if defined ZMQ_HAVE_WINDOWS + Sleep (timeout_ > 0 ? timeout_ : INFINITE); + return 0; +#elif defined ZMQ_HAVE_VXWORKS + struct timespec ns_; + ns_.tv_sec = timeout_ / 1000; + ns_.tv_nsec = timeout_ % 1000 * 1000000; + return nanosleep (&ns_, 0); +#else + return usleep (timeout_ * 1000); +#endif + } + if (!items_) { + errno = EFAULT; + return -1; + } + return 1; +} + +struct zmq_poll_select_fds_t_ +{ + explicit zmq_poll_select_fds_t_ (int nitems_) : + pollset_in (nitems_), + pollset_out (nitems_), + pollset_err (nitems_), + inset (nitems_), + outset (nitems_), + errset (nitems_), + maxfd (0) + { + FD_ZERO (pollset_in.get ()); + FD_ZERO (pollset_out.get ()); + FD_ZERO (pollset_err.get ()); + } + + zmq::optimized_fd_set_t pollset_in; + zmq::optimized_fd_set_t pollset_out; + zmq::optimized_fd_set_t pollset_err; + zmq::optimized_fd_set_t inset; + zmq::optimized_fd_set_t outset; + zmq::optimized_fd_set_t errset; + zmq::fd_t maxfd; +}; + +zmq_poll_select_fds_t_ +zmq_poll_build_select_fds_ (zmq_pollitem_t *items_, int nitems_, int &rc) +{ + // Ensure we do not attempt to select () on more than FD_SETSIZE + // file descriptors. + // TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here + zmq_assert (nitems_ <= FD_SETSIZE); + + zmq_poll_select_fds_t_ fds (nitems_); + + // Build the fd_sets for passing to select (). + for (int i = 0; i != nitems_; i++) { + // If the poll item is a 0MQ socket we are interested in input on the + // notification file descriptor retrieved by the ZMQ_FD socket option. + if (items_[i].socket) { + size_t zmq_fd_size = sizeof (zmq::fd_t); + zmq::fd_t notify_fd; + if (zmq_getsockopt (items_[i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) + == -1) { + rc = -1; + return fds; + } + if (items_[i].events) { + FD_SET (notify_fd, fds.pollset_in.get ()); + if (fds.maxfd < notify_fd) + fds.maxfd = notify_fd; + } + } + // Else, the poll item is a raw file descriptor. Convert the poll item + // events to the appropriate fd_sets. + else { + if (items_[i].events & ZMQ_POLLIN) + FD_SET (items_[i].fd, fds.pollset_in.get ()); + if (items_[i].events & ZMQ_POLLOUT) + FD_SET (items_[i].fd, fds.pollset_out.get ()); + if (items_[i].events & ZMQ_POLLERR) + FD_SET (items_[i].fd, fds.pollset_err.get ()); + if (fds.maxfd < items_[i].fd) + fds.maxfd = items_[i].fd; + } + } + + rc = 0; + return fds; +} + +timeval *zmq_poll_select_set_timeout_ ( + long timeout_, bool first_pass, uint64_t now, uint64_t end, timeval &timeout) +{ + timeval *ptimeout; + if (first_pass) { + timeout.tv_sec = 0; + timeout.tv_usec = 0; + ptimeout = &timeout; + } else if (timeout_ < 0) + ptimeout = NULL; + else { + timeout.tv_sec = static_cast ((end - now) / 1000); + timeout.tv_usec = static_cast ((end - now) % 1000 * 1000); + ptimeout = &timeout; + } + return ptimeout; +} + +timespec *zmq_poll_select_set_timeout_ ( + long timeout_, bool first_pass, uint64_t now, uint64_t end, timespec &timeout) +{ + timespec *ptimeout; + if (first_pass) { + timeout.tv_sec = 0; + timeout.tv_nsec = 0; + ptimeout = &timeout; + } else if (timeout_ < 0) + ptimeout = NULL; + else { + timeout.tv_sec = static_cast ((end - now) / 1000); + timeout.tv_nsec = static_cast ((end - now) % 1000 * 1000000); + ptimeout = &timeout; + } + return ptimeout; +} + +int zmq_poll_select_check_events_ (zmq_pollitem_t *items_, + int nitems_, + zmq_poll_select_fds_t_ &fds, + int &nevents) +{ + // Check for the events. + for (int i = 0; i != nitems_; i++) { + items_[i].revents = 0; + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_[i].socket) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) + == -1) + return -1; + if ((items_[i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT)) + items_[i].revents |= ZMQ_POLLOUT; + if ((items_[i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN)) + items_[i].revents |= ZMQ_POLLIN; + } + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + if (FD_ISSET (items_[i].fd, fds.inset.get ())) + items_[i].revents |= ZMQ_POLLIN; + if (FD_ISSET (items_[i].fd, fds.outset.get ())) + items_[i].revents |= ZMQ_POLLOUT; + if (FD_ISSET (items_[i].fd, fds.errset.get ())) + items_[i].revents |= ZMQ_POLLERR; + } + + if (items_[i].revents) + nevents++; + } + + return 0; +} + +bool zmq_poll_must_break_loop_ (long timeout_, + int nevents, + bool &first_pass, + zmq::clock_t &clock, + uint64_t &now, + uint64_t &end) +{ + // If timeout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) + return true; + + // If there are events to return, we can exit immediately. + if (nevents) + return true; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) { + if (first_pass) + first_pass = false; + return false; + } + + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + timeout_; + if (now == end) + return true; + first_pass = false; + return false; + } + + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + return true; + + // finally, in all other cases, we just continue + return false; +} +#endif // ZMQ_HAVE_PPOLL + +#if !defined _WIN32 +int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const sigset_t *sigmask_) +#else +// Windows has no sigset_t +int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const void *sigmask_) +#endif +{ +#ifdef ZMQ_HAVE_PPOLL + int rc = zmq_poll_check_items_ (items_, nitems_, timeout_); + if (rc <= 0) { + return rc; + } + + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + zmq_poll_select_fds_t_ fds = + zmq_poll_build_select_fds_ (items_, nitems_, rc); + if (rc == -1) { + return -1; + } + + bool first_pass = true; + int nevents = 0; + + while (true) { + // Compute the timeout for the subsequent poll. + timespec timeout; + timespec *ptimeout = zmq_poll_select_set_timeout_ (timeout_, first_pass, + now, end, timeout); + + // Wait for events. Ignore interrupts if there's infinite timeout. + while (true) { + memcpy (fds.inset.get (), fds.pollset_in.get (), + zmq::valid_pollset_bytes (*fds.pollset_in.get ())); + memcpy (fds.outset.get (), fds.pollset_out.get (), + zmq::valid_pollset_bytes (*fds.pollset_out.get ())); + memcpy (fds.errset.get (), fds.pollset_err.get (), + zmq::valid_pollset_bytes (*fds.pollset_err.get ())); + int rc = + pselect (fds.maxfd + 1, fds.inset.get (), fds.outset.get (), + fds.errset.get (), ptimeout, sigmask_); + if (unlikely (rc == -1)) { + errno_assert (errno == EINTR || errno == EBADF); + return -1; + } + break; + } + + rc = zmq_poll_select_check_events_ (items_, nitems_, fds, nevents); + if (rc < 0) { + return rc; + } + + if (zmq_poll_must_break_loop_ (timeout_, nevents, first_pass, clock, + now, end)) { + break; + } + } + + return nevents; +#else + errno = ENOTSUP; + return -1; +#endif // ZMQ_HAVE_PPOLL +} + // The poller functionality void *zmq_poller_new (void) diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 59be7932..5a1b17ab 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -169,6 +169,19 @@ int zmq_socket_monitor_versioned ( void *s_, const char *addr_, uint64_t events_, int event_version_, int type_); int zmq_socket_monitor_pipes_stats (void *s_); +#if !defined _WIN32 +int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const sigset_t *sigmask_); +#else +// Windows has no sigset_t +int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const void *sigmask_); +#endif + #endif // ZMQ_BUILD_DRAFT_API #endif //ifndef __ZMQ_DRAFT_H_INCLUDED__ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e3a3e753..77591784 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -165,7 +165,11 @@ if(ENABLE_DRAFTS) test_hello_msg test_disconnect_msg test_hiccup_msg + test_zmq_ppoll_fd ) + if(HAVE_FORK) + list(APPEND tests test_zmq_ppoll_signals) + endif() if(ZMQ_HAVE_BUSY_POLL) list(APPEND tests test_busy_poll) endif() diff --git a/tests/test_zmq_ppoll_fd.cpp b/tests/test_zmq_ppoll_fd.cpp new file mode 100644 index 00000000..65b1c07c --- /dev/null +++ b/tests/test_zmq_ppoll_fd.cpp @@ -0,0 +1,90 @@ +/* + Copyright (c) 2021 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include + +#ifndef _WIN32 +#include +#include +#endif + +SETUP_TEARDOWN_TESTCONTEXT + +void test_ppoll_fd () +{ +#ifdef ZMQ_HAVE_PPOLL + int recv_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP); + TEST_ASSERT_NOT_EQUAL (-1, recv_socket); + + int flag = 1; + TEST_ASSERT_SUCCESS_ERRNO ( + setsockopt (recv_socket, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int))); + + struct sockaddr_in saddr = bind_bsd_socket (recv_socket); + + void *sb = test_context_socket (ZMQ_REP); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://127.0.0.1:*")); + + zmq_pollitem_t pollitems[] = { + {sb, 0, ZMQ_POLLIN, 0}, + {NULL, recv_socket, ZMQ_POLLIN, 0}, + }; + + int send_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP); + TEST_ASSERT_NOT_EQUAL (-1, send_socket); + + char buf[10]; + memset (buf, 1, 10); + + TEST_ASSERT_SUCCESS_ERRNO (sendto ( + send_socket, buf, 10, 0, (struct sockaddr *) &saddr, sizeof (saddr))); + + TEST_ASSERT_EQUAL (1, zmq_ppoll (pollitems, 2, 1, NULL)); + TEST_ASSERT_BITS_LOW (ZMQ_POLLIN, pollitems[0].revents); + TEST_ASSERT_BITS_HIGH (ZMQ_POLLIN, pollitems[1].revents); + + test_context_socket_close (sb); + + close (send_socket); + close (recv_socket); +#else + TEST_IGNORE_MESSAGE ("libzmq without zmq_ppoll, ignoring test"); +#endif // ZMQ_HAVE_PPOLL +} + +int main () +{ + UNITY_BEGIN (); + RUN_TEST (test_ppoll_fd); + return UNITY_END (); +} diff --git a/tests/test_zmq_ppoll_signals.cpp b/tests/test_zmq_ppoll_signals.cpp new file mode 100644 index 00000000..44d90b0b --- /dev/null +++ b/tests/test_zmq_ppoll_signals.cpp @@ -0,0 +1,189 @@ +/* + Copyright (c) 2021 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +// author: E. G. Patrick Bos, Netherlands eScience Center, 2021 + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include // memset +// types.h and wait.h for waitpid: +#include +#include + +static bool sigterm_received = false; + +void handle_sigterm (int /*signum*/) +{ + sigterm_received = true; +} + +void recv_string_expect_success_or_eagain (void *socket_, + const char *str_, + int flags_) +{ + const size_t len = str_ ? strlen (str_) : 0; + char buffer[255]; + TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (sizeof (buffer), len, + "recv_string_expect_success cannot be " + "used for strings longer than 255 " + "characters"); + + const int rc = zmq_recv (socket_, buffer, sizeof (buffer), flags_); + if (rc < 0) { + if (errno == EAGAIN) { + printf ("got EAGAIN\n"); + return; + } else { + TEST_ASSERT_SUCCESS_ERRNO (rc); + } + } else { + TEST_ASSERT_EQUAL_INT ((int) len, rc); + if (str_) + TEST_ASSERT_EQUAL_STRING_LEN (str_, buffer, len); + } +} + +void test_ppoll_signals () +{ +#ifdef ZMQ_HAVE_PPOLL + pid_t child_pid; + do { + child_pid = fork (); + } while (child_pid == -1); // retry if fork fails + + if (child_pid > 0) { // parent + setup_test_context (); + void *socket = test_context_socket (ZMQ_REQ); + // to make sure we don't hang when the child has already exited at the end, we set a receive timeout of five seconds + int recv_timeout = 5000; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + socket, ZMQ_RCVTIMEO, &recv_timeout, sizeof (recv_timeout))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (socket, "tcp://*:6660")); + // bind is on the master process to avoid zombie children to hold on to binds + + // first send a test message to check whether the signal mask is setup in the child process + send_string_expect_success (socket, "breaker breaker", 0); + recv_string_expect_success (socket, "one-niner", 0); + + // then send the signal + kill (child_pid, SIGTERM); + + // for good measure, and to make sure everything went as expected, close off with another handshake, which will trigger the second poll call on the other side + send_string_expect_success (socket, "breaker breaker", 0); + // in case the 1 second sleep was not enough on the child side, we are also fine with an EAGAIN here + recv_string_expect_success_or_eagain (socket, "one-niner", 0); + + // finish + test_context_socket_close (socket); + + // wait for child + int status = 0; + pid_t pid; + do { + pid = waitpid (child_pid, &status, 0); + } while (-1 == pid + && EINTR == errno); // retry on interrupted system call + + if (0 != status) { + if (WIFEXITED (status)) { + printf ("exited, status=%d\n", WEXITSTATUS (status)); + } else if (WIFSIGNALED (status)) { + printf ("killed by signal %d\n", WTERMSIG (status)); + } else if (WIFSTOPPED (status)) { + printf ("stopped by signal %d\n", WSTOPSIG (status)); + } else if (WIFCONTINUED (status)) { + printf ("continued\n"); + } + } + + if (-1 == pid) { + printf ("waitpid returned -1, with errno %s\n", strerror (errno)); + } + } else { // child + setup_test_context (); + // set up signal mask and install handler for SIGTERM + sigset_t sigmask, sigmask_without_sigterm; + sigemptyset (&sigmask); + sigaddset (&sigmask, SIGTERM); + sigprocmask (SIG_BLOCK, &sigmask, &sigmask_without_sigterm); + struct sigaction sa; + memset (&sa, '\0', sizeof (sa)); + sa.sa_handler = handle_sigterm; + TEST_ASSERT_SUCCESS_ERRNO (sigaction (SIGTERM, &sa, NULL)); + + void *socket = test_context_socket (ZMQ_REP); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_connect (socket, "tcp://127.0.0.1:6660")); + + zmq_pollitem_t pollitems[] = { + {socket, 0, ZMQ_POLLIN, 0}, + }; + + // first receive test message and send back handshake + recv_string_expect_success (socket, "breaker breaker", 0); + send_string_expect_success (socket, "one-niner", 0); + + // now start ppolling, which should exit with EINTR because of the SIGTERM + TEST_ASSERT_FAILURE_ERRNO ( + EINTR, zmq_ppoll (pollitems, 1, -1, &sigmask_without_sigterm)); + TEST_ASSERT_TRUE (sigterm_received); + + // poll again for the final handshake + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_ppoll (pollitems, 1, -1, &sigmask_without_sigterm)); + TEST_ASSERT_BITS_HIGH (ZMQ_POLLIN, pollitems[0].revents); + // receive and send back handshake + recv_string_expect_success (socket, "breaker breaker", 0); + send_string_expect_success (socket, "one-niner", 0); + + // finish + // wait before closing socket, so that parent has time to receive + sleep (1); + test_context_socket_close (socket); + _Exit (0); + } +#else + TEST_IGNORE_MESSAGE ("libzmq without zmq_ppoll, ignoring test"); +#endif // ZMQ_HAVE_PPOLL +} + +// We note that using zmq_poll instead of zmq_ppoll in the test above, while +// also not using the sigmask, will fail most of the time, because it is +// impossible to predict during which call the signal will be handled. Of +// course, every call could be surrounded with an EINTR check and a subsequent +// check of sigterm_received's value, but even then a race condition can occur, +// see the explanation given here: https://250bpm.com/blog:12/ + +int main () +{ + UNITY_BEGIN (); + RUN_TEST (test_ppoll_signals); + return UNITY_END (); +}