diff --git a/src/proxy.cpp b/src/proxy.cpp index 7161acea..8c626af8 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -130,15 +130,6 @@ int zmq::proxy ( { backend_, 0, ZMQ_POLLOUT, 0 } }; - int control_idx = 2; - if (frontend_ == backend_) { - // when frontend & backend are the same, - // avoid duplicate poll entries - qt_poll_items -= 1; - items[1] = items[2]; - control_idx = 1; - } - // Proxy can be in these three states enum { active, @@ -163,7 +154,7 @@ int zmq::proxy ( } // Process a control command if any - if (control_ && items [control_idx].revents & ZMQ_POLLIN) { + if (control_ && items [2].revents & ZMQ_POLLIN) { rc = control_->recv (&msg, 0); if (unlikely (rc < 0)) return -1; diff --git a/src/zmq.cpp b/src/zmq.cpp index bb79f5df..8d479413 100644 --- a/src/zmq.cpp +++ b/src/zmq.cpp @@ -753,12 +753,28 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) void *poller = zmq_poller_new (); alloc_assert(poller); + bool repeat_items = false; // Register sockets with poller for (int i = 0; i < nitems_; i++) { items_[i].revents = 0; + + bool modify = false; + short e = items_[i].events; if (items_[i].socket) { // Poll item is a 0MQ socket. - rc = zmq_poller_add (poller, items_[i].socket, NULL, items_[i].events); + for (int j = 0; j < i; ++j) { + // Check for repeat entries + if (items_[j].socket == items_[i].socket) { + repeat_items = true; + modify = true; + e |= items_[j].events; + } + } + if (modify) { + rc = zmq_poller_modify (poller, items_[i].socket, e); + } else { + rc = zmq_poller_add (poller, items_[i].socket, NULL, e); + } if (rc < 0) { zmq_poller_destroy (&poller); delete [] events; @@ -766,7 +782,19 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) } } else { // Poll item is a raw file descriptor. - rc = zmq_poller_add_fd (poller, items_[i].fd, NULL, items_[i].events); + for (int j = 0; j < i; ++j) { + // Check for repeat entries + if (items_[j].fd == items_[i].fd) { + repeat_items = true; + modify = true; + e |= items_[j].events; + } + } + if (modify) { + rc = zmq_poller_modify_fd (poller, items_[i].fd, e); + } else { + rc = zmq_poller_add_fd (poller, items_[i].fd, NULL, e); + } if (rc < 0) { zmq_poller_destroy (&poller); delete [] events; @@ -786,18 +814,30 @@ inline int zmq_poller_poll (zmq_pollitem_t *items_, int nitems_, long timeout_) return rc; } - // Transform poller events into zmq_pollitem events + // Transform poller events into zmq_pollitem events. // items_ contains all items, while events only contains fired events. - // The two are still co-ordered, so the step through items - // Checking for matches only on the first event - int found_events = rc; - for (int i = 0, j = 0; i < nitems_ && j < found_events; i++) { - if ( - (items_[i].socket && items_[i].socket == events[j].socket) || - (items_[i].fd && items_[i].fd == events[j].fd) - ) { - items_[i].revents = events[j].events; - j++; + // If no sockets are repeated (likely), the two are still co-ordered, so the step through items + // Checking for matches only on the first event. + // If there are repeat items, they cannot be assumed to be co-ordered, + // so each pollitem must check fired events from the beginning. + int j_start = 0, found_events = rc; + for (int i = 0; i < nitems_; i++) { + for (int j = j_start; j < found_events; ++j) { + if ( + (items_[i].socket && items_[i].socket == events[j].socket) || + (items_[i].fd && items_[i].fd == events[j].fd) + ) { + items_[i].revents = events[j].events & items_[i].events; + if (!repeat_items) { + // no repeats, we can ignore events we've already seen + j_start++; + } + break; + } + if (!repeat_items) { + // no repeats, never have to look at j > j_start + break; + } } }