diff --git a/include/zmq.h b/include/zmq.h index 5f35d191..108e8644 100644 --- a/include/zmq.h +++ b/include/zmq.h @@ -577,6 +577,7 @@ ZMQ_EXPORT int zmq_poller_add (void *poller, void *socket, void *user_data, sho ZMQ_EXPORT int zmq_poller_modify (void *poller, void *socket, short events); ZMQ_EXPORT int zmq_poller_remove (void *poller, void *socket); ZMQ_EXPORT int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout); +ZMQ_EXPORT int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, long timeout); #if defined _WIN32 ZMQ_EXPORT int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events); diff --git a/src/socket_poller.cpp b/src/socket_poller.cpp index d694df3a..70a7fe1e 100644 --- a/src/socket_poller.cpp +++ b/src/socket_poller.cpp @@ -380,7 +380,7 @@ int zmq::socket_poller_t::rebuild () return 0; } -int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long timeout_) +int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *events_, long timeout_) { if (need_rebuild) if (rebuild () == -1) @@ -412,6 +412,7 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time uint64_t end = 0; bool first_pass = true; + bool found = false; while (true) { // Compute the timeout for the subsequent poll. @@ -439,7 +440,13 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time signaler.recv (); // Check for the events. - for (items_t::iterator it = items.begin (); it != items.end (); ++it) { + int i=0; + for (items_t::iterator it = items.begin (); it != items.end (); ++i, ++it) { + + events_[i].socket = NULL; + events_[i].fd = 0; + events_[i].user_data = NULL; + events_[i].events = 0; // The poll item is a 0MQ socket. Retrieve pending events // using the ZMQ_EVENTS socket option. @@ -451,12 +458,10 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time } if (it->events & events) { - event_->socket = it->socket; - event_->user_data = it->user_data; - event_->events = it->events & events; - - // If there is event to return, we can exit immediately. - return 0; + events_[i].socket = it->socket; + events_[i].user_data = it->user_data; + events_[i].events = it->events & events; + found = true; } } // Else, the poll item is a raw file descriptor, simply convert @@ -475,16 +480,17 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time events |= ZMQ_POLLERR; if (events) { - event_->socket = NULL; - event_->user_data = it->user_data; - event_->fd = it->fd; - event_->events = events; - - // If there is event to return, we can exit immediately. - return 0; + events_[i].socket = NULL; + events_[i].user_data = it->user_data; + events_[i].fd = it->fd; + events_[i].events = events; + found = true; } } } + if (found) { + return 0; + } // If timeout is zero, exit immediately whether there are events or not. if (timeout_ == 0) @@ -516,7 +522,6 @@ int zmq::socket_poller_t::wait (zmq::socket_poller_t::event_t *event_, long time if (now >= end) break; } - errno = ETIMEDOUT; return -1; diff --git a/src/socket_poller.hpp b/src/socket_poller.hpp index 04db836f..5fe43c06 100644 --- a/src/socket_poller.hpp +++ b/src/socket_poller.hpp @@ -75,6 +75,8 @@ namespace zmq int wait (event_t *event, long timeout); + inline int size (void) { return items.size (); }; + // Return false if object is not a socket. bool check_tag (); diff --git a/src/zmq.cpp b/src/zmq.cpp index d6f58db1..ada3f7f9 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -1222,15 +1222,46 @@ int zmq_poller_wait (void *poller_, zmq_poller_event_t *event, long timeout_) zmq_assert (event != NULL); - zmq::socket_poller_t::event_t e; - memset (&e, 0, sizeof (e)); + int n_items = ((zmq::socket_poller_t*)poller_)->size (); + zmq_poller_event_t *events; + events = new zmq_poller_event_t[n_items]; + alloc_assert(events); - int rc = ((zmq::socket_poller_t*)poller_)->wait (&e, timeout_); + int rc = zmq_poller_wait_all(poller_, events, timeout_); - event->socket = e.socket; - event->fd = e.fd; - event->user_data = e.user_data; - event->events = e.events; + if (rc >= 0) { + *event = events[0]; + } else { + memset (event, 0, sizeof(zmq_poller_event_t)); + } + delete [] events; + + return rc; +} + +int zmq_poller_wait_all (void *poller_, zmq_poller_event_t *events, long timeout_) +{ + if (!poller_ || !((zmq::socket_poller_t*)poller_)->check_tag ()) { + errno = EFAULT; + return -1; + } + + zmq_assert (events != NULL); + + int n_items = ((zmq::socket_poller_t*)poller_)->size (); + zmq::socket_poller_t::event_t *evts; + evts = new zmq::socket_poller_t::event_t[n_items]; + alloc_assert(evts); + + int rc = ((zmq::socket_poller_t*)poller_)->wait (evts, timeout_); + + for(int i = 0; i < n_items; ++i) { + events[i].socket = evts[i].socket; + events[i].fd = evts[i].fd; + events[i].user_data = evts[i].user_data; + events[i].events = evts[i].events; + } + delete [] evts; return rc; } diff --git a/src/zmq_draft.h b/src/zmq_draft.h index 92956471..907f0b83 100644 --- a/src/zmq_draft.h +++ b/src/zmq_draft.h @@ -80,6 +80,7 @@ int zmq_poller_add (void *poller, void *socket, void *user_data, short events); int zmq_poller_modify (void *poller, void *socket, short events); int zmq_poller_remove (void *poller, void *socket); int zmq_poller_wait (void *poller, zmq_poller_event_t *event, long timeout); +int zmq_poller_wait_all (void *poller, zmq_poller_event_t *events, long timeout); #if defined _WIN32 int zmq_poller_add_fd (void *poller, SOCKET fd, void *user_data, short events);