mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-17 08:34:00 +00:00
Precise timouts in zmq_poll implemented
Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
This commit is contained in:
parent
9d96e0037a
commit
e2167cecae
131
src/zmq.cpp
131
src/zmq.cpp
@ -384,6 +384,10 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
zmq::clock_t clock;
|
||||||
|
uint64_t now = 0;
|
||||||
|
uint64_t end = 0;
|
||||||
|
|
||||||
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
|
pollfd *pollfds = (pollfd*) malloc (nitems_ * sizeof (pollfd));
|
||||||
zmq_assert (pollfds);
|
zmq_assert (pollfds);
|
||||||
|
|
||||||
@ -413,16 +417,21 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
|
|
||||||
bool first_pass = true;
|
bool first_pass = true;
|
||||||
int nevents = 0;
|
int nevents = 0;
|
||||||
if (timeout_ >= 0)
|
|
||||||
timeout_ /= 1000;
|
|
||||||
else
|
|
||||||
timeout_ = -1;
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
|
// Compute the timeout for the subsequent poll.
|
||||||
|
int timeout;
|
||||||
|
if (first_pass)
|
||||||
|
timeout = 0;
|
||||||
|
else if (timeout_ < 0)
|
||||||
|
timeout = -1;
|
||||||
|
else
|
||||||
|
timeout = end - now;
|
||||||
|
|
||||||
// Wait for events.
|
// Wait for events.
|
||||||
while (true) {
|
while (true) {
|
||||||
int rc = poll (pollfds, nitems_, first_pass ? 0 : timeout_);
|
int rc = poll (pollfds, nitems_, timeout);
|
||||||
if (rc == -1 && errno == EINTR) {
|
if (rc == -1 && errno == EINTR) {
|
||||||
free (pollfds);
|
free (pollfds);
|
||||||
return -1;
|
return -1;
|
||||||
@ -468,22 +477,33 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
nevents++;
|
nevents++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are no events from the first pass (the one with no
|
// If timout is zero, exit immediately whether there are events or not.
|
||||||
// timeout), do at least the second pass so that we wait.
|
if (timeout_ == 0)
|
||||||
if (first_pass && nevents == 0 && timeout_ != 0) {
|
break;
|
||||||
first_pass = false;
|
|
||||||
|
// If there are events to return, we can exit immediately.
|
||||||
|
if (nevents)
|
||||||
|
break;
|
||||||
|
|
||||||
|
// At this point we are meant to wait for events but there are none.
|
||||||
|
// If timeout is infinite we can just loop until we get some events.
|
||||||
|
if (timeout_ < 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// The timeout is finite and there are no events. In the first pass
|
||||||
|
// we get a timestamp of when the polling have begun. (We assume that
|
||||||
|
// first pass have taken negligible time). We also compute the time
|
||||||
|
// when the polling should time out.
|
||||||
|
if (first_pass) {
|
||||||
|
now = clock.now_ms ();
|
||||||
|
end = now + (timeout_ / 1000);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If timeout is set to infinite and we have to events to return
|
// Find out whether timeout have expired.
|
||||||
// we can restart the polling.
|
now = clock.now_ms ();
|
||||||
if (timeout_ == -1 && nevents == 0)
|
if (now >= end)
|
||||||
continue;
|
break;
|
||||||
|
|
||||||
// TODO: if nevents is zero recompute timeout and loop
|
|
||||||
// if it is not yet reached.
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
free (pollfds);
|
free (pollfds);
|
||||||
@ -491,6 +511,19 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
|
|
||||||
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
#elif defined ZMQ_POLL_BASED_ON_SELECT
|
||||||
|
|
||||||
|
if (!items_) {
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
zmq::clock_t clock;
|
||||||
|
uint64_t now = 0;
|
||||||
|
uint64_t end = 0;
|
||||||
|
|
||||||
|
// Ensure we do not attempt to select () on more than FD_SETSIZE
|
||||||
|
// file descriptors.
|
||||||
|
zmq_assert (nitems_ <= FD_SETSIZE);
|
||||||
|
|
||||||
fd_set pollset_in;
|
fd_set pollset_in;
|
||||||
FD_ZERO (&pollset_in);
|
FD_ZERO (&pollset_in);
|
||||||
fd_set pollset_out;
|
fd_set pollset_out;
|
||||||
@ -500,10 +533,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
|
|
||||||
zmq::fd_t maxfd = 0;
|
zmq::fd_t maxfd = 0;
|
||||||
|
|
||||||
// Ensure we do not attempt to select () on more than FD_SETSIZE
|
|
||||||
// file descriptors.
|
|
||||||
zmq_assert (nitems_ <= FD_SETSIZE);
|
|
||||||
|
|
||||||
// Build the fd_sets for passing to select ().
|
// Build the fd_sets for passing to select ().
|
||||||
for (int i = 0; i != nitems_; i++) {
|
for (int i = 0; i != nitems_; i++) {
|
||||||
|
|
||||||
@ -536,11 +565,25 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
}
|
}
|
||||||
|
|
||||||
bool first_pass = true;
|
bool first_pass = true;
|
||||||
timeval zero_timeout = {0, 0};
|
|
||||||
timeval timeout = {timeout_ / 1000000, timeout_ % 1000000};
|
|
||||||
int nevents = 0;
|
int nevents = 0;
|
||||||
fd_set inset, outset, errset;
|
fd_set inset, outset, errset;
|
||||||
|
|
||||||
|
// Compute the timeout for the subsequent poll.
|
||||||
|
timeval timeout;
|
||||||
|
timeval *ptimeout;
|
||||||
|
if (first_pass) {
|
||||||
|
timeout.tv_sec = 0;
|
||||||
|
timeout.tv_usec = 0;
|
||||||
|
ptimeout = &timeout;
|
||||||
|
}
|
||||||
|
else if (timeout_ < 0)
|
||||||
|
ptimeout = NULL;
|
||||||
|
else {
|
||||||
|
timeout.tv_sec = (long) ((end - now) / 1000);
|
||||||
|
timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
|
||||||
|
ptimeout = &timeout;
|
||||||
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
|
||||||
// Wait for events. Ignore interrupts if there's infinite timeout.
|
// Wait for events. Ignore interrupts if there's infinite timeout.
|
||||||
@ -548,8 +591,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
memcpy (&inset, &pollset_in, sizeof (fd_set));
|
memcpy (&inset, &pollset_in, sizeof (fd_set));
|
||||||
memcpy (&outset, &pollset_out, sizeof (fd_set));
|
memcpy (&outset, &pollset_out, sizeof (fd_set));
|
||||||
memcpy (&errset, &pollset_err, sizeof (fd_set));
|
memcpy (&errset, &pollset_err, sizeof (fd_set));
|
||||||
int rc = select (maxfd + 1, &inset, &outset, &errset,
|
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
|
||||||
first_pass ? &zero_timeout : (timeout_ < 0 ? NULL : &timeout));
|
|
||||||
#if defined ZMQ_HAVE_WINDOWS
|
#if defined ZMQ_HAVE_WINDOWS
|
||||||
wsa_assert (rc != SOCKET_ERROR);
|
wsa_assert (rc != SOCKET_ERROR);
|
||||||
#else
|
#else
|
||||||
@ -595,22 +637,33 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
nevents++;
|
nevents++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are no events from the first pass (the one with no
|
// If timout is zero, exit immediately whether there are events or not.
|
||||||
// timout), do at least the second pass so that we wait.
|
if (timeout_ == 0)
|
||||||
if (first_pass && nevents == 0 && timeout_ != 0) {
|
break;
|
||||||
first_pass = false;
|
|
||||||
|
// If there are events to return, we can exit immediately.
|
||||||
|
if (nevents)
|
||||||
|
break;
|
||||||
|
|
||||||
|
// At this point we are meant to wait for events but there are none.
|
||||||
|
// If timeout is infinite we can just loop until we get some events.
|
||||||
|
if (timeout_ < 0)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
// The timeout is finite and there are no events. In the first pass
|
||||||
|
// we get a timestamp of when the polling have begun. (We assume that
|
||||||
|
// first pass have taken negligible time). We also compute the time
|
||||||
|
// when the polling should time out.
|
||||||
|
if (first_pass) {
|
||||||
|
now = clock.now_ms ();
|
||||||
|
end = now + (timeout_ / 1000);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If timeout is set to infinite and we have to events to return
|
// Find out whether timeout have expired.
|
||||||
// we can restart the polling.
|
now = clock.now_ms ();
|
||||||
if (timeout_ < 0 && nevents == 0)
|
if (now >= end)
|
||||||
continue;
|
break;
|
||||||
|
|
||||||
// TODO: if nevents is zero recompute timeout and loop
|
|
||||||
// if it is not yet reached.
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nevents;
|
return nevents;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user