From bdcaa935b95f6373444d1a38ca3094034a9be981 Mon Sep 17 00:00:00 2001 From: Min RK Date: Wed, 28 Sep 2016 13:39:38 +0200 Subject: [PATCH] zmq_poll calls zmq_poller if available enables zmq_poll on threadsafe sockets only supported in zmq_poller (radio, dish, etc.) --- src/zmq.cpp | 51 +++++++++++++++++++++++++++++++++++++++ tests/test_radio_dish.cpp | 9 +++++++ 2 files changed, 60 insertions(+) diff --git a/src/zmq.cpp b/src/zmq.cpp index 8aaae035..23a3d6ab 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -742,10 +742,60 @@ const char *zmq_msg_gets (zmq_msg_t *msg_, const char *property_) // Polling. +#if defined ZMQ_HAVE_POLLER +inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) +{ + // implement zmq_poll on top of zmq_poller + int rc; + zmq_poller_event_t events[nitems_]; + void *poller = zmq_poller_new (); + alloc_assert(poller); + + // Register sockets with poller + for (int i = 0; i < nitems_; i++) { + if (items_[i].socket) { + // Poll item is a 0MQ socket. + rc = zmq_poller_add (poller, items_[i].socket, NULL, items_[i].events); + if (rc < 0) { + zmq_poller_destroy (&poller); + return rc; + } + } else { + // Poll item is a raw file descriptor. + rc = zmq_poller_add_fd (poller, items_[i].fd, NULL, items_[i].events); + if (rc < 0) { + zmq_poller_destroy (&poller); + return rc; + } + } + } + + // Wait for events + rc = zmq_poller_wait_all (poller, events, nitems_, timeout_); + if (rc < 0) { + zmq_poller_destroy (&poller); + return rc; + } + + // Put the event information where zmq_poll expects it to go. + for (int i = 0; i < nitems_; i++) { + items_[i].revents = events[i].events; + } + + // Cleanup + rc = zmq_poller_destroy (&poller); + return rc; +} +#endif // ZMQ_HAVE_POLLER + int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) { // TODO: the function implementation can just call zmq_pollfd_poll with // pollfd as NULL, however pollfd is not yet stable. +#if defined ZMQ_HAVE_POLLER + // if poller is present, use that. + return zmq_poller_poll(items_, nitems_, timeout_); +#else #if defined ZMQ_POLL_BASED_ON_POLL if (unlikely (nitems_ < 0)) { errno = EINVAL; @@ -1094,6 +1144,7 @@ int zmq_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) errno = ENOTSUP; return -1; #endif +#endif // ZMQ_HAVE_POLLER } // The poller functionality diff --git a/tests/test_radio_dish.cpp b/tests/test_radio_dish.cpp index e5e371fd..7d021331 100644 --- a/tests/test_radio_dish.cpp +++ b/tests/test_radio_dish.cpp @@ -161,6 +161,15 @@ int main (void) rc = msg_send (&msg, radio, "Movies", "Godfather"); assert (rc == 9); + // test zmq_poll with dish + zmq_pollitem_t items [] = { + { radio, 0, ZMQ_POLLIN, 0 }, // read publications + { dish, 0, ZMQ_POLLIN, 0 }, // read subscriptions + }; + rc = zmq_poll(items, 2, 2000); + assert (rc == 0); + assert (items[1].revents == ZMQ_POLLIN); + // Check the correct message arrived rc = msg_recv_cmp (&msg, dish, "Movies", "Godfather"); assert (rc == 9);