diff --git a/CMakeLists.txt b/CMakeLists.txt index 788b20d8..7f2d5d86 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -468,6 +468,11 @@ message(STATUS "Using polling method in zmq_poll(er)_* API: ${API_POLLER}") string(TOUPPER ${API_POLLER} UPPER_API_POLLER) set(ZMQ_POLL_BASED_ON_${UPPER_API_POLLER} 1) +check_cxx_symbol_exists(pselect sys/select.h HAVE_PSELECT) +if (NOT WIN32 AND HAVE_PSELECT) + set(ZMQ_HAVE_PPOLL 1) +endif() + # special alignment settings execute_process( COMMAND getconf LEVEL1_DCACHE_LINESIZE diff --git a/Makefile.am b/Makefile.am index 715204f9..b61ddf0f 100755 --- a/Makefile.am +++ b/Makefile.am @@ -1065,7 +1065,8 @@ test_apps += tests/test_poller \ tests/test_hello_msg \ tests/test_disconnect_msg \ tests/test_channel \ - tests/test_hiccup_msg + tests/test_hiccup_msg \ + tests/test_zmq_ppoll_fd tests_test_poller_SOURCES = tests/test_poller.cpp tests_test_poller_LDADD = ${TESTUTIL_LIBS} src/libzmq.la @@ -1134,6 +1135,18 @@ tests_test_channel_CPPFLAGS = ${TESTUTIL_CPPFLAGS} tests_test_hiccup_msg_SOURCES = tests/test_hiccup_msg.cpp tests_test_hiccup_msg_LDADD = ${TESTUTIL_LIBS} src/libzmq.la tests_test_hiccup_msg_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +tests_test_zmq_ppoll_fd_SOURCES = tests/test_zmq_ppoll_fd.cpp +tests_test_zmq_ppoll_fd_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_zmq_ppoll_fd_CPPFLAGS = ${TESTUTIL_CPPFLAGS} + +if HAVE_FORK +test_apps += tests/test_zmq_ppoll_signals + +tests_test_zmq_ppoll_signals_SOURCES = tests/test_zmq_ppoll_signals.cpp +tests_test_zmq_ppoll_signals_LDADD = ${TESTUTIL_LIBS} src/libzmq.la +tests_test_zmq_ppoll_signals_CPPFLAGS = ${TESTUTIL_CPPFLAGS} +endif endif if FUZZING_ENGINE_LIB diff --git a/acinclude.m4 b/acinclude.m4 index ab8a3670..ac55776e 100644 --- a/acinclude.m4 +++ b/acinclude.m4 @@ -1069,6 +1069,32 @@ select(1, &t_rfds, 0, 0, &tv); ) }]) +dnl ################################################################################ +dnl # LIBZMQ_CHECK_PSELECT([action-if-found], [action-if-not-found]) # +dnl # Checks pselect polling system # +dnl ################################################################################ +AC_DEFUN([LIBZMQ_CHECK_PSELECT], [{ + AC_LINK_IFELSE([ + AC_LANG_PROGRAM([ +#include +#include + ],[[ +fd_set t_rfds; +struct timespec ts; +FD_ZERO(&t_rfds); +FD_SET(0, &t_rfds); +ts.tv_sec = 5; +ts.tv_nsec = 0; +sigset_t sigmask, sigmask_without_sigterm; +sigemptyset(&sigmask); +sigprocmask(SIG_BLOCK, &sigmask, &sigmask_without_sigterm); + +pselect(1, &t_rfds, 0, 0, &ts, &sigmask); + ]])], + [$1],[$2] + ) +}]) + dnl ################################################################################ dnl # LIBZMQ_CHECK_POLLER([action-if-found], [action-if-not-found]) # dnl # Choose polling system # @@ -1197,6 +1223,27 @@ AC_DEFUN([LIBZMQ_CHECK_POLLER], [{ fi }]) +dnl ################################################################################ +dnl # LIBZMQ_CHECK_PPOLL([action-if-found], [action-if-not-found]) # +dnl # Check whether zmq_ppoll can be activated, and do so if it can # +dnl ################################################################################ + +AC_DEFUN([LIBZMQ_CHECK_PPOLL], [{ + AC_REQUIRE([AC_CANONICAL_HOST]) + + case "${host_os}" in + *mingw*|*cygwin*|*msys*) + # Disable ppoll by default on Windows + AC_MSG_NOTICE([NOT building active zmq_ppoll on '$host_os']) ;; + *) + LIBZMQ_CHECK_PSELECT([ + AC_MSG_NOTICE([Building with zmq_ppoll]) + AC_DEFINE(ZMQ_HAVE_PPOLL, 1, [Build with zmq_ppoll]) + ]) + ;; + esac +}]) + dnl ############################################################################## dnl # LIBZMQ_CHECK_CACHELINE # dnl # Check cacheline size for alignment purposes # diff --git a/builds/cmake/platform.hpp.in b/builds/cmake/platform.hpp.in index 013dca1b..2d23e8b2 100644 --- a/builds/cmake/platform.hpp.in +++ b/builds/cmake/platform.hpp.in @@ -12,6 +12,7 @@ #cmakedefine ZMQ_IOTHREAD_POLLER_USE_DEVPOLL #cmakedefine ZMQ_IOTHREAD_POLLER_USE_POLL #cmakedefine ZMQ_IOTHREAD_POLLER_USE_SELECT +#cmakedefine ZMQ_HAVE_PPOLL #cmakedefine ZMQ_POLL_BASED_ON_SELECT #cmakedefine ZMQ_POLL_BASED_ON_POLL diff --git a/configure.ac b/configure.ac index 6af4cadc..5454f2a4 100644 --- a/configure.ac +++ b/configure.ac @@ -416,6 +416,9 @@ LIBZMQ_CHECK_DOC_BUILD # Check polling system, set appropriate macro in src/platform.hpp LIBZMQ_CHECK_POLLER +# Check for pselect to activate ppoll, set appropriate macro in src/platform.hpp +LIBZMQ_CHECK_PPOLL + # Check cacheline size, set appropriate macro in src/platform.hpp LIBZMQ_CHECK_CACHELINE diff --git a/doc/Makefile.am b/doc/Makefile.am index 4fa53818..67eb55e8 100644 --- a/doc/Makefile.am +++ b/doc/Makefile.am @@ -10,7 +10,7 @@ MAN3 = zmq_bind.3 zmq_unbind.3 zmq_connect.3 zmq_connect_peer.3 zmq_disconnect.3 zmq_send.3 zmq_recv.3 zmq_send_const.3 \ zmq_msg_get.3 zmq_msg_set.3 zmq_msg_more.3 zmq_msg_gets.3 \ zmq_getsockopt.3 zmq_setsockopt.3 \ - zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 \ + zmq_socket.3 zmq_socket_monitor.3 zmq_poll.3 zmq_ppoll.3 \ zmq_socket_monitor_versioned.3 \ zmq_errno.3 zmq_strerror.3 zmq_version.3 \ zmq_sendmsg.3 zmq_recvmsg.3 \ diff --git a/doc/zmq_ppoll.txt b/doc/zmq_ppoll.txt new file mode 100644 index 00000000..9f9a93fb --- /dev/null +++ b/doc/zmq_ppoll.txt @@ -0,0 +1,140 @@ +zmq_poll(3) +=========== + + +NAME +---- +zmq_ppoll - input/output multiplexing with signal mask + + +SYNOPSIS +-------- + +*int zmq_ppoll (zmq_pollitem_t '*items', int 'nitems', long 'timeout', const sigset_t '*sigmask');* + + +DESCRIPTION +----------- +The relationship between _zmq_poll()_ and _zmq_ppoll()_ is analogous to the +relationship between poll(2) and ppoll(2) and between select(2) and +pselect(2): _zmq_ppoll()_ allows an application to safely wait until either a +file descriptor becomes ready or until a signal is caught. + +When using _zmq_ppoll()_ with 'sigmask' set to NULL, its behavior is identical +to that of _zmq_poll()_. See linkzmq:zmq_poll[3] for more on this. + +To make full use of _zmq_ppoll()_, a non-NULL pointer to a signal mask must be +constructed and passed to 'sigmask'. See sigprocmask(2) for more details. When +this is done, inside the actual _ppoll()_ (or _pselect()_, see note below) +system call, an atomic operation consisting of three steps is performed: + 1. The current signal mask is replaced by the one pointed to by 'sigmask'. + 2. The actual _poll()_ call is done. + 3. The original signal mask is restored. +Because these operations are done atomically, there is no opportunity for race +conditions in between the calls changing the signal mask and the poll/select +system call. This means that only during this (atomic) call, we can unblock +certain signals, so that they can be handled *at that time only*, not outside +of the call. This means that effectively, we extend our poller into a function +that not only watches sockets for changes, but also watches the "POSIX signal +socket" for incoming signals. At other times, these signals will be blocked, +and we will not have to deal with interruptions in system calls at these other +times. + +NOTE: The _zmq_ppoll()_ function may be implemented or emulated using operating +system interfaces other than _ppoll()_, and as such may be subject to the +limits of those interfaces in ways not defined in this documentation. + +NOTE: There is no _ppoll_ or _pselect_ on Windows, so _zmq_ppoll()_ is not +supported in Windows builds. It is still callable, but its 'sigmask' has void +pointer type (because 'sigset_t' is also not available on Windows) and +_zmq_ppoll()_ will return with an error (see error section below). + +THREAD SAFETY +------------- +The *zmq_pollitem_t* array must only be used by the thread which +will/is calling _zmq_ppoll_. + +If a socket is contained in multiple *zmq_pollitem_t* arrays, each owned by a +different thread, the socket itself needs to be thead-safe (Server, Client, ...). +Otherwise, behaviour is undefined. + + +RETURN VALUE +------------ +Upon successful completion, the _zmq_ppoll()_ function shall return the number +of *zmq_pollitem_t* structures with events signaled in 'revents' or `0` if no +events have been signaled. Upon failure, _zmq_ppoll()_ shall return `-1` and set +'errno' to one of the values defined below. + + +ERRORS +------ +*ETERM*:: +At least one of the members of the 'items' array refers to a 'socket' whose +associated 0MQ 'context' was terminated. +*EFAULT*:: +The provided 'items' was not valid (NULL). +*EINTR*:: +The operation was interrupted by delivery of a signal before any events were +available. +*EINTR*:: +The operation was interrupted by delivery of a signal before any events were +available. +*ENOTSUP*:: +_zmq_ppoll()_ was not activated in this build. + + +EXAMPLE +------- +.Polling indefinitely for input events on both a 0MQ socket and a standard socket. +See the _example section_ of linkzmq:zmq_poll[3]. One only needs to replace +the _zmq_poll_ call with _zmq_ppoll_ and add a _NULL_ argument for the 'sigmask' +parameter. + +.Handle SIGTERM during _zmq_ppoll_ (and block it otherwise). +---- +// simple global signal handler for SIGTERM +static bool sigterm_received = false; +void handle_sigterm (int signum) { + sigterm_received = true; +} + +// set up signal mask and install handler for SIGTERM +sigset_t sigmask, sigmask_without_sigterm; +sigemptyset(&sigmask); +sigaddset(&sigmask, SIGTERM); +sigprocmask(SIG_BLOCK, &sigmask, &sigmask_without_sigterm); +struct sigaction sa; +memset(&sa, '\0', sizeof(sa)); +sa.sa_handler = handle_sigterm; + +// poll +zmq_pollitem_t items [1]; +// Just one item, which refers to 0MQ socket 'socket' */ +items[0].socket = socket; +items[0].events = ZMQ_POLLIN; +// Poll for events indefinitely, but also exit on SIGTERM +int rc = zmq_poll (items, 2, -1, &sigmask_without_sigterm); +if (rc < 0 && errno == EINTR && sigterm_received) { + // do your SIGTERM business +} else { + // do your non-SIGTERM error handling +} +---- + + +SEE ALSO +-------- +linkzmq:zmq_poll[3] +linkzmq:zmq_socket[3] +linkzmq:zmq_send[3] +linkzmq:zmq_recv[3] +linkzmq:zmq[7] + +Your operating system documentation for the _poll()_ system call. + + +AUTHORS +------- +This page was written by the 0MQ community. To make a change please +read the 0MQ Contribution Policy at . diff --git a/include/zmq.h b/include/zmq.h index e3313ce5..2612c664 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -101,6 +101,11 @@ typedef unsigned __int8 uint8_t; #include #endif +#if !defined _WIN32 +// needed for sigset_t definition in zmq_ppoll +#include +#endif + // 32-bit AIX's pollfd struct members are called reqevents and rtnevents so it // defines compatibility macros for them. Need to include that header first to // stop build failures since zmq_pollset_t defines them as events and revents. @@ -763,6 +768,19 @@ ZMQ_EXPORT int zmq_socket_monitor_versioned ( void *s_, const char *addr_, uint64_t events_, int event_version_, int type_); ZMQ_EXPORT int zmq_socket_monitor_pipes_stats (void *s); +#if !defined _WIN32 +ZMQ_EXPORT int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const sigset_t *sigmask_); +#else +// Windows has no sigset_t +ZMQ_EXPORT int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const void *sigmask_); +#endif + #endif // ZMQ_BUILD_DRAFT_API diff --git a/src/polling_util.hpp b/src/polling_util.hpp index a73ec747..6c11d645 100644 --- a/src/polling_util.hpp +++ b/src/polling_util.hpp @@ -108,8 +108,9 @@ typedef int timeout_t; timeout_t compute_timeout (bool first_pass_, long timeout_, uint64_t now_, uint64_t end_); - -#elif defined ZMQ_POLL_BASED_ON_SELECT +#endif +#if (!defined ZMQ_POLL_BASED_ON_POLL && defined ZMQ_POLL_BASED_ON_SELECT) \ + || defined ZMQ_HAVE_PPOLL #if defined ZMQ_HAVE_WINDOWS inline size_t valid_pollset_bytes (const fd_set &pollset_) { diff --git a/src/zmq.cpp b/src/zmq.cpp index 2c2e3d52..1733cafc 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -97,6 +97,11 @@ struct iovec #include "ip.hpp" #include "address.hpp" +#ifdef ZMQ_HAVE_PPOLL +#include "polling_util.hpp" +#include +#endif + #if defined ZMQ_HAVE_OPENPGM #define __PGM_WININT_H__ #include @@ -1148,6 +1153,304 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) #endif } +#ifdef ZMQ_HAVE_PPOLL +// return values of 0 or -1 should be returned from zmq_poll; return value 1 means items passed checks +int zmq_poll_check_items_ (zmq_pollitem_t *items_, int nitems_, long timeout_) +{ + if (unlikely (nitems_ < 0)) { + errno = EINVAL; + return -1; + } + if (unlikely (nitems_ == 0)) { + if (timeout_ == 0) + return 0; +#if defined ZMQ_HAVE_WINDOWS + Sleep (timeout_ > 0 ? timeout_ : INFINITE); + return 0; +#elif defined ZMQ_HAVE_VXWORKS + struct timespec ns_; + ns_.tv_sec = timeout_ / 1000; + ns_.tv_nsec = timeout_ % 1000 * 1000000; + return nanosleep (&ns_, 0); +#else + return usleep (timeout_ * 1000); +#endif + } + if (!items_) { + errno = EFAULT; + return -1; + } + return 1; +} + +struct zmq_poll_select_fds_t_ +{ + explicit zmq_poll_select_fds_t_ (int nitems_) : + pollset_in (nitems_), + pollset_out (nitems_), + pollset_err (nitems_), + inset (nitems_), + outset (nitems_), + errset (nitems_), + maxfd (0) + { + FD_ZERO (pollset_in.get ()); + FD_ZERO (pollset_out.get ()); + FD_ZERO (pollset_err.get ()); + } + + zmq::optimized_fd_set_t pollset_in; + zmq::optimized_fd_set_t pollset_out; + zmq::optimized_fd_set_t pollset_err; + zmq::optimized_fd_set_t inset; + zmq::optimized_fd_set_t outset; + zmq::optimized_fd_set_t errset; + zmq::fd_t maxfd; +}; + +zmq_poll_select_fds_t_ +zmq_poll_build_select_fds_ (zmq_pollitem_t *items_, int nitems_, int &rc) +{ + // Ensure we do not attempt to select () on more than FD_SETSIZE + // file descriptors. + // TODO since this function is called by a client, we could return errno EINVAL/ENOMEM/... here + zmq_assert (nitems_ <= FD_SETSIZE); + + zmq_poll_select_fds_t_ fds (nitems_); + + // Build the fd_sets for passing to select (). + for (int i = 0; i != nitems_; i++) { + // If the poll item is a 0MQ socket we are interested in input on the + // notification file descriptor retrieved by the ZMQ_FD socket option. + if (items_[i].socket) { + size_t zmq_fd_size = sizeof (zmq::fd_t); + zmq::fd_t notify_fd; + if (zmq_getsockopt (items_[i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) + == -1) { + rc = -1; + return fds; + } + if (items_[i].events) { + FD_SET (notify_fd, fds.pollset_in.get ()); + if (fds.maxfd < notify_fd) + fds.maxfd = notify_fd; + } + } + // Else, the poll item is a raw file descriptor. Convert the poll item + // events to the appropriate fd_sets. + else { + if (items_[i].events & ZMQ_POLLIN) + FD_SET (items_[i].fd, fds.pollset_in.get ()); + if (items_[i].events & ZMQ_POLLOUT) + FD_SET (items_[i].fd, fds.pollset_out.get ()); + if (items_[i].events & ZMQ_POLLERR) + FD_SET (items_[i].fd, fds.pollset_err.get ()); + if (fds.maxfd < items_[i].fd) + fds.maxfd = items_[i].fd; + } + } + + rc = 0; + return fds; +} + +timeval *zmq_poll_select_set_timeout_ ( + long timeout_, bool first_pass, uint64_t now, uint64_t end, timeval &timeout) +{ + timeval *ptimeout; + if (first_pass) { + timeout.tv_sec = 0; + timeout.tv_usec = 0; + ptimeout = &timeout; + } else if (timeout_ < 0) + ptimeout = NULL; + else { + timeout.tv_sec = static_cast ((end - now) / 1000); + timeout.tv_usec = static_cast ((end - now) % 1000 * 1000); + ptimeout = &timeout; + } + return ptimeout; +} + +timespec *zmq_poll_select_set_timeout_ ( + long timeout_, bool first_pass, uint64_t now, uint64_t end, timespec &timeout) +{ + timespec *ptimeout; + if (first_pass) { + timeout.tv_sec = 0; + timeout.tv_nsec = 0; + ptimeout = &timeout; + } else if (timeout_ < 0) + ptimeout = NULL; + else { + timeout.tv_sec = static_cast ((end - now) / 1000); + timeout.tv_nsec = static_cast ((end - now) % 1000 * 1000000); + ptimeout = &timeout; + } + return ptimeout; +} + +int zmq_poll_select_check_events_ (zmq_pollitem_t *items_, + int nitems_, + zmq_poll_select_fds_t_ &fds, + int &nevents) +{ + // Check for the events. + for (int i = 0; i != nitems_; i++) { + items_[i].revents = 0; + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_[i].socket) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_[i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) + == -1) + return -1; + if ((items_[i].events & ZMQ_POLLOUT) && (zmq_events & ZMQ_POLLOUT)) + items_[i].revents |= ZMQ_POLLOUT; + if ((items_[i].events & ZMQ_POLLIN) && (zmq_events & ZMQ_POLLIN)) + items_[i].revents |= ZMQ_POLLIN; + } + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + if (FD_ISSET (items_[i].fd, fds.inset.get ())) + items_[i].revents |= ZMQ_POLLIN; + if (FD_ISSET (items_[i].fd, fds.outset.get ())) + items_[i].revents |= ZMQ_POLLOUT; + if (FD_ISSET (items_[i].fd, fds.errset.get ())) + items_[i].revents |= ZMQ_POLLERR; + } + + if (items_[i].revents) + nevents++; + } + + return 0; +} + +bool zmq_poll_must_break_loop_ (long timeout_, + int nevents, + bool &first_pass, + zmq::clock_t &clock, + uint64_t &now, + uint64_t &end) +{ + // If timeout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) + return true; + + // If there are events to return, we can exit immediately. + if (nevents) + return true; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) { + if (first_pass) + first_pass = false; + return false; + } + + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + timeout_; + if (now == end) + return true; + first_pass = false; + return false; + } + + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + return true; + + // finally, in all other cases, we just continue + return false; +} +#endif // ZMQ_HAVE_PPOLL + +#if !defined _WIN32 +int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const sigset_t *sigmask_) +#else +// Windows has no sigset_t +int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const void *sigmask_) +#endif +{ +#ifdef ZMQ_HAVE_PPOLL + int rc = zmq_poll_check_items_ (items_, nitems_, timeout_); + if (rc <= 0) { + return rc; + } + + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + zmq_poll_select_fds_t_ fds = + zmq_poll_build_select_fds_ (items_, nitems_, rc); + if (rc == -1) { + return -1; + } + + bool first_pass = true; + int nevents = 0; + + while (true) { + // Compute the timeout for the subsequent poll. + timespec timeout; + timespec *ptimeout = zmq_poll_select_set_timeout_ (timeout_, first_pass, + now, end, timeout); + + // Wait for events. Ignore interrupts if there's infinite timeout. + while (true) { + memcpy (fds.inset.get (), fds.pollset_in.get (), + zmq::valid_pollset_bytes (*fds.pollset_in.get ())); + memcpy (fds.outset.get (), fds.pollset_out.get (), + zmq::valid_pollset_bytes (*fds.pollset_out.get ())); + memcpy (fds.errset.get (), fds.pollset_err.get (), + zmq::valid_pollset_bytes (*fds.pollset_err.get ())); + int rc = + pselect (fds.maxfd + 1, fds.inset.get (), fds.outset.get (), + fds.errset.get (), ptimeout, sigmask_); + if (unlikely (rc == -1)) { + errno_assert (errno == EINTR || errno == EBADF); + return -1; + } + break; + } + + rc = zmq_poll_select_check_events_ (items_, nitems_, fds, nevents); + if (rc < 0) { + return rc; + } + + if (zmq_poll_must_break_loop_ (timeout_, nevents, first_pass, clock, + now, end)) { + break; + } + } + + return nevents; +#else + errno = ENOTSUP; + return -1; +#endif // ZMQ_HAVE_PPOLL +} + // The poller functionality void *zmq_poller_new (void) diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 59be7932..5a1b17ab 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -169,6 +169,19 @@ int zmq_socket_monitor_versioned ( void *s_, const char *addr_, uint64_t events_, int event_version_, int type_); int zmq_socket_monitor_pipes_stats (void *s_); +#if !defined _WIN32 +int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const sigset_t *sigmask_); +#else +// Windows has no sigset_t +int zmq_ppoll (zmq_pollitem_t *items_, + int nitems_, + long timeout_, + const void *sigmask_); +#endif + #endif // ZMQ_BUILD_DRAFT_API #endif //ifndef __ZMQ_DRAFT_H_INCLUDED__ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e3a3e753..77591784 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -165,7 +165,11 @@ if(ENABLE_DRAFTS) test_hello_msg test_disconnect_msg test_hiccup_msg + test_zmq_ppoll_fd ) + if(HAVE_FORK) + list(APPEND tests test_zmq_ppoll_signals) + endif() if(ZMQ_HAVE_BUSY_POLL) list(APPEND tests test_busy_poll) endif() diff --git a/tests/test_zmq_ppoll_fd.cpp b/tests/test_zmq_ppoll_fd.cpp new file mode 100644 index 00000000..65b1c07c --- /dev/null +++ b/tests/test_zmq_ppoll_fd.cpp @@ -0,0 +1,90 @@ +/* + Copyright (c) 2021 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include + +#ifndef _WIN32 +#include +#include +#endif + +SETUP_TEARDOWN_TESTCONTEXT + +void test_ppoll_fd () +{ +#ifdef ZMQ_HAVE_PPOLL + int recv_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP); + TEST_ASSERT_NOT_EQUAL (-1, recv_socket); + + int flag = 1; + TEST_ASSERT_SUCCESS_ERRNO ( + setsockopt (recv_socket, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof (int))); + + struct sockaddr_in saddr = bind_bsd_socket (recv_socket); + + void *sb = test_context_socket (ZMQ_REP); + + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (sb, "tcp://127.0.0.1:*")); + + zmq_pollitem_t pollitems[] = { + {sb, 0, ZMQ_POLLIN, 0}, + {NULL, recv_socket, ZMQ_POLLIN, 0}, + }; + + int send_socket = socket (AF_INET, SOCK_DGRAM, IPPROTO_UDP); + TEST_ASSERT_NOT_EQUAL (-1, send_socket); + + char buf[10]; + memset (buf, 1, 10); + + TEST_ASSERT_SUCCESS_ERRNO (sendto ( + send_socket, buf, 10, 0, (struct sockaddr *) &saddr, sizeof (saddr))); + + TEST_ASSERT_EQUAL (1, zmq_ppoll (pollitems, 2, 1, NULL)); + TEST_ASSERT_BITS_LOW (ZMQ_POLLIN, pollitems[0].revents); + TEST_ASSERT_BITS_HIGH (ZMQ_POLLIN, pollitems[1].revents); + + test_context_socket_close (sb); + + close (send_socket); + close (recv_socket); +#else + TEST_IGNORE_MESSAGE ("libzmq without zmq_ppoll, ignoring test"); +#endif // ZMQ_HAVE_PPOLL +} + +int main () +{ + UNITY_BEGIN (); + RUN_TEST (test_ppoll_fd); + return UNITY_END (); +} diff --git a/tests/test_zmq_ppoll_signals.cpp b/tests/test_zmq_ppoll_signals.cpp new file mode 100644 index 00000000..44d90b0b --- /dev/null +++ b/tests/test_zmq_ppoll_signals.cpp @@ -0,0 +1,189 @@ +/* + Copyright (c) 2021 Contributors as noted in the AUTHORS file + + This file is part of libzmq, the ZeroMQ core engine in C++. + + libzmq is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License (LGPL) as published + by the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + As a special exception, the Contributors give you permission to link + this library with independent modules to produce an executable, + regardless of the license terms of these independent modules, and to + copy and distribute the resulting executable under terms of your choice, + provided that you also meet, for each linked independent module, the + terms and conditions of the license of that module. An independent + module is a module which is not derived from or based on this library. + If you modify this library, you must extend this exception to your + version of the library. + + libzmq is distributed in the hope that it will be useful, but WITHOUT + ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see . +*/ + +// author: E. G. Patrick Bos, Netherlands eScience Center, 2021 + +#include "testutil.hpp" +#include "testutil_unity.hpp" + +#include // memset +// types.h and wait.h for waitpid: +#include +#include + +static bool sigterm_received = false; + +void handle_sigterm (int /*signum*/) +{ + sigterm_received = true; +} + +void recv_string_expect_success_or_eagain (void *socket_, + const char *str_, + int flags_) +{ + const size_t len = str_ ? strlen (str_) : 0; + char buffer[255]; + TEST_ASSERT_LESS_OR_EQUAL_MESSAGE (sizeof (buffer), len, + "recv_string_expect_success cannot be " + "used for strings longer than 255 " + "characters"); + + const int rc = zmq_recv (socket_, buffer, sizeof (buffer), flags_); + if (rc < 0) { + if (errno == EAGAIN) { + printf ("got EAGAIN\n"); + return; + } else { + TEST_ASSERT_SUCCESS_ERRNO (rc); + } + } else { + TEST_ASSERT_EQUAL_INT ((int) len, rc); + if (str_) + TEST_ASSERT_EQUAL_STRING_LEN (str_, buffer, len); + } +} + +void test_ppoll_signals () +{ +#ifdef ZMQ_HAVE_PPOLL + pid_t child_pid; + do { + child_pid = fork (); + } while (child_pid == -1); // retry if fork fails + + if (child_pid > 0) { // parent + setup_test_context (); + void *socket = test_context_socket (ZMQ_REQ); + // to make sure we don't hang when the child has already exited at the end, we set a receive timeout of five seconds + int recv_timeout = 5000; + TEST_ASSERT_SUCCESS_ERRNO (zmq_setsockopt ( + socket, ZMQ_RCVTIMEO, &recv_timeout, sizeof (recv_timeout))); + TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (socket, "tcp://*:6660")); + // bind is on the master process to avoid zombie children to hold on to binds + + // first send a test message to check whether the signal mask is setup in the child process + send_string_expect_success (socket, "breaker breaker", 0); + recv_string_expect_success (socket, "one-niner", 0); + + // then send the signal + kill (child_pid, SIGTERM); + + // for good measure, and to make sure everything went as expected, close off with another handshake, which will trigger the second poll call on the other side + send_string_expect_success (socket, "breaker breaker", 0); + // in case the 1 second sleep was not enough on the child side, we are also fine with an EAGAIN here + recv_string_expect_success_or_eagain (socket, "one-niner", 0); + + // finish + test_context_socket_close (socket); + + // wait for child + int status = 0; + pid_t pid; + do { + pid = waitpid (child_pid, &status, 0); + } while (-1 == pid + && EINTR == errno); // retry on interrupted system call + + if (0 != status) { + if (WIFEXITED (status)) { + printf ("exited, status=%d\n", WEXITSTATUS (status)); + } else if (WIFSIGNALED (status)) { + printf ("killed by signal %d\n", WTERMSIG (status)); + } else if (WIFSTOPPED (status)) { + printf ("stopped by signal %d\n", WSTOPSIG (status)); + } else if (WIFCONTINUED (status)) { + printf ("continued\n"); + } + } + + if (-1 == pid) { + printf ("waitpid returned -1, with errno %s\n", strerror (errno)); + } + } else { // child + setup_test_context (); + // set up signal mask and install handler for SIGTERM + sigset_t sigmask, sigmask_without_sigterm; + sigemptyset (&sigmask); + sigaddset (&sigmask, SIGTERM); + sigprocmask (SIG_BLOCK, &sigmask, &sigmask_without_sigterm); + struct sigaction sa; + memset (&sa, '\0', sizeof (sa)); + sa.sa_handler = handle_sigterm; + TEST_ASSERT_SUCCESS_ERRNO (sigaction (SIGTERM, &sa, NULL)); + + void *socket = test_context_socket (ZMQ_REP); + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_connect (socket, "tcp://127.0.0.1:6660")); + + zmq_pollitem_t pollitems[] = { + {socket, 0, ZMQ_POLLIN, 0}, + }; + + // first receive test message and send back handshake + recv_string_expect_success (socket, "breaker breaker", 0); + send_string_expect_success (socket, "one-niner", 0); + + // now start ppolling, which should exit with EINTR because of the SIGTERM + TEST_ASSERT_FAILURE_ERRNO ( + EINTR, zmq_ppoll (pollitems, 1, -1, &sigmask_without_sigterm)); + TEST_ASSERT_TRUE (sigterm_received); + + // poll again for the final handshake + TEST_ASSERT_SUCCESS_ERRNO ( + zmq_ppoll (pollitems, 1, -1, &sigmask_without_sigterm)); + TEST_ASSERT_BITS_HIGH (ZMQ_POLLIN, pollitems[0].revents); + // receive and send back handshake + recv_string_expect_success (socket, "breaker breaker", 0); + send_string_expect_success (socket, "one-niner", 0); + + // finish + // wait before closing socket, so that parent has time to receive + sleep (1); + test_context_socket_close (socket); + _Exit (0); + } +#else + TEST_IGNORE_MESSAGE ("libzmq without zmq_ppoll, ignoring test"); +#endif // ZMQ_HAVE_PPOLL +} + +// We note that using zmq_poll instead of zmq_ppoll in the test above, while +// also not using the sigmask, will fail most of the time, because it is +// impossible to predict during which call the signal will be handled. Of +// course, every call could be surrounded with an EINTR check and a subsequent +// check of sigterm_received's value, but even then a race condition can occur, +// see the explanation given here: https://250bpm.com/blog:12/ + +int main () +{ + UNITY_BEGIN (); + RUN_TEST (test_ppoll_signals); + return UNITY_END (); +}