feat/support_fiber #2

Merged
tqcq merged 57 commits from feat/support_fiber into master 2024-06-21 10:33:52 +08:00
7 changed files with 151 additions and 15 deletions
Showing only changes of commit 0941e0b4ff - Show all commits

View File

@ -268,6 +268,7 @@ if(TILE_BUILD_TESTS)
target_sources(${PROJECT_NAME}_test_all PRIVATE ${test_file}) target_sources(${PROJECT_NAME}_test_all PRIVATE ${test_file})
endmacro() endmacro()
tile_add_test(fiber_detail_scheduler_test "tile/fiber/detail/scheduler_test.cc")
tile_add_test(base_internal_meta_test "tile/base/internal/meta_test.cc") tile_add_test(base_internal_meta_test "tile/base/internal/meta_test.cc")
# tile_add_test(net_internal_http_engine_test # tile_add_test(net_internal_http_engine_test
# "tile/net/internal/http_engine_test.cc") # "tile/net/internal/http_engine_test.cc")

View File

@ -26,6 +26,14 @@ template <typename T> struct has_to_string {
static constexpr bool value = decltype(Test<T>(0))::value; static constexpr bool value = decltype(Test<T>(0))::value;
}; };
template <typename T> struct can_to_string {
template <typename U>
static auto Test(int) -> decltype(ToString(std::declval<U>()),
std::true_type());
template <typename...> static auto Test(...) -> std::false_type;
static constexpr bool value = decltype(Test<T>(0))::value;
};
template <typename Rep, typename Period> template <typename Rep, typename Period>
std::string format_as_impl(const std::chrono::duration<Rep, Period> &val, std::string format_as_impl(const std::chrono::duration<Rep, Period> &val,
const char *suffix) { const char *suffix) {
@ -52,11 +60,21 @@ auto format_as(const T &val)
template <typename T> template <typename T>
auto format_as(const T &val) auto format_as(const T &val)
-> tile::enable_if_t<!tile::detail::is_streamable<std::ostream, T>::value && -> tile::enable_if_t<!tile::detail::is_streamable<std::ostream, T>::value &&
tile::detail::has_to_string<T>::value, tile::detail::has_to_string<T>::value &&
!tile::detail::can_to_string<T>::value,
std::string> { std::string> {
return val.ToString(); return val.ToString();
} }
template <typename T>
auto format_as(const T &val)
-> tile::enable_if_t<!tile::detail::is_streamable<std::ostream, T>::value &&
!tile::detail::has_to_string<T>::value &&
tile::detail::can_to_string<T>::value,
std::string> {
return ToString(val);
}
template <typename T> template <typename T>
auto format_as(T *ptr) auto format_as(T *ptr)
-> tile::enable_if_t<!std::is_void<tile::remove_cvref_t<T>>::value, -> tile::enable_if_t<!std::is_void<tile::remove_cvref_t<T>>::value,

View File

@ -1,10 +1,12 @@
#include "tile/fiber/detail/fiber.h" #include "tile/fiber/detail/fiber.h"
#include "tile/base/align.h" #include "tile/base/align.h"
#include "tile/base/internal/index_alloc.h"
#include "tile/base/internal/move_on_copy.h" #include "tile/base/internal/move_on_copy.h"
#include "tile/base/logging.h" #include "tile/base/logging.h"
#include "tile/base/make_unique.h" #include "tile/base/make_unique.h"
#include "tile/base/object_pool.h" #include "tile/base/object_pool.h"
#include "tile/base/string.h"
#include "nova/context/fcontext.h" #include "nova/context/fcontext.h"
@ -12,6 +14,25 @@ namespace tile {
namespace fiber { namespace fiber {
namespace detail { namespace detail {
std::string ToString(const FiberState &state) noexcept {
switch (state) {
case FiberState::Runnable:
return "Runnable";
case FiberState::Running:
return "Running";
case FiberState::Waiting:
return "Waiting";
case FiberState::Terminated:
return "Terminated";
default:
TILE_UNEXPECTED("");
return "Unknown";
}
}
std::ostream &operator<<(std::ostream &os, const FiberState &state) noexcept {
return os << ToString(state);
}
static thread_local Fiber *tls_current_fiber = nullptr; static thread_local Fiber *tls_current_fiber = nullptr;
static thread_local Fiber *tls_master_fiber = nullptr; static thread_local Fiber *tls_master_fiber = nullptr;
@ -32,8 +53,14 @@ void FiberEntry(fcontext_transfer_t t) {
try { try {
// From Resume() // From Resume()
t = jump_fcontext(t.fctx, nullptr); t = jump_fcontext(t.fctx, nullptr);
self = reinterpret_cast<Fiber *>(t.data); Fiber *caller = reinterpret_cast<Fiber *>(t.data);
self->caller_->ctx_ = t.fctx;
self = GetCurrentFiber();
if (caller) {
caller->ctx_ = t.fctx;
} else {
TILE_CHECK_EQ(self, GetMasterFiber());
}
(*fn)(); (*fn)();
} catch (const std::exception &e) { } catch (const std::exception &e) {
@ -41,13 +68,16 @@ void FiberEntry(fcontext_transfer_t t) {
} }
self->state_ = FiberState::Terminated; self->state_ = FiberState::Terminated;
if (GetMasterFiber() != GetCurrentFiber()) { if (GetMasterFiber() == self) {
GetMasterFiber()->Resume();
} else {
// master fiber end // master fiber end
jump_fcontext(t.fctx, GetMasterFiber()); jump_fcontext(t.fctx, GetMasterFiber());
} else {
TILE_CHECK(GetMasterFiber() != nullptr);
TILE_CHECK_NE(GetMasterFiber()->state(), FiberState::Terminated);
GetMasterFiber()->Resume();
} }
} }
fcontext_transfer_t FiberOnTop(fcontext_transfer_t t) {} fcontext_transfer_t FiberOnTop(fcontext_transfer_t t) {}
fcontext_t CreateFiber(void *stack, std::size_t stack_size, fcontext_t CreateFiber(void *stack, std::size_t stack_size,
@ -70,7 +100,8 @@ std::unique_ptr<Fiber> Fiber::Create(std::function<void()> proc) noexcept {
} }
Fiber::Fiber(std::function<void()> proc) Fiber::Fiber(std::function<void()> proc)
: data_(object_pool::Get<FiberContext>().Leak()) { : id_(internal::IndexAlloc::For<Fiber>()->Next()),
data_(object_pool::Get<FiberContext>().Leak()) {
TILE_CHECK(proc); TILE_CHECK(proc);
data_->proc = std::move(proc); data_->proc = std::move(proc);
@ -81,15 +112,20 @@ Fiber::~Fiber() {
if (data_) { if (data_) {
object_pool::Put<FiberContext>(data_.release()); object_pool::Put<FiberContext>(data_.release());
} }
internal::IndexAlloc::For<Fiber>()->Free(id_);
} }
void Fiber::Resume() { void Fiber::Resume() {
auto caller = GetCurrentFiber(); auto caller = GetCurrentFiber();
TILE_CHECK_NE(caller, this, "Calling `ResumeOn()`, on self is undefined."); TILE_CHECK_NE(caller, this, "Calling `Resume()`, on self is undefined.");
auto t = jump_fcontext(ctx_, this); SetUpCurrentFiber(this);
caller = reinterpret_cast<Fiber *>(t.data);
caller->ctx_ = t.fctx; auto t = jump_fcontext(internal::Exchange(ctx_, nullptr), caller);
if (auto from = reinterpret_cast<Fiber *>(t.data)) {
from->ctx_ = t.fctx;
}
SetUpCurrentFiber(caller); SetUpCurrentFiber(caller);
} }
@ -97,7 +133,21 @@ void Fiber::Resume() {
void Fiber::ResumeOn(std::function<void()> &&cb) noexcept { void Fiber::ResumeOn(std::function<void()> &&cb) noexcept {
auto caller = GetCurrentFiber(); auto caller = GetCurrentFiber();
TILE_CHECK_NE(caller, this, "Calling `ResumeOn()`, on self is undefined."); TILE_CHECK_NE(caller, this, "Calling `ResumeOn()`, on self is undefined.");
ontop_fcontext(ctx_, this, FiberOnTop); SetUpCurrentFiber(this);
auto t = ontop_fcontext(ctx_, this, FiberOnTop);
caller = reinterpret_cast<Fiber *>(t.data);
SetUpCurrentFiber(caller);
}
std::string ToString(const Fiber &fiber) noexcept { return ToString(&fiber); }
std::string ToString(const std::unique_ptr<Fiber> &fiber) noexcept {
return ToString(fiber.get());
}
std::string ToString(Fiber const *const fiber) noexcept {
if (TILE_UNLIKELY(fiber == nullptr)) {
return "Fiber(nullptr)";
}
return Format("Fiber({})[{}]", fiber->id(), fmt::ptr(fiber));
} }
} // namespace detail } // namespace detail

View File

@ -22,10 +22,11 @@ class Scheduler;
enum class FiberState { enum class FiberState {
Runnable, Runnable,
Running, Running,
Suspended,
Waiting, Waiting,
Terminated, Terminated,
}; };
std::string ToString(const FiberState &state) noexcept;
std::ostream &operator<<(std::ostream &os, const FiberState &state) noexcept;
class Scheduler; class Scheduler;
@ -46,6 +47,8 @@ public:
void Resume(); void Resume();
void ResumeOn(std::function<void()> &&cb) noexcept; void ResumeOn(std::function<void()> &&cb) noexcept;
void Yield(); void Yield();
Id id() const noexcept { return id_; }
FiberState state() const noexcept { return state_; }
Scheduler *scheduler() const { return scheduler_; } Scheduler *scheduler() const { return scheduler_; }
@ -61,10 +64,14 @@ private:
Fiber(std::function<void()> proc = nullptr); Fiber(std::function<void()> proc = nullptr);
private: private:
Id id_;
std::unique_ptr<FiberContext> data_; std::unique_ptr<FiberContext> data_;
FiberState state_{FiberState::Runnable}; FiberState state_{FiberState::Runnable};
void *ctx_{nullptr}; void *ctx_{nullptr};
Fiber *caller_{nullptr};
// for scheduler
Scheduler *scheduler_{nullptr}; Scheduler *scheduler_{nullptr};
}; };
Fiber *GetCurrentFiber() noexcept; Fiber *GetCurrentFiber() noexcept;
@ -75,6 +82,10 @@ void SetUpMasterFiber(Fiber *fiber) noexcept;
inline bool IsFiberContext() noexcept { return GetCurrentFiber() != nullptr; } inline bool IsFiberContext() noexcept { return GetCurrentFiber() != nullptr; }
std::string ToString(Fiber const *const fiber) noexcept;
std::string ToString(const std::unique_ptr<Fiber> &fiber) noexcept;
std::string ToString(const Fiber &fiber) noexcept;
} // namespace detail } // namespace detail
} // namespace fiber } // namespace fiber
} // namespace tile } // namespace tile

View File

@ -8,7 +8,7 @@ static thread_local Scheduler *current_scheduler = nullptr;
void Scheduler::SwitchTo(Fiber *self, Fiber *to) { void Scheduler::SwitchTo(Fiber *self, Fiber *to) {
TILE_CHECK_EQ(self, GetCurrentFiber()); TILE_CHECK_EQ(self, GetCurrentFiber());
TILE_CHECK(to->state_ == FiberState::Runnable, TILE_CHECK(to->state_ == FiberState::Terminated,
"Fiber `to` is not in Runnable."); "Fiber `to` is not in Runnable.");
TILE_CHECK_NE(self, to, "Switch to yourself result in U.B."); TILE_CHECK_NE(self, to, "Switch to yourself result in U.B.");

View File

@ -0,0 +1,55 @@
#include "tile/fiber/detail/scheduler.h"
#include "tile/base/deferred.h"
#include "tile/base/random.h"
#include "gtest/gtest.h"
namespace tile {
namespace fiber {
namespace detail {
TEST(Fiber, Resume) {
constexpr int kFiberCount = 1000;
constexpr int kSwitchCount = 100000;
std::unique_ptr<Fiber> master;
std::unique_ptr<Fiber> worker[kFiberCount];
int cnt = 0;
auto end_check = tile::Deferred([&]() { ASSERT_EQ(cnt, kSwitchCount); });
master = Fiber::Create([&]() {
while (cnt < kSwitchCount) {
ASSERT_EQ(GetCurrentFiber(), master.get());
worker[Random(kFiberCount - 1)]->Resume();
ASSERT_EQ(GetCurrentFiber(), master.get());
}
for (int i = 0; i != kFiberCount; ++i) {
worker[i]->Resume();
}
});
for (int i = 0; i != kFiberCount; ++i) {
worker[i] = Fiber::Create([&, i]() {
while (cnt < kSwitchCount) {
++cnt;
ASSERT_EQ(GetCurrentFiber(), worker[i].get());
master->Resume();
ASSERT_EQ(GetCurrentFiber(), worker[i].get());
}
});
}
SetUpMasterFiber(master.get());
SetUpCurrentFiber(nullptr);
master->Resume();
ASSERT_EQ(cnt, kSwitchCount);
for (int i = 0; i != kFiberCount; ++i) {
ASSERT_EQ(worker[i]->state(), FiberState::Terminated);
}
}
} // namespace detail
} // namespace fiber
} // namespace tile

View File

@ -18,6 +18,7 @@ void SleepFor(std::chrono::nanoseconds expires_in) {
Fiber::Id GetId() { Fiber::Id GetId() {
auto self = fiber::detail::GetCurrentFiber(); auto self = fiber::detail::GetCurrentFiber();
TILE_CHECK(self, "GetId() should be called in a fiber."); TILE_CHECK(self, "GetId() should be called in a fiber.");
return self->id();
} }
} // namespace this_fiber } // namespace this_fiber
} // namespace tile } // namespace tile