feat add ThreadPool
Some checks failed
linux-x64-gcc / linux-gcc (push) Failing after 44s
rpcrypto-build / build (Debug, hisiv510.toolchain.cmake) (push) Failing after 1m2s
linux-mips64-gcc / linux-gcc-mips64el (push) Failing after 1m8s
rpcrypto-build / build (Release, hisiv510.toolchain.cmake) (push) Failing after 1m3s
linux-hisiv500-gcc / linux-gcc-hisiv500 (push) Failing after 59s
rpcrypto-build / build (Release, himix200.toolchain.cmake) (push) Failing after 58s
rpcrypto-build / build (Debug, himix200.toolchain.cmake) (push) Failing after 25m25s

This commit is contained in:
tqcq 2023-12-27 19:23:05 +08:00
parent ce1570008a
commit 59197cd6ad
7 changed files with 177 additions and 0 deletions

View File

@ -41,6 +41,8 @@ target_sources(${PROJECT_NAME} PRIVATE
src/ulib/concorrency/event.h
src/ulib/system/thread.h
src/ulib/system/thread.cpp
src/ulib/system/thread_pool.h
src/ulib/system/thread_pool.cpp
)
find_package(Threads REQUIRED)

View File

@ -91,30 +91,36 @@ Thread::Thread(const ThreadFunc &func, const std::string &thread_name)
: impl_(new Impl(func, thread_name))
{}
Thread::Thread(Thread &&other) : impl_(other.impl_) { other.impl_ = nullptr; }
// Thread(ThreadFunc &&func, const std::string &thread_name = std::string());
Thread::~Thread() { delete impl_; }
void
Thread::Start()
{
ULOG_ASSERT(impl_, "impl_ is null");
impl_->Start();
}
int
Thread::Join()
{
ULOG_ASSERT(impl_, "impl_ is null");
return impl_->Join();
}
bool
Thread::Started() const
{
ULOG_ASSERT(impl_, "impl_ is null");
return impl_->Started();
}
pid_t
Thread::Tid() const
{
ULOG_ASSERT(impl_, "impl_ is null");
return impl_->Tid();
}
}// namespace ulib

View File

@ -11,6 +11,7 @@ public:
Thread(const ThreadFunc &func,
const std::string &thread_name = std::string());
// Thread(ThreadFunc &&func, const std::string &thread_name = std::string());
Thread(Thread &&);
~Thread();
void Start();

View File

@ -0,0 +1,66 @@
#include "thread_pool.h"
namespace ulib {
ThreadPool::ThreadPool(int max_thread_num,
int init_thread_num,
const std::string &thread_pool_name)
: max_thread_num_(max_thread_num),
idle_thread_num_(0),
thread_pool_name_(thread_pool_name),
stopped_(false),
joined_(false)
{
ULOG_ASSERT(max_thread_num_ > 0, "max_thread_num_ must be greater than 0");
workers_.reserve(max_thread_num_);
for (int i = 0; i < init_thread_num; ++i) {
AddThread(thread_pool_name_ + std::to_string(i));
}
}
void
ThreadPool::AddThread(const std::string &&thread_name)
{
workers_.emplace_back(
[this]() {
while (true) {
Thread::ThreadFunc task;
{
MutexGuard guard(tasks_lock_);
++idle_thread_num_;
while (tasks_.empty() && !stopped_) {
tasks_cond_.Wait(guard);
}
--idle_thread_num_;
if (tasks_.empty() && stopped_) { break; }
task = tasks_.front();
tasks_.pop();
}
task();
}
},
thread_name);
workers_.back().Start();
}
ThreadPool::~ThreadPool()
{
if (!stopped_) { Stop(); }
if (!joined_) { Join(); }
}
void
ThreadPool::Stop()
{
MutexGuard guard(tasks_lock_);
stopped_ = true;
tasks_cond_.NotifyAll();
}
void
ThreadPool::Join()
{
joined_ = true;
for (auto &worker : workers_) { worker.Join(); }
}
}// namespace ulib

View File

@ -0,0 +1,67 @@
#ifndef ULIB_SRC_ULIB_SYSTEM_THREAD_POOL_H_
#define ULIB_SRC_ULIB_SYSTEM_THREAD_POOL_H_
#include "thread.h"
#include "ulib/log/log.h"
#include "ulib/concorrency/condition_variable.h"
#include "ulib/concorrency/mutex.h"
#include <future>
#include <vector>
#include <queue>
namespace ulib {
class ThreadPool {
public:
ThreadPool(int max_thread_num,
int init_thread_num = 1,
const std::string &thread_pool_name = std::string());
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
~ThreadPool();
void Stop();
void Join();
template<typename F, typename... Args>
auto Submit(F &&f, Args &&...args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
MutexGuard guard(tasks_lock_);
ULOG_ASSERT(!stopped_, "thread pool is stopped");
tasks_.emplace([task]() { (*task)(); });
if (tasks_.size() > idle_thread_num_
&& workers_.size() < max_thread_num_) {
AddThread(thread_pool_name_ + std::to_string(workers_.size()));
}
if (tasks_.size() > 1) {
tasks_cond_.NotifyAll();
} else {
tasks_cond_.NotifyOne();
}
}
return res;
}
private:
void AddThread(const std::string &&thread_name);
int max_thread_num_;
int idle_thread_num_;
std::string thread_pool_name_;
bool stopped_;
bool joined_;
std::vector<Thread> workers_;
std::queue<Thread::ThreadFunc> tasks_;
Mutex tasks_lock_;
ConditionVariable tasks_cond_;
};
}// namespace ulib
#endif// ULIB_SRC_ULIB_SYSTEM_THREAD_POOL_H_

View File

@ -8,6 +8,7 @@ add_executable(ulib_test
ulib/concorrency/event_unittest.cpp
ulib/concorrency/countdown_latch_unittest.cpp
ulib/system/thread_unittest.cpp
ulib/system/thread_pool_unittest.cpp
)
target_link_libraries(ulib_test PRIVATE
ulib

View File

@ -0,0 +1,34 @@
#include <ulib/system/thread_pool.h>
#include <gtest/gtest.h>
#include <memory>
class ThreadPoolTest : public ::testing::Test {
protected:
void SetUp() override { thread_pool_ = std::make_unique<ulib::ThreadPool>(10); }
std::unique_ptr<ulib::ThreadPool> thread_pool_;
};
TEST_F(ThreadPoolTest, Submit)
{
auto future = thread_pool_->Submit([]() { return 1; });
ASSERT_EQ(future.get(), 1);
}
TEST_F(ThreadPoolTest, SubmitWithArgs)
{
auto future =
thread_pool_->Submit([](int a, int b) { return a + b; }, 1, 2);
ASSERT_EQ(future.get(), 3);
}
TEST_F(ThreadPoolTest, MultiTask)
{
std::vector<std::future<int>> futures;
for (int i = 0; i < 1000; ++i) {
futures.emplace_back(
thread_pool_->Submit([](int a, int b) { return a + b; }, 1, 2));
}
for (auto &future : futures) { ASSERT_EQ(future.get(), 3); }
}