feat/support_fiber #2

Merged
tqcq merged 57 commits from feat/support_fiber into master 2024-06-21 10:33:52 +08:00
4 changed files with 62 additions and 82 deletions
Showing only changes of commit 3ee740d0bd - Show all commits

View File

@ -12,59 +12,42 @@ namespace tile {
namespace fiber {
namespace detail {
#define SET_PREV_FCTX(new_fctx) \
do { \
if (PrevFiber()) { \
PrevFiber()->ctx_->fctx = (new_fctx); \
} \
} while (0)
static thread_local Fiber *tls_current_fiber = nullptr;
static thread_local Fiber *tls_master_fiber = nullptr;
static thread_local Fiber *tls_prev_fiber = nullptr;
constexpr std::size_t kStackSize = 128 * 1024; // 128k
constexpr std::size_t kAlignSize = 16;
struct alignas(hardware_destructive_interference_size) Fiber::FiberContext {
fcontext_t fctx;
std::aligned_storage<kStackSize, kAlignSize>::type stack;
std::function<void()> proc;
};
// static void SetPrevFiber(Fiber *fiber) noexcept { tls_prev_fiber = fiber; }
static Fiber *PrevFiber() noexcept { return tls_prev_fiber; }
void FiberEntry(fcontext_transfer_t t) {
SET_PREV_FCTX(t.fctx);
std::function<void()> *fn = static_cast<std::function<void()> *>(t.data);
TILE_CHECK_NE(t.data, nullptr);
TILE_CHECK_NE(t.fctx, nullptr);
Fiber *self;
try {
// From Resume()
t = jump_fcontext(t.fctx, nullptr);
self = Fiber::Current();
SET_PREV_FCTX(t.fctx);
auto self = reinterpret_cast<Fiber *>(t.data);
self->caller_->ctx_ = t.fctx;
(*fn)();
} catch (const std::exception &e) {
TILE_LOG_ERROR("Exception caught in fiber: {}", e.what());
}
TILE_CHECK_NE(t.fctx, nullptr);
if (self == Fiber::MasterFiber()) {
// TILE_LOG_INFO("FiberEntry End. Resume to {}", t.fctx);
jump_fcontext(t.fctx, nullptr);
if (GetMasterFiber() != GetCurrentFiber()) {
GetMasterFiber()->Resume();
} else {
// TILE_LOG_INFO("FiberEntry End. Resume to {}",
// Fiber::MasterFiber()->ctx_->fctx);
Fiber::MasterFiber()->Resume();
// master fiber end
jump_fcontext(t.fctx, GetMasterFiber());
}
}
fcontext_transfer_t FiberOnTop(fcontext_transfer_t t) {}
fcontext_t CreateFiber(void *stack, std::size_t stack_size,
std::function<void()> *fn) {
@ -75,14 +58,11 @@ fcontext_t CreateFiber(void *stack, std::size_t stack_size,
return jump_fcontext(fctx, fn).fctx;
}
Fiber *Fiber::Current() noexcept { return tls_current_fiber; }
void Fiber::SetCurrent(Fiber *fiber) noexcept {
tls_prev_fiber = tls_current_fiber;
tls_current_fiber = fiber;
}
Fiber *GetCurrentFiber() noexcept { return tls_current_fiber; }
void SetUpCurrentFiber(Fiber *fiber) noexcept { tls_current_fiber = fiber; }
Fiber *Fiber::MasterFiber() noexcept { return tls_master_fiber; }
void Fiber::SetMasterFiber(Fiber *fiber) noexcept { tls_master_fiber = fiber; }
Fiber *GetMasterFiber() noexcept { return tls_master_fiber; }
void SetUpMasterFiber(Fiber *fiber) noexcept { tls_master_fiber = fiber; }
std::unique_ptr<Fiber> Fiber::Create(std::function<void()> proc) noexcept {
return std::unique_ptr<Fiber>(new Fiber(std::move(proc)));
@ -90,45 +70,34 @@ std::unique_ptr<Fiber> Fiber::Create(std::function<void()> proc) noexcept {
}
Fiber::Fiber(std::function<void()> proc)
: ctx_(object_pool::Get<FiberContext>().Leak()) {
: data_(object_pool::Get<FiberContext>().Leak()) {
TILE_CHECK(proc);
ctx_->proc = std::move(proc);
ctx_->fctx = CreateFiber(&ctx_->stack, kStackSize, &ctx_->proc);
data_->proc = std::move(proc);
ctx_ = CreateFiber(&data_->stack, kStackSize, &data_->proc);
}
Fiber::~Fiber() {
if (ctx_) {
object_pool::Put<FiberContext>(ctx_.release());
if (data_) {
object_pool::Put<FiberContext>(data_.release());
}
}
void Fiber::Resume() {
TILE_CHECK(state_ != FiberState::Suspended);
TILE_CHECK(state_ != FiberState::Terminated);
TILE_CHECK_NE(ctx_->fctx, nullptr);
auto caller = GetCurrentFiber();
TILE_CHECK_NE(caller, this, "Calling `ResumeOn()`, on self is undefined.");
auto caller = Current();
TILE_CHECK_NE(caller, this, "Can't `Resume()` self");
if (caller != Fiber::MasterFiber() && this != Fiber::MasterFiber()) {
caller->state_ = FiberState::Suspended;
} else if (caller) {
caller->state_ = FiberState::Ready;
auto t = jump_fcontext(ctx_, this);
caller = reinterpret_cast<Fiber *>(t.data);
caller->ctx_ = t.fctx;
SetUpCurrentFiber(caller);
}
// TILE_LOG_INFO("Resume from {} to {}", fmt::ptr(caller), fmt::ptr(this));
SetCurrent(this);
this->state_ = FiberState::Running;
auto caller_fctx =
jump_fcontext(internal::Exchange(ctx_->fctx, nullptr), nullptr).fctx;
if (Current() == this) {
// fiber terminated
state_ = FiberState::Terminated;
} else {
state_ = FiberState::Ready;
}
// SET_PREV_FCTX(caller_fctx);
void Fiber::ResumeOn(std::function<void()> &&cb) noexcept {
auto caller = GetCurrentFiber();
TILE_CHECK_NE(caller, this, "Calling `ResumeOn()`, on self is undefined.");
ontop_fcontext(ctx_, this, FiberOnTop);
}
} // namespace detail

View File

@ -18,8 +18,6 @@ namespace detail {
class Scheduler;
class alignas(hardware_destructive_interference_size) Fiber {
public:
enum class FiberState {
Runnable,
Running,
@ -28,11 +26,8 @@ public:
Terminated,
};
static Fiber *Current() noexcept;
static void SetCurrent(Fiber *fiber) noexcept;
static Fiber *MasterFiber() noexcept;
static void SetMasterFiber(Fiber *fiber) noexcept;
class alignas(hardware_destructive_interference_size) Fiber {
public:
static std::unique_ptr<Fiber>
Create(std::function<void()> proc = nullptr) noexcept;
@ -43,24 +38,34 @@ public:
Fiber(Fiber &&other) noexcept = default;
Fiber &operator=(Fiber &&other) noexcept = default;
// for `Scheduler`
void Resume();
void ResumeOn(std::function<void()> &&cb) noexcept;
void Yield();
private:
TILE_FRIEND_TEST(Fiber, Base);
friend Scheduler;
friend void FiberEntry(struct fcontext_transfer);
struct FiberContext;
friend class ::tile::PoolTraits<FiberContext>;
friend void FiberEntry(struct fcontext_transfer);
friend struct fcontext_transfer FiberOnTop(struct fcontext_transfer);
Fiber(std::function<void()> proc = nullptr);
private:
std::unique_ptr<FiberContext> ctx_;
std::unique_ptr<FiberContext> data_;
FiberState state_{FiberState::Runnable};
void *ctx_{nullptr};
Fiber *caller_{nullptr};
};
Fiber *GetCurrentFiber() noexcept;
Fiber *GetMasterFiber() noexcept;
void SetUpCurrentFiber(Fiber *fiber) noexcept;
void SetUpMasterFiber(Fiber *fiber) noexcept;
inline bool IsFiberContext() noexcept { return GetCurrentFiber() != nullptr; }
} // namespace detail
} // namespace fiber

View File

@ -18,14 +18,14 @@ TEST(Fiber, Base) {
master_fiber = Fiber::Create([&] {
TILE_LOG_INFO("master fiber");
// ASSERT_EQ(cnt, 0);
ASSERT_EQ(Fiber::MasterFiber(), master_fiber.get());
ASSERT_EQ(Fiber::Current(), master_fiber.get());
ASSERT_EQ(GetMasterFiber(), master_fiber.get());
ASSERT_EQ(GetCurrentFiber(), master_fiber.get());
ASSERT_EQ(cnt, 0);
while (cnt < kMaxCnt) {
std::unique_ptr<Fiber> worker_fiber = Fiber::Create([&] {
while (cnt < kMaxCnt) {
ASSERT_EQ(Fiber::Current(), worker_fiber.get());
ASSERT_EQ(GetCurrentFiber(), worker_fiber.get());
++cnt;
master_fiber->Resume();
}
@ -35,15 +35,15 @@ TEST(Fiber, Base) {
int old = cnt;
worker_fiber->Resume();
ASSERT_EQ(old + 1, cnt);
ASSERT_EQ(Fiber::Current(), master_fiber.get());
ASSERT_EQ(GetCurrentFiber(), master_fiber.get());
}
}
ASSERT_EQ(cnt, kMaxCnt);
});
Fiber::SetMasterFiber(master_fiber.get());
Fiber::SetCurrent(nullptr);
SetUpMasterFiber(master_fiber.get());
SetUpCurrentFiber(nullptr);
master_fiber->Resume();
}

View File

@ -3,11 +3,17 @@
#pragma once
#include "tile/fiber/detail/fiber.h"
namespace tile {
namespace fiber {
namespace detail {
class Scheduler {};
class Scheduler {
public:
void SwitchTo(Fiber *self, Fiber *to);
void Yield(Fiber *self);
};
} // namespace detail
} // namespace fiber
} // namespace tile