0
0
mirror of https://github.com/zeromq/libzmq.git synced 2024-12-27 07:31:03 +08:00

Removing zmq_pollfd as it is replaced by zmq_poller

This commit is contained in:
somdoron 2015-10-22 11:12:04 +03:00
parent 0650b59b10
commit da2bc60abe
8 changed files with 563 additions and 853 deletions

View File

@ -367,7 +367,6 @@ test_apps = \
tests/test_socketopt_hwm \
tests/test_heartbeats \
tests/test_stream_exceeds_buffer \
tests/test_thread_safe_polling \
tests/test_poller
tests_test_system_SOURCES = tests/test_system.cpp
@ -573,9 +572,6 @@ tests_test_heartbeats_LDADD = src/libzmq.la
tests_test_stream_exceeds_buffer_SOURCES = tests/test_stream_exceeds_buffer.cpp
tests_test_stream_exceeds_buffer_LDADD = src/libzmq.la
tests_test_thread_safe_polling_SOURCES = tests/test_thread_safe_polling.cpp
tests_test_thread_safe_polling_LDADD = src/libzmq.la
tests_test_poller_SOURCES = tests/test_poller.cpp
tests_test_poller_LDADD = src/libzmq.la

View File

@ -383,8 +383,6 @@ ZMQ_EXPORT int zmq_send (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_send_const (void *s, const void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_recv (void *s, void *buf, size_t len, int flags);
ZMQ_EXPORT int zmq_socket_monitor (void *s, const char *addr, int events);
ZMQ_EXPORT int zmq_add_pollfd (void *s, void *p);
ZMQ_EXPORT int zmq_remove_pollfd (void *s, void *p);
/******************************************************************************/
/* I/O multiplexing. */
@ -411,22 +409,6 @@ typedef struct zmq_pollitem_t
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
/******************************************************************************/
/* Pollfd polling on thread safe socket */
/******************************************************************************/
ZMQ_EXPORT void *zmq_pollfd_new ();
ZMQ_EXPORT int zmq_pollfd_close (void *p);
ZMQ_EXPORT void zmq_pollfd_recv (void *p);
ZMQ_EXPORT int zmq_pollfd_wait (void *p, int timeout_);
ZMQ_EXPORT int zmq_pollfd_poll (void *p, zmq_pollitem_t *items, int nitems, long timeout);
#if defined _WIN32
ZMQ_EXPORT SOCKET zmq_pollfd_fd (void *p);
#else
ZMQ_EXPORT int zmq_pollfd_fd (void *p);
#endif
/******************************************************************************/
/* Poller polling on sockets,fd and threaf safe sockets */
/******************************************************************************/

View File

@ -32,10 +32,13 @@
zmq::socket_poller_t::socket_poller_t () :
tag (0xCAFEBABE),
poll_set (NULL),
poll_events (NULL)
need_rebuild (true),
use_signaler (false)
#if defined ZMQ_POLL_BASED_ON_POLL
,
pollfds (NULL)
#endif
{
pollfd = zmq_pollfd_new ();
}
zmq::socket_poller_t::~socket_poller_t ()
@ -43,27 +46,22 @@ zmq::socket_poller_t::~socket_poller_t ()
// Mark the socket_poller as dead
tag = 0xdeadbeef;
for (events_t::iterator it = events.begin(); it != events.end(); ++it) {
for (items_t::iterator it = items.begin(); it != items.end(); ++it) {
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt(it->socket, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
zmq_remove_pollfd(it->socket, pollfd);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == 0 && thread_safe)
it->socket->remove_signaler (&signaler);
}
}
zmq_pollfd_close (pollfd);
if (poll_set) {
free (poll_set);
poll_set = NULL;
}
if (poll_events) {
free (poll_events);
poll_events = NULL;
#if defined ZMQ_POLL_BASED_ON_POLL
if (pollfds) {
free (pollfds);
pollfds = NULL;
}
#endif
}
bool zmq::socket_poller_t::check_tag ()
@ -71,9 +69,9 @@ bool zmq::socket_poller_t::check_tag ()
return tag == 0xCAFEBABE;
}
int zmq::socket_poller_t::add (void *socket_, void* user_data_, short events_)
int zmq::socket_poller_t::add (socket_base_t *socket_, void* user_data_, short events_)
{
for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->socket == socket_) {
errno = EINVAL;
return -1;
@ -83,51 +81,47 @@ int zmq::socket_poller_t::add (void *socket_, void* user_data_, short events_)
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (thread_safe) {
if (zmq_add_pollfd (socket_, pollfd) == -1)
if (socket_->add_signaler (&signaler) == -1)
return -1;
}
event_t event = {socket_, 0, user_data_, events_};
events.push_back (event);
item_t item = {socket_, 0, user_data_, events_};
items.push_back (item);
need_rebuild = true;
return 0;
}
#if defined _WIN32
int zmq::socket_poller_t::add_fd (SOCKET fd_, void *user_data_, short events_)
#else
int zmq::socket_poller_t::add_fd (int fd_, void *user_data_, short events_)
#endif
int zmq::socket_poller_t::add_fd (fd_t fd_, void *user_data_, short events_)
{
for (events_t::iterator it = events.begin (); it != events.end (); ++it) {
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (!it->socket && it->fd == fd_) {
errno = EINVAL;
return -1;
}
}
event_t event = {NULL, fd_, user_data_, events_};
events.push_back (event);
item_t item = {NULL, fd_, user_data_, events_};
items.push_back (item);
need_rebuild = true;
return 0;
}
int zmq::socket_poller_t::modify (void *socket_, short events_)
int zmq::socket_poller_t::modify (socket_base_t *socket_, short events_)
{
events_t::iterator it;
items_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
for (it = items.begin (); it != items.end (); ++it) {
if (it->socket == socket_)
break;
}
if (it == events.end()) {
if (it == items.end()) {
errno = EINVAL;
return -1;
}
@ -139,20 +133,16 @@ int zmq::socket_poller_t::modify (void *socket_, short events_)
}
#if defined _WIN32
int zmq::socket_poller_t::modify_fd (SOCKET fd_, short events_)
#else
int zmq::socket_poller_t::modify_fd (int fd_, short events_)
#endif
int zmq::socket_poller_t::modify_fd (fd_t fd_, short events_)
{
events_t::iterator it;
items_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
for (it = items.begin (); it != items.end (); ++it) {
if (!it->socket && it->fd == fd_)
break;
}
if (it == events.end()) {
if (it == items.end()) {
errno = EINVAL;
return -1;
}
@ -164,16 +154,16 @@ int zmq::socket_poller_t::modify_fd (int fd_, short events_)
}
int zmq::socket_poller_t::remove (void* socket_)
int zmq::socket_poller_t::remove (socket_base_t *socket_)
{
events_t::iterator it;
items_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
for (it = items.begin (); it != items.end (); ++it) {
if (it->socket == socket_)
break;
}
if (it == events.end()) {
if (it == items.end()) {
errno = EINVAL;
return -1;
}
@ -181,101 +171,482 @@ int zmq::socket_poller_t::remove (void* socket_)
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (socket_, ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
if (socket_->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (thread_safe) {
if (zmq_remove_pollfd (socket_, pollfd) == -1)
if (socket_->remove_signaler (&signaler) == -1)
return -1;
}
events.erase (it);
items.erase (it);
need_rebuild = true;
return 0;
}
#if defined _WIN32
int zmq::socket_poller_t::remove_fd (SOCKET fd_)
#else
int zmq::socket_poller_t::remove_fd (int fd_)
#endif
int zmq::socket_poller_t::remove_fd (fd_t fd_)
{
events_t::iterator it;
items_t::iterator it;
for (it = events.begin (); it != events.end (); ++it) {
for (it = items.begin (); it != items.end (); ++it) {
if (!it->socket && it->fd == fd_)
break;
}
if (it == events.end()) {
if (it == items.end()) {
errno = EINVAL;
return -1;
}
events.erase (it);
items.erase (it);
need_rebuild = true;
return 0;
}
int zmq::socket_poller_t::rebuild ()
{
#if defined ZMQ_POLL_BASED_ON_POLL
if (pollfds) {
free (pollfds);
pollfds = NULL;
}
use_signaler = false;
poll_size = 0;
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->events) {
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (thread_safe) {
if (!use_signaler) {
use_signaler = true;
poll_size++;
}
}
else
poll_size++;
}
else
poll_size++;
}
}
if (poll_size == 0)
return 0;
pollfds = (pollfd*) malloc (poll_size * sizeof (pollfd));
alloc_assert (pollfds);
int item_nbr = 0;
if (use_signaler) {
item_nbr = 1;
pollfds[0].fd = signaler.get_fd();
pollfds[0].events = POLLIN;
}
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->events) {
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (!thread_safe) {
size_t fd_size = sizeof (zmq::fd_t);
if (it->socket->getsockopt (ZMQ_FD, &pollfds [item_nbr].fd, &fd_size) == -1) {
return -1;
}
pollfds [item_nbr].events = POLLIN;
item_nbr++;
}
}
else {
pollfds [item_nbr].fd = it->fd;
pollfds [item_nbr].events =
(it->events & ZMQ_POLLIN ? POLLIN : 0) |
(it->events & ZMQ_POLLOUT ? POLLOUT : 0) |
(it->events & ZMQ_POLLPRI ? POLLPRI : 0);
it->pollfd_index = item_nbr;
item_nbr++;
}
}
}
#elif defined ZMQ_POLL_BASED_ON_SELECT
FD_ZERO (&pollset_in);
FD_ZERO (&pollset_out);
FD_ZERO (&pollset_err);
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert (items.size () <= FD_SETSIZE);
poll_size = 0;
use_signaler = false;
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (thread_safe && it->events) {
use_signaler = true;
FD_SET (signaler.get_fd (), &pollset_in);
poll_size = 1;
break;
}
}
}
maxfd = 0;
// Build the fd_sets for passing to select ().
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
if (it->events) {
// If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option.
if (it->socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (it->socket->getsockopt (ZMQ_THREAD_SAFE, &thread_safe, &thread_safe_size) == -1)
return -1;
if (!thread_safe) {
zmq::fd_t notify_fd;
size_t fd_size = sizeof (zmq::fd_t);
if (it->socket->getsockopt (ZMQ_FD, &notify_fd, &fd_size) == -1)
return -1;
FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd)
maxfd = notify_fd;
poll_size++;
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// events to the appropriate fd_sets.
else {
if (it->events & ZMQ_POLLIN)
FD_SET (it->fd, &pollset_in);
if (it->events & ZMQ_POLLOUT)
FD_SET (it->fd, &pollset_out);
if (it->events & ZMQ_POLLERR)
FD_SET (it->fd, &pollset_err);
if (maxfd < it->fd)
maxfd = it->fd;
poll_size++;
}
}
}
#endif
need_rebuild = false;
return 0;
}
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_)
{
if (need_rebuild)
rebuild ();
if (rebuild () == -1)
return -1;
int rc = zmq_pollfd_poll (pollfd, poll_set, poll_size, timeout_);
if (rc == -1) {
return rc;
#if defined ZMQ_POLL_BASED_ON_POLL
if (unlikely (poll_size == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
if (rc == 0) {
errno = ETIMEDOUT;
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
bool first_pass = true;
while (true) {
// Compute the timeout for the subsequent poll.
int timeout;
if (first_pass)
timeout = 0;
else
if (timeout_ < 0)
timeout = -1;
else
timeout = end - now;
// Wait for events.
while (true) {
int rc = poll (pollfds, poll_size, timeout);
if (rc == -1 && errno == EINTR) {
return -1;
}
errno_assert (rc >= 0);
break;
}
// Receive the signal from pollfd
if (use_signaler && pollfds[0].revents & POLLIN)
signaler.recv ();
// Check for the events.
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (it->socket) {
size_t events_size = sizeof (uint32_t);
uint32_t events;
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1) {
return -1;
}
for (int i = 0; i < poll_size; i++) {
if ((poll_set [i].revents & poll_events [i].events) != 0) {
*event_ = poll_events[i];
event_->events = poll_set [i].revents & poll_events [i].events;
if (it->events & events) {
event_->socket = it->socket;
event_->user_data = it->user_data;
event_->events = it->events & events;
// If there is event to return, we can exit immediately.
return 0;
}
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
short revents = pollfds [it->pollfd_index].revents;
short events = 0;
if (revents & POLLIN)
events |= ZMQ_POLLIN;
if (revents & POLLOUT)
events |= ZMQ_POLLOUT;
if (revents & POLLPRI)
events |= ZMQ_POLLPRI;
if (revents & ~(POLLIN | POLLOUT | POLLPRI))
events |= ZMQ_POLLERR;
if (events) {
event_->socket = NULL;
event_->user_data = it->user_data;
event_->fd = it->fd;
event_->events = events;
// If there is event to return, we can exit immediately.
return 0;
}
}
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
}
errno = ETIMEDOUT;
return -1;
#elif defined ZMQ_POLL_BASED_ON_SELECT
if (unlikely (poll_size == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
bool first_pass = true;
fd_set inset, outset, errset;
while (true) {
// Compute the timeout for the subsequent poll.
timeval timeout;
timeval *ptimeout;
if (first_pass) {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
ptimeout = &timeout;
}
else
if (timeout_ < 0)
ptimeout = NULL;
else {
timeout.tv_sec = (long) ((end - now) / 1000);
timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
ptimeout = &timeout;
}
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
#if defined ZMQ_HAVE_WINDOWS
int rc = select (0, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == SOCKET_ERROR)) {
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
wsa_assert (errno == ENOTSOCK);
return -1;
}
#else
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF);
return -1;
}
#endif
break;
}
if (use_signaler && FD_ISSET (signaler.get_fd (), &inset))
signaler.recv ();
// Check for the events.
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (it->socket) {
size_t events_size = sizeof (uint32_t);
uint32_t events;
if (it->socket->getsockopt (ZMQ_EVENTS, &events, &events_size) == -1)
return -1;
if (it->events & events) {
event_->socket = it->socket;
event_->user_data = it->user_data;
event_->events = it->events & events;
// If there is event to return, we can exit immediately.
return 0;
}
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
short events = 0;
if (FD_ISSET (it->fd, &inset))
events |= ZMQ_POLLIN;
if (FD_ISSET (it->fd, &outset))
events |= ZMQ_POLLOUT;
if (FD_ISSET (it->fd, &errset))
events |= ZMQ_POLLERR;
if (events) {
event_->socket = NULL;
event_->user_data = it->user_data;
event_->fd = it->fd;
event_->events = events;
// If there is event to return, we can exit immediately.
return 0;
}
}
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
errno = ETIMEDOUT;
return -1;
#else
// Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
return -1;
#endif
}
void zmq::socket_poller_t::rebuild ()
{
if (poll_set) {
free (poll_set);
poll_set = NULL;
}
if (poll_events) {
free (poll_events);
poll_events = NULL;
}
poll_size = events.size ();
poll_set = (zmq_pollitem_t*) malloc (poll_size * sizeof (zmq_pollitem_t));
alloc_assert (poll_set);
poll_events = (event_t*) malloc (poll_size * sizeof (event_t));
int event_nbr = 0;
for (events_t::iterator it = events.begin (); it != events.end (); ++it, event_nbr++) {
poll_set [event_nbr].socket = it->socket;
if (!it->socket)
poll_set [event_nbr].fd = it->fd;
poll_set [event_nbr].events = it->events;
poll_events [event_nbr] = *it;
}
need_rebuild = false;
}

View File

@ -30,10 +30,23 @@
#ifndef __ZMQ_SOCKET_POLLER_HPP_INCLUDED__
#define __ZMQ_SOCKET_POLLER_HPP_INCLUDED__
#include "poller.hpp"
#if defined ZMQ_POLL_BASED_ON_POLL
#include <poll.h>
#endif
#if defined ZMQ_HAVE_WINDOWS
#include "windows.hpp"
#else
#include <unistd.h>
#endif
#include <vector>
#include <algorithm>
#include "../include/zmq.h"
#include "socket_base.hpp"
#include "signaler.hpp"
namespace zmq
{
@ -46,28 +59,19 @@ namespace zmq
typedef struct event_t
{
void *socket;
#if defined _WIN32
SOCKET fd;
#else
int fd;
#endif
socket_base_t *socket;
fd_t fd;
void *user_data;
short events;
} event_t;
int add (void *socket, void *user_data, short events);
int modify (void *socket, short events);
int remove (void *socket);
#if defined _WIN32
int add_fd (SOCKET fd, void *user_data, short events);
int modify_fd (SOCKET fd, short events);
int remove_fd (SOCKET fd);
#else
int add_fd (int fd, void *user_data, short events);
int modify_fd (int fd, short events);
int remove_fd (int fd);
#endif
int add (socket_base_t *socket, void *user_data, short events);
int modify (socket_base_t *socket, short events);
int remove (socket_base_t *socket);
int add_fd (fd_t fd, void *user_data, short events);
int modify_fd (fd_t fd, short events);
int remove_fd (fd_t fd);
int wait (event_t *event, long timeout);
@ -75,29 +79,45 @@ namespace zmq
bool check_tag ();
private:
void rebuild ();
int rebuild ();
// Used to check whether the object is a socket_poller.
uint32_t tag;
// Pollfd used for thread safe sockets polling
void *pollfd;
// Signaler used for thread safe sockets polling
signaler_t signaler;
typedef struct item_t {
socket_base_t *socket;
fd_t fd;
void *user_data;
short events;
#if defined ZMQ_POLL_BASED_ON_POLL
int pollfd_index;
#endif
} item_t;
// List of sockets
typedef std::vector <event_t> events_t;
events_t events;
typedef std::vector <item_t> items_t;
items_t items;
// Current zmq_poll set
zmq_pollitem_t *poll_set;
// Does the pollset needs rebuilding?
bool need_rebuild;
// Matching set to events
event_t *poll_events;
// Should the signaler be used for the thread safe polling?
bool use_signaler;
// Size of the pollset
int poll_size;
// Does the pollset needs rebuilding?
bool need_rebuild;
#if defined ZMQ_POLL_BASED_ON_POLL
pollfd *pollfds;
#elif defined ZMQ_POLL_BASED_ON_SELECT
fd_set pollset_in;
fd_set pollset_out;
fd_set pollset_err;
zmq::fd_t maxfd;
#endif
socket_poller_t (const socket_poller_t&);
const socket_poller_t &operator = (const socket_poller_t&);

View File

@ -565,34 +565,6 @@ int zmq_recviov (void *s_, iovec *a_, size_t *count_, int flags_)
return nread;
}
// Add/remove pollfd from a socket
int zmq_add_pollfd (void *s_, void *p_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
zmq::signaler_t *p = (zmq::signaler_t *) p_;
return s->add_signaler(p);
}
int zmq_remove_pollfd (void *s_, void *p_)
{
if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
zmq::signaler_t *p = (zmq::signaler_t *) p_;
return s->remove_signaler(p);
}
// Message manipulators.
int zmq_msg_init (zmq_msg_t *msg_)
@ -1070,495 +1042,6 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
#endif
}
// Create pollfd
void *zmq_pollfd_new ()
{
return new zmq::signaler_t ();
}
// Close pollfd
int zmq_pollfd_close (void* p_)
{
zmq::signaler_t *s = (zmq::signaler_t*)p_;
LIBZMQ_DELETE(s);
return 0;
}
// Recv signal from pollfd
void zmq_pollfd_recv(void *p_)
{
zmq::signaler_t *s = (zmq::signaler_t*)p_;
s->recv ();
}
// Wait until pollfd is signalled
int zmq_pollfd_wait(void *p_, int timeout_)
{
zmq::signaler_t *s = (zmq::signaler_t*)p_;
return s->wait (timeout_);
}
// Get pollfd fd
#if defined _WIN32
SOCKET zmq_pollfd_fd (void *p_)
#else
int zmq_pollfd_fd (void *p_)
#endif
{
zmq::signaler_t *s = (zmq::signaler_t*)p_;
return s->get_fd ();
}
// Polling thread safe sockets version
int zmq_pollfd_poll (void* p_, zmq_pollitem_t *items_, int nitems_, long timeout_)
{
#if defined ZMQ_POLL_BASED_ON_POLL
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
}
if (unlikely (nitems_ == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#elif defined ZMQ_HAVE_ANDROID
usleep (timeout_ * 1000);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
if (!items_) {
errno = EFAULT;
return -1;
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
pollfd spollfds[ZMQ_POLLITEMS_DFLT];
pollfd *pollfds = spollfds;
int pollfds_size = 0;
int pollfds_index = 0;
bool use_pollfd = false;
for (int i = 0; i != nitems_; i++) {
if (items_ [i].socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1) {
return -1;
}
// All thread safe sockets share same fd
if (thread_safe) {
// if poll fd is not set yet and events are set for this socket
if (!use_pollfd && items_ [i].events) {
use_pollfd = true;
pollfds_size++;
}
}
else
pollfds_size++;
}
else
pollfds_size++;
}
if (pollfds_size > ZMQ_POLLITEMS_DFLT) {
pollfds = (pollfd*) malloc (pollfds_size * sizeof (pollfd));
alloc_assert (pollfds);
}
// If we have at least one thread safe socket we set pollfd first
if (use_pollfd) {
pollfds [0].fd = zmq_pollfd_fd (p_);
pollfds [0].events = POLLIN;
pollfds_index = 1;
}
// Build pollset for poll () system call.
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket, we poll on the file descriptor
// retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
// We already handled the thread safe sockets
if (!thread_safe) {
size_t zmq_fd_size = sizeof (zmq::fd_t);
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &pollfds [pollfds_index].fd,
&zmq_fd_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
pollfds [pollfds_index].events = items_ [i].events ? POLLIN : 0;
pollfds_index++;
}
}
// Else, the poll item is a raw file descriptor. Just convert the
// events to normal POLLIN/POLLOUT for poll ().
else {
pollfds [pollfds_index].fd = items_ [i].fd;
pollfds [pollfds_index].events =
(items_ [i].events & ZMQ_POLLIN ? POLLIN : 0) |
(items_ [i].events & ZMQ_POLLOUT ? POLLOUT : 0) |
(items_ [i].events & ZMQ_POLLPRI ? POLLPRI : 0);
pollfds_index++;
}
}
bool first_pass = true;
int nevents = 0;
while (true) {
// Compute the timeout for the subsequent poll.
int timeout;
if (first_pass)
timeout = 0;
else
if (timeout_ < 0)
timeout = -1;
else
timeout = end - now;
// Wait for events.
while (true) {
int rc = poll (pollfds, pollfds_size, timeout);
if (rc == -1 && errno == EINTR) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
errno_assert (rc >= 0);
break;
}
// Receive the signal from pollfd
if (use_pollfd && pollfds[0].revents & POLLIN)
zmq_pollfd_recv (p_);
// Check for the events.
for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1) {
if (pollfds != spollfds)
free (pollfds);
return -1;
}
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (pollfds [i].revents & POLLIN)
items_ [i].revents |= ZMQ_POLLIN;
if (pollfds [i].revents & POLLOUT)
items_ [i].revents |= ZMQ_POLLOUT;
if (pollfds [i].revents & POLLPRI)
items_ [i].revents |= ZMQ_POLLPRI;
if (pollfds [i].revents & ~(POLLIN | POLLOUT | POLLPRI))
items_ [i].revents |= ZMQ_POLLERR;
}
if (items_ [i].revents)
nevents++;
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// If there are events to return, we can exit immediately.
if (nevents)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
if (pollfds != spollfds)
free (pollfds);
return nevents;
#elif defined ZMQ_POLL_BASED_ON_SELECT
if (unlikely (nitems_ < 0)) {
errno = EINVAL;
return -1;
}
if (unlikely (nitems_ == 0)) {
if (timeout_ == 0)
return 0;
#if defined ZMQ_HAVE_WINDOWS
Sleep (timeout_ > 0 ? timeout_ : INFINITE);
return 0;
#else
return usleep (timeout_ * 1000);
#endif
}
zmq::clock_t clock;
uint64_t now = 0;
uint64_t end = 0;
// Ensure we do not attempt to select () on more than FD_SETSIZE
// file descriptors.
zmq_assert (nitems_ <= FD_SETSIZE);
fd_set pollset_in;
FD_ZERO (&pollset_in);
fd_set pollset_out;
FD_ZERO (&pollset_out);
fd_set pollset_err;
FD_ZERO (&pollset_err);
bool use_pollfd = false;
for (int i = 0; i != nitems_; i++) {
if (items_ [i].socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1)
return -1;
if (thread_safe && items_ [i].events) {
use_pollfd = true;
FD_SET (zmq_pollfd_fd (p_), &pollset_in);
break;
}
}
}
zmq::fd_t maxfd = 0;
// Build the fd_sets for passing to select ().
for (int i = 0; i != nitems_; i++) {
// If the poll item is a 0MQ socket we are interested in input on the
// notification file descriptor retrieved by the ZMQ_FD socket option.
if (items_ [i].socket) {
int thread_safe;
size_t thread_safe_size = sizeof(int);
if (zmq_getsockopt (items_ [i].socket, ZMQ_THREAD_SAFE, &thread_safe,
&thread_safe_size) == -1)
return -1;
if (!thread_safe) {
zmq::fd_t notify_fd;
size_t zmq_fd_size = sizeof (zmq::fd_t);
if (zmq_getsockopt (items_ [i].socket, ZMQ_FD, &notify_fd,
&zmq_fd_size) == -1)
return -1;
if (items_ [i].events) {
FD_SET (notify_fd, &pollset_in);
if (maxfd < notify_fd)
maxfd = notify_fd;
}
}
}
// Else, the poll item is a raw file descriptor. Convert the poll item
// events to the appropriate fd_sets.
else {
if (items_ [i].events & ZMQ_POLLIN)
FD_SET (items_ [i].fd, &pollset_in);
if (items_ [i].events & ZMQ_POLLOUT)
FD_SET (items_ [i].fd, &pollset_out);
if (items_ [i].events & ZMQ_POLLERR)
FD_SET (items_ [i].fd, &pollset_err);
if (maxfd < items_ [i].fd)
maxfd = items_ [i].fd;
}
}
bool first_pass = true;
int nevents = 0;
fd_set inset, outset, errset;
while (true) {
// Compute the timeout for the subsequent poll.
timeval timeout;
timeval *ptimeout;
if (first_pass) {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
ptimeout = &timeout;
}
else
if (timeout_ < 0)
ptimeout = NULL;
else {
timeout.tv_sec = (long) ((end - now) / 1000);
timeout.tv_usec = (long) ((end - now) % 1000 * 1000);
ptimeout = &timeout;
}
// Wait for events. Ignore interrupts if there's infinite timeout.
while (true) {
memcpy (&inset, &pollset_in, sizeof (fd_set));
memcpy (&outset, &pollset_out, sizeof (fd_set));
memcpy (&errset, &pollset_err, sizeof (fd_set));
#if defined ZMQ_HAVE_WINDOWS
int rc = select (0, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == SOCKET_ERROR)) {
errno = zmq::wsa_error_to_errno (WSAGetLastError ());
wsa_assert (errno == ENOTSOCK);
return -1;
}
#else
int rc = select (maxfd + 1, &inset, &outset, &errset, ptimeout);
if (unlikely (rc == -1)) {
errno_assert (errno == EINTR || errno == EBADF);
return -1;
}
#endif
break;
}
if (use_pollfd && FD_ISSET (zmq_pollfd_fd (p_), &inset))
zmq_pollfd_recv (p_);
// Check for the events.
for (int i = 0; i != nitems_; i++) {
items_ [i].revents = 0;
// The poll item is a 0MQ socket. Retrieve pending events
// using the ZMQ_EVENTS socket option.
if (items_ [i].socket) {
size_t zmq_events_size = sizeof (uint32_t);
uint32_t zmq_events;
if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
&zmq_events_size) == -1)
return -1;
if ((items_ [i].events & ZMQ_POLLOUT) &&
(zmq_events & ZMQ_POLLOUT))
items_ [i].revents |= ZMQ_POLLOUT;
if ((items_ [i].events & ZMQ_POLLIN) &&
(zmq_events & ZMQ_POLLIN))
items_ [i].revents |= ZMQ_POLLIN;
}
// Else, the poll item is a raw file descriptor, simply convert
// the events to zmq_pollitem_t-style format.
else {
if (FD_ISSET (items_ [i].fd, &inset))
items_ [i].revents |= ZMQ_POLLIN;
if (FD_ISSET (items_ [i].fd, &outset))
items_ [i].revents |= ZMQ_POLLOUT;
if (FD_ISSET (items_ [i].fd, &errset))
items_ [i].revents |= ZMQ_POLLERR;
}
if (items_ [i].revents)
nevents++;
}
// If timeout is zero, exit immediately whether there are events or not.
if (timeout_ == 0)
break;
// If there are events to return, we can exit immediately.
if (nevents)
break;
// At this point we are meant to wait for events but there are none.
// If timeout is infinite we can just loop until we get some events.
if (timeout_ < 0) {
if (first_pass)
first_pass = false;
continue;
}
// The timeout is finite and there are no events. In the first pass
// we get a timestamp of when the polling have begun. (We assume that
// first pass have taken negligible time). We also compute the time
// when the polling should time out.
if (first_pass) {
now = clock.now_ms ();
end = now + timeout_;
if (now == end)
break;
first_pass = false;
continue;
}
// Find out whether timeout have expired.
now = clock.now_ms ();
if (now >= end)
break;
}
return nevents;
#else
// Exotic platforms that support neither poll() nor select().
errno = ENOTSUP;
return -1;
#endif
}
// The poller functionality
void* zmq_poller_new ()
@ -1579,14 +1062,20 @@ int zmq_poller_close (void *poller_)
return 0;
}
int zmq_poller_add (void *poller_, void *socket_, void *user_data_, short events_)
int zmq_poller_add (void *poller_, void *s_, void *user_data_, short events_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->add (socket_, user_data_, events_);
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
return ((zmq::socket_poller_t*)poller_)->add (socket, user_data_, events_);
}
#if defined _WIN32
@ -1604,14 +1093,20 @@ int zmq_poller_add_fd (void *poller_, int fd_, void *user_data_, short events_)
}
int zmq_poller_modify (void *poller_, void *socket_, short events_)
int zmq_poller_modify (void *poller_, void *s_, short events_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
return ((zmq::socket_poller_t*)poller_)->modify (socket_, events_);
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
return ((zmq::socket_poller_t*)poller_)->modify (socket, events_);
}
@ -1630,13 +1125,19 @@ int zmq_poller_modify_fd (void *poller_, int fd_, short events_)
}
int zmq_poller_remove (void *poller_, void *socket)
int zmq_poller_remove (void *poller_, void *s_)
{
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
errno = EFAULT;
return -1;
}
if (!s_ || !((zmq::socket_base_t*)s_)->check_tag ()) {
errno = ENOTSOCK;
return -1;
}
zmq::socket_base_t *socket = (zmq::socket_base_t*)s_;
return ((zmq::socket_poller_t*)poller_)->remove (socket);
}

