mirror of
https://github.com/zeromq/libzmq.git
synced 2025-03-10 07:56:09 +00:00
Problem: zmq_poller only signals one event
Solution: zmq_poller_wait_all signals all events allows signaling multiple events with one call to zmq_poller_wait_all rather than emitting only one event. this prepares for zmq_poll being based on zmq_poller, which requires events for all sockets rather than just one.
This commit is contained in:
parent
555a087763
commit
2bc9796651
@ -577,6 +577,7 @@ ZMQ_EXPORT int zmq_poller_add (void *poller, void *socket, void *user_data, sho
|
|||||||
ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events);
|
ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events);
|
||||||
ZMQ_EXPORT int zmq_poller_remove (void *poller, void *socket);
|
ZMQ_EXPORT int zmq_poller_remove (void *poller, void *socket);
|
||||||
ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
|
ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
|
||||||
|
ZMQ_EXPORT int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, long timeout);
|
||||||
|
|
||||||
#if defined _WIN32
|
#if defined _WIN32
|
||||||
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
|
ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
|
||||||
|
@ -380,7 +380,7 @@ int zmq::socket_poller_t::rebuild ()
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_)
|
int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, long timeout_)
|
||||||
{
|
{
|
||||||
if (need_rebuild)
|
if (need_rebuild)
|
||||||
if (rebuild () == -1)
|
if (rebuild () == -1)
|
||||||
@ -412,6 +412,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
|||||||
uint64_t end = 0;
|
uint64_t end = 0;
|
||||||
|
|
||||||
bool first_pass = true;
|
bool first_pass = true;
|
||||||
|
bool found = false;
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
// Compute the timeout for the subsequent poll.
|
// Compute the timeout for the subsequent poll.
|
||||||
@ -439,7 +440,13 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
|||||||
signaler.recv ();
|
signaler.recv ();
|
||||||
|
|
||||||
// Check for the events.
|
// Check for the events.
|
||||||
for (items_t::iterator it = items.begin (); it != items.end (); ++it) {
|
int i=0;
|
||||||
|
for (items_t::iterator it = items.begin (); it != items.end (); ++i, ++it) {
|
||||||
|
|
||||||
|
events_[i].socket = NULL;
|
||||||
|
events_[i].fd = 0;
|
||||||
|
events_[i].user_data = NULL;
|
||||||
|
events_[i].events = 0;
|
||||||
|
|
||||||
// The poll item is a 0MQ socket. Retrieve pending events
|
// The poll item is a 0MQ socket. Retrieve pending events
|
||||||
// using the ZMQ_EVENTS socket option.
|
// using the ZMQ_EVENTS socket option.
|
||||||
@ -451,12 +458,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (it->events & events) {
|
if (it->events & events) {
|
||||||
event_->socket = it->socket;
|
events_[i].socket = it->socket;
|
||||||
event_->user_data = it->user_data;
|
events_[i].user_data = it->user_data;
|
||||||
event_->events = it->events & events;
|
events_[i].events = it->events & events;
|
||||||
|
found = true;
|
||||||
// If there is event to return, we can exit immediately.
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Else, the poll item is a raw file descriptor, simply convert
|
// Else, the poll item is a raw file descriptor, simply convert
|
||||||
@ -475,16 +480,17 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
|||||||
events |= ZMQ_POLLERR;
|
events |= ZMQ_POLLERR;
|
||||||
|
|
||||||
if (events) {
|
if (events) {
|
||||||
event_->socket = NULL;
|
events_[i].socket = NULL;
|
||||||
event_->user_data = it->user_data;
|
events_[i].user_data = it->user_data;
|
||||||
event_->fd = it->fd;
|
events_[i].fd = it->fd;
|
||||||
event_->events = events;
|
events_[i].events = events;
|
||||||
|
found = true;
|
||||||
// If there is event to return, we can exit immediately.
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (found) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// If timeout is zero, exit immediately whether there are events or not.
|
// If timeout is zero, exit immediately whether there are events or not.
|
||||||
if (timeout_ == 0)
|
if (timeout_ == 0)
|
||||||
@ -516,7 +522,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time
|
|||||||
if (now >= end)
|
if (now >= end)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
errno = ETIMEDOUT;
|
errno = ETIMEDOUT;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
|
@ -75,6 +75,8 @@ namespace zmq
|
|||||||
|
|
||||||
int wait (event_t *event, long timeout);
|
int wait (event_t *event, long timeout);
|
||||||
|
|
||||||
|
inline int size (void) { return items.size (); };
|
||||||
|
|
||||||
// Return false if object is not a socket.
|
// Return false if object is not a socket.
|
||||||
bool check_tag ();
|
bool check_tag ();
|
||||||
|
|
||||||
|
45
src/zmq.cpp
45
src/zmq.cpp
@ -1222,15 +1222,46 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_)
|
|||||||
|
|
||||||
zmq_assert (event != NULL);
|
zmq_assert (event != NULL);
|
||||||
|
|
||||||
zmq::socket_poller_t::event_t e;
|
int n_items = ((zmq::socket_poller_t*)poller_)->size ();
|
||||||
memset (&e, 0, sizeof (e));
|
zmq_poller_event_t *events;
|
||||||
|
events = new zmq_poller_event_t[n_items];
|
||||||
|
alloc_assert(events);
|
||||||
|
|
||||||
int rc = ((zmq::socket_poller_t*)poller_)->wait (&e, timeout_);
|
int rc = zmq_poller_wait_all(poller_, events, timeout_);
|
||||||
|
|
||||||
event->socket = e.socket;
|
if (rc >= 0) {
|
||||||
event->fd = e.fd;
|
*event = events[0];
|
||||||
event->user_data = e.user_data;
|
} else {
|
||||||
event->events = e.events;
|
memset (event, 0, sizeof(zmq_poller_event_t));
|
||||||
|
}
|
||||||
|
delete [] events;
|
||||||
|
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, long timeout_)
|
||||||
|
{
|
||||||
|
if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) {
|
||||||
|
errno = EFAULT;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
zmq_assert (events != NULL);
|
||||||
|
|
||||||
|
int n_items = ((zmq::socket_poller_t*)poller_)->size ();
|
||||||
|
zmq::socket_poller_t::event_t *evts;
|
||||||
|
evts = new zmq::socket_poller_t::event_t[n_items];
|
||||||
|
alloc_assert(evts);
|
||||||
|
|
||||||
|
int rc = ((zmq::socket_poller_t*)poller_)->wait (evts, timeout_);
|
||||||
|
|
||||||
|
for(int i = 0; i < n_items; ++i) {
|
||||||
|
events[i].socket = evts[i].socket;
|
||||||
|
events[i].fd = evts[i].fd;
|
||||||
|
events[i].user_data = evts[i].user_data;
|
||||||
|
events[i].events = evts[i].events;
|
||||||
|
}
|
||||||
|
delete [] evts;
|
||||||
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
@ -80,6 +80,7 @@ int zmq_poller_add (void *poller, void *socket, void *user_data, short events);
|
|||||||
int zmq_poller_modify (void *poller, void *socket, short events);
|
int zmq_poller_modify (void *poller, void *socket, short events);
|
||||||
int zmq_poller_remove (void *poller, void *socket);
|
int zmq_poller_remove (void *poller, void *socket);
|
||||||
int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
|
int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout);
|
||||||
|
int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, long timeout);
|
||||||
|
|
||||||
#if defined _WIN32
|
#if defined _WIN32
|
||||||
int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
|
int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user