feat add Timer
Some checks failed
rpcrypto-build / build (Debug, hisiv510.toolchain.cmake) (push) Failing after 59s
linux-x64-gcc / linux-gcc (push) Failing after 56s
rpcrypto-build / build (Release, hisiv510.toolchain.cmake) (push) Failing after 1m1s
linux-mips64-gcc / linux-gcc-mips64el (push) Failing after 1m1s
rpcrypto-build / build (Release, himix200.toolchain.cmake) (push) Failing after 1m17s
rpcrypto-build / build (Debug, himix200.toolchain.cmake) (push) Failing after 1m8s
linux-hisiv500-gcc / linux-gcc-hisiv500 (push) Failing after 1m25s
Some checks failed
rpcrypto-build / build (Debug, hisiv510.toolchain.cmake) (push) Failing after 59s
linux-x64-gcc / linux-gcc (push) Failing after 56s
rpcrypto-build / build (Release, hisiv510.toolchain.cmake) (push) Failing after 1m1s
linux-mips64-gcc / linux-gcc-mips64el (push) Failing after 1m1s
rpcrypto-build / build (Release, himix200.toolchain.cmake) (push) Failing after 1m17s
rpcrypto-build / build (Debug, himix200.toolchain.cmake) (push) Failing after 1m8s
linux-hisiv500-gcc / linux-gcc-hisiv500 (push) Failing after 1m25s
This commit is contained in:
parent
59197cd6ad
commit
90e10881d3
@ -20,7 +20,9 @@ set(CMAKE_POSITION_INDEPENDENT_CODE ON)
|
||||
if (ULIB_SHARED_LIB)
|
||||
add_library(${PROJECT_NAME} SHARED "")
|
||||
else()
|
||||
add_library(${PROJECT_NAME} STATIC "")
|
||||
add_library(${PROJECT_NAME} STATIC ""
|
||||
src/ulib/system/timer.cpp
|
||||
src/ulib/system/timer.h)
|
||||
endif()
|
||||
target_sources(${PROJECT_NAME} PRIVATE
|
||||
src/ulib/base/location.h
|
||||
@ -43,6 +45,8 @@ target_sources(${PROJECT_NAME} PRIVATE
|
||||
src/ulib/system/thread.cpp
|
||||
src/ulib/system/thread_pool.h
|
||||
src/ulib/system/thread_pool.cpp
|
||||
src/ulib/system/timer.h
|
||||
src/ulib/system/timer.cpp
|
||||
)
|
||||
|
||||
find_package(Threads REQUIRED)
|
||||
|
@ -123,4 +123,10 @@ Thread::Tid() const
|
||||
ULOG_ASSERT(impl_, "impl_ is null");
|
||||
return impl_->Tid();
|
||||
}
|
||||
|
||||
void
|
||||
Thread::Sleep(uint64_t usec)
|
||||
{
|
||||
usleep(usec);
|
||||
}
|
||||
}// namespace ulib
|
||||
|
@ -17,6 +17,8 @@ public:
|
||||
void Start();
|
||||
int Join();
|
||||
|
||||
static void Sleep(uint64_t usec);
|
||||
|
||||
bool Started() const;
|
||||
pid_t Tid() const;
|
||||
const std::string &name() const;
|
||||
|
189
src/ulib/system/timer.cpp
Normal file
189
src/ulib/system/timer.cpp
Normal file
@ -0,0 +1,189 @@
|
||||
//
|
||||
// Created by Feng Zhang on 2023/12/27.
|
||||
//
|
||||
|
||||
#include "timer.h"
|
||||
#include <unistd.h>
|
||||
#include <mutex>
|
||||
#include <sys/time.h>
|
||||
#include "ulib/log/log.h"
|
||||
|
||||
namespace ulib {
|
||||
|
||||
uint64_t
|
||||
TimeNowInMicroSeconds()
|
||||
{
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, NULL);
|
||||
return tv.tv_sec * 1000000ULL + tv.tv_usec;
|
||||
}
|
||||
|
||||
std::atomic_uint64_t Timer::next_timer_id_(1);
|
||||
|
||||
Timer::Timer(uint64_t when, uint64_t interval)
|
||||
: timer_id_(next_timer_id_.fetch_add(1)),
|
||||
when_(when),
|
||||
interval_(interval),
|
||||
auto_reset_(interval_ > 0),
|
||||
expire_callback_()
|
||||
{}
|
||||
|
||||
TimerManager::TimerManager()
|
||||
: stopped_(false),
|
||||
thread_(std::bind(&TimerManager::Loop, this), "timer_manager_thread"),
|
||||
timers_lock_(),
|
||||
timers_cond_(),
|
||||
timers_(),
|
||||
timer_inserting_set_lock_(),
|
||||
timer_inserting_set_(),
|
||||
timer_canceling_set_lock_(),
|
||||
timer_canceling_set_()
|
||||
{}
|
||||
|
||||
TimerId
|
||||
TimerManager::AddTimerImpl(const Timer::ExpireCallback &cb,
|
||||
uint64_t run_after_ms,
|
||||
uint64_t interval)
|
||||
{
|
||||
MutexGuard guard(timer_inserting_set_lock_);
|
||||
auto *timer =
|
||||
new Timer(TimeNowInMicroSeconds() + run_after_ms * 1000, interval * 1000);
|
||||
timer->on_expire() = cb;
|
||||
if (!timer_inserting_set_.insert({timer->when(), timer}).second) {
|
||||
ULOG_ERROR("system.timer_manager", "Add timer failed, already exists timer_id={}", timer->id());
|
||||
delete timer;
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (timers_lock_.TryLock()) {
|
||||
timers_cond_.NotifyAll();
|
||||
timers_lock_.Unlock();
|
||||
}
|
||||
return timer->id();
|
||||
}
|
||||
|
||||
void
|
||||
TimerManager::CancelTimerImpl(TimerId timer_id)
|
||||
{
|
||||
MutexGuard guard(timer_canceling_set_lock_);
|
||||
timer_canceling_set_.insert(timer_id);
|
||||
if (timers_lock_.TryLock()) {
|
||||
timers_cond_.NotifyAll();
|
||||
timers_lock_.Unlock();
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
TimerManager::Loop()
|
||||
{
|
||||
while (!stopped_) {
|
||||
// ULOG_TRACE("system.timer_manager", "now: {}", TimeNowInMicroSeconds());
|
||||
// Thread::Sleep(3000000);
|
||||
MutexGuard timers_guard(timers_lock_);
|
||||
// add timer
|
||||
{
|
||||
std::set<Entry> timer_inserting_set;
|
||||
{
|
||||
MutexGuard insert_guard(timer_inserting_set_lock_);
|
||||
timer_inserting_set.swap(timer_inserting_set_);
|
||||
}
|
||||
timers_.insert(timer_inserting_set.begin(),
|
||||
timer_inserting_set.end());
|
||||
}
|
||||
// cancel timer
|
||||
{
|
||||
// TODO imporve
|
||||
MutexGuard cancel_guard(timer_canceling_set_lock_);
|
||||
for (auto iter = timers_.begin(); !timer_canceling_set_.empty() && iter != timers_.end();) {
|
||||
if (timer_canceling_set_.find(iter->second->id())
|
||||
!= timer_canceling_set_.end()) {
|
||||
delete iter->second;
|
||||
iter = timers_.erase(iter);
|
||||
} else {
|
||||
++iter;
|
||||
}
|
||||
}
|
||||
timer_canceling_set_.clear();
|
||||
}
|
||||
|
||||
uint64_t now = TimeNowInMicroSeconds();
|
||||
// check timers
|
||||
// ULOG_TRACE("system.timer_manager","wait for {}-{}={} micro seconds", timers_.begin()->first, now, timers_.begin()->first - now);
|
||||
if (timers_.empty()) {
|
||||
timers_cond_.WaitForMilliseconds(timers_guard, 1000);
|
||||
} else if (timers_.begin()->first > now) {
|
||||
do {
|
||||
uint64_t wait_time_ms = (timers_.begin()->first - now) / 1000;
|
||||
timers_cond_.WaitForMilliseconds(timers_guard, wait_time_ms);
|
||||
now = TimeNowInMicroSeconds();
|
||||
} while(timers_.begin()->first > now);
|
||||
} else {
|
||||
ULOG_TRACE("system.timer_manager", "need trigger set size: {}", timers_.size());
|
||||
std::set<Entry> expired_timers;
|
||||
while (!timers_.empty() && timers_.begin()->first <= now) {
|
||||
expired_timers.insert(*timers_.begin());
|
||||
timers_.erase(timers_.begin());
|
||||
}
|
||||
|
||||
for (auto iter = expired_timers.begin();
|
||||
iter != expired_timers.end(); ++iter) {
|
||||
uint64_t when = iter->first;
|
||||
Timer* timer = iter->second;
|
||||
try {
|
||||
timer->on_expire()();
|
||||
if (timer->auto_reset()) {
|
||||
timers_.insert({when + timer->interval(), timer});
|
||||
} else {
|
||||
delete timer;
|
||||
}
|
||||
} catch (const std::exception &e) {
|
||||
ULOG_ERROR("system.timer",
|
||||
"timer crash: timer_id={}, reason: {}",
|
||||
iter->second->id(), e.what());
|
||||
} catch (...) {
|
||||
// do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool
|
||||
TimerManager::Initialize()
|
||||
{
|
||||
ULOG_ASSERT(!thread_.Started(), "double initialize");
|
||||
thread_.Start();
|
||||
return thread_.Started();
|
||||
}
|
||||
|
||||
TimerManager::~TimerManager()
|
||||
{
|
||||
stopped_ = true;
|
||||
thread_.Join();
|
||||
}
|
||||
|
||||
TimerManager &
|
||||
TimerManager::Instance()
|
||||
{
|
||||
static TimerManager timer_manager;
|
||||
static std::once_flag init_flag;
|
||||
std::call_once(init_flag, [&]{
|
||||
timer_manager.Initialize();
|
||||
});
|
||||
return timer_manager;
|
||||
}
|
||||
|
||||
TimerId
|
||||
TimerManager::AddTimer(const Timer::ExpireCallback &cb,
|
||||
uint64_t run_after_ms,
|
||||
uint64_t interval)
|
||||
{
|
||||
return Instance().AddTimerImpl(cb, run_after_ms, interval);
|
||||
}
|
||||
|
||||
void
|
||||
TimerManager::CancelTimer(TimerId timer_id)
|
||||
{
|
||||
Instance().CancelTimerImpl(timer_id);
|
||||
}
|
||||
}// namespace ulib
|
74
src/ulib/system/timer.h
Normal file
74
src/ulib/system/timer.h
Normal file
@ -0,0 +1,74 @@
|
||||
#ifndef ULIB_SRC_ULIB_SYSTEM_TIMER_H_
|
||||
#define ULIB_SRC_ULIB_SYSTEM_TIMER_H_
|
||||
|
||||
#include "ulib/base/noncopyable.h"
|
||||
#include "ulib/base/types.h"
|
||||
#include "ulib/concorrency/mutex.h"
|
||||
#include "ulib/concorrency/condition_variable.h"
|
||||
#include "ulib/system/thread.h"
|
||||
#include <functional>
|
||||
#include <atomic>
|
||||
#include <set>
|
||||
|
||||
namespace ulib {
|
||||
|
||||
using TimerId = uint64_t;
|
||||
|
||||
class Timer {
|
||||
public:
|
||||
using ExpireCallback = std::function<void()>;
|
||||
Timer(uint64_t when, uint64_t intarval);
|
||||
// void Start();
|
||||
// void Stop();
|
||||
ExpireCallback & on_expire() { return expire_callback_; }
|
||||
const TimerId id() const { return timer_id_; }
|
||||
uint64_t when() const { return when_; }
|
||||
uint64_t interval() const { return interval_; }
|
||||
bool auto_reset() const { return auto_reset_; }
|
||||
bool operator < (const Timer& rhs) const { return timer_id_ < rhs.timer_id_; }
|
||||
bool operator == (const Timer& rhs) const { return timer_id_ == rhs.timer_id_; }
|
||||
|
||||
private:
|
||||
const TimerId timer_id_;
|
||||
// micro seconds
|
||||
uint64_t when_;
|
||||
// micro seconds
|
||||
uint64_t interval_;
|
||||
bool auto_reset_;
|
||||
ExpireCallback expire_callback_;
|
||||
static std::atomic_uint64_t next_timer_id_;
|
||||
};
|
||||
|
||||
|
||||
|
||||
class TimerManager : NonCopyable {
|
||||
public:
|
||||
using Entry = std::pair<uint64_t, Timer*>;
|
||||
TimerManager();
|
||||
~TimerManager();
|
||||
static TimerManager& Instance();
|
||||
bool Initialize();
|
||||
static TimerId AddTimer(const Timer::ExpireCallback& cb, uint64_t run_after_ms, uint64_t interval = 0);
|
||||
static void CancelTimer(TimerId timer_id);
|
||||
|
||||
void Loop();
|
||||
private:
|
||||
TimerId AddTimerImpl(const Timer::ExpireCallback& cb, uint64_t run_after_ms, uint64_t interval = 0);
|
||||
void CancelTimerImpl(TimerId timer_id);
|
||||
friend class Timer;
|
||||
bool stopped_;
|
||||
Thread thread_;
|
||||
|
||||
Mutex timers_lock_;
|
||||
ConditionVariable timers_cond_;
|
||||
std::set<Entry> timers_;
|
||||
|
||||
Mutex timer_inserting_set_lock_;
|
||||
std::set<Entry> timer_inserting_set_;
|
||||
Mutex timer_canceling_set_lock_;
|
||||
std::set<TimerId> timer_canceling_set_;
|
||||
};
|
||||
|
||||
} // namespace ulib
|
||||
|
||||
#endif//ULIB_SRC_ULIB_SYSTEM_TIMER_H_
|
@ -9,6 +9,7 @@ add_executable(ulib_test
|
||||
ulib/concorrency/countdown_latch_unittest.cpp
|
||||
ulib/system/thread_unittest.cpp
|
||||
ulib/system/thread_pool_unittest.cpp
|
||||
ulib/system/timer_unittest.cpp
|
||||
)
|
||||
target_link_libraries(ulib_test PRIVATE
|
||||
ulib
|
||||
|
36
tests/ulib/system/timer_unittest.cpp
Normal file
36
tests/ulib/system/timer_unittest.cpp
Normal file
@ -0,0 +1,36 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <ulib/system/timer.h>
|
||||
#include <ulib/system/thread.h>
|
||||
#include <ulib/concorrency/countdown_latch.h>
|
||||
#include <ulib/log/log.h>
|
||||
|
||||
class TimerTest : public ::testing::Test {
|
||||
protected:
|
||||
void SetUp() override { latch_ = nullptr; }
|
||||
void TearDown() override { delete latch_; }
|
||||
|
||||
protected:
|
||||
ulib::CountDownLatch *latch_;
|
||||
};
|
||||
|
||||
TEST_F(TimerTest, OnceTrigger) {
|
||||
latch_ = new ulib::CountDownLatch(1);
|
||||
ulib::TimerId timer_id = ulib::TimerManager::AddTimer([&]{
|
||||
latch_->CountDown();
|
||||
}, 1000);
|
||||
latch_->Await();
|
||||
ulib::TimerManager::CancelTimer(timer_id);
|
||||
}
|
||||
|
||||
TEST_F(TimerTest, PeriodTrigger) {
|
||||
latch_ = new ulib::CountDownLatch(3);
|
||||
std::atomic_int cnt(3);
|
||||
ulib::TimerId timer_id = ulib::TimerManager::AddTimer([&]{
|
||||
if (cnt.fetch_sub(1) > 0) {
|
||||
ULOG_INFO("timer.unittest", "start count down: {}", cnt);
|
||||
latch_->CountDown();
|
||||
}
|
||||
}, 1000, 3000);
|
||||
latch_->Await();
|
||||
ulib::TimerManager::CancelTimer(timer_id);
|
||||
}
|
Loading…
Reference in New Issue
Block a user