feat support resume to Master Fiber

This commit is contained in:
tqcq 2024-06-13 22:10:34 +08:00
parent b881ba6efc
commit 840b240561
4 changed files with 99 additions and 38 deletions

View File

@ -12,26 +12,58 @@ namespace tile {
namespace fiber { namespace fiber {
namespace detail { namespace detail {
#define SET_PREV_FCTX(new_fctx) \
do { \
if (PrevFiber()) { \
PrevFiber()->ctx_->fctx = (new_fctx); \
} \
} while (0)
static thread_local Fiber *tls_current_fiber = nullptr; static thread_local Fiber *tls_current_fiber = nullptr;
static thread_local Fiber *tls_master_fiber = nullptr; static thread_local Fiber *tls_master_fiber = nullptr;
static thread_local Fiber *tls_prev_fiber = nullptr;
constexpr std::size_t kStackSize = 128 * 1024; // 128k constexpr std::size_t kStackSize = 128 * 1024; // 128k
constexpr std::size_t kAlignSize = 16; constexpr std::size_t kAlignSize = 16;
struct alignas(hardware_destructive_interference_size) Fiber::FiberContext {
fcontext_t fctx;
std::aligned_storage<kStackSize, kAlignSize>::type stack;
std::function<void()> proc;
};
// static void SetPrevFiber(Fiber *fiber) noexcept { tls_prev_fiber = fiber; }
static Fiber *PrevFiber() noexcept { return tls_prev_fiber; }
void FiberEntry(fcontext_transfer_t t) { void FiberEntry(fcontext_transfer_t t) {
// TILE_LOG_INFO("FiberEntry creater {}, proc {}", t.fctx, t.data); SET_PREV_FCTX(t.fctx);
std::function<void()> *fn = static_cast<std::function<void()> *>(t.data); std::function<void()> *fn = static_cast<std::function<void()> *>(t.data);
TILE_CHECK_NE(t.data, nullptr); TILE_CHECK_NE(t.data, nullptr);
TILE_CHECK_NE(t.fctx, nullptr); TILE_CHECK_NE(t.fctx, nullptr);
Fiber *self;
try { try {
// From Resume()
t = jump_fcontext(t.fctx, nullptr); t = jump_fcontext(t.fctx, nullptr);
self = Fiber::Current();
SET_PREV_FCTX(t.fctx);
(*fn)(); (*fn)();
} catch (const std::exception &e) { } catch (const std::exception &e) {
// TILE_LOG_ERROR("Exception caught in fiber: {}", e.what()); TILE_LOG_ERROR("Exception caught in fiber: {}", e.what());
} }
TILE_CHECK_NE(t.fctx, nullptr); TILE_CHECK_NE(t.fctx, nullptr);
if (self == Fiber::MasterFiber()) {
// TILE_LOG_INFO("FiberEntry End. Resume to {}", t.fctx); // TILE_LOG_INFO("FiberEntry End. Resume to {}", t.fctx);
jump_fcontext(t.fctx, nullptr); jump_fcontext(t.fctx, nullptr);
} else {
// TILE_LOG_INFO("FiberEntry End. Resume to {}",
// Fiber::MasterFiber()->ctx_->fctx);
Fiber::MasterFiber()->Resume();
}
} }
fcontext_t CreateFiber(void *stack, std::size_t stack_size, fcontext_t CreateFiber(void *stack, std::size_t stack_size,
@ -43,14 +75,11 @@ fcontext_t CreateFiber(void *stack, std::size_t stack_size,
return jump_fcontext(fctx, fn).fctx; return jump_fcontext(fctx, fn).fctx;
} }
struct alignas(hardware_destructive_interference_size) Fiber::FiberContext {
fcontext_t fctx;
std::aligned_storage<kStackSize, kAlignSize>::type stack;
std::function<void()> proc;
};
Fiber *Fiber::Current() noexcept { return tls_current_fiber; } Fiber *Fiber::Current() noexcept { return tls_current_fiber; }
void Fiber::SetCurrent(Fiber *fiber) noexcept { tls_current_fiber = fiber; } void Fiber::SetCurrent(Fiber *fiber) noexcept {
tls_prev_fiber = tls_current_fiber;
tls_current_fiber = fiber;
}
Fiber *Fiber::MasterFiber() noexcept { return tls_master_fiber; } Fiber *Fiber::MasterFiber() noexcept { return tls_master_fiber; }
void Fiber::SetMasterFiber(Fiber *fiber) noexcept { tls_master_fiber = fiber; } void Fiber::SetMasterFiber(Fiber *fiber) noexcept { tls_master_fiber = fiber; }
@ -62,13 +91,10 @@ std::unique_ptr<Fiber> Fiber::Create(std::function<void()> proc) noexcept {
Fiber::Fiber(std::function<void()> proc) Fiber::Fiber(std::function<void()> proc)
: ctx_(object_pool::Get<FiberContext>().Leak()) { : ctx_(object_pool::Get<FiberContext>().Leak()) {
TILE_CHECK(proc);
ctx_->proc = std::move(proc); ctx_->proc = std::move(proc);
if (ctx_->proc) {
ctx_->fctx = CreateFiber(&ctx_->stack, kStackSize, &ctx_->proc); ctx_->fctx = CreateFiber(&ctx_->stack, kStackSize, &ctx_->proc);
} else {
ctx_->fctx = nullptr;
}
} }
Fiber::~Fiber() { Fiber::~Fiber() {
@ -78,18 +104,32 @@ Fiber::~Fiber() {
} }
void Fiber::Resume() { void Fiber::Resume() {
TILE_CHECK(state_ != FiberState::Suspended);
TILE_CHECK(state_ != FiberState::Terminated);
TILE_CHECK_NE(ctx_->fctx, nullptr); TILE_CHECK_NE(ctx_->fctx, nullptr);
auto caller = Current(); auto caller = Current();
TILE_CHECK_NE(caller, this, "Can't `Resume()` self"); TILE_CHECK_NE(caller, this, "Can't `Resume()` self");
SetCurrent(this); if (caller != Fiber::MasterFiber() && this != Fiber::MasterFiber()) {
// TILE_LOG_INFO("Resume before proc: {}", fmt::ptr(&proc_)); caller->state_ = FiberState::Suspended;
ctx_->fctx = } else if (caller) {
jump_fcontext(internal::Exchange(ctx_->fctx, nullptr), nullptr).fctx; caller->state_ = FiberState::Ready;
// TILE_LOG_INFO("Resume after proc: {}", fmt::ptr(&proc_));
SetCurrent(caller);
} }
void Fiber::Yield() {} // TILE_LOG_INFO("Resume from {} to {}", fmt::ptr(caller), fmt::ptr(this));
SetCurrent(this);
this->state_ = FiberState::Running;
auto caller_fctx =
jump_fcontext(internal::Exchange(ctx_->fctx, nullptr), nullptr).fctx;
if (Current() == this) {
// fiber terminated
state_ = FiberState::Terminated;
} else {
state_ = FiberState::Ready;
}
// SET_PREV_FCTX(caller_fctx);
}
} // namespace detail } // namespace detail
} // namespace fiber } // namespace fiber

View File

@ -15,15 +15,15 @@ struct fcontext_transfer;
namespace tile { namespace tile {
namespace fiber { namespace fiber {
namespace detail { namespace detail {
void RunProc(void *arg);
void RunProc1(struct fcontext_transfer);
class Scheduler; class Scheduler;
class alignas(hardware_destructive_interference_size) Fiber { class alignas(hardware_destructive_interference_size) Fiber {
public: public:
enum FiberState { enum class FiberState {
Ready, Runnable,
Running,
Suspended,
Waiting, Waiting,
Terminated, Terminated,
}; };
@ -50,6 +50,8 @@ public:
private: private:
TILE_FRIEND_TEST(Fiber, Base); TILE_FRIEND_TEST(Fiber, Base);
friend Scheduler; friend Scheduler;
friend void FiberEntry(struct fcontext_transfer);
struct FiberContext; struct FiberContext;
friend class ::tile::PoolTraits<FiberContext>; friend class ::tile::PoolTraits<FiberContext>;
@ -57,7 +59,7 @@ private:
private: private:
std::unique_ptr<FiberContext> ctx_; std::unique_ptr<FiberContext> ctx_;
FiberState state_{Ready}; FiberState state_{FiberState::Runnable};
}; };
} // namespace detail } // namespace detail

View File

@ -7,7 +7,7 @@ namespace fiber {
namespace detail { namespace detail {
TEST(Fiber, Base) { TEST(Fiber, Base) {
constexpr auto kMaxCnt = 100 * 1000; constexpr auto kMaxCnt = 1; // 100 * 1000;
int cnt = 0; int cnt = 0;
int resume_cnt = 0; int resume_cnt = 0;
@ -24,22 +24,26 @@ TEST(Fiber, Base) {
ASSERT_EQ(cnt, 0); ASSERT_EQ(cnt, 0);
while (cnt < kMaxCnt) { while (cnt < kMaxCnt) {
std::unique_ptr<Fiber> worker_fiber = Fiber::Create([&] { std::unique_ptr<Fiber> worker_fiber = Fiber::Create([&] {
while (cnt < kMaxCnt) {
ASSERT_EQ(Fiber::Current(), worker_fiber.get()); ASSERT_EQ(Fiber::Current(), worker_fiber.get());
++cnt; ++cnt;
master_fiber->Resume();
}
}); });
while (cnt < kMaxCnt) {
int old = cnt; int old = cnt;
worker_fiber->Resume(); worker_fiber->Resume();
ASSERT_EQ(old + 1, cnt); ASSERT_EQ(old + 1, cnt);
ASSERT_EQ(Fiber::Current(), master_fiber.get()); ASSERT_EQ(Fiber::Current(), master_fiber.get());
} }
}
ASSERT_EQ(cnt, kMaxCnt); ASSERT_EQ(cnt, kMaxCnt);
}); });
Fiber::SetMasterFiber(master_fiber.get()); Fiber::SetMasterFiber(master_fiber.get());
// Fiber::SetCurrent(master_fiber.get()); Fiber::SetCurrent(nullptr);
// master_fiber->Resume();
master_fiber->Resume(); master_fiber->Resume();
} }

View File

@ -0,0 +1,15 @@
#ifndef TILE_FIBER_DETAIL_SCHEDULER_H
#define TILE_FIBER_DETAIL_SCHEDULER_H
#pragma once
namespace tile {
namespace fiber {
namespace detail {
class Scheduler {};
} // namespace detail
} // namespace fiber
} // namespace tile
#endif // TILE_FIBER_DETAIL_SCHEDULER_H