#pragma once #include #include #include #include #include #include #include #include #ifndef MPM_LEFTRIGHT_CACHE_LINE_SIZE #define MPM_LEFTRIGHT_CACHE_LINE_SIZE 64 #endif namespace mpm { //! \defgroup Concepts //! Concept is a term that describes a named set of requirements for a type. //! \defgroup ReaderRegistry //! \ingroup Concepts //! \{ //! //! Keeps track of active readers such that it can efficiently //! indicate whether there are any active readers when queried. //! //! \par Extends //! DefaultConstructible //! //! \par Requirements //! Given:\n //! R, an implementation of the ReaderRegistry concept \n //! r, an instance of R //! //! |Expression | Requirements | Return type | //! |:-----------|:---------------------------------------------------------|:-------------| //! | R() | R is default constructible. | R | //! | r.arrive() | Notes the arival of a reader. Wait-free and noexcept. | void | //! | r.depart() | Notes the departure of a reader. Wait-free and noexcept. | void | //! | r.empty() | const and noexcept. | true if there are no readers; false otherwise. | //! \} struct in_place_t {}; constexpr in_place_t in_place{}; //! Wrap any single-threaded datastructure with Left-Right //! concurrency control //! //! Left-Right concurrency allows wait-free population-oblivious reads //! and blocking writes. Writers never block readers. //! //! Instances of this class maintain two full copies of the underlying //! datastructure and all modifications are performed twice. Consequently, //! uses of this class should be limited to small amounts of data where //! the number of reads dominates the number of writes. //! //! Left-Right concurrency control is described in depth in //! A. Correia and P. Ramalhete. Left-Right: A Concurrency Control //! Technique with Wait-Free Population Oblivious Reads template class basic_leftright { static_assert(noexcept(std::declval().arrive()), "ReaderRegistry::arrive() must be noexcept"); static_assert(noexcept(std::declval().depart()), "ReaderRegistry::depart() must be noexcept"); static_assert(noexcept(std::declval().empty()), "ReaderRegistry::empty() must be noexcept"); public: using value_type = T; using reference = value_type &; using const_reference = const value_type &; basic_leftright() = default; //! Construct the two underlying instances of T //! by moving a seed instance basic_leftright(T &&seed) noexcept( std::is_nothrow_copy_constructible::value && std::is_nothrow_move_constructible::value); //! Construct the two underlying instances of T //! by copying a seed instance basic_leftright(const value_type &seed) noexcept( std::is_nothrow_copy_constructible::value); //! Construct the two underlying instances of T in place, forwarding //! the args after the mpm::in_place tag template basic_leftright(in_place_t, Args &&...) noexcept( std::is_nothrow_copy_constructible::value && std::is_nothrow_constructible::value); //! \internal //! Need a use-case for these. It seems that you would never want to //! move/copy/swap the full leftright instance but rather apply those //! operations to the encapsulated instance, in which case the relevant //! operation is accessed via modify() basic_leftright(const basic_leftright &other) = delete; basic_leftright(basic_leftright &&other) = delete; basic_leftright &operator=(const basic_leftright &rhs) = delete; basic_leftright &operator=(basic_leftright &&rhs) = delete; void swap(basic_leftright &other) noexcept = delete; //! Modify the state of the managed datastructure //! //! Blocks/is-blocked-by other concurrent writers; does not //! block concurrent readers //! //! This function requires that execution of the supplied functor //! be noexcept. //! //! The function passed will be executed twice and *must* //! result in the exact same mutation operation being applied //! in both cases. For example it would be incorrect to supply //! a function here that inserted a random number into the //! underlying datastructure if said random number were calculated //! for each invocation (i.e. each invocation would insert a //! different value). //! //! \throws std::system_error on failure to lock internal mutex //! //! \internal I wanted the declaration to be as below so that this //! function doesn't even exist for non-noexcept functors, //! but g++ has not yet implemented noexcept mangling //! as of version 5.2.1. Instead we just static_assert the //! noexcept-ness of the functor in the body of the function //! //! template //! auto modify(F f) //! -> typename std::enable_if())), //! typename std::result_of::type>::type; template typename std::result_of::type modify(F f); //! Observe the state of the managed datastructure //! //! Always wait-free provided ReaderRegistry::arrive() and //! ReaderRegistry::depart() are truly wait-free //! //! \throws Whatever the provided functor throws and nothing else template typename std::result_of::type observe(F f) const noexcept(noexcept(f(std::declval()))); private: class scoped_read_indication { public: scoped_read_indication(ReaderRegistry &rr) noexcept; ~scoped_read_indication() noexcept; private: ReaderRegistry &m_reg; }; template void toggle_reader_registry(Lock &l) noexcept; enum lr { read_left, read_right }; mutable std::array m_reader_registries; std::atomic_size_t m_registry_index{0}; std::atomic m_leftright{read_left}; T m_left alignas(MPM_LEFTRIGHT_CACHE_LINE_SIZE); T m_right alignas(MPM_LEFTRIGHT_CACHE_LINE_SIZE); std::mutex m_writemutex; }; //! Simple implementation of ReaderRegistry concept //! //! This implemtation is wait-free but readers will contend //! on a single cache line due to the use of a shared counter. //! //! \concept{ReaderRegistry} class alignas(MPM_LEFTRIGHT_CACHE_LINE_SIZE) atomic_reader_registry { public: void arrive() noexcept; void depart() noexcept; bool empty() const noexcept; private: std::atomic_uint_fast32_t m_count{0}; }; //! Distributed implementation of ReaderRegistry //! //! Uses an array of N counters (suitably padded) and hashes reader //! thread ids to indices into said array so that concurrent reader //! registration can be made unlikely to contend. The likelihood //! of a collision is dependent on the number of concurrent readers //! relative to N. //! //! arrive() and depart() will perform better if N is a power of two. //! //! \concept{ReaderRegistry} template> class alignas(MPM_LEFTRIGHT_CACHE_LINE_SIZE) distributed_atomic_reader_registry { public: void arrive() noexcept; void depart() noexcept; bool empty() const noexcept; private: class alignas(MPM_LEFTRIGHT_CACHE_LINE_SIZE) counter { public: void incr() noexcept; void decr() noexcept; std::uint_fast32_t relaxed_read() const noexcept; private: std::atomic_uint_fast32_t m_value{0}; }; std::array m_counters{}; }; //! Default leftright uses the simpler reader registry; prefer //! distributed_atomic_reader_registry when reads are highly contended template using leftright = basic_leftright; #ifndef DOCS template basic_leftright::basic_leftright(T &&seed) noexcept( std::is_nothrow_copy_constructible::value && std::is_nothrow_move_constructible::value) : m_left(std::move(seed)), m_right(m_left) {} template basic_leftright::basic_leftright(const T &seed) noexcept( std::is_nothrow_copy_constructible::value) : m_left(seed), m_right(seed) {} template template basic_leftright::basic_leftright(in_place_t, Args &&...args) noexcept( std::is_nothrow_copy_constructible::value && std::is_nothrow_constructible::value) : m_left(std::forward(args)...), m_right(m_left) {} template template typename std::result_of::type basic_leftright::observe(F f) const noexcept(noexcept(f(std::declval()))) { std::size_t idx = m_registry_index.load(std::memory_order_acquire); scoped_read_indication sri(m_reader_registries[idx]); return read_left == m_leftright.load(std::memory_order_acquire) ? f(m_left) : f(m_right); } template template typename std::result_of::type basic_leftright::modify(F f) { static_assert(noexcept(f(std::declval())), "Modify functor must be noexcept"); std::unique_lock xlock(m_writemutex); if (read_left == m_leftright.load(std::memory_order_relaxed)) { f(m_right); m_leftright.store(read_right, std::memory_order_release); toggle_reader_registry(xlock); return f(m_left); } else { f(m_left); m_leftright.store(read_left, std::memory_order_release); toggle_reader_registry(xlock); return f(m_right); } } template template void basic_leftright::toggle_reader_registry(Lock &l) noexcept { assert(l); const std::size_t current = m_registry_index.load(std::memory_order_acquire); const std::size_t next = (current + 1) & 0x1; while (!m_reader_registries[next].empty()) { std::this_thread::yield(); } m_registry_index.store(next, std::memory_order_release); while (!m_reader_registries[current].empty()) { std::this_thread::yield(); } } template basic_leftright::scoped_read_indication::scoped_read_indication( R &r) noexcept : m_reg(r) { m_reg.arrive(); } template basic_leftright::scoped_read_indication::~scoped_read_indication() noexcept { m_reg.depart(); } inline void atomic_reader_registry::arrive() noexcept { m_count.fetch_add(1, std::memory_order_acq_rel); } inline void atomic_reader_registry::depart() noexcept { m_count.fetch_sub(1, std::memory_order_acq_rel); } inline bool atomic_reader_registry::empty() const noexcept { return 0 == m_count.load(std::memory_order_acquire); } template void distributed_atomic_reader_registry::arrive() noexcept { std::size_t index = Hasher()(std::this_thread::get_id()) % N; m_counters[index].incr(); } template void distributed_atomic_reader_registry::depart() noexcept { std::size_t index = Hasher()(std::this_thread::get_id()) % N; m_counters[index].decr(); } template bool distributed_atomic_reader_registry::empty() const noexcept { bool retval = std::none_of(begin(m_counters), end(m_counters), [](const counter &ctr) { return ctr.relaxed_read(); }); std::atomic_thread_fence(std::memory_order_acquire); return retval; } template void distributed_atomic_reader_registry::counter::incr() noexcept { (void) m_value.fetch_add(1, std::memory_order_acq_rel); } template void distributed_atomic_reader_registry::counter::decr() noexcept { (void) m_value.fetch_sub(1, std::memory_order_acq_rel); } template std::uint_fast32_t distributed_atomic_reader_registry::counter::relaxed_read() const noexcept { return m_value.load(std::memory_order_relaxed); } #endif }// namespace mpm