From 61ee4ebeef0d920512c6fff10642d54143e6403b Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Wed, 1 May 2024 14:14:57 +0800 Subject: [PATCH 1/3] feat support any safety task --- .../task_queue/pending_task_safety_flag.h | 28 ++++++++++++++++--- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/src/sled/task_queue/pending_task_safety_flag.h b/src/sled/task_queue/pending_task_safety_flag.h index 8c3b6e4..00b326b 100644 --- a/src/sled/task_queue/pending_task_safety_flag.h +++ b/src/sled/task_queue/pending_task_safety_flag.h @@ -8,6 +8,7 @@ #define SLED_TASK_QUEUE_PENDING_TASK_SAFETY_FLAG_H #pragma once +#include "sled/meta/type_traits.h" #include "sled/ref_counted_base.h" #include "sled/scoped_refptr.h" #include "sled/synchronization/sequence_checker.h" @@ -53,11 +54,30 @@ private: scoped_refptr flag_; }; -inline std::function -SafeTask(scoped_refptr flag, std::function task) +// inline std::function +// SafeTask(scoped_refptr flag, std::function task) +// { +// return [flag, task]() mutable { +// if (flag->alive()) { std::move(task)(); } +// }; +// } + +template>::value>> +std::function(Args &&...)> +SafeTask(scoped_refptr flag, F &&f) { - return [flag, task]() mutable { - if (flag->alive()) { std::move(task)(); } + return [flag, f](Args &&...args) mutable -> InvokeResultT { + if (flag->alive()) { return f(std::forward(args)...); } + return {}; + }; +} + +template>::value>> +std::function +SafeTask(scoped_refptr flag, F &&f) +{ + return [flag, f](Args &&...args) mutable -> void { + if (flag->alive()) { f(std::forward(args)...); } }; } From 3da9625ce210814cea4710990ba053aa02506f40 Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Wed, 1 May 2024 14:27:02 +0800 Subject: [PATCH 2/3] feat update --- .../task_queue/pending_task_safety_flag.h | 8 ------- src/sled/timer/timer.cc | 21 +++++++++++-------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/src/sled/task_queue/pending_task_safety_flag.h b/src/sled/task_queue/pending_task_safety_flag.h index 00b326b..502cdaa 100644 --- a/src/sled/task_queue/pending_task_safety_flag.h +++ b/src/sled/task_queue/pending_task_safety_flag.h @@ -54,14 +54,6 @@ private: scoped_refptr flag_; }; -// inline std::function -// SafeTask(scoped_refptr flag, std::function task) -// { -// return [flag, task]() mutable { -// if (flag->alive()) { std::move(task)(); } -// }; -// } - template>::value>> std::function(Args &&...)> SafeTask(scoped_refptr flag, F &&f) diff --git a/src/sled/timer/timer.cc b/src/sled/timer/timer.cc index 12cfff1..a6df078 100644 --- a/src/sled/timer/timer.cc +++ b/src/sled/timer/timer.cc @@ -47,9 +47,9 @@ Timer::Stop() { if (is_running()) { timeout_->Stop(); - generation_ = TimerGeneration(generation_ + 1); + generation_ = TimerGeneration(generation_ + 1); expiration_count_ = 0; - is_running_ = false; + is_running_ = false; } } @@ -81,23 +81,26 @@ Timer::Trigger(TimerGeneration generation) std::unique_ptr TimerManager::CreateTimer(const std::string &name, Timer::OnExpired on_expired) { - next_id_ = TimerID(next_id_ + 1); + next_id_ = TimerID(next_id_ + 1); TimerID id = next_id_; std::unique_ptr timeout = timeout_creator_(sled::TaskQueueBase::DelayPrecision::kHigh); - auto timer = std::unique_ptr(new Timer( - id, name, std::move(on_expired), - /* ungrgister_handler=*/[this, id]() { timers_.erase(id); }, std::move(timeout))); - timers_[id] = timer.get(); + auto timer = std::unique_ptr(new Timer( + id, + name, + std::move(on_expired), + /* ungrgister_handler=*/[this, id]() { timers_.erase(id); }, + std::move(timeout))); + timers_[id] = timer.get(); return timer; } void TimerManager::HandleTimeout(TimeoutID id) { - TimerID timer_id = id >> 32; + TimerID timer_id = id >> 32; TimerGeneration generation = id & 0xffffffff; - auto it = timers_.find(timer_id); + auto it = timers_.find(timer_id); if (it != timers_.end()) { it->second->Trigger(generation); } } }// namespace sled From 4656a7146d41e2ae3a7f832f1fbb373828c14b14 Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Wed, 1 May 2024 14:37:40 +0800 Subject: [PATCH 3/3] feat timer support Start/Stop in thread --- src/sled/timer/timer.cc | 31 +++++++++++++++++++++++++++++++ src/sled/timer/timer.h | 35 ++++++++++++++++++++++------------- 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/src/sled/timer/timer.cc b/src/sled/timer/timer.cc index a6df078..39b5174 100644 --- a/src/sled/timer/timer.cc +++ b/src/sled/timer/timer.cc @@ -1,4 +1,5 @@ #include "sled/timer/timer.h" +#include "sled/log/log.h" namespace sled { namespace { @@ -9,6 +10,22 @@ MakeTimeoutId(TimerID timer_id, TimerGeneration generation) } }// namespace +TimerThreadDeleter::TimerThreadDeleter(TaskQueueBase *owner) : owner_(owner) {} + +void +TimerThreadDeleter::operator()(Timer *timer) +{ + if (!timer) { return; } + if (owner_) { + owner_->PostTask([timer]() { + timer->Stop(); + delete timer; + }); + } else { + delete timer; + } +} + Timer::Timer(TimerID id, const std::string &name, OnExpired on_expired, @@ -53,6 +70,20 @@ Timer::Stop() } } +void +Timer::Start(TaskQueueBase *owner) +{ + SLED_ASSERT(owner != nullptr, "owner must not be nullptr"); + owner->BlockingCall([this]() { Start(); }); +} + +void +Timer::Stop(TaskQueueBase *owner) +{ + SLED_ASSERT(owner != nullptr, "owner must not be nullptr"); + owner->BlockingCall([this]() { Stop(); }); +} + void Timer::Trigger(TimerGeneration generation) { diff --git a/src/sled/timer/timer.h b/src/sled/timer/timer.h index 85e6a24..462f74f 100644 --- a/src/sled/timer/timer.h +++ b/src/sled/timer/timer.h @@ -13,18 +13,31 @@ namespace sled { typedef uint64_t TimerID; typedef uint32_t TimerGeneration; +class Timer; + +struct TimerThreadDeleter { + TimerThreadDeleter(TaskQueueBase *owner); + inline void operator()(Timer *timer); + +private: + TaskQueueBase *owner_; +}; + class Timer { public: - using OnExpired = std::function()>; - Timer(const Timer &) = delete; + using OnExpired = std::function()>; + Timer(const Timer &) = delete; Timer &operator=(const Timer &) = delete; ~Timer(); void Start(); void Stop(); + void Start(TaskQueueBase *owner); + void Stop(TaskQueueBase *owner); + void set_duration(DurationMs duration) { duration_ = duration; } - const DurationMs &duration() const { return duration_; } + const DurationMs duration() const { return duration_; } int expiration_count() const { return expiration_count_; } @@ -46,24 +59,20 @@ private: const UnregisterHandler unregister_handler_; std::unique_ptr timeout_; - DurationMs duration_; + std::atomic duration_; TimerGeneration generation_ = TimerGeneration(0); - bool is_running_ = false; - int expiration_count_ = 0; + bool is_running_ = false; + int expiration_count_ = 0; }; class TimerManager { - using TimeoutCreator = std::function( - sled::TaskQueueBase::DelayPrecision)>; + using TimeoutCreator = std::function(sled::TaskQueueBase::DelayPrecision)>; public: - explicit TimerManager(TimeoutCreator timeout_creator) - : timeout_creator_(timeout_creator) - {} + explicit TimerManager(TimeoutCreator timeout_creator) : timeout_creator_(timeout_creator) {} - std::unique_ptr CreateTimer(const std::string &name, - Timer::OnExpired on_expired); + std::unique_ptr CreateTimer(const std::string &name, Timer::OnExpired on_expired); void HandleTimeout(TimeoutID timeout_id); private: