From 596d6e5b1c2d060b2b5b09d01f7ebd207041791d Mon Sep 17 00:00:00 2001 From: KIU Shueng Chuan Date: Tue, 29 Sep 2015 09:14:02 +0800 Subject: [PATCH] create signaler::recv_failable() In real world usage, there have been reported signaler failures where the eventfd read() or socket recv() system call in signaler::recv() fails, despite having made a prior successful signaler::wait() call. this patch creates a signaler::recv_failable() method that allows unreadable eventfd / socket to return an error without asserting. --- src/mailbox.cpp | 8 +++-- src/signaler.cpp | 77 ++++++++++++++++++++++++++++++++++++++---------- src/signaler.hpp | 1 + 3 files changed, 69 insertions(+), 17 deletions(-) diff --git a/src/mailbox.cpp b/src/mailbox.cpp index e3c2e8a1..a354a4b8 100644 --- a/src/mailbox.cpp +++ b/src/mailbox.cpp @@ -77,14 +77,18 @@ int zmq::mailbox_t::recv (command_t *cmd_, int timeout_) } // Wait for signal from the command sender. - const int rc = signaler.wait (timeout_); + int rc = signaler.wait (timeout_); if (rc == -1) { errno_assert (errno == EAGAIN || errno == EINTR); return -1; } // Receive the signal. - signaler.recv (); + rc = signaler.recv_failable (); + if (rc == -1) { + errno_assert (errno == EAGAIN); + return -1; + } // Switch into active state. active = true; diff --git a/src/signaler.cpp b/src/signaler.cpp index 008e6548..b298fda9 100644 --- a/src/signaler.cpp +++ b/src/signaler.cpp @@ -289,23 +289,18 @@ void zmq::signaler_t::recv () #if defined ZMQ_HAVE_EVENTFD uint64_t dummy; ssize_t sz = read (r, &dummy, sizeof (dummy)); - if (sz == -1) { - errno_assert (errno == EAGAIN); - } - else { - errno_assert (sz == sizeof (dummy)); + errno_assert (sz == sizeof (dummy)); - // If we accidentally grabbed the next signal(s) along with the current - // one, return it back to the eventfd object. - if (unlikely (dummy > 1)) { - const uint64_t inc = dummy - 1; - ssize_t sz2 = write (w, &inc, sizeof (inc)); - errno_assert (sz2 == sizeof (inc)); - return; - } - - zmq_assert (dummy == 1); + // If we accidentally grabbed the next signal(s) along with the current + // one, return it back to the eventfd object. + if (unlikely (dummy > 1)) { + const uint64_t inc = dummy - 1; + ssize_t sz2 = write (w, &inc, sizeof (inc)); + errno_assert (sz2 == sizeof (inc)); + return; } + + zmq_assert (dummy == 1); #else unsigned char dummy; #if defined ZMQ_HAVE_WINDOWS @@ -320,6 +315,58 @@ void zmq::signaler_t::recv () #endif } +int zmq::signaler_t::recv_failable () +{ + // Attempt to read a signal. +#if defined ZMQ_HAVE_EVENTFD + uint64_t dummy; + ssize_t sz = read (r, &dummy, sizeof (dummy)); + if (sz == -1) { + errno_assert (errno == EAGAIN); + return -1; + } + else { + errno_assert (sz == sizeof (dummy)); + + // If we accidentally grabbed the next signal(s) along with the current + // one, return it back to the eventfd object. + if (unlikely (dummy > 1)) { + const uint64_t inc = dummy - 1; + ssize_t sz2 = write (w, &inc, sizeof (inc)); + errno_assert (sz2 == sizeof (inc)); + return 0; + } + + zmq_assert (dummy == 1); + } +#else + unsigned char dummy; +#if defined ZMQ_HAVE_WINDOWS + int nbytes = ::recv (r, (char*) &dummy, sizeof (dummy), 0); + if (nbytes == SOCKET_ERROR) { + const int last_error = WSAGetLastError(); + if (last_error == WSAEWOULDBLOCK) { + errno = EAGAIN; + return -1; + } + wsa_assert (last_error == WSAEWOULDBLOCK); + } +#else + ssize_t nbytes = ::recv (r, &dummy, sizeof (dummy), 0); + if (nbytes == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR) { + errno = EAGAIN; + return -1; + } + errno_assert (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR); + } +#endif + zmq_assert (nbytes == sizeof (dummy)); + zmq_assert (dummy == 0); +#endif + return 0; +} + #ifdef HAVE_FORK void zmq::signaler_t::forked () { diff --git a/src/signaler.hpp b/src/signaler.hpp index c31b0a48..0ac0b7d9 100644 --- a/src/signaler.hpp +++ b/src/signaler.hpp @@ -55,6 +55,7 @@ namespace zmq void send (); int wait (int timeout_); void recv (); + int recv_failable (); #ifdef HAVE_FORK // close the file descriptors in a forked child process so that they