Merge branch 'master' of code.uocat.com:tqcq/sled

This commit is contained in:
tqcq
2024-03-22 18:36:44 +08:00
23 changed files with 958 additions and 126 deletions

40
src/exec/just_test.cc Normal file
View File

@@ -0,0 +1,40 @@
#include <gtest/gtest.h>
#include <sled/exec/exec.h>
struct cout_receiver {
template<typename T>
void SetValue(T &&val)
{
// 这个receiver什么都不干只对收集到的结果输出
std::cout << "Result: " << val << '\n';
}
void SetError(std::exception_ptr err) { std::terminate(); }
void SetStopped() { std::terminate(); }
};
TEST(Just, basic)
{
sled::Just(42).Connect(cout_receiver{}).Start();
sled::Just(11).Connect(cout_receiver{}).Start();
}
TEST(Then, basic)
{
auto s1 = sled::Just(42);
auto s2 = sled::Then(s1, [](int x) { return x + 1; });
auto s3 = sled::Then(s2, [](int x) { return x + 1; });
auto s4 = sled::Then(s3, [](int x) { return x + 1; });
s4.Connect(cout_receiver{}).Start();
}
TEST(SyncWait, basic)
{
auto s1 = sled::Just(42);
auto s2 = sled::Then(s1, [](int x) { return x + 1; });
auto s3 = sled::Then(s2, [](int x) { return x + 1; });
auto s4 = sled::Then(s3, [](int x) { return x + 1; });
auto s5 = sled::SyncWait(s4).value();
std::cout << "Result: " << s5 << '\n';
}

View File

@@ -0,0 +1,8 @@
#include <gtest/gtest.h>
#include <sled/futures/future.h>
TEST(Future, basic)
{
// sled::Future<int> x;
// auto res = x.Then([](int) {});
}

View File

