diff --git a/src/poller_base.cpp b/src/poller_base.cpp index 463095ba..87a25792 100644 --- a/src/poller_base.cpp +++ b/src/poller_base.cpp @@ -42,7 +42,7 @@ zmq::poller_base_t::~poller_base_t () zmq_assert (get_load () == 0); } -int zmq::poller_base_t::get_load () +int zmq::poller_base_t::get_load () const { return load.get (); } diff --git a/src/poller_base.hpp b/src/poller_base.hpp index a5ec664b..73eb0f85 100644 --- a/src/poller_base.hpp +++ b/src/poller_base.hpp @@ -47,7 +47,7 @@ class poller_base_t // Returns load of the poller. Note that this function can be // invoked from a different thread! - int get_load (); + int get_load () const; // Add a timeout to expire in timeout_ milliseconds. After the // expiration timer_event on sink_ object will be called with diff --git a/src/select.cpp b/src/select.cpp index c15ba05b..c21f3d84 100644 --- a/src/select.cpp +++ b/src/select.cpp @@ -92,6 +92,7 @@ zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) #endif adjust_load (1); + assert_load_consistent (); return fd_; } @@ -109,6 +110,27 @@ zmq::select_t::find_fd_entry_by_handle (fd_entries_t &fd_entries, return fd_entry_it; } +size_t zmq::select_t::count_non_retired (const fd_entries_t &fd_entries) +{ + return fd_entries.size () + - std::count_if (fd_entries.begin (), fd_entries.end (), + is_retired_fd); +} + +void zmq::select_t::assert_load_consistent () const +{ +#ifdef _DEBUG + // check if load is consistent with number of non-retired fd entries + int expected = get_load (); + + for (family_entries_t::const_iterator family_it = family_entries.begin (); + family_it != family_entries.end (); ++family_it) { + expected -= (int) count_non_retired (family_it->second.fd_entries); + } + zmq_assert (expected == 0); +#endif +} + void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_, const fds_set_t &local_fds_set_, int event_count_) @@ -150,7 +172,7 @@ void zmq::select_t::trigger_events (const fd_entries_t &fd_entries_, } #if defined ZMQ_HAVE_WINDOWS -bool zmq::select_t::try_remove_fd_entry ( +int zmq::select_t::try_retire_fd_entry ( family_entries_t::iterator family_entry_it, zmq::fd_t &handle_) { family_entry_t &family_entry = family_entry_it->second; @@ -158,7 +180,7 @@ bool zmq::select_t::try_remove_fd_entry ( fd_entries_t::iterator fd_entry_it = find_fd_entry_by_handle (family_entry.fd_entries, handle_); if (fd_entry_it == family_entry.fd_entries.end ()) - return false; + return 0; if (family_entry_it != current_family_entry_it) { // Family is not currently being iterated and can be safely // modified in-place. So later it can be skipped without @@ -168,10 +190,10 @@ bool zmq::select_t::try_remove_fd_entry ( // Otherwise mark removed entries as retired. It will be cleaned up // at the end of the iteration. See zmq::select_t::loop fd_entry_it->fd = retired_fd; - family_entry.retired = true; + family_entry.has_retired = true; } family_entry.fds_set.remove_fd (handle_); - return true; + return 1; } #endif @@ -179,12 +201,12 @@ void zmq::select_t::rm_fd (handle_t handle_) { #if defined ZMQ_HAVE_WINDOWS u_short family = get_fd_family (handle_); + int retired = 0; if (family != AF_UNSPEC) { family_entries_t::iterator family_entry_it = family_entries.find (family); - int removed = try_remove_fd_entry (family_entry_it, handle_); - assert (removed); + retired += try_retire_fd_entry (family_entry_it, handle_); } else { // get_fd_family may fail and return AF_UNSPEC if the socket was not // successfully connected. In that case, we need to look for the @@ -193,8 +215,9 @@ void zmq::select_t::rm_fd (handle_t handle_) for (family_entries_t::iterator family_entry_it = family_entries.begin (); family_entry_it != end; ++family_entry_it) { - if (try_remove_fd_entry (family_entry_it, handle_)) + if (retired += try_retire_fd_entry (family_entry_it, handle_)) { break; + } } } #else @@ -202,6 +225,8 @@ void zmq::select_t::rm_fd (handle_t handle_) find_fd_entry_by_handle (family_entry.fd_entries, handle_); assert (fd_entry_it != fd_entries.end ()); + ++retired; + fd_entry_it->fd = retired_fd; family_entry.fds_set.remove_fd (handle_); @@ -213,9 +238,12 @@ void zmq::select_t::rm_fd (handle_t handle_) maxfd = fd_entry_it->fd; } - family_entry.retired = true; + family_entry.has_retired = true; #endif + zmq_assert (retired == 1); adjust_load (-1); + + assert_load_consistent (); } void zmq::select_t::set_pollin (handle_t handle_) @@ -406,8 +434,8 @@ void zmq::select_t::select_family_entry (family_entry_t &family_entry_, trigger_events (fd_entries, local_fds_set, rc); - if (family_entry_.retired) { - family_entry_.retired = false; + if (family_entry_.has_retired) { + family_entry_.has_retired = false; family_entry_.fd_entries.erase (std::remove_if (fd_entries.begin (), fd_entries.end (), is_retired_fd), @@ -543,7 +571,7 @@ u_short zmq::select_t::determine_fd_family (fd_t fd_) return AF_UNSPEC; } -zmq::select_t::family_entry_t::family_entry_t () : retired (false) +zmq::select_t::family_entry_t::family_entry_t () : has_retired (false) { } diff --git a/src/select.hpp b/src/select.hpp index a410dab8..9e253b53 100644 --- a/src/select.hpp +++ b/src/select.hpp @@ -119,7 +119,7 @@ class select_t : public poller_base_t fd_entries_t fd_entries; fds_set_t fds_set; - bool retired; + bool has_retired; }; void select_family_entry (family_entry_t &family_entry_, @@ -143,8 +143,8 @@ class select_t : public poller_base_t // See loop for details. family_entries_t::iterator current_family_entry_it; - bool try_remove_fd_entry (family_entries_t::iterator family_entry_it, - zmq::fd_t &handle_); + int try_retire_fd_entry (family_entries_t::iterator family_entry_it, + zmq::fd_t &handle_); static const size_t fd_family_cache_size = 8; std::pair fd_family_cache[fd_family_cache_size]; @@ -166,6 +166,10 @@ class select_t : public poller_base_t static fd_entries_t::iterator find_fd_entry_by_handle (fd_entries_t &fd_entries, handle_t handle_); + static size_t + count_non_retired (const zmq::select_t::fd_entries_t &fd_entries); + void assert_load_consistent () const; + // If true, start has been called. bool started;