diff --git a/include/zmq.h b/include/zmq.h index 83673e68..32bf8cce 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -410,8 +410,16 @@ typedef struct zmq_pollitem_t #define ZMQ_POLLITEMS_DFLT 16 ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout); + +/******************************************************************************/ +/* Pollfd polling on thread safe socket */ +/******************************************************************************/ + ZMQ_EXPORT void *zmq_pollfd_new (); ZMQ_EXPORT int zmq_pollfd_close (void *p); +ZMQ_EXPORT void zmq_pollfd_recv (void *p); +ZMQ_EXPORT int zmq_pollfd_wait (void *p, int timeout_); +ZMQ_EXPORT int zmq_pollfd_poll (void *p, zmq_pollitem_t *items, int nitems, long timeout); #if defined _WIN32 ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p); diff --git a/src/zmq.cpp b/src/zmq.cpp index 94feec30..940c3b39 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -709,38 +709,11 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) } } -// Create pollfd - -void *zmq_pollfd_new () -{ - return new zmq::signaler_t (); -} - -// Close pollfd - -int zmq_pollfd_close (void* p) -{ - zmq::signaler_t *s = (zmq::signaler_t*)p; - LIBZMQ_DELETE(s); - - return 0; -} - -// Get poller fd -#if defined _WIN32 -SOCKET zmq_pollfd_fd (void *p) -#else -int zmq_pollfd_fd (void *p) -#endif -{ - zmq::signaler_t *s = (zmq::signaler_t*)p; - return s->get_fd (); -} - // Polling. int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) { + // TODO: the function implementation can just call zmq_pollfd_poll with pollfd as NULL, however pollfd is not yet stable #if defined ZMQ_POLL_BASED_ON_POLL if (unlikely (nitems_ < 0)) { errno = EINVAL; @@ -782,36 +755,14 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // If the poll item is a 0MQ socket, we poll on the file descriptor // retrieved by the ZMQ_FD socket option. if (items_ [i].socket) { - int thread_safe; - size_t thread_safe_size = sizeof(int); - - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size) == -1) { + size_t zmq_fd_size = sizeof (zmq::fd_t); + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, + &zmq_fd_size) == -1) { if (pollfds != spollfds) free (pollfds); return -1; } - - if (thread_safe) { - if (!items_ [i].fd) { - if (pollfds != spollfds) - free (pollfds); - errno = EINVAL; - return -1; - } - - pollfds [i].fd = items_ [i].fd; - } - else { - size_t zmq_fd_size = sizeof (zmq::fd_t); - if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [i].fd, - &zmq_fd_size) == -1) { - if (pollfds != spollfds) - free (pollfds); - return -1; - } - } - pollfds [i].events = items_ [i].events ? POLLIN : 0; + pollfds [i].events = items_ [i].events ? POLLIN : 0; } // Else, the poll item is a raw file descriptor. Just convert the // events to normal POLLIN/POLLOUT for poll (). @@ -967,29 +918,11 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) // If the poll item is a 0MQ socket we are interested in input on the // notification file descriptor retrieved by the ZMQ_FD socket option. if (items_ [i].socket) { - int thread_safe; - size_t thread_safe_size = sizeof(int); - - if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, - &thread_safe_size) == -1) - return -1; - + size_t zmq_fd_size = sizeof (zmq::fd_t); zmq::fd_t notify_fd; - - if (thread_safe) { - if (!items_ [i].fd) { - errno = EINVAL; - return -1; - } - - notify_fd = items_ [i].fd; - } - else { - size_t zmq_fd_size = sizeof (zmq::fd_t); - if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, - &zmq_fd_size) == -1) - return -1; - } + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) == -1) + return -1; if (items_ [i].events) { FD_SET (notify_fd, &pollset_in); if (maxfd < notify_fd) @@ -1119,6 +1052,495 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) continue; } + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + break; + } + + return nevents; + +#else + // Exotic platforms that support neither poll() nor select(). + errno = ENOTSUP; + return -1; +#endif +} + +// Create pollfd + +void *zmq_pollfd_new () +{ + return new zmq::signaler_t (); +} + +// Close pollfd + +int zmq_pollfd_close (void* p_) +{ + zmq::signaler_t *s = (zmq::signaler_t*)p_; + LIBZMQ_DELETE(s); + return 0; +} + +// Recv signal from pollfd + +void zmq_pollfd_recv(void *p_) +{ + zmq::signaler_t *s = (zmq::signaler_t*)p_; + s->recv (); +} + +// Wait until pollfd is signalled + +int zmq_pollfd_wait(void *p_, int timeout_) +{ + zmq::signaler_t *s = (zmq::signaler_t*)p_; + return s->wait (timeout_); +} + +// Get pollfd fd + +#if defined _WIN32 +SOCKET zmq_pollfd_fd (void *p_) +#else +int zmq_pollfd_fd (void *p_) +#endif +{ + zmq::signaler_t *s = (zmq::signaler_t*)p_; + return s->get_fd (); +} + +// Polling thread safe sockets version + +int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout_) +{ +#if defined ZMQ_POLL_BASED_ON_POLL + if (unlikely (nitems_ < 0)) { + errno = EINVAL; + return -1; + } + if (unlikely (nitems_ == 0)) { + if (timeout_ == 0) + return 0; +#if defined ZMQ_HAVE_WINDOWS + Sleep (timeout_ > 0 ? timeout_ : INFINITE); + return 0; +#elif defined ZMQ_HAVE_ANDROID + usleep (timeout_ * 1000); + return 0; +#else + return usleep (timeout_ * 1000); +#endif + } + + if (!items_) { + errno = EFAULT; + return -1; + } + + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + pollfd spollfds[ZMQ_POLLITEMS_DFLT]; + pollfd *pollfds = spollfds; + int pollfds_size = 0; + int pollfds_index = 0; + bool use_pollfd = false; + + for (int i = 0; i != nitems_; i++) { + if (items_ [i].socket) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + &thread_safe_size) == -1) { + return -1; + } + + // All thread safe sockets share same fd + if (thread_safe) { + + // if poll fd is not set yet and events are set for this socket + if (!use_pollfd && items_ [i].events) { + use_pollfd = true; + pollfds_size++; + } + } + else + pollfds_size++; + } + else + pollfds_size++; + } + + if (pollfds_size > ZMQ_POLLITEMS_DFLT) { + pollfds = (pollfd*) malloc (pollfds_size * sizeof (pollfd)); + alloc_assert (pollfds); + } + + // If we have at least one thread safe socket we set pollfd first + if (use_pollfd) { + pollfds [0].fd = zmq_pollfd_fd (p_); + pollfds [0].events = POLLIN; + pollfds_index = 1; + } + + // Build pollset for poll () system call. + for (int i = 0; i != nitems_; i++) { + + // If the poll item is a 0MQ socket, we poll on the file descriptor + // retrieved by the ZMQ_FD socket option. + if (items_ [i].socket) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + &thread_safe_size) == -1) { + if (pollfds != spollfds) + free (pollfds); + return -1; + } + + // We already handled the thread safe sockets + if (!thread_safe) { + size_t zmq_fd_size = sizeof (zmq::fd_t); + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [pollfds_index].fd, + &zmq_fd_size) == -1) { + if (pollfds != spollfds) + free (pollfds); + return -1; + } + pollfds [pollfds_index].events = items_ [i].events ? POLLIN : 0; + pollfds_index++; + } + } + // Else, the poll item is a raw file descriptor. Just convert the + // events to normal POLLIN/POLLOUT for poll (). + else { + pollfds [pollfds_index].fd = items_ [i].fd; + pollfds [pollfds_index].events = + (items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) | + (items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) | + (items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0); + pollfds_index++; + } + } + + bool first_pass = true; + int nevents = 0; + + while (true) { + // Compute the timeout for the subsequent poll. + int timeout; + if (first_pass) + timeout = 0; + else + if (timeout_ < 0) + timeout = -1; + else + timeout = end - now; + + // Wait for events. + while (true) { + int rc = poll (pollfds, pollfds_size, timeout); + if (rc == -1 && errno == EINTR) { + if (pollfds != spollfds) + free (pollfds); + return -1; + } + errno_assert (rc >= 0); + break; + } + + // Receive the signal from pollfd + if (use_pollfd && pollfds[0].revents & POLLIN) + zmq_pollfd_recv (p_); + + // Check for the events. + for (int i = 0; i != nitems_; i++) { + + items_ [i].revents = 0; + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) { + if (pollfds != spollfds) + free (pollfds); + return -1; + } + if ((items_ [i].events & ZMQ_POLLOUT) && + (zmq_events & ZMQ_POLLOUT)) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && + (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; + } + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + if (pollfds [i].revents & POLLIN) + items_ [i].revents |= ZMQ_POLLIN; + if (pollfds [i].revents & POLLOUT) + items_ [i].revents |= ZMQ_POLLOUT; + if (pollfds [i].revents & POLLPRI) + items_ [i].revents |= ZMQ_POLLPRI; + if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI)) + items_ [i].revents |= ZMQ_POLLERR; + } + + if (items_ [i].revents) + nevents++; + } + + // If timout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) + break; + + // If there are events to return, we can exit immediately. + if (nevents) + break; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) { + if (first_pass) + first_pass = false; + continue; + } + + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + timeout_; + if (now == end) + break; + first_pass = false; + continue; + } + + // Find out whether timeout have expired. + now = clock.now_ms (); + if (now >= end) + break; + } + + if (pollfds != spollfds) + free (pollfds); + return nevents; + +#elif defined ZMQ_POLL_BASED_ON_SELECT + + if (unlikely (nitems_ < 0)) { + errno = EINVAL; + return -1; + } + if (unlikely (nitems_ == 0)) { + if (timeout_ == 0) + return 0; +#if defined ZMQ_HAVE_WINDOWS + Sleep (timeout_ > 0 ? timeout_ : INFINITE); + return 0; +#else + return usleep (timeout_ * 1000); +#endif + } + zmq::clock_t clock; + uint64_t now = 0; + uint64_t end = 0; + + // Ensure we do not attempt to select () on more than FD_SETSIZE + // file descriptors. + zmq_assert (nitems_ <= FD_SETSIZE); + + fd_set pollset_in; + FD_ZERO (&pollset_in); + fd_set pollset_out; + FD_ZERO (&pollset_out); + fd_set pollset_err; + FD_ZERO (&pollset_err); + + bool use_pollfd = false; + + for (int i = 0; i != nitems_; i++) { + if (items_ [i].socket) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + &thread_safe_size) == -1) + return -1; + + if (thread_safe && items_ [i].events) { + use_pollfd = true; + FD_SET (zmq_pollfd_fd (p_), &pollset_in); + break; + } + } + } + + zmq::fd_t maxfd = 0; + + // Build the fd_sets for passing to select (). + for (int i = 0; i != nitems_; i++) { + + // If the poll item is a 0MQ socket we are interested in input on the + // notification file descriptor retrieved by the ZMQ_FD socket option. + if (items_ [i].socket) { + int thread_safe; + size_t thread_safe_size = sizeof(int); + + if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe, + &thread_safe_size) == -1) + return -1; + + if (!thread_safe) { + zmq::fd_t notify_fd; + size_t zmq_fd_size = sizeof (zmq::fd_t); + if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, ¬ify_fd, + &zmq_fd_size) == -1) + return -1; + + if (items_ [i].events) { + FD_SET (notify_fd, &pollset_in); + if (maxfd < notify_fd) + maxfd = notify_fd; + } + } + } + // Else, the poll item is a raw file descriptor. Convert the poll item + // events to the appropriate fd_sets. + else { + if (items_ [i].events & ZMQ_POLLIN) + FD_SET (items_ [i].fd, &pollset_in); + if (items_ [i].events & ZMQ_POLLOUT) + FD_SET (items_ [i].fd, &pollset_out); + if (items_ [i].events & ZMQ_POLLERR) + FD_SET (items_ [i].fd, &pollset_err); + if (maxfd < items_ [i].fd) + maxfd = items_ [i].fd; + } + } + + bool first_pass = true; + int nevents = 0; + fd_set inset, outset, errset; + + while (true) { + + // Compute the timeout for the subsequent poll. + timeval timeout; + timeval *ptimeout; + if (first_pass) { + timeout.tv_sec = 0; + timeout.tv_usec = 0; + ptimeout = &timeout; + } + else + if (timeout_ < 0) + ptimeout = NULL; + else { + timeout.tv_sec = (long) ((end - now) / 1000); + timeout.tv_usec = (long) ((end - now) % 1000 * 1000); + ptimeout = &timeout; + } + + // Wait for events. Ignore interrupts if there's infinite timeout. + while (true) { + memcpy (&inset, &pollset_in, sizeof (fd_set)); + memcpy (&outset, &pollset_out, sizeof (fd_set)); + memcpy (&errset, &pollset_err, sizeof (fd_set)); +#if defined ZMQ_HAVE_WINDOWS + int rc = select (0, &inset, &outset, &errset, ptimeout); + if (unlikely (rc == SOCKET_ERROR)) { + errno = zmq::wsa_error_to_errno (WSAGetLastError ()); + wsa_assert (errno == ENOTSOCK); + return -1; + } +#else + int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout); + if (unlikely (rc == -1)) { + errno_assert (errno == EINTR || errno == EBADF); + return -1; + } +#endif + break; + } + + if (use_pollfd && FD_ISSET (zmq_pollfd_fd (p_), &inset)) + zmq_pollfd_recv (p_); + + // Check for the events. + for (int i = 0; i != nitems_; i++) { + + items_ [i].revents = 0; + + // The poll item is a 0MQ socket. Retrieve pending events + // using the ZMQ_EVENTS socket option. + if (items_ [i].socket) { + size_t zmq_events_size = sizeof (uint32_t); + uint32_t zmq_events; + if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events, + &zmq_events_size) == -1) + return -1; + if ((items_ [i].events & ZMQ_POLLOUT) && + (zmq_events & ZMQ_POLLOUT)) + items_ [i].revents |= ZMQ_POLLOUT; + if ((items_ [i].events & ZMQ_POLLIN) && + (zmq_events & ZMQ_POLLIN)) + items_ [i].revents |= ZMQ_POLLIN; + } + // Else, the poll item is a raw file descriptor, simply convert + // the events to zmq_pollitem_t-style format. + else { + if (FD_ISSET (items_ [i].fd, &inset)) + items_ [i].revents |= ZMQ_POLLIN; + if (FD_ISSET (items_ [i].fd, &outset)) + items_ [i].revents |= ZMQ_POLLOUT; + if (FD_ISSET (items_ [i].fd, &errset)) + items_ [i].revents |= ZMQ_POLLERR; + } + + if (items_ [i].revents) + nevents++; + } + + // If timout is zero, exit immediately whether there are events or not. + if (timeout_ == 0) + break; + + // If there are events to return, we can exit immediately. + if (nevents) + break; + + // At this point we are meant to wait for events but there are none. + // If timeout is infinite we can just loop until we get some events. + if (timeout_ < 0) { + if (first_pass) + first_pass = false; + continue; + } + + // The timeout is finite and there are no events. In the first pass + // we get a timestamp of when the polling have begun. (We assume that + // first pass have taken negligible time). We also compute the time + // when the polling should time out. + if (first_pass) { + now = clock.now_ms (); + end = now + timeout_; + if (now == end) + break; + first_pass = false; + continue; + } + // Find out whether timeout have expired. now = clock.now_ms (); if (now >= end) diff --git a/tests/test_thread_safe_polling.cpp b/tests/test_thread_safe_polling.cpp index ad8d8d62..197bcced 100644 --- a/tests/test_thread_safe_polling.cpp +++ b/tests/test_thread_safe_polling.cpp @@ -50,8 +50,8 @@ int main (void) assert (rc == 0); zmq_pollitem_t items[] = { - {server, zmq_pollfd_fd(pollfd), ZMQ_POLLIN, 0}, - {server2, zmq_pollfd_fd(pollfd), ZMQ_POLLIN, 0}}; + {server, 0, ZMQ_POLLIN, 0}, + {server2, 0, ZMQ_POLLIN, 0}}; rc = zmq_bind (server, "tcp://127.0.0.1:5560"); assert (rc == 0); @@ -63,7 +63,7 @@ int main (void) assert (rc == 0); - rc = zmq_poll (items, 2, -1); + rc = zmq_pollfd_poll (pollfd, items, 2, -1); assert (rc == 1); assert (items[0].revents == ZMQ_POLLIN); @@ -74,7 +74,7 @@ int main (void) rc = zmq_msg_recv(&msg, server, ZMQ_DONTWAIT); assert (rc == 1); - rc = zmq_poll (items, 2, -1); + rc = zmq_pollfd_poll (pollfd, items, 2, -1); assert (rc == 1); assert (items[0].revents == 0); @@ -83,7 +83,7 @@ int main (void) rc = zmq_msg_recv(&msg, server2, ZMQ_DONTWAIT); assert (rc == 1); - rc = zmq_poll (items, 2, 0); + rc = zmq_pollfd_poll (pollfd, items, 2, 0); assert (rc == 0); assert (items[0].revents == 0);