From 56026fcfca51d3ce5f17f4e375f3cb12398c4f7e Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Mon, 1 Apr 2024 18:36:40 +0800 Subject: [PATCH] feat add event_bus --- CMakeLists.txt | 11 +- src/sled/event_bus/event_bus.h | 154 +++++++++++++++++++++++++ src/sled/event_bus/event_bus_test.cc | 65 +++++++++++ src/sled/sigslot.cc | 11 +- src/sled/sigslot.h | 161 +++++++++++---------------- src/sled/sled.h | 3 + src/sled/synchronization/mutex.h | 124 ++++++++++++++------- src/sled/timer/task_queue_timeout.cc | 2 +- 8 files changed, 391 insertions(+), 140 deletions(-) create mode 100644 src/sled/event_bus/event_bus.h create mode 100644 src/sled/event_bus/event_bus_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 85d3790..e6358e6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -134,8 +134,9 @@ function(sled_add_test) add_executable(${SLED_TEST_NAME} ${SLED_TEST_SRCS}) if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang") - set(EXTRA_FLAGS -Wthread-safety -g -fsanitize=address - -fno-omit-frame-pointer -fno-optimize-sibling-calls) + set(EXTRA_FLAGS # -Wthread-safety + -g -fsanitize=address -fno-omit-frame-pointer + -fno-optimize-sibling-calls) target_compile_options(${SLED_TEST_NAME} PRIVATE ${EXTRA_FLAGS}) target_link_options(${SLED_TEST_NAME} PRIVATE ${EXTRA_FLAGS}) @@ -179,11 +180,13 @@ if(SLED_BUILD_TESTS) src/sled/rx_test.cc src/sled/uri_test.cc) - sled_add_test(NAME sled_symbolize_test SRCS - src/sled/debugging/symbolize_test.cc NO_MAIN) + sled_add_test(NAME sled_event_bus_test SRCS + src/sled/event_bus/event_bus_test.cc) sled_add_test(NAME sled_lua_test SRCS tests/lua_test.cc) sled_add_test(NAME sled_move_on_copy_test SRCS src/sled/utility/move_on_copy_test.cc) + sled_add_test(NAME sled_symbolize_test SRCS + src/sled/debugging/symbolize_test.cc NO_MAIN) endif(SLED_BUILD_TESTS) if(SLED_BUILD_FUZZ) diff --git a/src/sled/event_bus/event_bus.h b/src/sled/event_bus/event_bus.h new file mode 100644 index 0000000..8554d47 --- /dev/null +++ b/src/sled/event_bus/event_bus.h @@ -0,0 +1,154 @@ +#ifndef SLED_EVENT_BUS_EVENT_BUS_H +#define SLED_EVENT_BUS_EVENT_BUS_H + +#include "sled/sigslot.h" +#include "sled/synchronization/mutex.h" +#include + +namespace sled { + +class EventBus; +class EventSubscriber; + +namespace { + +template +class EventRegistry { +public: + using Dispatcher = sigslot::signal1; + using SubscriberTable = std::unordered_map; + + static EventRegistry &Instance() + { + static EventRegistry instance_; + return instance_; + } + + static std::function &GetCleanupHandler() + { + static std::function cleanup_handler + = std::bind(&EventRegistry::OnBusDestroyed, &Instance(), std::placeholders::_1); + return cleanup_handler; + } + + void Post(EventBus *bus, Event event) + { + sled::SharedMutexReadLock lock(&shared_mutex_); + if (signals_.empty()) { return; } + auto iter = signals_.find(bus); + if (iter != signals_.end()) { iter->second(event); } + } + + template + void Subscribe(EventBus *bus, C *instance, void (C::*method)(Event)) + { + sled::SharedMutexWriteLock lock(&shared_mutex_); + auto iter = signals_.find(bus); + if (iter == signals_.end()) { + signals_.emplace(bus, Dispatcher()); + iter = signals_.find(bus); + } + auto &dispatcher = iter->second; + dispatcher.connect(instance, method); + } + + template + void Unsubscribe(EventBus *bus, C *instance) + { + sled::SharedMutexWriteLock lock(&shared_mutex_); + auto iter = signals_.find(bus); + if (iter == signals_.end()) { return; } + auto &dispatcher = iter->second; + dispatcher.disconnect(instance); + } + + bool IsEmpty(EventBus *bus) const + { + sled::SharedMutexReadLock lock(&shared_mutex_); + auto iter = signals_.find(bus); + if (iter == signals_.end()) { return true; } + return iter->second.is_empty(); + } + + void OnBusDestroyed(EventBus *bus) + { + sled::SharedMutexWriteLock lock(&shared_mutex_); + signals_.erase(bus); + } + + template + void OnSubscriberDestroyed(C *instance) + { + sled::SharedMutexWriteLock lock(&shared_mutex_); + for (auto &entry : signals_) { + auto &dispatcher = entry.second; + dispatcher.disconnect(instance); + } + } + +private: + mutable sled::SharedMutex shared_mutex_; + + SubscriberTable signals_; +}; + +}// namespace + +class EventSubscriber : public sigslot::has_slots<> { +public: + virtual ~EventSubscriber() {} +}; + +class EventBus { +public: + EventBus() = default; + + ~EventBus() + { + for (const auto &handler : cleanup_handlers_) { handler.second(this); } + } + + EventBus(const EventBus &) = delete; + EventBus &operator=(const EventBus &) = delete; + + template + void Post(const Event &event) + { + EventRegistry::Instance().Post(this, event); + } + + // On ([](const Event1 &){}) + template + typename std::enable_if::value>::type + Subscribe(C *instance, void (C::*method)(Event)) + { + { + sled::MutexLock lock(&mutex_); + cleanup_handlers_[std::type_index(typeid(Event))] = EventRegistry::GetCleanupHandler(); + } + + EventRegistry::Instance().Subscribe(this, instance, method); + } + + template + typename std::enable_if::value>::type Unsubscribe(C *instance) + { + EventRegistry::Instance().Unsubscribe(this, instance); + { + sled::MutexLock lock(&mutex_); + if (EventRegistry::Instance().IsEmpty(this)) { + auto iter = cleanup_handlers_.find(std::type_index(typeid(Event))); + if (iter != cleanup_handlers_.end()) { + iter->second(this); + cleanup_handlers_.erase(iter); + } + } + } + } + +private: + sled::Mutex mutex_; + std::unordered_map> cleanup_handlers_ GUARDED_BY(mutex_); +}; +}// namespace sled +#endif// SLED_EVENT_BUS_EVENT_BUS_H diff --git a/src/sled/event_bus/event_bus_test.cc b/src/sled/event_bus/event_bus_test.cc new file mode 100644 index 0000000..a1274c7 --- /dev/null +++ b/src/sled/event_bus/event_bus_test.cc @@ -0,0 +1,65 @@ +#include +#include + +struct Event1 { + int a; +}; + +struct Event2 { + std::string str; +}; + +struct Subscriber : public sled::EventSubscriber { + void OnEvent1(Event1 event) { a += event.a; } + + void OnEvent2(Event2 event) { str += event.str; } + + int a = 0; + std::string str = ""; +}; + +TEST_SUITE("EventBus") +{ + TEST_CASE("single thread") + { + sled::EventBus bus; + bus.Post(Event1{1}); + bus.Post(Event2{"1"}); + + Subscriber subscriber; + bus.Subscribe(&subscriber, &Subscriber::OnEvent1); + bus.Subscribe(&subscriber, &Subscriber::OnEvent2); + + bus.Post(Event1{1}); + bus.Post(Event2{"1"}); + + CHECK_EQ(subscriber.a, 1); + CHECK_EQ(subscriber.str, "1"); + + bus.Post(Event1{1}); + bus.Post(Event2{"1"}); + + CHECK_EQ(subscriber.a, 2); + CHECK_EQ(subscriber.str, "11"); + } + + TEST_CASE("multi thread") + { + auto thread = sled::Thread::Create(); + thread->Start(); + + sled::EventBus bus; + Subscriber subscriber; + + bus.Subscribe(&subscriber, &Subscriber::OnEvent1); + bus.Subscribe(&subscriber, &Subscriber::OnEvent2); + + thread->BlockingCall([&] { + bus.Post(Event1{1}); + bus.Post(Event2{"1"}); + }); + + CHECK_EQ(subscriber.a, 1); + CHECK_EQ(subscriber.str, "1"); + } +} diff --git a/src/sled/sigslot.cc b/src/sled/sigslot.cc index 4fa6b11..0438f60 100644 --- a/src/sled/sigslot.cc +++ b/src/sled/sigslot.cc @@ -4,12 +4,19 @@ namespace sigslot { #ifdef _SIGSLOT_HAS_POSIX_THREADS -pthread_mutex_t * +sled::Mutex * multi_threaded_global::get_mutex() { - static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; + static sled::Mutex g_mutex; return &g_mutex; } +// pthread_mutex_t * +// multi_threaded_global::get_mutex() +// { +// static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER; +// return &g_mutex; +// } + #endif// _SIGSLOT_HAS_POSIX_THREADS }// namespace sigslot diff --git a/src/sled/sigslot.h b/src/sled/sigslot.h index 6efb1d2..424d61e 100644 --- a/src/sled/sigslot.h +++ b/src/sled/sigslot.h @@ -94,10 +94,11 @@ // If signalx is single threaded the user must ensure that disconnect, connect // or signal is not happening concurrently or data race may occur. -#pragma once #ifndef SLED_SIGSLOT_H #define SLED_SIGSLOT_H +#pragma once +#include "sled/synchronization/mutex.h" #include #include #include @@ -105,9 +106,7 @@ // On our copy of sigslot.h, we set single threading as default. #define SIGSLOT_DEFAULT_MT_POLICY single_threaded -#if defined(SIGSLOT_PURE_ISO) \ - || (!defined(WEBRTC_WIN) && !defined(__GNUG__) \ - && !defined(SIGSLOT_USE_POSIX_THREADS)) +#if defined(SIGSLOT_PURE_ISO) || (!defined(WEBRTC_WIN) && !defined(__GNUG__) && !defined(SIGSLOT_USE_POSIX_THREADS)) #define _SIGSLOT_SINGLE_THREADED #elif defined(WEBRTC_WIN) #define _SIGSLOT_HAS_WIN32_THREADS @@ -167,10 +166,7 @@ class multi_threaded_local { public: multi_threaded_local() { InitializeCriticalSection(&m_critsec); } - multi_threaded_local(const multi_threaded_local &) - { - InitializeCriticalSection(&m_critsec); - } + multi_threaded_local(const multi_threaded_local &) { InitializeCriticalSection(&m_critsec); } ~multi_threaded_local() { DeleteCriticalSection(&m_critsec); } @@ -187,31 +183,49 @@ private: // The multi threading policies only get compiled in if they are enabled. class multi_threaded_global { public: - void lock() { pthread_mutex_lock(get_mutex()); } + void lock() + { + get_mutex()->Lock(); + // pthread_mutex_lock(get_mutex()); + } - void unlock() { pthread_mutex_unlock(get_mutex()); } + void unlock() + { + get_mutex()->Unlock(); + // pthread_mutex_unlock(get_mutex()); + } private: - static pthread_mutex_t *get_mutex(); + static sled::Mutex *get_mutex(); + // static pthread_mutex_t *get_mutex(); }; class multi_threaded_local { public: - multi_threaded_local() { pthread_mutex_init(&m_mutex, nullptr); } + // multi_threaded_local() { pthread_mutex_init(&m_mutex, nullptr); } + // + // multi_threaded_local(const multi_threaded_local &) + // { + // pthread_mutex_init(&m_mutex, nullptr); + // } + // + // ~multi_threaded_local() { pthread_mutex_destroy(&m_mutex); } - multi_threaded_local(const multi_threaded_local &) + void lock() { - pthread_mutex_init(&m_mutex, nullptr); + mutex_.Lock(); + // pthread_mutex_lock(&m_mutex); } - ~multi_threaded_local() { pthread_mutex_destroy(&m_mutex); } - - void lock() { pthread_mutex_lock(&m_mutex); } - - void unlock() { pthread_mutex_unlock(&m_mutex); } + void unlock() + { + mutex_.Unlock(); + // pthread_mutex_unlock(&m_mutex); + } private: - pthread_mutex_t m_mutex; + sled::Mutex mutex_; + // pthread_mutex_t m_mutex; }; #endif// _SIGSLOT_HAS_POSIX_THREADS @@ -229,10 +243,8 @@ class _signal_base_interface; class has_slots_interface { private: - typedef void (*signal_connect_t)(has_slots_interface *self, - _signal_base_interface *sender); - typedef void (*signal_disconnect_t)(has_slots_interface *self, - _signal_base_interface *sender); + typedef void (*signal_connect_t)(has_slots_interface *self, _signal_base_interface *sender); + typedef void (*signal_disconnect_t)(has_slots_interface *self, _signal_base_interface *sender); typedef void (*disconnect_all_t)(has_slots_interface *self); const signal_connect_t m_signal_connect; @@ -240,9 +252,7 @@ private: const disconnect_all_t m_disconnect_all; protected: - has_slots_interface(signal_connect_t conn, - signal_disconnect_t disc, - disconnect_all_t disc_all) + has_slots_interface(signal_connect_t conn, signal_disconnect_t disc, disconnect_all_t disc_all) : m_signal_connect(conn), m_signal_disconnect(disc), m_disconnect_all(disc_all) @@ -253,23 +263,16 @@ protected: virtual ~has_slots_interface() {} public: - void signal_connect(_signal_base_interface *sender) - { - m_signal_connect(this, sender); - } + void signal_connect(_signal_base_interface *sender) { m_signal_connect(this, sender); } - void signal_disconnect(_signal_base_interface *sender) - { - m_signal_disconnect(this, sender); - } + void signal_disconnect(_signal_base_interface *sender) { m_signal_disconnect(this, sender); } void disconnect_all() { m_disconnect_all(this); } }; class _signal_base_interface { private: - typedef void (*slot_disconnect_t)(_signal_base_interface *self, - has_slots_interface *pslot); + typedef void (*slot_disconnect_t)(_signal_base_interface *self, has_slots_interface *pslot); typedef void (*slot_duplicate_t)(_signal_base_interface *self, const has_slots_interface *poldslot, has_slots_interface *pnewslot); @@ -286,13 +289,9 @@ protected: ~_signal_base_interface() {} public: - void slot_disconnect(has_slots_interface *pslot) - { - m_slot_disconnect(this, pslot); - } + void slot_disconnect(has_slots_interface *pslot) { m_slot_disconnect(this, pslot); } - void slot_duplicate(const has_slots_interface *poldslot, - has_slots_interface *pnewslot) + void slot_duplicate(const has_slots_interface *poldslot, has_slots_interface *pnewslot) { m_slot_duplicate(this, poldslot, pnewslot); } @@ -323,15 +322,14 @@ public: _opaque_connection(DestT *pd, void (DestT::*pm)(Args...)) : pdest(pd) { typedef void (DestT::*pm_t)(Args...); - static_assert(sizeof(pm_t) <= sizeof(pmethod), - "Size of slot function pointer too large."); + static_assert(sizeof(pm_t) <= sizeof(pmethod), "Size of slot function pointer too large."); std::memcpy(pmethod, &pm, sizeof(pm_t)); typedef void (*em_t)(const _opaque_connection *self, Args...); union_caster caster2; caster2.from = &_opaque_connection::emitter; - pemit = caster2.to; + pemit = caster2.to; } has_slots_interface *getdest() const { return pdest; } @@ -339,7 +337,7 @@ public: _opaque_connection duplicate(has_slots_interface *newtarget) const { _opaque_connection res = *this; - res.pdest = newtarget; + res.pdest = newtarget; return res; } @@ -360,8 +358,7 @@ private: { typedef void (DestT::*pm_t)(Args...); pm_t pm; - static_assert(sizeof(pm_t) <= sizeof(pmethod), - "Size of slot function pointer too large."); + static_assert(sizeof(pm_t) <= sizeof(pmethod), "Size of slot function pointer too large."); std::memcpy(&pm, self->pmethod, sizeof(pm_t)); (static_cast(self->pdest)->*(pm))(args...); } @@ -373,8 +370,7 @@ protected: typedef std::list<_opaque_connection> connections_list; _signal_base() - : _signal_base_interface(&_signal_base::do_slot_disconnect, - &_signal_base::do_slot_duplicate), + : _signal_base_interface(&_signal_base::do_slot_disconnect, &_signal_base::do_slot_duplicate), m_current_iterator(m_connected_slots.end()) {} @@ -385,8 +381,7 @@ private: public: _signal_base(const _signal_base &o) - : _signal_base_interface(&_signal_base::do_slot_disconnect, - &_signal_base::do_slot_duplicate), + : _signal_base_interface(&_signal_base::do_slot_disconnect, &_signal_base::do_slot_duplicate), m_current_iterator(m_connected_slots.end()) { lock_block lock(this); @@ -409,8 +404,7 @@ public: while (!m_connected_slots.empty()) { has_slots_interface *pdest = m_connected_slots.front().getdest(); m_connected_slots.pop_front(); - pdest->signal_disconnect( - static_cast<_signal_base_interface *>(this)); + pdest->signal_disconnect(static_cast<_signal_base_interface *>(this)); } // If disconnect_all is called while the signal is firing, advance the // current slot iterator to the end to avoid an invalidated iterator from @@ -422,7 +416,7 @@ public: bool connected(has_slots_interface *pclass) { lock_block lock(this); - connections_list::const_iterator it = m_connected_slots.begin(); + connections_list::const_iterator it = m_connected_slots.begin(); connections_list::const_iterator itEnd = m_connected_slots.end(); while (it != itEnd) { if (it->getdest() == pclass) return true; @@ -435,7 +429,7 @@ public: void disconnect(has_slots_interface *pclass) { lock_block lock(this); - connections_list::iterator it = m_connected_slots.begin(); + connections_list::iterator it = m_connected_slots.begin(); connections_list::iterator itEnd = m_connected_slots.end(); while (it != itEnd) { @@ -447,8 +441,7 @@ public: } else { m_connected_slots.erase(it); } - pclass->signal_disconnect( - static_cast<_signal_base_interface *>(this)); + pclass->signal_disconnect(static_cast<_signal_base_interface *>(this)); return; } ++it; @@ -456,12 +449,11 @@ public: } private: - static void do_slot_disconnect(_signal_base_interface *p, - has_slots_interface *pslot) + static void do_slot_disconnect(_signal_base_interface *p, has_slots_interface *pslot) { _signal_base *const self = static_cast<_signal_base *>(p); lock_block lock(self); - connections_list::iterator it = self->m_connected_slots.begin(); + connections_list::iterator it = self->m_connected_slots.begin(); connections_list::iterator itEnd = self->m_connected_slots.end(); while (it != itEnd) { @@ -472,8 +464,7 @@ private: // If we're currently using this iterator because the signal is firing, // advance it to avoid it being invalidated. if (self->m_current_iterator == it) { - self->m_current_iterator = - self->m_connected_slots.erase(it); + self->m_current_iterator = self->m_connected_slots.erase(it); } else { self->m_connected_slots.erase(it); } @@ -483,19 +474,16 @@ private: } } - static void do_slot_duplicate(_signal_base_interface *p, - const has_slots_interface *oldtarget, - has_slots_interface *newtarget) + static void + do_slot_duplicate(_signal_base_interface *p, const has_slots_interface *oldtarget, has_slots_interface *newtarget) { _signal_base *const self = static_cast<_signal_base *>(p); lock_block lock(self); - connections_list::iterator it = self->m_connected_slots.begin(); + connections_list::iterator it = self->m_connected_slots.begin(); connections_list::iterator itEnd = self->m_connected_slots.end(); while (it != itEnd) { - if (it->getdest() == oldtarget) { - self->m_connected_slots.push_back(it->duplicate(newtarget)); - } + if (it->getdest() == oldtarget) { self->m_connected_slots.push_back(it->duplicate(newtarget)); } ++it; } @@ -540,16 +528,14 @@ public: private: has_slots &operator=(has_slots const &); - static void do_signal_connect(has_slots_interface *p, - _signal_base_interface *sender) + static void do_signal_connect(has_slots_interface *p, _signal_base_interface *sender) { has_slots *const self = static_cast(p); lock_block lock(self); self->m_senders.insert(sender); } - static void do_signal_disconnect(has_slots_interface *p, - _signal_base_interface *sender) + static void do_signal_disconnect(has_slots_interface *p, _signal_base_interface *sender) { has_slots *const self = static_cast(p); lock_block lock(self); @@ -563,7 +549,7 @@ private: while (!self->m_senders.empty()) { std::set<_signal_base_interface *> senders; senders.swap(self->m_senders); - const_iterator it = senders.begin(); + const_iterator it = senders.begin(); const_iterator itEnd = senders.end(); while (it != itEnd) { @@ -627,22 +613,13 @@ using signal0 = signal_with_thread_policy; template using signal1 = signal_with_thread_policy; -template +template using signal2 = signal_with_thread_policy; -template +template using signal3 = signal_with_thread_policy; -template +template using signal4 = signal_with_thread_policy; template -using signal7 = - signal_with_thread_policy; +using signal7 = signal_with_thread_policy; template -using signal8 = - signal_with_thread_policy; +using signal8 = signal_with_thread_policy; }// namespace sigslot -#endif // SLED_SIGSLOT_H +#endif// SLED_SIGSLOT_H diff --git a/src/sled/sled.h b/src/sled/sled.h index b7f67ce..515a236 100644 --- a/src/sled/sled.h +++ b/src/sled/sled.h @@ -9,6 +9,9 @@ namespace async {} #include "inja.hpp" #include "rx.h" +// event_bus +#include "sled/event_bus/event_bus.h" + // filesystem #include "sled/filesystem/path.h" #include "sled/filesystem/temporary_file.h" diff --git a/src/sled/synchronization/mutex.h b/src/sled/synchronization/mutex.h index 1b029a6..c9e1a06 100644 --- a/src/sled/synchronization/mutex.h +++ b/src/sled/synchronization/mutex.h @@ -146,46 +146,90 @@ private: marl::ConditionVariable cv_; }; -// class ConditionVariable final { -// public: -// static constexpr TimeDelta kForever = TimeDelta::PlusInfinity(); -// ConditionVariable() = default; -// ConditionVariable(const ConditionVariable &) = delete; -// ConditionVariable &operator=(const ConditionVariable &) = delete; -// -// template -// inline bool Wait(LockGuard &guard, Predicate pred) -// { -// std::unique_lock lock(guard.mutex_->impl_, std::adopt_lock); -// cv_.wait(lock, pred); -// return true; -// } -// -// template -// inline bool -// WaitFor(LockGuard &guard, TimeDelta timeout, Predicate pred) -// { -// std::unique_lock lock(guard.mutex_->impl_, std::adopt_lock); -// if (timeout == kForever) { -// cv_.wait(lock, pred); -// return true; -// } else { -// return cv_.wait_for(lock, std::chrono::milliseconds(timeout.ms()), -// pred); -// } -// } -// -// // template -// // bool WaitUntil(Mutex *mutex, TimeDelta timeout, Predicate pred) -// // {} -// -// inline void NotifyOne() { cv_.notify_one(); } -// -// inline void NotifyAll() { cv_.notify_all(); } -// -// private: -// std::condition_variable cv_; -// }; +class SCOPED_CAPABILITY SharedMutex final { +public: + enum class Mode { + kReaderPriority, + kWriterPriority, + }; + + inline SharedMutex(Mode mode = SharedMutex::Mode::kWriterPriority) : mode_(mode) {} + + inline void Lock() SLED_EXCLUSIVE_LOCK_FUNCTION() + { + wait_w_count_.fetch_add(1); + + sled::MutexLock lock(&mutex_); + if (Mode::kReaderPriority == mode_) { + // 读取优先,必须在没有任何读取的消费者的情况下才能持有锁 + cv_.Wait(lock, [this] { return r_count_ == 0 && w_count_ == 0 && wait_r_count_.load() == 0; }); + w_count_++; + } else { + // 写入优先,只要没有持有读锁的消费者,就可以加锁 + cv_.Wait(lock, [this] { return r_count_ == 0 && w_count_ == 0; }); + w_count_++; + cv_.Wait(lock, [this] { return r_count_ == 0; }); + } + wait_w_count_.fetch_sub(1); + } + + inline void Unlock() SLED_UNLOCK_FUNCTION() + { + sled::MutexLock lock(&mutex_); + w_count_--; + if (w_count_ == 0) { cv_.NotifyAll(); } + } + + inline void LockShared() SLED_SHARED_LOCK_FUNCTION() + { + wait_r_count_.fetch_add(1); + sled::MutexLock lock(&mutex_); + if (Mode::kReaderPriority == mode_) { + cv_.Wait(lock, [this] { return w_count_ == 0; }); + r_count_++; + } else { + cv_.Wait(lock, [this] { return w_count_ == 0 && wait_w_count_.load() == 0; }); + r_count_++; + } + wait_r_count_.fetch_sub(1); + } + + inline void UnlockShared() SLED_UNLOCK_FUNCTION() + { + sled::MutexLock lock(&mutex_); + r_count_--; + if (r_count_ == 0) { cv_.NotifyAll(); } + } + +private: + const Mode mode_; + sled::Mutex mutex_; + sled::ConditionVariable cv_; + int r_count_{0}; + int w_count_{0}; + std::atomic wait_r_count_{0}; + std::atomic wait_w_count_{0}; +}; + +class SharedMutexReadLock final { +public: + explicit SharedMutexReadLock(SharedMutex *mutex) : mutex_(mutex) { mutex_->LockShared(); } + + ~SharedMutexReadLock() { mutex_->UnlockShared(); } + +private: + SharedMutex *mutex_; +}; + +class SharedMutexWriteLock final { +public: + explicit SharedMutexWriteLock(SharedMutex *mutex) : mutex_(mutex) { mutex_->Lock(); } + + ~SharedMutexWriteLock() { mutex_->Unlock(); } + +private: + SharedMutex *mutex_; +}; }// namespace sled diff --git a/src/sled/timer/task_queue_timeout.cc b/src/sled/timer/task_queue_timeout.cc index 869b88f..4ddeb32 100644 --- a/src/sled/timer/task_queue_timeout.cc +++ b/src/sled/timer/task_queue_timeout.cc @@ -41,7 +41,7 @@ TaskQueueTimeoutFactory::TaskQueueTimeout::Start(DurationMs duration_ms, Timeout precision_, SafeTask(safety_flag_, [timeout_id, this]() { - LOGV("timer", "Timeout expired: {}", timeout_id); + LOGV("timer", "Timeout expired id={}", timeout_id); SLED_DCHECK_RUN_ON(&parent_.thread_checker_); SLED_DCHECK(posted_task_expiration_ != std::numeric_limits::max(), ""); posted_task_expiration_ = std::numeric_limits::max();