@@ -45,8 +45,7 @@ void
ThreadManager::RemoveInternal(Thread *message_queue)
{
MutexLock lock(&cirt_);
auto iter = std::find(message_queues_.begin(), message_queues_.end(),
message_queue);
auto iter = std::find(message_queues_.begin(), message_queues_.end(), message_queue);
if (iter != message_queues_.end()) { message_queues_.erase(iter); }
}
@@ -96,8 +95,7 @@ ThreadManager::ProcessAllMessageQueueInternal()
MutexLock lock(&cirt_);
for (Thread *queue : message_queues_) {
queues_not_done.fetch_add(1);
auto sub =
MakeCleanup([&queues_not_done] { queues_not_done.fetch_sub(1); });
auto sub = MakeCleanup([&queues_not_done] { queues_not_done.fetch_sub(1); });
queue->PostDelayedTask([&sub] {}, TimeDelta::Zero());
}
@@ -115,9 +113,7 @@ ThreadManager::SetCurrentThreadInternal(Thread *message_queue)
Thread::Thread(SocketServer *ss) : Thread(ss, /*do_init=*/true) {}
Thread::Thread(std::unique_ptr<SocketServer> ss)
: Thread(std::move(ss), /*do_init=*/true)
{}
Thread::Thread(std::unique_ptr<SocketServer> ss) : Thread(std::move(ss), /*do_init=*/true) {}
Thread::Thread(SocketServer *ss, bool do_init)
: delayed_next_num_(0),
@@ -131,11 +127,7 @@ Thread::Thread(SocketServer *ss, bool do_init)
if (do_init) { DoInit(); }
}
Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init)
: Thread(ss.get(), do_init)
{
own_ss_ = std::move(ss);
}
Thread::Thread(std::unique_ptr<SocketServer> ss, bool do_init) : Thread(ss.get(), do_init) { own_ss_ = std::move(ss); }
Thread::~Thread()
{
@@ -244,13 +236,10 @@ Thread::Get(int cmsWait)
cmsNext = cmsDelayNext;
} else {
cmsNext = std::max<int64_t>(0, cmsTotal - cmsElapsed);
if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) {
cmsNext = cmsDelayNext;
}
if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) { cmsNext = cmsDelayNext; }
}
{
if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever
: TimeDelta::Millis(cmsNext),
if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever : TimeDelta::Millis(cmsNext),
/*process_io=*/true)) {
return nullptr;
}
@@ -266,9 +255,7 @@ Thread::Get(int cmsWait)
}
void
Thread::PostTaskImpl(std::function<void()> &&task,
const PostTaskTraits &traits,
const Location &location)
Thread::PostTaskImpl(std::function<void()> &&task, const PostTaskTraits &traits, const Location &location)
{
if (IsQuitting()) { return; }
{
@@ -303,8 +290,7 @@ Thread::PostDelayedTaskImpl(std::function<void()> &&task,
}
void
Thread::BlockingCallImpl(std::function<void()> functor,
const Location &location)
Thread::BlockingCallImpl(std::function<void()> &&functor, const Location &location)
{
if (IsQuitting()) { return; }
if (IsCurrent()) {
@@ -373,8 +359,7 @@ Thread::SetName(const std::string &name, const void *obj)
void
Thread::EnsureIsCurrentTaskQueue()
{
task_queue_registration_.reset(
new TaskQueueBase::CurrentTaskQueueSetter(this));
task_queue_registration_.reset(new TaskQueueBase::CurrentTaskQueueSetter(this));
}
void
@@ -426,8 +411,7 @@ Thread::PreRun(void *pv)
}
bool
Thread::WrapCurrentWithThreadManager(ThreadManager *thread_manager,
bool need_synchronize_access)
Thread::WrapCurrentWithThreadManager(ThreadManager *thread_manager, bool need_synchronize_access)
{
// assert(!IsRunning());
owned_ = false;
@@ -498,8 +482,7 @@ Thread::Current()
return thread;
}
AutoSocketServerThread::AutoSocketServerThread(SocketServer *ss)
: Thread(ss, /*do_init=*/false)
AutoSocketServerThread::AutoSocketServerThread(SocketServer *ss) : Thread(ss, /*do_init=*/false)
{
DoInit();
old_thread_ = ThreadManager::Instance()->CurrentThread();

View File

@@ -1,15 +1,39 @@
#include "sled/system/thread_pool.h"
#include "sled/system/location.h"
#include "sled/task_queue/task_queue_base.h"
namespace sled {
ThreadPool::ThreadPool(int num_threads)
{
if (num_threads == -1) {
num_threads = std::thread::hardware_concurrency();
}
scheduler = new sled::Scheduler(
sled::Scheduler::Config().setWorkerThreadCount(num_threads));
if (num_threads == -1) { num_threads = std::thread::hardware_concurrency(); }
scheduler_ = new sled::Scheduler(sled::Scheduler::Config().setWorkerThreadCount(num_threads));
}
ThreadPool::~ThreadPool() { delete scheduler; }
ThreadPool::~ThreadPool() { delete scheduler_; }
void
ThreadPool::Delete()
{}
void
ThreadPool::PostTaskImpl(std::function<void()> &&task, const PostTaskTraits &traits, const Location &location)
{
scheduler_->enqueue(marl::Task([task] { task(); }));
}
void
ThreadPool::PostDelayedTaskImpl(std::function<void()> &&task,
TimeDelta delay,
const PostDelayedTaskTraits &traits,
const Location &location)
{
if (traits.high_precision) {
delayed_thread_->PostDelayedTaskWithPrecision(TaskQueueBase::DelayPrecision::kHigh, std::move(task), delay,
location);
} else {
delayed_thread_->PostDelayedTaskWithPrecision(TaskQueueBase::DelayPrecision::kLow, std::move(task), delay,
location);
}
}
}// namespace sled

View File

@@ -1,4 +1,5 @@
#include "sled/task_queue/task_queue_base.h"
#include "sled/synchronization/event.h"
namespace sled {
namespace {
@@ -11,12 +12,22 @@ TaskQueueBase::Current()
return current;
}
TaskQueueBase::CurrentTaskQueueSetter::CurrentTaskQueueSetter(TaskQueueBase *task_queue)
: previous_(current)
TaskQueueBase::CurrentTaskQueueSetter::CurrentTaskQueueSetter(TaskQueueBase *task_queue) : previous_(current)
{
current = task_queue;
}
TaskQueueBase::CurrentTaskQueueSetter::~CurrentTaskQueueSetter() { current = previous_; }
void
TaskQueueBase::BlockingCallImpl(std::function<void()> &&functor, const sled::Location &from)
{
Event done;
PostTask([functor, &done] {
functor();
done.Set();
});
done.Wait(Event::kForever);
}
}// namespace sled