View File

@ -52,7 +52,6 @@ set(tests
test_client_server
test_sockopt_hwm
test_heartbeats
test_thread_safe_polling
test_poller
)
if(NOT WIN32)

View File

@ -78,6 +78,11 @@ int main (void)
rc = zmq_recv (sink, data, 1, 0);
assert (rc == 1);
// We expect timed out
rc = zmq_poller_wait (poller, &event, 0);
assert (rc == -1);
assert (errno == ETIMEDOUT);
// Stop polling sink
rc = zmq_poller_remove (poller, sink);
assert (rc == 0);
@ -141,7 +146,7 @@ int main (void)
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
rc = zmq_ctx_shutdown (ctx);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;

View File

@ -1,164 +0,0 @@
/*
Copyright (c) 2007-2015 Contributors as noted in the AUTHORS file
This file is part of libzmq, the ZeroMQ core engine in C++.
libzmq is free software; you can redistribute it and/or modify it under
the terms of the GNU Lesser General Public License (LGPL) as published
by the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
As a special exception, the Contributors give you permission to link
this library with independent modules to produce an executable,
regardless of the license terms of these independent modules, and to
copy and distribute the resulting executable under terms of your choice,
provided that you also meet, for each linked independent module, the
terms and conditions of the license of that module. An independent
module is a module which is not derived from or based on this library.
If you modify this library, you must extend this exception to your
version of the library.
libzmq is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
License for more details.
You should have received a copy of the GNU Lesser General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "testutil.hpp"
void worker(void* s);
int main (void)
{
setup_test_environment();
void *ctx = zmq_ctx_new ();
assert (ctx);
void *server = zmq_socket (ctx, ZMQ_SERVER);
void *server2 = zmq_socket (ctx, ZMQ_SERVER);
void *pollfd = zmq_pollfd_new ();
int rc;
rc = zmq_add_pollfd (server, pollfd);
assert (rc == 0);
rc = zmq_add_pollfd (server2, pollfd);
assert (rc == 0);
zmq_pollitem_t items[] = {
{server, 0, ZMQ_POLLIN, 0},
{server2, 0, ZMQ_POLLIN, 0}};
rc = zmq_bind (server, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_bind (server2, "tcp://127.0.0.1:5561");
assert (rc == 0);
void* t = zmq_threadstart(worker, ctx);
assert (rc == 0);
rc = zmq_pollfd_poll (pollfd, items, 2, -1);
assert (rc == 1);
assert (items[0].revents == ZMQ_POLLIN);
assert (items[1].revents == 0);
zmq_msg_t msg;
rc = zmq_msg_init(&msg);
rc = zmq_msg_recv(&msg, server, ZMQ_DONTWAIT);
assert (rc == 1);
rc = zmq_pollfd_poll (pollfd, items, 2, -1);
assert (rc == 1);
assert (items[0].revents == 0);
assert (items[1].revents == ZMQ_POLLIN);
rc = zmq_msg_recv(&msg, server2, ZMQ_DONTWAIT);
assert (rc == 1);
rc = zmq_pollfd_poll (pollfd, items, 2, 0);
assert (rc == 0);
assert (items[0].revents == 0);
assert (items[1].revents == 0);
zmq_threadclose(t);
rc = zmq_msg_close(&msg);
assert (rc == 0);
rc = zmq_remove_pollfd (server, pollfd);
assert (rc == 0);
rc = zmq_remove_pollfd (server2, pollfd);
assert (rc == 0);
rc = zmq_pollfd_close (pollfd);
assert (rc == 0);
rc = zmq_close (server);
assert (rc == 0);
rc = zmq_close (server2);
assert (rc == 0);
rc = zmq_ctx_term (ctx);
assert (rc == 0);
return 0;
}
void worker(void* ctx)
{
void *client = zmq_socket (ctx, ZMQ_CLIENT);
int rc = zmq_connect (client, "tcp://127.0.0.1:5560");
assert (rc == 0);
msleep(100);
zmq_msg_t msg;
rc = zmq_msg_init_size(&msg,1);
assert (rc == 0);
char * data = (char *)zmq_msg_data(&msg);
data[0] = 1;
rc = zmq_msg_send(&msg, client, 0);
assert (rc == 1);
rc = zmq_disconnect (client, "tcp://127.0.0.1:5560");
assert (rc == 0);
rc = zmq_connect (client, "tcp://127.0.0.1:5561");
assert (rc == 0);
msleep(100);
rc = zmq_msg_close(&msg);
assert (rc == 0);
rc = zmq_msg_init_size(&msg,1);
assert (rc == 0);
data = (char *)zmq_msg_data(&msg);
data[0] = 1;
rc = zmq_msg_send(&msg, client, 0);
assert (rc == 1);
rc = zmq_msg_close(&msg);
assert (rc == 0);
rc = zmq_close (client);
assert (rc == 0);
}