mirror of
https://github.com/zeromq/libzmq.git
synced 2024-12-27 15:41:05 +08:00
Merge pull request #2744 from msune/refactor_poller_wait
Problem: duplicated socket_poller::wait() code
This commit is contained in:
commit
5de2a82be8
@ -401,7 +401,135 @@ void zmq::socket_poller_t::rebuild ()
|
||||
need_rebuild = false;
|
||||
}
|
||||
|
||||
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_events_, long timeout_)
|
||||
void zmq::socket_poller_t::zero_trail_events (
|
||||
zmq::socket_poller_t::event_t *events_,
|
||||
int n_events_,
|
||||
int found)
|
||||
{
|
||||
for (int i = found; i < n_events_; ++i) {
|
||||
events_[i].socket = NULL;
|
||||
events_[i].fd = 0;
|
||||
events_[i].user_data = NULL;
|
||||
events_[i].events = 0;
|
||||
}
|
||||
}
|
||||
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
|
||||
int n_events_)
|
||||
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
||||
int zmq::socket_poller_t::check_events (zmq::socket_poller_t::event_t *events_,
|
||||
int n_events_,
|
||||
fd_set& inset,
|
||||
fd_set& outset,
|
||||
fd_set& errset)
|
||||
#endif
|
||||
{
|
||||
int found = 0;
|
||||
for (items_t::iterator it = items.begin (); it != items.end () &&
|
||||
found < n_events_; ++it) {
|
||||
|
||||
// The poll item is a 0MQ socket. Retrieve pending events
|
||||
// using the ZMQ_EVENTS socket option.
|
||||
if (it->socket) {
|
||||
size_t events_size = sizeof (uint32_t);
|
||||
uint32_t events;
|
||||
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size)
|
||||
== -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (it->events & events) {
|
||||
events_[found].socket = it->socket;
|
||||
events_[found].user_data = it->user_data;
|
||||
events_[found].events = it->events & events;
|
||||
++found;
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor, simply convert
|
||||
// the events to zmq_pollitem_t-style format.
|
||||
else {
|
||||
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
|
||||
short revents = pollfds [it->pollfd_index].revents;
|
||||
short events = 0;
|
||||
|
||||
if (revents & POLLIN)
|
||||
events |= ZMQ_POLLIN;
|
||||
if (revents & POLLOUT)
|
||||
events |= ZMQ_POLLOUT;
|
||||
if (revents & POLLPRI)
|
||||
events |= ZMQ_POLLPRI;
|
||||
if (revents & ~(POLLIN | POLLOUT | POLLPRI))
|
||||
events |= ZMQ_POLLERR;
|
||||
|
||||
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
||||
|
||||
short events = 0;
|
||||
|
||||
if (FD_ISSET (it->fd, &inset))
|
||||
events |= ZMQ_POLLIN;
|
||||
if (FD_ISSET (it->fd, &outset))
|
||||
events |= ZMQ_POLLOUT;
|
||||
if (FD_ISSET (it->fd, &errset))
|
||||
events |= ZMQ_POLLERR;
|
||||
#endif //POLL_SELECT
|
||||
|
||||
if (events) {
|
||||
events_[found].socket = NULL;
|
||||
events_[found].user_data = it->user_data;
|
||||
events_[found].fd = it->fd;
|
||||
events_[found].events = events;
|
||||
++found;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return found;
|
||||
}
|
||||
|
||||
//Return 0 if timeout is expired otherwise 1
|
||||
int zmq::socket_poller_t::adjust_timeout (zmq::clock_t& clock, long timeout_,
|
||||
uint64_t& now,
|
||||
uint64_t& end,
|
||||
bool& first_pass)
|
||||
{
|
||||
|
||||
// If socket_poller_t::timeout is zero, exit immediately whether there
|
||||
// are events or not.
|
||||
if (timeout_ == 0)
|
||||
return 0;
|
||||
|
||||
// At this point we are meant to wait for events but there are none.
|
||||
// If timeout is infinite we can just loop until we get some events.
|
||||
if (timeout_ < 0) {
|
||||
if (first_pass)
|
||||
first_pass = false;
|
||||
return 1;
|
||||
}
|
||||
|
||||
// The timeout is finite and there are no events. In the first pass
|
||||
// we get a timestamp of when the polling have begun. (We assume that
|
||||
// first pass have taken negligible time). We also compute the time
|
||||
// when the polling should time out.
|
||||
now = clock.now_ms ();
|
||||
if (first_pass) {
|
||||
end = now + timeout_;
|
||||
first_pass = false;
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Find out whether timeout have expired.
|
||||
if (now >= end)
|
||||
return 0;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_,
|
||||
int n_events_,
|
||||
long timeout_)
|
||||
{
|
||||
if (items.empty () && timeout_ < 0) {
|
||||
errno = EFAULT;
|
||||
@ -411,7 +539,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
|
||||
if (need_rebuild)
|
||||
rebuild ();
|
||||
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
if (unlikely (poll_size == 0)) {
|
||||
// We'll report an error (timed out) as if the list was non-empty and
|
||||
// no event occurred within the specified timeout. Otherwise the caller
|
||||
@ -432,6 +559,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
|
||||
#endif
|
||||
}
|
||||
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
zmq::clock_t clock;
|
||||
uint64_t now = 0;
|
||||
uint64_t end = 0;
|
||||
@ -464,89 +592,15 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
|
||||
signaler->recv ();
|
||||
|
||||
// Check for the events.
|
||||
int found = 0;
|
||||
for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) {
|
||||
|
||||
events_[found].socket = NULL;
|
||||
events_[found].fd = 0;
|
||||
events_[found].user_data = NULL;
|
||||
events_[found].events = 0;
|
||||
|
||||
// The poll item is a 0MQ socket. Retrieve pending events
|
||||
// using the ZMQ_EVENTS socket option.
|
||||
if (it->socket) {
|
||||
size_t events_size = sizeof (uint32_t);
|
||||
uint32_t events;
|
||||
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (it->events & events) {
|
||||
events_[found].socket = it->socket;
|
||||
events_[found].user_data = it->user_data;
|
||||
events_[found].events = it->events & events;
|
||||
++found;
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor, simply convert
|
||||
// the events to zmq_pollitem_t-style format.
|
||||
else {
|
||||
short revents = pollfds [it->pollfd_index].revents;
|
||||
short events = 0;
|
||||
|
||||
if (revents & POLLIN)
|
||||
events |= ZMQ_POLLIN;
|
||||
if (revents & POLLOUT)
|
||||
events |= ZMQ_POLLOUT;
|
||||
if (revents & POLLPRI)
|
||||
events |= ZMQ_POLLPRI;
|
||||
if (revents & ~(POLLIN | POLLOUT | POLLPRI))
|
||||
events |= ZMQ_POLLERR;
|
||||
|
||||
if (events) {
|
||||
events_[found].socket = NULL;
|
||||
events_[found].user_data = it->user_data;
|
||||
events_[found].fd = it->fd;
|
||||
events_[found].events = events;
|
||||
++found;
|
||||
}
|
||||
}
|
||||
}
|
||||
int found = check_events (events_, n_events_);
|
||||
if (found) {
|
||||
for (int i = found; i < n_events_; ++i) {
|
||||
events_[i].socket = NULL;
|
||||
events_[i].fd = 0;
|
||||
events_[i].user_data = NULL;
|
||||
events_[i].events = 0;
|
||||
}
|
||||
if (found > 0)
|
||||
zero_trail_events (events_, n_events_, found);
|
||||
return found;
|
||||
}
|
||||
|
||||
// If timeout is zero, exit immediately whether there are events or not.
|
||||
if (timeout_ == 0)
|
||||
break;
|
||||
|
||||
// At this point we are meant to wait for events but there are none.
|
||||
// If timeout is infinite we can just loop until we get some events.
|
||||
if (timeout_ < 0) {
|
||||
if (first_pass)
|
||||
first_pass = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// The timeout is finite but non-zero and there are no events. In the
|
||||
// first pass, we get a timestamp of when the polling have begun.
|
||||
// (We assume that first pass have taken negligible time). We also
|
||||
// compute the time when the polling should time out.
|
||||
now = clock.now_ms ();
|
||||
if (first_pass) {
|
||||
end = now + timeout_;
|
||||
first_pass = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find out whether timeout have expired.
|
||||
if (now >= end)
|
||||
// Adjust timeout or break
|
||||
if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
|
||||
break;
|
||||
}
|
||||
errno = EAGAIN;
|
||||
@ -554,27 +608,12 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
|
||||
|
||||
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
||||
|
||||
if (unlikely (poll_size == 0)) {
|
||||
// We'll report an error (timed out) as if the list was non-empty and
|
||||
// no event occured within the specified timeout. Otherwise the caller
|
||||
// needs to check the return value AND the event to avoid using the
|
||||
// nullified event data.
|
||||
errno = EAGAIN;
|
||||
if (timeout_ == 0)
|
||||
return -1;
|
||||
#if defined ZMQ_HAVE_WINDOWS
|
||||
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
|
||||
return -1;
|
||||
#else
|
||||
usleep (timeout_ * 1000);
|
||||
return -1;
|
||||
#endif
|
||||
}
|
||||
zmq::clock_t clock;
|
||||
uint64_t now = 0;
|
||||
uint64_t end = 0;
|
||||
|
||||
bool first_pass = true;
|
||||
|
||||
fd_set inset, outset, errset;
|
||||
|
||||
while (true) {
|
||||
@ -629,81 +668,15 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
|
||||
signaler->recv ();
|
||||
|
||||
// Check for the events.
|
||||
int found = 0;
|
||||
for (items_t::iterator it = items.begin (); it != items.end () && found < n_events_; ++it) {
|
||||
|
||||
// The poll item is a 0MQ socket. Retrieve pending events
|
||||
// using the ZMQ_EVENTS socket option.
|
||||
if (it->socket) {
|
||||
size_t events_size = sizeof (uint32_t);
|
||||
uint32_t events;
|
||||
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1)
|
||||
return -1;
|
||||
|
||||
if (it->events & events) {
|
||||
events_[found].socket = it->socket;
|
||||
events_[found].user_data = it->user_data;
|
||||
events_[found].events = it->events & events;
|
||||
++found;
|
||||
}
|
||||
}
|
||||
// Else, the poll item is a raw file descriptor, simply convert
|
||||
// the events to zmq_pollitem_t-style format.
|
||||
else {
|
||||
short events = 0;
|
||||
|
||||
if (FD_ISSET (it->fd, &inset))
|
||||
events |= ZMQ_POLLIN;
|
||||
if (FD_ISSET (it->fd, &outset))
|
||||
events |= ZMQ_POLLOUT;
|
||||
if (FD_ISSET (it->fd, &errset))
|
||||
events |= ZMQ_POLLERR;
|
||||
|
||||
if (events) {
|
||||
events_[found].socket = NULL;
|
||||
events_[found].user_data = it->user_data;
|
||||
events_[found].fd = it->fd;
|
||||
events_[found].events = events;
|
||||
++found;
|
||||
}
|
||||
}
|
||||
}
|
||||
int found = check_events(events_, n_events_, inset, outset, errset);
|
||||
if (found) {
|
||||
// zero-out remaining events
|
||||
for (int i = found; i < n_events_; ++i) {
|
||||
events_[i].socket = NULL;
|
||||
events_[i].fd = 0;
|
||||
events_[i].user_data = NULL;
|
||||
events_[i].events = 0;
|
||||
}
|
||||
if (found > 0)
|
||||
zero_trail_events (events_, n_events_, found);
|
||||
return found;
|
||||
}
|
||||
|
||||
// If timeout is zero, exit immediately whether there are events or not.
|
||||
if (timeout_ == 0)
|
||||
break;
|
||||
|
||||
// At this point we are meant to wait for events but there are none.
|
||||
// If timeout is infinite we can just loop until we get some events.
|
||||
if (timeout_ < 0) {
|
||||
if (first_pass)
|
||||
first_pass = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// The timeout is finite and there are no events. In the first pass
|
||||
// we get a timestamp of when the polling have begun. (We assume that
|
||||
// first pass have taken negligible time). We also compute the time
|
||||
// when the polling should time out.
|
||||
now = clock.now_ms ();
|
||||
if (first_pass) {
|
||||
end = now + timeout_;
|
||||
first_pass = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find out whether timeout have expired.
|
||||
if (now >= end)
|
||||
// Adjust timeout or break
|
||||
if (adjust_timeout (clock, timeout_, now, end, first_pass) == 0)
|
||||
break;
|
||||
}
|
||||
|
||||
@ -711,8 +684,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, int n_ev
|
||||
return -1;
|
||||
|
||||
#else
|
||||
|
||||
// Exotic platforms that support neither poll() nor select().
|
||||
errno = ENOTSUP;
|
||||
return -1;
|
||||
|
||||
#endif
|
||||
}
|
||||
|
@ -81,6 +81,21 @@ namespace zmq
|
||||
bool check_tag ();
|
||||
|
||||
private:
|
||||
void zero_trail_events (zmq::socket_poller_t::event_t *events_,
|
||||
int n_events_,
|
||||
int found);
|
||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||
int check_events (zmq::socket_poller_t::event_t *events_,
|
||||
int n_events_);
|
||||
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
||||
int check_events (zmq::socket_poller_t::event_t *events_, int n_events_,
|
||||
fd_set& inset,
|
||||
fd_set& outset,
|
||||
fd_set& errset);
|
||||
#endif
|
||||
int adjust_timeout (zmq::clock_t& clock, long timeout_, uint64_t& now,
|
||||
uint64_t& end,
|
||||
bool& first_pass);
|
||||
void rebuild ();
|
||||
|
||||
// Used to check whether the object is a socket_poller.
|
||||
|
Loading…
x
Reference in New Issue
Block a user