mirror of
https://github.com/zeromq/libzmq.git
synced 2025-01-16 20:41:18 +08:00
Merge pull request #2140 from minrk/zmq_poll_poller
Support all sockets in zmq_poll
This commit is contained in:
commit
55930f5e42
51
src/zmq.cpp
51
src/zmq.cpp
@ -742,10 +742,60 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_)
|
|||||||
|
|
||||||
// Polling.
|
// Polling.
|
||||||
|
|
||||||
|
#if defined ZMQ_HAVE_POLLER
|
||||||
|
inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||||
|
{
|
||||||
|
// implement zmq_poll on top of zmq_poller
|
||||||
|
int rc;
|
||||||
|
zmq_poller_event_t events[nitems_];
|
||||||
|
void *poller = zmq_poller_new ();
|
||||||
|
alloc_assert(poller);
|
||||||
|
|
||||||
|
// Register sockets with poller
|
||||||
|
for (int i = 0; i < nitems_; i++) {
|
||||||
|
if (items_[i].socket) {
|
||||||
|
// Poll item is a 0MQ socket.
|
||||||
|
rc = zmq_poller_add (poller, items_[i].socket, NULL, items_[i].events);
|
||||||
|
if (rc < 0) {
|
||||||
|
zmq_poller_destroy (&poller);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Poll item is a raw file descriptor.
|
||||||
|
rc = zmq_poller_add_fd (poller, items_[i].fd, NULL, items_[i].events);
|
||||||
|
if (rc < 0) {
|
||||||
|
zmq_poller_destroy (&poller);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for events
|
||||||
|
rc = zmq_poller_wait_all (poller, events, nitems_, timeout_);
|
||||||
|
if (rc < 0) {
|
||||||
|
zmq_poller_destroy (&poller);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put the event information where zmq_poll expects it to go.
|
||||||
|
for (int i = 0; i < nitems_; i++) {
|
||||||
|
items_[i].revents = events[i].events;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cleanup
|
||||||
|
rc = zmq_poller_destroy (&poller);
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
#endif // ZMQ_HAVE_POLLER
|
||||||
|
|
||||||
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
||||||
{
|
{
|
||||||
// TODO: the function implementation can just call zmq_pollfd_poll with
|
// TODO: the function implementation can just call zmq_pollfd_poll with
|
||||||
// pollfd as NULL, however pollfd is not yet stable.
|
// pollfd as NULL, however pollfd is not yet stable.
|
||||||
|
#if defined ZMQ_HAVE_POLLER
|
||||||
|
// if poller is present, use that.
|
||||||
|
return zmq_poller_poll(items_, nitems_, timeout_);
|
||||||
|
#else
|
||||||
#if defined ZMQ_POLL_BASED_ON_POLL
|
#if defined ZMQ_POLL_BASED_ON_POLL
|
||||||
if (unlikely (nitems_ < 0)) {
|
if (unlikely (nitems_ < 0)) {
|
||||||
errno = EINVAL;
|
errno = EINVAL;
|
||||||
@ -1094,6 +1144,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_)
|
|||||||
errno = ENOTSUP;
|
errno = ENOTSUP;
|
||||||
return -1;
|
return -1;
|
||||||
#endif
|
#endif
|
||||||
|
#endif // ZMQ_HAVE_POLLER
|
||||||
}
|
}
|
||||||
|
|
||||||
// The poller functionality
|
// The poller functionality
|
||||||
|
@ -161,6 +161,15 @@ int main (void)
|
|||||||
rc = msg_send (&msg, radio, "Movies", "Godfather");
|
rc = msg_send (&msg, radio, "Movies", "Godfather");
|
||||||
assert (rc == 9);
|
assert (rc == 9);
|
||||||
|
|
||||||
|
// test zmq_poll with dish
|
||||||
|
zmq_pollitem_t items [] = {
|
||||||
|
{ radio, 0, ZMQ_POLLIN, 0 }, // read publications
|
||||||
|
{ dish, 0, ZMQ_POLLIN, 0 }, // read subscriptions
|
||||||
|
};
|
||||||
|
rc = zmq_poll(items, 2, 2000);
|
||||||
|
assert (rc == 0);
|
||||||
|
assert (items[1].revents == ZMQ_POLLIN);
|
||||||
|
|
||||||
// Check the correct message arrived
|
// Check the correct message arrived
|
||||||
rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather");
|
rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather");
|
||||||
assert (rc == 9);
|
assert (rc == 9);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user