Loading src/sled/task_queue/pending_task_safety_flag.h +16 −4 Original line number Diff line number Diff line Loading @@ -8,6 +8,7 @@ #define SLED_TASK_QUEUE_PENDING_TASK_SAFETY_FLAG_H #pragma once #include "sled/meta/type_traits.h" #include "sled/ref_counted_base.h" #include "sled/scoped_refptr.h" #include "sled/synchronization/sequence_checker.h" Loading Loading @@ -53,11 +54,22 @@ private: scoped_refptr<PendingTaskSafetyFlag> flag_; }; inline std::function<void()> SafeTask(scoped_refptr<PendingTaskSafetyFlag> flag, std::function<void()> task) template<typename F, typename... Args, typename = EnableIfT<!std::is_void<InvokeResultT<F, Args...>>::value>> std::function<InvokeResultT<F, Args...>(Args &&...)> SafeTask(scoped_refptr<PendingTaskSafetyFlag> flag, F &&f) { return [flag, task]() mutable { if (flag->alive()) { std::move(task)(); } return [flag, f](Args &&...args) mutable -> InvokeResultT<F, Args...> { if (flag->alive()) { return f(std::forward<Args>(args)...); } return {}; }; } template<typename F, typename... Args, typename = EnableIfT<std::is_void<InvokeResultT<F, Args...>>::value>> std::function<void(Args &&...)> SafeTask(scoped_refptr<PendingTaskSafetyFlag> flag, F &&f) { return [flag, f](Args &&...args) mutable -> void { if (flag->alive()) { f(std::forward<Args>(args)...); } }; } Loading src/sled/timer/timer.cc +43 −9 Original line number Diff line number Diff line #include "sled/timer/timer.h" #include "sled/log/log.h" namespace sled { namespace { Loading @@ -9,6 +10,22 @@ MakeTimeoutId(TimerID timer_id, TimerGeneration generation) } }// namespace TimerThreadDeleter::TimerThreadDeleter(TaskQueueBase *owner) : owner_(owner) {} void TimerThreadDeleter::operator()(Timer *timer) { if (!timer) { return; } if (owner_) { owner_->PostTask([timer]() { timer->Stop(); delete timer; }); } else { delete timer; } } Timer::Timer(TimerID id, const std::string &name, OnExpired on_expired, Loading Loading @@ -53,6 +70,20 @@ Timer::Stop() } } void Timer::Start(TaskQueueBase *owner) { SLED_ASSERT(owner != nullptr, "owner must not be nullptr"); owner->BlockingCall([this]() { Start(); }); } void Timer::Stop(TaskQueueBase *owner) { SLED_ASSERT(owner != nullptr, "owner must not be nullptr"); owner->BlockingCall([this]() { Stop(); }); } void Timer::Trigger(TimerGeneration generation) { Loading Loading @@ -86,8 +117,11 @@ TimerManager::CreateTimer(const std::string &name, Timer::OnExpired on_expired) std::unique_ptr<Timeout> timeout = timeout_creator_(sled::TaskQueueBase::DelayPrecision::kHigh); auto timer = std::unique_ptr<Timer>(new Timer( id, name, std::move(on_expired), /* ungrgister_handler=*/[this, id]() { timers_.erase(id); }, std::move(timeout))); id, name, std::move(on_expired), /* ungrgister_handler=*/[this, id]() { timers_.erase(id); }, std::move(timeout))); timers_[id] = timer.get(); return timer; } Loading src/sled/timer/timer.h +22 −13 Original line number Diff line number Diff line Loading @@ -13,6 +13,16 @@ namespace sled { typedef uint64_t TimerID; typedef uint32_t TimerGeneration; class Timer; struct TimerThreadDeleter { TimerThreadDeleter(TaskQueueBase *owner); inline void operator()(Timer *timer); private: TaskQueueBase *owner_; }; class Timer { public: using OnExpired = std::function<sled::optional<DurationMs>()>; Loading @@ -22,9 +32,12 @@ public: void Start(); void Stop(); void Start(TaskQueueBase *owner); void Stop(TaskQueueBase *owner); void set_duration(DurationMs duration) { duration_ = duration; } const DurationMs &duration() const { return duration_; } const DurationMs duration() const { return duration_; } int expiration_count() const { return expiration_count_; } Loading @@ -46,7 +59,7 @@ private: const UnregisterHandler unregister_handler_; std::unique_ptr<Timeout> timeout_; DurationMs duration_; std::atomic<DurationMs> duration_; TimerGeneration generation_ = TimerGeneration(0); bool is_running_ = false; Loading @@ -54,16 +67,12 @@ private: }; class TimerManager { using TimeoutCreator = std::function<std::unique_ptr<Timeout>( sled::TaskQueueBase::DelayPrecision)>; using TimeoutCreator = std::function<std::unique_ptr<Timeout>(sled::TaskQueueBase::DelayPrecision)>; public: explicit TimerManager(TimeoutCreator timeout_creator) : timeout_creator_(timeout_creator) {} explicit TimerManager(TimeoutCreator timeout_creator) : timeout_creator_(timeout_creator) {} std::unique_ptr<Timer> CreateTimer(const std::string &name, Timer::OnExpired on_expired); std::unique_ptr<Timer> CreateTimer(const std::string &name, Timer::OnExpired on_expired); void HandleTimeout(TimeoutID timeout_id); private: Loading Loading
src/sled/task_queue/pending_task_safety_flag.h +16 −4 Original line number Diff line number Diff line Loading @@ -8,6 +8,7 @@ #define SLED_TASK_QUEUE_PENDING_TASK_SAFETY_FLAG_H #pragma once #include "sled/meta/type_traits.h" #include "sled/ref_counted_base.h" #include "sled/scoped_refptr.h" #include "sled/synchronization/sequence_checker.h" Loading Loading @@ -53,11 +54,22 @@ private: scoped_refptr<PendingTaskSafetyFlag> flag_; }; inline std::function<void()> SafeTask(scoped_refptr<PendingTaskSafetyFlag> flag, std::function<void()> task) template<typename F, typename... Args, typename = EnableIfT<!std::is_void<InvokeResultT<F, Args...>>::value>> std::function<InvokeResultT<F, Args...>(Args &&...)> SafeTask(scoped_refptr<PendingTaskSafetyFlag> flag, F &&f) { return [flag, task]() mutable { if (flag->alive()) { std::move(task)(); } return [flag, f](Args &&...args) mutable -> InvokeResultT<F, Args...> { if (flag->alive()) { return f(std::forward<Args>(args)...); } return {}; }; } template<typename F, typename... Args, typename = EnableIfT<std::is_void<InvokeResultT<F, Args...>>::value>> std::function<void(Args &&...)> SafeTask(scoped_refptr<PendingTaskSafetyFlag> flag, F &&f) { return [flag, f](Args &&...args) mutable -> void { if (flag->alive()) { f(std::forward<Args>(args)...); } }; } Loading
src/sled/timer/timer.cc +43 −9 Original line number Diff line number Diff line #include "sled/timer/timer.h" #include "sled/log/log.h" namespace sled { namespace { Loading @@ -9,6 +10,22 @@ MakeTimeoutId(TimerID timer_id, TimerGeneration generation) } }// namespace TimerThreadDeleter::TimerThreadDeleter(TaskQueueBase *owner) : owner_(owner) {} void TimerThreadDeleter::operator()(Timer *timer) { if (!timer) { return; } if (owner_) { owner_->PostTask([timer]() { timer->Stop(); delete timer; }); } else { delete timer; } } Timer::Timer(TimerID id, const std::string &name, OnExpired on_expired, Loading Loading @@ -53,6 +70,20 @@ Timer::Stop() } } void Timer::Start(TaskQueueBase *owner) { SLED_ASSERT(owner != nullptr, "owner must not be nullptr"); owner->BlockingCall([this]() { Start(); }); } void Timer::Stop(TaskQueueBase *owner) { SLED_ASSERT(owner != nullptr, "owner must not be nullptr"); owner->BlockingCall([this]() { Stop(); }); } void Timer::Trigger(TimerGeneration generation) { Loading Loading @@ -86,8 +117,11 @@ TimerManager::CreateTimer(const std::string &name, Timer::OnExpired on_expired) std::unique_ptr<Timeout> timeout = timeout_creator_(sled::TaskQueueBase::DelayPrecision::kHigh); auto timer = std::unique_ptr<Timer>(new Timer( id, name, std::move(on_expired), /* ungrgister_handler=*/[this, id]() { timers_.erase(id); }, std::move(timeout))); id, name, std::move(on_expired), /* ungrgister_handler=*/[this, id]() { timers_.erase(id); }, std::move(timeout))); timers_[id] = timer.get(); return timer; } Loading
src/sled/timer/timer.h +22 −13 Original line number Diff line number Diff line Loading @@ -13,6 +13,16 @@ namespace sled { typedef uint64_t TimerID; typedef uint32_t TimerGeneration; class Timer; struct TimerThreadDeleter { TimerThreadDeleter(TaskQueueBase *owner); inline void operator()(Timer *timer); private: TaskQueueBase *owner_; }; class Timer { public: using OnExpired = std::function<sled::optional<DurationMs>()>; Loading @@ -22,9 +32,12 @@ public: void Start(); void Stop(); void Start(TaskQueueBase *owner); void Stop(TaskQueueBase *owner); void set_duration(DurationMs duration) { duration_ = duration; } const DurationMs &duration() const { return duration_; } const DurationMs duration() const { return duration_; } int expiration_count() const { return expiration_count_; } Loading @@ -46,7 +59,7 @@ private: const UnregisterHandler unregister_handler_; std::unique_ptr<Timeout> timeout_; DurationMs duration_; std::atomic<DurationMs> duration_; TimerGeneration generation_ = TimerGeneration(0); bool is_running_ = false; Loading @@ -54,16 +67,12 @@ private: }; class TimerManager { using TimeoutCreator = std::function<std::unique_ptr<Timeout>( sled::TaskQueueBase::DelayPrecision)>; using TimeoutCreator = std::function<std::unique_ptr<Timeout>(sled::TaskQueueBase::DelayPrecision)>; public: explicit TimerManager(TimeoutCreator timeout_creator) : timeout_creator_(timeout_creator) {} explicit TimerManager(TimeoutCreator timeout_creator) : timeout_creator_(timeout_creator) {} std::unique_ptr<Timer> CreateTimer(const std::string &name, Timer::OnExpired on_expired); std::unique_ptr<Timer> CreateTimer(const std::string &name, Timer::OnExpired on_expired); void HandleTimeout(TimeoutID timeout_id); private: Loading