diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index 09b295bd..a460b548 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -401,7 +401,135 @@ void zmq::socket_poller_t::rebuild () need_rebuild = false; } -int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_events_, long timeout_) +void zmq::socket_poller_t::zero_trail_events ( + zmq::socket_poller_t::event_t *events_, + int n_events_, + int found) +{ + for (int i = found; i < n_events_; ++i) { + events_[i].socket = NULL; + events_[i].fd = 0; + events_[i].user_data = NULL; + events_[i].events = 0; + } +} + +#if defined ZMQ_POLL_BASED_ON_POLL +int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_, + int n_events_) +#elif defined ZMQ_POLL_BASED_ON_SELECT +int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_, + int n_events_, + fd_set& inset, + fd_set& outset, + fd_set& errset) +#endif +{ + int found = 0; + for (items_t::iterator it = items.begin (); it != items.end () && + found < n_events_; ++it) { + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (it->socket) { + size_t events_size = sizeof (uint32_t); + uint32_t events; + if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) + == -1) { + return -1; + } + + if (it->events & events) { + events_[found].socket = it->socket; + events_[found].user_data = it->user_data; + events_[found].events = it->events & events; + ++found; + } + } + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + +#if defined ZMQ_POLL_BASED_ON_POLL + + short revents = pollfds [it->pollfd_index].revents; + short events = 0; + + if (revents & POLLIN) + events |= ZMQ_POLLIN; + if (revents & POLLOUT) + events |= ZMQ_POLLOUT; + if (revents & POLLPRI) + events |= ZMQ_POLLPRI; + if (revents & ~(POLLIN | POLLOUT | POLLPRI)) + events |= ZMQ_POLLERR; + +#elif defined ZMQ_POLL_BASED_ON_SELECT + + short events = 0; + + if (FD_ISSET (it->fd, &inset)) + events |= ZMQ_POLLIN; + if (FD_ISSET (it->fd, &outset)) + events |= ZMQ_POLLOUT; + if (FD_ISSET (it->fd, &errset)) + events |= ZMQ_POLLERR; +#endif //POLL_SELECT + + if (events) { + events_[found].socket = NULL; + events_[found].user_data = it->user_data; + events_[found].fd = it->fd; + events_[found].events = events; + ++found; + } + } + } + + return found; +} + +//Return 0 if timeout is expired otherwise 1 +int zmq::socket_poller_t::adjust_timeout (zmq::clock_t& clock, long timeout_, + uint64_t& now, + uint64_t& end, + bool& first_pass) +{ + + // If socket_poller_t::timeout is zero, exit immediately whether there + // are events or not. + if (timeout_ == 0) + return 0; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) { + if (first_pass) + first_pass = false; + return 1; + } + + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + now = clock.now_ms (); + if (first_pass) { + end = now + timeout_; + first_pass = false; + return 1; + } + + // Find out whether timeout have expired. + if (now >= end) + return 0; + + return 1; +} + +int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, + int n_events_, + long timeout_) { if (items.empty () && timeout_ < 0) { errno = EFAULT; @@ -411,7 +539,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev if (need_rebuild) rebuild (); -#if defined ZMQ_POLL_BASED_ON_POLL if (unlikely (poll_size == 0)) { // We'll report an error (timed out) as if the list was non-empty and // no event occurred within the specified timeout. Otherwise the caller @@ -432,6 +559,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev #endif } +#if defined ZMQ_POLL_BASED_ON_POLL zmq::clock_t clock; uint64_t now = 0; uint64_t end = 0; @@ -464,89 +592,15 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev signaler->recv (); // Check for the events. - int found = 0; - for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) { - - events_[found].socket = NULL; - events_[found].fd = 0; - events_[found].user_data = NULL; - events_[found].events = 0; - - // The poll item is a 0MQ socket. Retrieve pending events - // using the ZMQ_EVENTS socket option. - if (it->socket) { - size_t events_size = sizeof (uint32_t); - uint32_t events; - if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) { - return -1; - } - - if (it->events & events) { - events_[found].socket = it->socket; - events_[found].user_data = it->user_data; - events_[found].events = it->events & events; - ++found; - } - } - // Else, the poll item is a raw file descriptor, simply convert - // the events to zmq_pollitem_t-style format. - else { - short revents = pollfds [it->pollfd_index].revents; - short events = 0; - - if (revents & POLLIN) - events |= ZMQ_POLLIN; - if (revents & POLLOUT) - events |= ZMQ_POLLOUT; - if (revents & POLLPRI) - events |= ZMQ_POLLPRI; - if (revents & ~(POLLIN | POLLOUT | POLLPRI)) - events |= ZMQ_POLLERR; - - if (events) { - events_[found].socket = NULL; - events_[found].user_data = it->user_data; - events_[found].fd = it->fd; - events_[found].events = events; - ++found; - } - } - } + int found = check_events (events_, n_events_); if (found) { - for (int i = found; i < n_events_; ++i) { - events_[i].socket = NULL; - events_[i].fd = 0; - events_[i].user_data = NULL; - events_[i].events = 0; - } + if (found > 0) + zero_trail_events (events_, n_events_, found); return found; } - // If timeout is zero, exit immediately whether there are events or not. - if (timeout_ == 0) - break; - - // At this point we are meant to wait for events but there are none. - // If timeout is infinite we can just loop until we get some events. - if (timeout_ < 0) { - if (first_pass) - first_pass = false; - continue; - } - - // The timeout is finite but non-zero and there are no events. In the - // first pass, we get a timestamp of when the polling have begun. - // (We assume that first pass have taken negligible time). We also - // compute the time when the polling should time out. - now = clock.now_ms (); - if (first_pass) { - end = now + timeout_; - first_pass = false; - continue; - } - - // Find out whether timeout have expired. - if (now >= end) + // Adjust timeout or break + if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0) break; } errno = EAGAIN; @@ -554,27 +608,12 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev #elif defined ZMQ_POLL_BASED_ON_SELECT - if (unlikely (poll_size == 0)) { - // We'll report an error (timed out) as if the list was non-empty and - // no event occured within the specified timeout. Otherwise the caller - // needs to check the return value AND the event to avoid using the - // nullified event data. - errno = EAGAIN; - if (timeout_ == 0) - return -1; -#if defined ZMQ_HAVE_WINDOWS - Sleep (timeout_ > 0 ? timeout_ : INFINITE); - return -1; -#else - usleep (timeout_ * 1000); - return -1; -#endif - } zmq::clock_t clock; uint64_t now = 0; uint64_t end = 0; bool first_pass = true; + fd_set inset, outset, errset; while (true) { @@ -629,81 +668,15 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev signaler->recv (); // Check for the events. - int found = 0; - for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) { - - // The poll item is a 0MQ socket. Retrieve pending events - // using the ZMQ_EVENTS socket option. - if (it->socket) { - size_t events_size = sizeof (uint32_t); - uint32_t events; - if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) - return -1; - - if (it->events & events) { - events_[found].socket = it->socket; - events_[found].user_data = it->user_data; - events_[found].events = it->events & events; - ++found; - } - } - // Else, the poll item is a raw file descriptor, simply convert - // the events to zmq_pollitem_t-style format. - else { - short events = 0; - - if (FD_ISSET (it->fd, &inset)) - events |= ZMQ_POLLIN; - if (FD_ISSET (it->fd, &outset)) - events |= ZMQ_POLLOUT; - if (FD_ISSET (it->fd, &errset)) - events |= ZMQ_POLLERR; - - if (events) { - events_[found].socket = NULL; - events_[found].user_data = it->user_data; - events_[found].fd = it->fd; - events_[found].events = events; - ++found; - } - } - } + int found = check_events(events_, n_events_, inset, outset, errset); if (found) { - // zero-out remaining events - for (int i = found; i < n_events_; ++i) { - events_[i].socket = NULL; - events_[i].fd = 0; - events_[i].user_data = NULL; - events_[i].events = 0; - } + if (found > 0) + zero_trail_events (events_, n_events_, found); return found; } - // If timeout is zero, exit immediately whether there are events or not. - if (timeout_ == 0) - break; - - // At this point we are meant to wait for events but there are none. - // If timeout is infinite we can just loop until we get some events. - if (timeout_ < 0) { - if (first_pass) - first_pass = false; - continue; - } - - // The timeout is finite and there are no events. In the first pass - // we get a timestamp of when the polling have begun. (We assume that - // first pass have taken negligible time). We also compute the time - // when the polling should time out. - now = clock.now_ms (); - if (first_pass) { - end = now + timeout_; - first_pass = false; - continue; - } - - // Find out whether timeout have expired. - if (now >= end) + // Adjust timeout or break + if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0) break; } @@ -711,8 +684,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev return -1; #else + // Exotic platforms that support neither poll() nor select(). errno = ENOTSUP; return -1; + #endif } diff --git a/src/socket_poller.hpp b/src/socket_poller.hpp index b330a6ac..a774ce6a 100644 --- a/src/socket_poller.hpp +++ b/src/socket_poller.hpp @@ -81,6 +81,21 @@ namespace zmq bool check_tag (); private: + void zero_trail_events (zmq::socket_poller_t::event_t *events_, + int n_events_, + int found); +#if defined ZMQ_POLL_BASED_ON_POLL + int check_events (zmq::socket_poller_t::event_t *events_, + int n_events_); +#elif defined ZMQ_POLL_BASED_ON_SELECT + int check_events (zmq::socket_poller_t::event_t *events_, int n_events_, + fd_set& inset, + fd_set& outset, + fd_set& errset); +#endif + int adjust_timeout (zmq::clock_t& clock, long timeout_, uint64_t& now, + uint64_t& end, + bool& first_pass); void rebuild (); // Used to check whether the object is a socket_poller.