feat/support_fiber #2

Merged
tqcq merged 57 commits from feat/support_fiber into master 2024-06-21 10:33:52 +08:00
13 changed files with 219 additions and 433 deletions
Showing only changes of commit 7321310b1f - Show all commits

View File

@ -160,11 +160,10 @@ set(TILE_SRCS
"tile/base/thread/rw_mutex.cc"
"tile/base/thread/scoped_lock.cc"
"tile/base/thread/spinlock.cc"
"tile/fiber/fiber.cc"
"tile/fiber/detail/os_fiber.cc"
"tile/fiber/detail/mutex.cc"
"tile/fiber/detail/fiber.cc"
# "tile/fiber/detail/os_fiber.cc" "tile/fiber/detail/posix_os_fiber.cc"
# "tile/fiber/detail/mutex.cc"
"tile/fiber/detail/ucontext.c"
"tile/fiber/detail/posix_os_fiber.cc"
"tile/fiber/scheduler.cc"
"tile/io/detail/eintr_safe.cc"
"tile/io/native/acceptor.cc"
@ -267,9 +266,7 @@ if(TILE_BUILD_TESTS)
target_sources(${PROJECT_NAME}_test_all PRIVATE ${test_file})
endmacro()
# -> fiber
tile_add_test(fiber_detail_posix_os_fiber_test
"tile/fiber/detail/posix_os_fiber_test.cc")
tile_add_test(fiber_detail_fiber_test "tile/fiber/detail/fiber_test.cc")
tile_add_test(base_internal_meta_test "tile/base/internal/meta_test.cc")
# tile_add_test(net_internal_http_engine_test

View File

@ -0,0 +1,93 @@
#include "tile/fiber/detail/fiber.h"
#include "tile/base/align.h"
#include "tile/base/internal/move_on_copy.h"
#include "tile/base/logging.h"
#include "tile/base/make_unique.h"
#include "tile/base/object_pool.h"
#include "tile/fiber/detail/ucontext.h"
namespace tile {
namespace fiber {
namespace detail {
static thread_local Fiber *tls_current_fiber = nullptr;
static thread_local Fiber *tls_master_fiber = nullptr;
constexpr auto kStackSize = 8192;
constexpr auto kAlignSize = 16;
void RunProc(void *arg) {
auto proc = static_cast<std::function<void()> *>(arg);
if (proc) {
(*proc)();
}
Fiber::MasterFiber()->Resume();
}
struct alignas(hardware_destructive_interference_size) Fiber::FiberContext {
tile_ucontext_t uctx;
std::aligned_storage<kStackSize, kAlignSize>::type stack;
};
Fiber *Fiber::Current() noexcept { return tls_current_fiber; }
void Fiber::SetCurrent(Fiber *fiber) noexcept { tls_current_fiber = fiber; }
Fiber *Fiber::MasterFiber() noexcept { return tls_master_fiber; }
void Fiber::SetMasterFiber(Fiber *fiber) noexcept { tls_master_fiber = fiber; }
std::unique_ptr<Fiber> Fiber::Create(std::function<void()> proc) noexcept {
return std::unique_ptr<Fiber>(new Fiber(std::move(proc)));
// return make_unique<Fiber>(std::move(proc));
}
Fiber::Fiber(std::function<void()> proc)
: ctx_(object_pool::Get<FiberContext>().Leak()), proc_(std::move(proc)) {
memset(&ctx_->uctx, 0, sizeof(tile_ucontext_t));
if (proc_) {
tile_ucontext_set_target(&ctx_->uctx, &ctx_->stack, kStackSize, RunProc,
&proc_);
}
}
Fiber::~Fiber() {
if (ctx_) {
object_pool::Put<FiberContext>(ctx_.release());
}
}
void Fiber::Resume() {
auto caller = Current();
TILE_CHECK_NE(caller, this, "Can't `Resume()` self");
SetCurrent(this);
tile_ucontext_swap(&caller->ctx_->uctx, &ctx_->uctx);
SetCurrent(caller);
}
} // namespace detail
} // namespace fiber
template <> struct PoolTraits<tile::fiber::detail::Fiber::FiberContext> {
static constexpr auto kType = PoolType::MemoryNodeShared;
static constexpr std::size_t kLowWaterMark = 128;
static constexpr std::size_t kHighWaterMark =
std::numeric_limits<std::size_t>::max();
static constexpr std::chrono::seconds kMaxIdle = std::chrono::seconds(10);
static constexpr std::size_t kMinimumThreadCacheSize = 64;
static constexpr std::size_t kTransferBatchSize = 16;
};
constexpr PoolType PoolTraits<fiber::detail::Fiber::FiberContext>::kType;
constexpr std::size_t
PoolTraits<fiber::detail::Fiber::FiberContext>::kLowWaterMark;
constexpr std::size_t
PoolTraits<fiber::detail::Fiber::FiberContext>::kHighWaterMark;
constexpr std::chrono::seconds
PoolTraits<fiber::detail::Fiber::FiberContext>::kMaxIdle;
constexpr std::size_t
PoolTraits<fiber::detail::Fiber::FiberContext>::kMinimumThreadCacheSize;
constexpr std::size_t
PoolTraits<fiber::detail::Fiber::FiberContext>::kTransferBatchSize;
} // namespace tile

65
tile/fiber/detail/fiber.h Normal file
View File

@ -0,0 +1,65 @@
#ifndef TILE_FIBER_DETAIL_FIBER_H
#define TILE_FIBER_DETAIL_FIBER_H
#pragma once
#include "tile/base/align.h"
#include "tile/base/internal/test_prod.h"
#include "tile/base/object_pool.h"
#include "tile/base/ref_ptr.h"
#include <memory>
struct tile_ucontext_t;
namespace tile {
namespace fiber {
namespace detail {
void RunProc(void *arg);
class Scheduler;
class alignas(hardware_destructive_interference_size) Fiber {
public:
enum FiberState {
Ready,
Waiting,
Terminated,
};
static Fiber *Current() noexcept;
static void SetCurrent(Fiber *fiber) noexcept;
static Fiber *MasterFiber() noexcept;
static void SetMasterFiber(Fiber *fiber) noexcept;
static std::unique_ptr<Fiber>
Create(std::function<void()> proc = nullptr) noexcept;
~Fiber();
Fiber(const Fiber &) = delete;
Fiber &operator=(const Fiber &) = delete;
Fiber(Fiber &&other) noexcept = default;
Fiber &operator=(Fiber &&other) noexcept = default;
private:
TILE_FRIEND_TEST(Fiber, Base);
friend void RunProc(void *);
friend Scheduler;
struct FiberContext;
friend class ::tile::PoolTraits<FiberContext>;
Fiber(std::function<void()> proc = nullptr);
void Resume();
private:
std::unique_ptr<FiberContext> ctx_;
std::function<void()> proc_;
FiberState state_{Ready};
};
} // namespace detail
} // namespace fiber
} // namespace tile
#endif // TILE_FIBER_DETAIL_FIBER_H

View File

@ -0,0 +1,45 @@
#include "tile/base/random.h"
#include "tile/fiber/detail/fiber.h"
#include "gtest/gtest.h"
namespace tile {
namespace fiber {
namespace detail {
TEST(Fiber, Base) {
constexpr auto kMaxCnt = 5000;
int cnt = 0;
// 0 -> master fiber
// [1, 9] -> worker fibers
std::vector<std::unique_ptr<Fiber>> fibers;
fibers.emplace_back(Fiber::Create());
Fiber::SetMasterFiber(fibers[0].get());
Fiber::SetCurrent(fibers[0].get());
for (int i = 1; i != 10; ++i) {
fibers.emplace_back(Fiber::Create([&, i] {
while (cnt < kMaxCnt) {
ASSERT_EQ(Fiber::Current(), fibers[i].get());
++cnt;
Fiber::MasterFiber()->Resume();
TILE_LOG_INFO("worke cnt: {}", cnt);
}
}));
}
while (cnt < kMaxCnt) {
int old = cnt;
auto next_fiber = fibers[Random(1, 1)].get();
TILE_LOG_INFO("cnt: {}", cnt);
next_fiber->Resume();
ASSERT_EQ(old + 1, cnt);
ASSERT_EQ(Fiber::Current(), Fiber::MasterFiber());
}
}
} // namespace detail
} // namespace fiber
} // namespace tile

View File

@ -1,79 +0,0 @@
#include "tile/fiber/detail/os_fiber.h"
namespace tile {
namespace fiber {
namespace detail {
TILE_DEFINE_CLASS_DEPENDENCY_REGISTRY(os_fiber_registry, OSFiber);
static thread_local OSFiber *current_fiber = nullptr;
static thread_local OSFiber *main_fiber = nullptr;
OSFiber *OSFiber::Current() noexcept { return current_fiber; }
OSFiber *OSFiber::MainFiber() noexcept { return main_fiber; }
void OSFiber::SetCurrent(OSFiber *fiber) noexcept {
if (current_fiber && current_fiber->IsAlive()) {
current_fiber->state_ = Fiber::FiberState::Waiting;
}
if (fiber) {
fiber->state_ = Fiber::FiberState::Running;
}
current_fiber = fiber;
}
void OSFiber::SetMainFiber(OSFiber *fiber) noexcept {
if (TILE_UNLIKELY(main_fiber == nullptr && fiber)) {
fiber->state_ = Fiber::FiberState::Running;
}
main_fiber = fiber;
if (main_fiber) {
TILE_LOG_INFO("Upate Main Fiber [{}]", main_fiber->GetId());
}
}
OSFiber::~OSFiber() {
if (MainFiber() == this) {
SetMainFiber(nullptr);
}
if (Current() == this) {
SetCurrent(nullptr);
}
}
bool OSFiber::SwitchTo(OSFiber *to) {
if (to == this) {
return true;
}
TILE_DCHECK(to);
TILE_DCHECK(to->GetFiberState() == Fiber::FiberState::Waiting ||
to->GetFiberState() == Fiber::FiberState::Idle);
// TILE_LOG_INFO("SwitchTo fiber from [{}] to [{}]", GetId(), to->GetId());
SetCurrent(to);
bool rc = SwitchToImpl(to);
SetCurrent(this);
TILE_LOG_WARNING_IF_EVERY_SECOND(
!rc, "SwitchToImpl fiber from [{}] to [{}] failed.", GetId(),
to->GetId());
return rc;
}
void OSFiber::Yield() {
TILE_DCHECK(MainFiber());
TILE_DCHECK(state_ == Fiber::FiberState::Running ||
state_ == Fiber::FiberState::Terminated,
"Fiber [{}] is can't call `Yield()`", GetId());
SwitchTo(MainFiber());
}
bool OSFiber::IsAlive() const { return alive_.load(std::memory_order_relaxed); }
} // namespace detail
} // namespace fiber
} // namespace tile

View File

@ -1,62 +0,0 @@
#ifndef TILE_FIBER_DETAIL_OS_FIBER_H
#define TILE_FIBER_DETAIL_OS_FIBER_H
#pragma once
#include "tile/base/dependency_registry.h"
#include "tile/base/thread/spinlock.h"
#include "tile/fiber/fiber.h"
namespace tile {
namespace fiber {
namespace detail {
class OSFiber {
public:
// static tool
static OSFiber *Current() noexcept;
static OSFiber *MainFiber() noexcept;
static void SetCurrent(OSFiber *fiber) noexcept;
static void SetMainFiber(OSFiber *fiber) noexcept;
public:
struct Options {
Fiber::Task task;
std::size_t stack_size;
};
virtual ~OSFiber();
virtual Fiber::Id GetId() const = 0;
virtual bool Initialize(Options options) = 0;
Fiber::FiberState GetFiberState() const { return state_; };
bool SwitchTo(OSFiber *to);
void Yield();
protected:
virtual bool SwitchToImpl(OSFiber *to) = 0;
bool IsAlive() const;
enum Fiber::FiberState state_;
std::atomic<bool> alive_{true};
private:
friend class Scheduler;
friend class Fiber;
Spinlock scheduler_lock_;
};
// 1. WorkerProc == nullptr, Fiber for current contextz.
// 2. WorkerProc != nullptr, Fiber for new func
TILE_DECLARE_CLASS_DEPENDENCY_REGISTRY(os_fiber_registry, OSFiber);
inline bool IsFiberEnv() noexcept { return OSFiber::Current() != nullptr; }
} // namespace detail
} // namespace fiber
} // namespace tile
#endif // TILE_FIBER_DETAIL_OS_FIBER_H

View File

@ -1,102 +0,0 @@
#include "tile/fiber/detail/posix_os_fiber.h"
#include "tile/base/down_cast.h"
#include "tile/base/internal/macro.h"
#include "tile/base/internal/move_on_copy.h"
#include "tile/base/likely.h"
#include "tile/base/make_unique.h"
#include <signal.h>
#include "tile/fiber/detail/ucontext.h"
#include <map>
namespace tile {
namespace fiber {
namespace detail {
TILE_REGISTER_CLASS_DEPENDENCY(os_fiber_registry, "os_fiber", PosixOSFiber);
namespace {
struct AlignMemoryDeleter {
void operator()(char *ptr) const {
if (ptr) {
delete[] ptr;
}
}
};
static void UContextAdaptor(int a1, int a2, int a3, int a4) {
int array[] = {a1, a2, a3, a4};
auto ptr = *reinterpret_cast<Fiber::Task **>(array);
(**reinterpret_cast<Fiber::Task **>(array))();
}
static void SampleAdaptor(void *arg) {
(*reinterpret_cast<Fiber::Task *>(arg))();
}
template <std::size_t N> struct InvokeHelper {
template <typename Array, typename F, typename... Args>
static void Invoke(const Array &array, F &&f, Args &&...args) {
return InvokeHelper<N - 1>::Invoke(array, f, std::forward<Args>(args)...,
array[4 - N]);
};
};
template <> struct InvokeHelper<0> {
template <typename Array, typename F, typename... Args>
static void Invoke(const Array &array, F &&f, Args &&...args) {
f(std::forward<Args>(args)...);
};
};
} // namespace
struct alignas(hardware_destructive_interference_size) PosixOSFiber::Context {
tile_ucontext_t ctx;
std::unique_ptr<char[]> stack;
std::function<void()> worker_proc;
};
PosixOSFiber::PosixOSFiber() : context_(make_unique<Context>()) {}
PosixOSFiber::~PosixOSFiber() {
// TILE_LOG_INFO("Release fiber [{}]", GetId());
}
bool PosixOSFiber::Initialize(Options options) {
memset(&context_->ctx, 0, sizeof(context_->ctx));
if (options.task) {
const auto kAllocSize = 128 + options.stack_size;
context_->stack = make_unique<char[]>(kAllocSize);
auto moved_task = MakeMoveOnCopy(options.task);
context_->worker_proc = [this, moved_task] {
moved_task.Ref()();
state_ = Fiber::FiberState::Terminated;
TILE_CHECK(this != MainFiber());
SwitchToImpl(MainFiber());
};
tile_ucontext_set_target(&context_->ctx, context_->stack.get(), kAllocSize,
SampleAdaptor, &context_->worker_proc);
}
// TILE_LOG_INFO("init fiber id [{}]", GetId());
return true;
}
Fiber::Id PosixOSFiber::GetId() const {
return reinterpret_cast<Fiber::Id>(this);
}
bool PosixOSFiber::SwitchToImpl(OSFiber *to) {
TILE_DCHECK(down_cast<PosixOSFiber>(to));
auto to_fiber = down_cast<PosixOSFiber>(to);
tile_ucontext_swap(&context_->ctx, &to_fiber->context_->ctx);
return true;
}
} // namespace detail
} // namespace fiber
} // namespace tile

View File

@ -1,31 +0,0 @@
#ifndef TILE_FIBER_DETAIL_POSIX_OS_FIBER_H
#define TILE_FIBER_DETAIL_POSIX_OS_FIBER_H
#pragma once
#include "tile/fiber/detail/os_fiber.h"
namespace tile {
namespace fiber {
namespace detail {
class PosixOSFiber : public OSFiber {
public:
PosixOSFiber();
~PosixOSFiber() override;
bool Initialize(Options options) override;
Fiber::Id GetId() const override;
protected:
bool SwitchToImpl(OSFiber *to) override;
private:
struct Context;
std::unique_ptr<Context> context_;
};
} // namespace detail
} // namespace fiber
} // namespace tile
#endif // TILE_FIBER_DETAIL_POSIX_OS_FIBER_H

View File

@ -1,91 +0,0 @@
#include "tile/fiber/detail/posix_os_fiber.h"
#include "gtest/gtest.h"
namespace tile {
namespace fiber {
namespace detail {
void DisplayAllFiberId() {
TILE_LOG_INFO("CurrentFiber: {}, MainFiber: {}",
OSFiber::Current() ? OSFiber::Current()->GetId() : 0,
OSFiber::MainFiber() ? OSFiber::MainFiber()->GetId() : 0);
}
std::unique_ptr<OSFiber> BuildMainFiber() {
auto main_fiber = os_fiber_registry.New("os_fiber");
EXPECT_TRUE(main_fiber != nullptr);
{
OSFiber::Options main_options;
main_options.task = nullptr;
main_options.stack_size = 8192;
main_fiber->Initialize(main_options);
}
OSFiber::SetMainFiber(main_fiber.get());
OSFiber::SetCurrent(main_fiber.get());
return main_fiber;
}
std::unique_ptr<OSFiber> BuildWorkerFiber(Fiber::Task task) {
auto worker_fiber = os_fiber_registry.New("os_fiber");
EXPECT_TRUE(worker_fiber != nullptr);
{
OSFiber::Options worker_options;
worker_options.task = task;
worker_options.stack_size = 8192;
worker_fiber->Initialize(worker_options);
}
return worker_fiber;
}
TEST(PosixOSFiber, Base) {
DisplayAllFiberId();
{
int called = 1;
auto main_fiber = BuildMainFiber();
std::unique_ptr<OSFiber> worker_fiber = BuildWorkerFiber([&] {
DisplayAllFiberId();
TILE_LOG_INFO("task start");
ASSERT_EQ(OSFiber::Current(), worker_fiber.get());
ASSERT_EQ(OSFiber::MainFiber(), main_fiber.get());
called = 2;
TILE_LOG_INFO("task end");
});
DisplayAllFiberId();
ASSERT_EQ(called, 1);
main_fiber->SwitchTo(worker_fiber.get());
DisplayAllFiberId();
ASSERT_EQ(called, 2);
ASSERT_EQ(OSFiber::Current(), main_fiber.get());
ASSERT_EQ(OSFiber::MainFiber(), main_fiber.get());
DisplayAllFiberId();
}
TILE_LOG_INFO("End");
}
TEST(PosixOSFiber, Yield) {
constexpr int kYieldCount = 1000;
auto main_fiber = BuildMainFiber();
for (int i = 0; i != 1000; ++i) {
int yield = 0;
int schedule_cnt = 0;
auto worker_fiber = BuildWorkerFiber([&] {
for (int j = 0; j != kYieldCount; ++j) {
++yield;
ASSERT_EQ(yield, schedule_cnt);
OSFiber::Current()->Yield();
}
});
while (yield != kYieldCount) {
++schedule_cnt;
main_fiber->SwitchTo(worker_fiber.get());
}
}
}
} // namespace detail
} // namespace fiber
} // namespace tile

View File

@ -1,48 +1,3 @@
#include "tile/fiber/detail/scheduler.h"
#include "tile/fiber/detail/os_fiber.h"
namespace tile {
namespace fiber {
namespace detail {
thread_local Scheduler *current_scheduler = nullptr;
Scheduler::Scheduler() { main_fiber_ = os_fiber_registry.New("os_fiber"); }
Scheduler::~Scheduler() {}
Scheduler *Scheduler::Current() noexcept { return current_scheduler; }
void Scheduler::Yield(OSFiber *self) noexcept {
auto main_fiber = OSFiber::MainFiber();
SwitchTo(self, main_fiber);
}
void Scheduler::Halt(OSFiber *self) noexcept {}
void Scheduler::SwitchTo(OSFiber *self, OSFiber *to) noexcept {
TILE_CHECK_EQ(self, OSFiber::Current());
TILE_CHECK_NE(to->GetFiberState(), Fiber::FiberState::Terminated,
"Fiber `to` is terminated.");
TILE_CHECK_NE(self, to, "Fiber `self` is the same as fiber `to`.");
self->SwitchTo(to);
TILE_CHECK_EQ(self, OSFiber::Current());
}
OSFiber *Scheduler::StartFiber(std::function<void()> task) {
auto os_fiber = os_fiber_registry.New("os_fiber");
OSFiber::Options options;
options.stack_size = FLAGS_tile_fiber_stack_size;
options.task = std::move(task);
if (!os_fiber->Initialize(options)) {
return nullptr;
}
return os_fiber.get();
}
} // namespace detail
} // namespace fiber
} // namespace tile

View File

@ -2,19 +2,21 @@
#define TILE_FIBER_DETAIL_SCHEDULER_H
#pragma once
#include "tile/base/thread/event.h"
#include "tile/base/thread/spinlock.h"
#include "tile/fiber/fiber.h"
#include <array>
#include <functional>
#include <list>
#include <map>
#include <memory>
#include <unordered_map>
namespace tile {
namespace fiber {
namespace detail {
class OSFiber;
class Fiber;
class Scheduler {
public:
@ -23,17 +25,18 @@ public:
void Yield(OSFiber *self) noexcept;
void Halt(OSFiber *self) noexcept;
void SwitchTo(OSFiber *self, OSFiber *to) noexcept;
OSFiber *StartFiber(std::function<void()> task);
void Queue(std::function<void()> task);
private:
OSFiber *StartFiber(std::function<void()> task);
Scheduler();
~Scheduler();
private:
std::unique_ptr<OSFiber> main_fiber_;
std::map<Fiber::Id, std::unique_ptr<OSFiber>> fibers_;
Spinlock fibers_lock_;
// std::array<std::unique_ptr<OSFiber>, 64> fibers_;
std::list<std::function<void()>> tasks_;
Event new_task_event_;
};
} // namespace detail
} // namespace fiber

View File

@ -34,15 +34,8 @@ void tile_ucontext_set_target(struct tile_ucontext_t *ctx, void *stack,
__hwasan_tag_memory(stack, 0, stack_size);
}
#endif
assert(((stack_size & ~((uintptr_t)15)) >= 1024) &&
"stack size must be greater than 16 bytes");
uintptr_t *stack_top = (uintptr_t *)((uint8_t *)(stack) + stack_size);
// align by 2^15=32796 bytes
{
stack_size -= ((uintptr_t)stack_top & 15);
stack_top -= ((uintptr_t)stack_top) & 15;
}
assert(((uintptr_t)stack_top & 15) == 0);
#if defined(__x86_64__)
TILE_UCONTEXT_ARG0(ctx, stack_top) = (uintptr_t)&tile_ucontext_trampoline;

View File

@ -28,8 +28,8 @@ public:
using Id = std::uint64_t;
using Task = std::function<void()>;
enum class FiberState {
Idle, // No task
Running, // Process task
Ready, // No task
// Running, // Process task
Waiting, // Blocked
Terminated, // Terminated
};