From ea241f2dd7335789d5d54a043ed04665051b0439 Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Thu, 21 Mar 2024 11:26:03 +0800 Subject: [PATCH 1/4] fix fmt::format not support ostream --- include/sled/log/log.h | 1 + 1 file changed, 1 insertion(+) diff --git a/include/sled/log/log.h b/include/sled/log/log.h index 1a17f8f..b1b7ffb 100644 --- a/include/sled/log/log.h +++ b/include/sled/log/log.h @@ -10,6 +10,7 @@ #include "sled/system/location.h" #include #include +#include // support fmt base ostream namespace sled { enum class LogLevel { From 2bb58ab55a811d315020b81dd8e9f28895629cf0 Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Thu, 21 Mar 2024 14:31:43 +0800 Subject: [PATCH 2/4] feat update --- CMakeLists.txt | 7 ++++- include/sled/exec/just.h | 68 ++++++++++++++++++++++++++++++++++++++++ src/exec/just_test.cc | 9 ++++++ src/rx_test.cc | 1 - 4 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 include/sled/exec/just.h create mode 100644 src/exec/just_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index 2971c7e..f1abe0c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -32,7 +32,11 @@ if (SLED_LOCATION_PATH) target_compile_definitions(sled PRIVATE __SLED_LOCATION_PATH="${SLED_LOCATION_PATH}") endif() # add_subdirectory(3party/eigen EXCLUDE_FROM_ALL) -target_include_directories(sled PUBLIC 3party/eigen 3party/inja 3party/rxcpp) +target_include_directories(sled PUBLIC + include + 3party/eigen + 3party/inja + 3party/rxcpp) target_sources( sled PRIVATE @@ -95,6 +99,7 @@ if(SLED_BUILD_TESTS) ) FetchContent_MakeAvailable(googletest) add_executable(sled_tests + src/exec/just_test.cc src/any_test.cc src/filesystem/path_test.cc src/strings/base64_test.cc diff --git a/include/sled/exec/just.h b/include/sled/exec/just.h new file mode 100644 index 0000000..9b59521 --- /dev/null +++ b/include/sled/exec/just.h @@ -0,0 +1,68 @@ +#pragma once +#ifndef SLED_EXEC_JUST_H +#define SLED_EXEC_JUST_H + +#include +#include + +namespace sled { + +struct immovable { + immovable() = default; + immovable(immovable &&) = delete; +}; + +template +using connect_result_t = decltype(connect(std::declval(), std::declval())); + +template +using sender_result_t = typename T::result_t; + +template +struct just_operation : immovable { + R receiver; + T value; + + friend void start(just_operation &self) { set_value(self.receiver, self.value); } +}; + +template +struct just_sender { + using result_t = T; + T value; + + template + just_operation connect(R receiver) + { + return {{}, receiver, this->value}; + } +}; + +template +just_sender +just(T t) +{ + return {t}; +} + +struct cout_receiver { + template + friend void set_value(cout_receiver self, T &&val) + { + std::cout << "Result: " << val << std::endl; + } + + friend void set_error(cout_receiver self, std::exception_ptr e) + { + try { + std::rethrow_exception(e); + } catch (const std::exception &e) { + std::cout << "Error: " << e.what() << std::endl; + } + } + + friend void set_stopped(cout_receiver self) { std::cout << "Stopped" << std::endl; } +}; + +}// namespace sled +#endif// SLED_EXEC_JUST_H diff --git a/src/exec/just_test.cc b/src/exec/just_test.cc new file mode 100644 index 0000000..31f7a9a --- /dev/null +++ b/src/exec/just_test.cc @@ -0,0 +1,9 @@ +#include "sled/ref_count.h" +#include +#include + +TEST(just, basic) +{ + sled::just_sender sender = sled::just(1); + auto op = connect(sender, sled::cout_receiver{}); +} diff --git a/src/rx_test.cc b/src/rx_test.cc index bbf8295..ad583da 100644 --- a/src/rx_test.cc +++ b/src/rx_test.cc @@ -1,4 +1,3 @@ -#include "schedulers/rx-currentthread.hpp" #include #include From 7fbf260c430b52fbf2b1aaf2ee1b27883ff503e8 Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Thu, 21 Mar 2024 20:59:37 +0800 Subject: [PATCH 3/4] feat add just,then,traits.h --- CMakeLists.txt | 2 +- include/sled/exec/detail/invoke_result.h | 391 ++++++++++++++++++ include/sled/exec/detail/just.h | 39 ++ include/sled/exec/detail/sync_wait.h | 68 +++ include/sled/exec/detail/then.h | 54 +++ include/sled/exec/detail/traits.h | 16 + include/sled/exec/exec.h | 10 + include/sled/exec/just.h | 68 --- include/sled/network/physical_socket_server.h | 26 +- include/sled/synchronization/mutex.h | 7 +- src/exec/just_test.cc | 41 +- 11 files changed, 625 insertions(+), 97 deletions(-) create mode 100644 include/sled/exec/detail/invoke_result.h create mode 100644 include/sled/exec/detail/just.h create mode 100644 include/sled/exec/detail/sync_wait.h create mode 100644 include/sled/exec/detail/then.h create mode 100644 include/sled/exec/detail/traits.h create mode 100644 include/sled/exec/exec.h delete mode 100644 include/sled/exec/just.h diff --git a/CMakeLists.txt b/CMakeLists.txt index befa154..ec49ef8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -120,7 +120,7 @@ if(SLED_BUILD_TESTS) src/exec/just_test.cc src/any_test.cc src/filesystem/path_test.cc - src/profiling/profiling_test.cc + # src/profiling/profiling_test.cc src/strings/base64_test.cc src/cleanup_test.cc src/status_or_test.cc diff --git a/include/sled/exec/detail/invoke_result.h b/include/sled/exec/detail/invoke_result.h new file mode 100644 index 0000000..32f7018 --- /dev/null +++ b/include/sled/exec/detail/invoke_result.h @@ -0,0 +1,391 @@ +//! \file eggs/invoke.hpp +// Eggs.Invoke +// +// Copyright Agustin K-ballo Berge, Fusion Fenix 2017-2020 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE.txt or copy at http://www.boost.org/LICENSE_1_0.txt) + +#ifndef EGGS_INVOKE_HPP +#define EGGS_INVOKE_HPP + +#include +#include +#include + +namespace eggs { +namespace detail { +#define EGGS_FWD(...) static_cast(__VA_ARGS__) + +/////////////////////////////////////////////////////////////////////////// +template::value> +struct invoke_mem_ptr; + +// when `pm` is a pointer to member of a class `C` and +// `is_base_of_v>` is `true`; +template +struct invoke_mem_ptr { + T C::*pm; + +#if !__cpp_aggregate_paren_init + constexpr invoke_mem_ptr(T C::*pm) noexcept : pm(pm) {} +#endif + + template + constexpr auto operator()(T1 &&t1) const noexcept(noexcept(EGGS_FWD(t1).*pm)) -> decltype(EGGS_FWD(t1).*pm) + { + return EGGS_FWD(t1).*pm; + } +}; + +template +struct invoke_mem_ptr { + T C::*pm; + +#if !__cpp_aggregate_paren_init + constexpr invoke_mem_ptr(T C::*pm) noexcept : pm(pm) {} +#endif + + template + constexpr auto operator()(T1 &&t1, Tn &&...tn) const noexcept(noexcept((EGGS_FWD(t1).*pm)(EGGS_FWD(tn)...))) + -> decltype((EGGS_FWD(t1).*pm)(EGGS_FWD(tn)...)) + { + return (EGGS_FWD(t1).*pm)(EGGS_FWD(tn)...); + } +}; + +// when `pm` is a pointer to member of a class `C` and +// `remove_cvref_t` is a specialization of `reference_wrapper`; +template +struct invoke_mem_ptr { + T C::*pm; + +#if !__cpp_aggregate_paren_init + constexpr invoke_mem_ptr(T C::*pm) noexcept : pm(pm) {} +#endif + + template + constexpr auto operator()(T1 &&t1) const noexcept(noexcept(t1.get().*pm)) -> decltype(t1.get().*pm) + { + return t1.get().*pm; + } +}; + +template +struct invoke_mem_ptr { + T C::*pm; + +#if !__cpp_aggregate_paren_init + constexpr invoke_mem_ptr(T C::*pm) noexcept : pm(pm) {} +#endif + + template + constexpr auto operator()(T1 &&t1, Tn &&...tn) const noexcept(noexcept((t1.get().*pm)(EGGS_FWD(tn)...))) + -> decltype((t1.get().*pm)(EGGS_FWD(tn)...)) + { + return (t1.get().*pm)(EGGS_FWD(tn)...); + } +}; + +// when `pm` is a pointer to member of a class `C` and `T` does not +// satisfy the previous two items; +template +struct invoke_mem_ptr { + T C::*pm; + +#if !__cpp_aggregate_paren_init + constexpr invoke_mem_ptr(T C::*pm) noexcept : pm(pm) {} +#endif + + template + constexpr auto operator()(T1 &&t1) const noexcept(noexcept((*EGGS_FWD(t1)).*pm)) -> decltype((*EGGS_FWD(t1)).*pm) + { + return (*EGGS_FWD(t1)).*pm; + } +}; + +template +struct invoke_mem_ptr { + T C::*pm; + +#if !__cpp_aggregate_paren_init + constexpr invoke_mem_ptr(T C::*pm) noexcept : pm(pm) {} +#endif + + template + constexpr auto operator()(T1 &&t1, Tn &&...tn) const noexcept(noexcept(((*EGGS_FWD(t1)).*pm)(EGGS_FWD(tn)...))) + -> decltype(((*EGGS_FWD(t1)).*pm)(EGGS_FWD(tn)...)) + { + return ((*EGGS_FWD(t1)).*pm)(EGGS_FWD(tn)...); + } +}; + +/////////////////////////////////////////////////////////////////////////// +template +auto invoke(F &&, ...) -> F &&; + +template +auto invoke(T C::*, T1 const &, ...) -> invoke_mem_ptr::value, + /*RefWrapper=*/false>; + +template +auto invoke(T C::*, std::reference_wrapper, ...) -> invoke_mem_ptr; + +//! EGGS_INVOKE(F, ...) +//! +//! - _Returns_: `INVOKE(F __VA_OPT__(,) __VA_ARGS__)`. +#if __cplusplus > 201703L// C++20: P0306 +#define EGGS_INVOKE(F, ...) \ + (static_cast(F)(__VA_ARGS__)) +#elif _MSVC_TRADITIONAL +#define EGGS_INVOKE(F, ...) (static_cast(F)(__VA_ARGS__)) +#else +#define EGGS_INVOKE(F, ...) (static_cast(F)(__VA_ARGS__)) +#endif + +/////////////////////////////////////////////////////////////////////////// +// `INVOKE(f, t1, t2, ..., tN)` implicitly converted to `R`. +template::type> +struct invoke_r { +private: + static R conversion(R) noexcept; + +public: + template + static constexpr auto + call(F &&f, Args &&...args) noexcept(noexcept(conversion(EGGS_INVOKE(EGGS_FWD(f), EGGS_FWD(args)...)))) + -> decltype(conversion(EGGS_INVOKE(EGGS_FWD(f), EGGS_FWD(args)...))) + { + return EGGS_INVOKE(EGGS_FWD(f), EGGS_FWD(args)...); + } +}; + +// `static_cast(INVOKE(f, t1, t2, ..., tN))` if `R` is _cv_ `void`. +template +struct invoke_r { + template + static constexpr auto call(F &&f, Args &&...args) noexcept(noexcept(EGGS_INVOKE(EGGS_FWD(f), EGGS_FWD(args)...))) + -> decltype(static_cast(EGGS_INVOKE(EGGS_FWD(f), EGGS_FWD(args)...))) + { + return static_cast(EGGS_INVOKE(EGGS_FWD(f), EGGS_FWD(args)...)); + } +}; + +//! EGGS_INVOKE(R, F, ...) +//! +//! - _Returns_: `INVOKE(F __VA_OPT__(,) __VA_ARGS__)`. +#define EGGS_INVOKE_R(R, ...) (::eggs::detail::invoke_r::call(__VA_ARGS__)) + +}// namespace detail +}// namespace eggs + +namespace eggs { +/////////////////////////////////////////////////////////////////////////// +namespace detail { +template +struct invoke_result_impl {}; + +template +struct invoke_result_impl(), std::declval()...))> { + using type = decltype(EGGS_INVOKE(std::declval(), std::declval()...)); +}; +}// namespace detail + +//! template struct invoke_result; +//! +//! - _Comments_: If the expression `INVOKE(std::declval(), +//! std::declval()...)` is well-formed when treated as an +//! unevaluated operand, the member typedef `type` names the type +//! `decltype(INVOKE(std::declval(), std::declval()...))`; +//! otherwise, there shall be no member `type`. Access checking is +//! performed as if in a context unrelated to `Fn` and `ArgTypes`. Only +//! the validity of the immediate context of the expression is considered. +//! +//! - _Preconditions_: `Fn` and all types in the template parameter pack +//! `ArgTypes` are complete types, _cv_ `void`, or arrays of unknown +//! bound. +template +struct invoke_result : detail::invoke_result_impl {}; + +//! template +//! using invoke_result_t = typename invoke_result::type; +template +using invoke_result_t = typename invoke_result::type; + +/////////////////////////////////////////////////////////////////////////// +namespace detail { +template +struct is_invocable_impl : std::false_type {}; + +template +struct is_invocable_impl(), std::declval()...))> + : std::true_type {}; +}// namespace detail + +//! template struct is_invocable; +//! +//! - _Condition_: The expression `INVOKE(std::declval(), +//! std::declval()...)` is well-formed when treated as an +//! unevaluated operand. +//! +//! - _Comments_: `Fn` and all types in the template parameter pack +//! `ArgTypes` shall be complete types, _cv_ `void`, or arrays of +//! unknown bound. +template +struct is_invocable : detail::is_invocable_impl::type {}; + +#if __cpp_variable_templates +//! template // (C++14) +//! inline constexpr bool is_invocable_v = +//! eggs::is_invocable::value; +template +#if __cpp_inline_variables +inline +#endif + constexpr bool is_invocable_v = is_invocable::value; +#endif + +/////////////////////////////////////////////////////////////////////////// +namespace detail { +template +struct is_invocable_r_impl : std::false_type {}; + +template +struct is_invocable_r_impl(), std::declval()...))> + : std::true_type {}; +}// namespace detail + +//! template struct is_invocable_r; +//! +//! - _Condition_: The expression `INVOKE(std::declval(), +//! std::declval()...)` is well-formed when treated as an +//! unevaluated operand. +//! +//! - _Comments_: `Fn`, `R`, and all types in the template parameter pack +//! `ArgTypes` shall be complete types, _cv_ `void`, or arrays of +//! unknown bound. +template +struct is_invocable_r : detail::is_invocable_r_impl::type {}; + +#if __cpp_variable_templates +//! template // (C++14) +//! inline constexpr bool is_invocable_r_v = +//! eggs::is_invocable_r::value; +template +#if __cpp_inline_variables +inline +#endif + constexpr bool is_invocable_r_v = is_invocable_r::value; +#endif + +/////////////////////////////////////////////////////////////////////////// +namespace detail { +template +struct is_nothrow_invocable_impl : std::false_type {}; + +template +struct is_nothrow_invocable_impl(), std::declval()...))> + : std::integral_constant(), std::declval()...))> {}; +}// namespace detail + +//! template struct is_nothrow_invocable; +//! +//! - _Condition_: `eggs::is_invocable_v` is `true` and +//! the expression `INVOKE(std::declval(), std::declval()...)` +//! is known not to throw any exceptions. +//! +//! - _Comments_: `Fn` and all types in the template parameter pack +//! `ArgTypes` shall be complete types, _cv_ `void`, or arrays of +//! unknown bound. +template +struct is_nothrow_invocable : detail::is_nothrow_invocable_impl::type {}; + +#if __cpp_variable_templates +//! template // (C++14) +//! inline constexpr bool is_nothrow_invocable_v = +//! eggs::is_nothrow_invocable::value; +template +#if __cpp_inline_variables +inline +#endif + constexpr bool is_nothrow_invocable_v = is_nothrow_invocable::value; +#endif + +/////////////////////////////////////////////////////////////////////////// +namespace detail { +template +struct is_nothrow_invocable_r_impl : std::false_type {}; + +template +struct is_nothrow_invocable_r_impl(), std::declval()...))> + : std::integral_constant(), std::declval()...))> {}; +}// namespace detail + +//! template struct is_nothrow_invocable_r; +//! +//! - _Condition_: `eggs::is_invocable_r_v` is `true` +//! and the expression `INVOKE(std::declval(), std::declval()...)` +//! is known not to throw any exceptions. +//! +//! - _Comments_: `Fn`, `R`, and all types in the template parameter pack +//! `ArgTypes` shall be complete types, _cv_ `void`, or arrays of +//! unknown bound. +template +struct is_nothrow_invocable_r : detail::is_nothrow_invocable_r_impl::type {}; + +#if __cpp_variable_templates +//! template // (C++14) +//! inline constexpr bool is_nothrow_invocable_r_v = +//! eggs::is_nothrow_invocable_r::value; +template +#if __cpp_inline_variables +inline +#endif + constexpr bool is_nothrow_invocable_r_v = is_nothrow_invocable_r::value; +#endif + +/////////////////////////////////////////////////////////////////////////// +//! template +//! constexpr eggs::invoke_result_t invoke(F&& f, Args&&... args) +//! noexcept(eggs::is_nothrow_invocable_v); +//! +//! - _Returns_: `INVOKE(std::forward(f), std::forward(args)...)`. +//! +//! - _Remarks_: This function shall not participate in overload resolution +//! unless `eggs::is_invocable_v` is `true`. +template +constexpr auto +invoke(Fn &&f, ArgTypes &&...args) noexcept(noexcept(EGGS_INVOKE(EGGS_FWD(f), EGGS_FWD(args)...))) + -> decltype(EGGS_INVOKE(EGGS_FWD(f), EGGS_FWD(args)...)) +{ + return EGGS_INVOKE(EGGS_FWD(f), EGGS_FWD(args)...); +} + +/////////////////////////////////////////////////////////////////////////// +//! template // (extension) +//! constexpr R eggs::invoke_r(F&& f, Args&&... args) +//! noexcept(eggs::is_nothrow_invocable_r_v); +//! +//! - _Returns_: `INVOKE(std::forward(f), std::forward(args)...)`. +//! +//! - _Remarks_: This function shall not participate in overload resolution +//! unless `eggs::is_invocable_r_v` is `true`. +template +constexpr auto +invoke_r(Fn &&f, ArgTypes &&...args) noexcept(noexcept(EGGS_INVOKE_R(R, EGGS_FWD(f), EGGS_FWD(args)...))) + -> decltype(EGGS_INVOKE_R(R, EGGS_FWD(f), EGGS_FWD(args)...)) +{ + return EGGS_INVOKE_R(R, EGGS_FWD(f), EGGS_FWD(args)...); +} + +#undef EGGS_FWD +}// namespace eggs + +#endif /*EGGS_INVOKE_HPP*/ diff --git a/include/sled/exec/detail/just.h b/include/sled/exec/detail/just.h new file mode 100644 index 0000000..df38285 --- /dev/null +++ b/include/sled/exec/detail/just.h @@ -0,0 +1,39 @@ +#ifndef SLED_EXEC_DETAIL_JUST_H +#define SLED_EXEC_DETAIL_JUST_H +#pragma once + +#include +#include + +namespace sled { + +template +struct JustOperation { + TReceiver receiver; + T value; + + void Start() { receiver.SetValue(std::move(value)); } +}; + +template +struct JustSender { + using result_t = T; + T value; + + template + JustOperation Connect(TReceiver &&receiver) + { + return {std::forward(receiver), std::forward(value)}; + } +}; + +template +JustSender +Just(T &&t) +{ + return {std::forward(t)}; +} + +}// namespace sled + +#endif// SLED_EXEC_DETAIL_JUST_H diff --git a/include/sled/exec/detail/sync_wait.h b/include/sled/exec/detail/sync_wait.h new file mode 100644 index 0000000..9b380c3 --- /dev/null +++ b/include/sled/exec/detail/sync_wait.h @@ -0,0 +1,68 @@ +#ifndef SLED_EXEC_DETAIL_SYNC_WAIT_H +#define SLED_EXEC_DETAIL_SYNC_WAIT_H +#pragma once + +#include "sled/optional.h" +#include "sled/synchronization/mutex.h" +#include "traits.h" +#include + +namespace sled { +struct SyncWaitData { + sled::Mutex lock; + sled::ConditionVariable cv; + std::exception_ptr err; + bool done = false; +}; + +template +struct SyncWaitReceiver { + SyncWaitData &data; + sled::optional &value; + + void SetValue(T &&val) + { + sled::MutexLock lock(&data.lock); + value.emplace(val); + data.done = true; + data.cv.NotifyOne(); + } + + void SetError(std::exception_ptr err) + { + sled::MutexLock lock(&data.lock); + data.err = err; + data.done = true; + data.cv.NotifyOne(); + } + + void SetStopped(std::exception_ptr err) + { + sled::MutexLock lock(&data.lock); + data.done = true; + data.cv.NotifyOne(); + } +}; + +template +sled::optional> +SyncWait(TSender sender) +{ + using T = SenderResultT; + SyncWaitData data; + sled::optional value; + + auto op = sender.Connect(SyncWaitReceiver{data, value}); + op.Start(); + + sled::MutexLock lock(&data.lock); + data.cv.Wait(lock, [&data] { return data.done; }); + + if (data.err) { std::rethrow_exception(data.err); } + + return value; +} + +}// namespace sled + +#endif// SLED_EXEC_DETAIL_SYNC_WAIT_H diff --git a/include/sled/exec/detail/then.h b/include/sled/exec/detail/then.h new file mode 100644 index 0000000..2df6db5 --- /dev/null +++ b/include/sled/exec/detail/then.h @@ -0,0 +1,54 @@ +#ifndef SLED_EXEC_DETAIL_THEN_H +#define SLED_EXEC_DETAIL_THEN_H +#pragma once + +#include "traits.h" +#include +#include + +namespace sled { +template +struct ThenReceiver { + TReceiver receiver; + F func; + + template + void SetValue(T &&value) + { + receiver.SetValue(func(std::forward(value))); + } + + void SetError(std::exception_ptr err) { receiver.SetError(err); } + + void SetStopped() { receiver.SetStopped(); } +}; + +template +struct ThenOperation { + ConnectResultT> op; + + void Start() { op.Start(); } +}; + +template +struct ThenSender { + using result_t = typename eggs::invoke_result_t>; + TSender sender; + F func; + + template + ThenOperation Connect(TReceiver &&receiver) + { + return {sender.Connect(ThenReceiver{std::forward(receiver), func})}; + } +}; + +template +ThenSender +Then(TSender sender, F &&func) +{ + return {std::forward(sender), std::forward(func)}; +} + +}// namespace sled +#endif// SLED_EXEC_DETAIL_THEN_H diff --git a/include/sled/exec/detail/traits.h b/include/sled/exec/detail/traits.h new file mode 100644 index 0000000..47cfe03 --- /dev/null +++ b/include/sled/exec/detail/traits.h @@ -0,0 +1,16 @@ +#ifndef SLED_EXEC_DETAIL_TRAITS_H +#define SLED_EXEC_DETAIL_TRAITS_H +#pragma once +#include "invoke_result.h" +#include + +namespace sled { +template +using ConnectResultT = decltype(std::declval().Connect(std::declval())); + +template +using SenderResultT = typename TSender::result_t; + +}// namespace sled + +#endif// SLED_EXEC_DETAIL_TRAITS_H diff --git a/include/sled/exec/exec.h b/include/sled/exec/exec.h new file mode 100644 index 0000000..9a9ea19 --- /dev/null +++ b/include/sled/exec/exec.h @@ -0,0 +1,10 @@ +#ifndef SLED_EXEC_EXEC_H +#define SLED_EXEC_EXEC_H +#pragma once +#include "detail/just.h" +#include "detail/sync_wait.h" +#include "detail/then.h" + +namespace sled {} + +#endif// SLED_EXEC_EXEC_H diff --git a/include/sled/exec/just.h b/include/sled/exec/just.h deleted file mode 100644 index 9b59521..0000000 --- a/include/sled/exec/just.h +++ /dev/null @@ -1,68 +0,0 @@ -#pragma once -#ifndef SLED_EXEC_JUST_H -#define SLED_EXEC_JUST_H - -#include -#include - -namespace sled { - -struct immovable { - immovable() = default; - immovable(immovable &&) = delete; -}; - -template -using connect_result_t = decltype(connect(std::declval(), std::declval())); - -template -using sender_result_t = typename T::result_t; - -template -struct just_operation : immovable { - R receiver; - T value; - - friend void start(just_operation &self) { set_value(self.receiver, self.value); } -}; - -template -struct just_sender { - using result_t = T; - T value; - - template - just_operation connect(R receiver) - { - return {{}, receiver, this->value}; - } -}; - -template -just_sender -just(T t) -{ - return {t}; -} - -struct cout_receiver { - template - friend void set_value(cout_receiver self, T &&val) - { - std::cout << "Result: " << val << std::endl; - } - - friend void set_error(cout_receiver self, std::exception_ptr e) - { - try { - std::rethrow_exception(e); - } catch (const std::exception &e) { - std::cout << "Error: " << e.what() << std::endl; - } - } - - friend void set_stopped(cout_receiver self) { std::cout << "Stopped" << std::endl; } -}; - -}// namespace sled -#endif// SLED_EXEC_JUST_H diff --git a/include/sled/network/physical_socket_server.h b/include/sled/network/physical_socket_server.h index 248534a..d24b6c0 100644 --- a/include/sled/network/physical_socket_server.h +++ b/include/sled/network/physical_socket_server.h @@ -59,9 +59,9 @@ private: bool WaitSelect(int64_t cusWait, bool process_io); uint64_t next_dispatcher_key_ = 0; - std::unordered_map dispatcher_by_key_; - std::unordered_map key_by_dispatcher_; - std::vector current_dispatcher_keys_; + std::unordered_map dispatcher_by_key_ GUARDED_BY(lock_); + std::unordered_map key_by_dispatcher_ GUARDED_BY(lock_); + std::vector current_dispatcher_keys_ GUARDED_BY(lock_); Signaler *signal_wakeup_; // Mutex lock_; RecursiveMutex lock_; @@ -91,10 +91,7 @@ public: int Send(const void *pv, size_t cb) override; int SendTo(const void *pv, size_t cb, const SocketAddress &addr) override; int Recv(void *pv, size_t cb, int64_t *timestamp) override; - int RecvFrom(void *pv, - size_t cb, - SocketAddress *paddr, - int64_t *timestamp) override; + int RecvFrom(void *pv, size_t cb, SocketAddress *paddr, int64_t *timestamp) override; int Listen(int backlog) override; Socket *Accept(SocketAddress *paddr) override; @@ -108,16 +105,9 @@ protected: int DoConnect(const SocketAddress &addr); virtual SOCKET DoAccept(SOCKET socket, sockaddr *addr, socklen_t *addrlen); virtual int DoSend(SOCKET socket, const char *buf, int len, int flags); - virtual int DoSendTo(SOCKET socket, - const char *buf, - int len, - int flags, - const struct sockaddr *dest_addr, - socklen_t addrlen); - int DoReadFromSocket(void *buffer, - size_t length, - SocketAddress *out_addr, - int64_t *timestamp); + virtual int + DoSendTo(SOCKET socket, const char *buf, int len, int flags, const struct sockaddr *dest_addr, socklen_t addrlen); + int DoReadFromSocket(void *buffer, size_t length, SocketAddress *out_addr, int64_t *timestamp); void OnResolveResult(AsyncResolverInterface *resolver); void UpdateLastError(); @@ -134,7 +124,7 @@ protected: bool udp_; int family_ = 0; mutable Mutex mutex_; - int error_; + int error_ GUARDED_BY(mutex_); ConnState state_; AsyncResolver *resolver_; diff --git a/include/sled/synchronization/mutex.h b/include/sled/synchronization/mutex.h index 46205f8..8a4c7a3 100644 --- a/include/sled/synchronization/mutex.h +++ b/include/sled/synchronization/mutex.h @@ -78,10 +78,7 @@ public: LockGuard(const LockGuard &) = delete; LockGuard &operator=(const LockGuard &) = delete; - explicit LockGuard(TLock *lock) EXCLUSIVE_LOCK_FUNCTION() : mutex_(lock) - { - mutex_->Lock(); - }; + explicit LockGuard(TLock *lock) EXCLUSIVE_LOCK_FUNCTION() : mutex_(lock) { mutex_->Lock(); }; ~LockGuard() UNLOCK_FUNCTION() { mutex_->Unlock(); }; @@ -150,7 +147,7 @@ public: template inline void Wait(MutexLock &lock, Predicate &&pred) { - cv_.wait(lock, std::forward(pred)); + cv_.wait(lock.lock_, std::forward(pred)); } template diff --git a/src/exec/just_test.cc b/src/exec/just_test.cc index 31f7a9a..452dd8e 100644 --- a/src/exec/just_test.cc +++ b/src/exec/just_test.cc @@ -1,9 +1,40 @@ -#include "sled/ref_count.h" #include -#include +#include -TEST(just, basic) +struct cout_receiver { + template + void SetValue(T &&val) + { + // 这个receiver什么都不干,只对收集到的结果输出 + std::cout << "Result: " << val << '\n'; + } + + void SetError(std::exception_ptr err) { std::terminate(); } + + void SetStopped() { std::terminate(); } +}; + +TEST(Just, basic) { - sled::just_sender sender = sled::just(1); - auto op = connect(sender, sled::cout_receiver{}); + sled::Just(42).Connect(cout_receiver{}).Start(); + sled::Just(11).Connect(cout_receiver{}).Start(); +} + +TEST(Then, basic) +{ + auto s1 = sled::Just(42); + auto s2 = sled::Then(s1, [](int x) { return x + 1; }); + auto s3 = sled::Then(s2, [](int x) { return x + 1; }); + auto s4 = sled::Then(s3, [](int x) { return x + 1; }); + s4.Connect(cout_receiver{}).Start(); +} + +TEST(SyncWait, basic) +{ + auto s1 = sled::Just(42); + auto s2 = sled::Then(s1, [](int x) { return x + 1; }); + auto s3 = sled::Then(s2, [](int x) { return x + 1; }); + auto s4 = sled::Then(s3, [](int x) { return x + 1; }); + auto s5 = sled::SyncWait(s4).value(); + std::cout << "Result: " << s5 << '\n'; } From d304f6da0219d05f7fd31c267d425c700ebbe780 Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Fri, 22 Mar 2024 17:44:43 +0800 Subject: [PATCH 4/4] feat update --- CMakeLists.txt | 1 + include/sled/exec/detail/just.h | 2 +- include/sled/exec/detail/retry.h | 67 +++++++++++++++++++++++ include/sled/exec/detail/sync_wait.h | 6 +- include/sled/exec/detail/then.h | 13 +++-- include/sled/futures/base_cell.h | 15 +++++ include/sled/futures/future.h | 51 +++++++++++++++++ include/sled/futures/just.h | 28 ++++++++++ include/sled/futures/then.h | 40 ++++++++++++++ include/sled/system/thread.h | 35 ++---------- include/sled/system/thread_pool.h | 24 +++++--- include/sled/task_queue/task_queue_base.h | 59 ++++++++++---------- src/futures/future_test.cc | 8 +++ src/system/thread.cc | 39 ++++--------- src/system/thread_pool.cc | 36 ++++++++++-- src/task_queue/task_queue_base.cc | 15 ++++- 16 files changed, 330 insertions(+), 109 deletions(-) create mode 100644 include/sled/exec/detail/retry.h create mode 100644 include/sled/futures/base_cell.h create mode 100644 include/sled/futures/future.h create mode 100644 include/sled/futures/just.h create mode 100644 include/sled/futures/then.h create mode 100644 src/futures/future_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index ec49ef8..45260b0 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -120,6 +120,7 @@ if(SLED_BUILD_TESTS) src/exec/just_test.cc src/any_test.cc src/filesystem/path_test.cc + src/futures/future_test.cc # src/profiling/profiling_test.cc src/strings/base64_test.cc src/cleanup_test.cc diff --git a/include/sled/exec/detail/just.h b/include/sled/exec/detail/just.h index df38285..a75c370 100644 --- a/include/sled/exec/detail/just.h +++ b/include/sled/exec/detail/just.h @@ -12,7 +12,7 @@ struct JustOperation { TReceiver receiver; T value; - void Start() { receiver.SetValue(std::move(value)); } + void Start() { receiver.SetValue(value); } }; template diff --git a/include/sled/exec/detail/retry.h b/include/sled/exec/detail/retry.h new file mode 100644 index 0000000..ec16a18 --- /dev/null +++ b/include/sled/exec/detail/retry.h @@ -0,0 +1,67 @@ +#ifndef SLED_EXEC_DETAIL_RETRY_H +#define SLED_EXEC_DETAIL_RETRY_H +#include +#include +#include +#pragma once + +#include "traits.h" + +namespace sled { + +struct RetryState { + int retry_count; + bool need_retry; +}; + +template +struct RetryReceiver { + TReceiver receiver; + std::shared_ptr state; + + template + void SetValue(T &&value) + { + receiver.SetValue(value); + } + + void SetError(std::exception_ptr err) + { + if (state->retry_count < 0) {} + } + + void SetStopped() { receiver.SetStopped(); } +}; + +template +struct RetryOperation { + ConnectResultT> op; + std::shared_ptr state; + + void Start() {} +}; + +template +struct RetrySender { + using S = typename std::remove_cv::type>::type; + using result_t = SenderResultT; + S sender; + int retry_count; + + template + RetryOperation Connect(TReceiver &&receiver) + { + auto retry_state = std::make_shared(new RetryState{retry_count, false}); + return {sender.Connect(RetryReceiver{receiver, retry_state}), retry_state}; + } +}; + +template +RetrySender +Retry(TSender &&sender, int retry_count) +{ + return {std::forward(sender), retry_count}; +} + +}// namespace sled +#endif// SLED_EXEC_DETAIL_RETRY_H diff --git a/include/sled/exec/detail/sync_wait.h b/include/sled/exec/detail/sync_wait.h index 9b380c3..63dbace 100644 --- a/include/sled/exec/detail/sync_wait.h +++ b/include/sled/exec/detail/sync_wait.h @@ -8,7 +8,7 @@ #include namespace sled { -struct SyncWaitData { +struct SyncWaitState { sled::Mutex lock; sled::ConditionVariable cv; std::exception_ptr err; @@ -17,7 +17,7 @@ struct SyncWaitData { template struct SyncWaitReceiver { - SyncWaitData &data; + SyncWaitState &data; sled::optional &value; void SetValue(T &&val) @@ -49,7 +49,7 @@ sled::optional> SyncWait(TSender sender) { using T = SenderResultT; - SyncWaitData data; + SyncWaitState data; sled::optional value; auto op = sender.Connect(SyncWaitReceiver{data, value}); diff --git a/include/sled/exec/detail/then.h b/include/sled/exec/detail/then.h index 2df6db5..77b403b 100644 --- a/include/sled/exec/detail/then.h +++ b/include/sled/exec/detail/then.h @@ -15,7 +15,11 @@ struct ThenReceiver { template void SetValue(T &&value) { - receiver.SetValue(func(std::forward(value))); + try { + receiver.SetValue(func(std::forward(value))); + } catch (...) { + receiver.SetError(std::current_exception()); + } } void SetError(std::exception_ptr err) { receiver.SetError(err); } @@ -32,8 +36,9 @@ struct ThenOperation { template struct ThenSender { - using result_t = typename eggs::invoke_result_t>; - TSender sender; + using S = typename std::remove_cv::type>::type; + using result_t = typename eggs::invoke_result_t>; + S sender; F func; template @@ -45,7 +50,7 @@ struct ThenSender { template ThenSender -Then(TSender sender, F &&func) +Then(TSender &&sender, F &&func) { return {std::forward(sender), std::forward(func)}; } diff --git a/include/sled/futures/base_cell.h b/include/sled/futures/base_cell.h new file mode 100644 index 0000000..e13fee0 --- /dev/null +++ b/include/sled/futures/base_cell.h @@ -0,0 +1,15 @@ +#ifndef SLED_FUTURES_BASE_CELL_H +#define SLED_FUTURES_BASE_CELL_H +#include +#pragma once + +namespace sled { +namespace futures { + +struct BaseCell { + void *scheduler; +}; +}// namespace futures + +}// namespace sled +#endif// SLED_FUTURES_BASE_CELL_H diff --git a/include/sled/futures/future.h b/include/sled/futures/future.h new file mode 100644 index 0000000..8d06c56 --- /dev/null +++ b/include/sled/futures/future.h @@ -0,0 +1,51 @@ +#ifndef SLED_FUTURES_FUTHRE_H +#define SLED_FUTURES_FUTHRE_H + +#include "sled/any.h" +#include "sled/exec/detail/invoke_result.h" +#include "sled/optional.h" +#include "sled/synchronization/mutex.h" +#include +#include + +namespace sled { + +template +class Future; +template +class Promise; + +template +struct FPState : std::enable_shared_from_this> { + sled::Mutex lock; + sled::optional data; + std::exception_ptr err; + bool done; + sled::any priv; +}; + +template +class Future { +public: + using result_t = T; + + Future(std::shared_ptr> state) : state_(state) {} + +private: + std::shared_ptr> state_; +}; + +template +class Promise { +public: + using result_t = T; + + void SetValue(T &&value) {} + + void SetError(std::exception_ptr err) {} + + Future GetFuture() {} +}; + +}// namespace sled +#endif// SLED_FUTURES_FUTHRE_H diff --git a/include/sled/futures/just.h b/include/sled/futures/just.h new file mode 100644 index 0000000..6265c33 --- /dev/null +++ b/include/sled/futures/just.h @@ -0,0 +1,28 @@ +#ifndef SLED_FUTURES_JUST_H +#define SLED_FUTURES_JUST_H +#include +#pragma once + +namespace sled { +namespace futures { +template +struct JustCell { + T value; + + template + void Start(R receiver) + { + receiver.SetValue(value); + } +}; + +template +JustCell +Just(T &&t) +{ + return {std::forward(t)}; +} + +}// namespace futures +}// namespace sled +#endif// SLED_FUTURES_JUST_H diff --git a/include/sled/futures/then.h b/include/sled/futures/then.h new file mode 100644 index 0000000..a709054 --- /dev/null +++ b/include/sled/futures/then.h @@ -0,0 +1,40 @@ + +#ifndef SLED_FUTURES_THEN_H +#define SLED_FUTURES_THEN_H +#include +#include +#pragma once + +namespace sled { +namespace futures { + +template +struct ThenCell { + S sender; + F func; + + // T value; + + template + void Start(R receiver) + { + sender.Start(); + } + + template + void SetValue(U &&value) + {} + + void SetError(std::exception_ptr err) {} +}; + +template +ThenCell +Then(S sender, F &&func) +{ + return {std::forward(sender), std::forward(func)}; +} + +}// namespace futures +}// namespace sled +#endif// SLED_FUTURES_THEN_H diff --git a/include/sled/system/thread.h b/include/sled/system/thread.h index 084f1ae..339cae2 100644 --- a/include/sled/system/thread.h +++ b/include/sled/system/thread.h @@ -59,25 +59,6 @@ public: Thread(const Thread &) = delete; Thread &operator=(const Thread &) = delete; - void BlockingCall(std::function functor, - const Location &location = Location::Current()) - { - BlockingCallImpl(functor, location); - } - - template::type, - typename = typename std::enable_if::value, - ReturnT>::type> - ReturnT BlockingCall(Functor &&functor, - const Location &location = Location::Current()) - { - ReturnT result; - BlockingCall([&] { result = std::forward(functor)(); }, - location); - return result; - } - static std::unique_ptr CreateWithSocketServer(); static std::unique_ptr Create(); static Thread *Current(); @@ -122,8 +103,7 @@ protected: bool operator<(const DelayedMessage &dmsg) const { return (dmsg.run_time_ms < run_time_ms) - || ((dmsg.run_time_ms == run_time_ms) - && (dmsg.message_number < message_number)); + || ((dmsg.run_time_ms == run_time_ms) && (dmsg.message_number < message_number)); } int64_t delay_ms; @@ -132,15 +112,12 @@ protected: mutable std::function functor; }; - void PostTaskImpl(std::function &&task, - const PostTaskTraits &traits, - const Location &location) override; + void PostTaskImpl(std::function &&task, const PostTaskTraits &traits, const Location &location) override; void PostDelayedTaskImpl(std::function &&task, TimeDelta delay, const PostDelayedTaskTraits &traits, const Location &location) override; - virtual void BlockingCallImpl(std::function functor, - const Location &location); + void BlockingCallImpl(std::function &&functor, const Location &location) override; void DoInit(); void DoDestroy(); @@ -150,8 +127,7 @@ private: std::function Get(int cmsWait); void Dispatch(std::function &&task); static void *PreRun(void *pv); - bool WrapCurrentWithThreadManager(ThreadManager *thread_manager, - bool need_synchronize_access); + bool WrapCurrentWithThreadManager(ThreadManager *thread_manager, bool need_synchronize_access); bool IsRunning(); // for ThreadManager @@ -171,8 +147,7 @@ private: std::unique_ptr thread_; bool owned_; - std::unique_ptr - task_queue_registration_; + std::unique_ptr task_queue_registration_; friend class ThreadManager; }; diff --git a/include/sled/system/thread_pool.h b/include/sled/system/thread_pool.h index 98e8945..05fc6b5 100644 --- a/include/sled/system/thread_pool.h +++ b/include/sled/system/thread_pool.h @@ -2,11 +2,12 @@ #ifndef SLED_SYSTEM_THREAD_POOL_H #define SLED_SYSTEM_THREAD_POOL_H #include "sled/system/fiber/scheduler.h" +#include "sled/system/thread.h" #include #include namespace sled { -class ThreadPool final { +class ThreadPool final : public TaskQueueBase { public: /** * @param num_threads The number of threads to create in the thread pool. If @@ -18,16 +19,25 @@ public: template auto submit(F &&f, Args &&...args) -> std::future { - std::function func = - std::bind(std::forward(f), std::forward(args)...); - auto task_ptr = - std::make_shared>(func); - scheduler->enqueue(marl::Task([task_ptr]() { (*task_ptr)(); })); + std::function func = std::bind(std::forward(f), std::forward(args)...); + auto task_ptr = std::make_shared>(func); + scheduler_->enqueue(marl::Task([task_ptr]() { (*task_ptr)(); })); return task_ptr->get_future(); } + void Delete() override; + +protected: + void PostTaskImpl(std::function &&task, const PostTaskTraits &traits, const Location &location) override; + + void PostDelayedTaskImpl(std::function &&task, + TimeDelta delay, + const PostDelayedTaskTraits &traits, + const Location &location) override; + private: - sled::Scheduler *scheduler; + sled::Scheduler *scheduler_; + std::unique_ptr delayed_thread_; }; }// namespace sled diff --git a/include/sled/task_queue/task_queue_base.h b/include/sled/task_queue/task_queue_base.h index 4c17666..10463ce 100644 --- a/include/sled/task_queue/task_queue_base.h +++ b/include/sled/task_queue/task_queue_base.h @@ -22,42 +22,34 @@ public: }; struct Deleter { - void operator()(TaskQueueBase *task_queue) const - { - task_queue->Delete(); - } + void operator()(TaskQueueBase *task_queue) const { task_queue->Delete(); } }; virtual void Delete() = 0; - inline void PostTask(std::function &&task, - const Location &location = Location::Current()) + inline void PostTask(std::function &&task, const Location &location = Location::Current()) { PostTaskImpl(std::move(task), PostTaskTraits{}, location); } - inline void PostDelayedTask(std::function &&task, - TimeDelta delay, - const Location &location = Location::Current()) + inline void + PostDelayedTask(std::function &&task, TimeDelta delay, const Location &location = Location::Current()) { - PostDelayedTaskImpl(std::move(task), delay, PostDelayedTaskTraits{}, - location); + PostDelayedTaskImpl(std::move(task), delay, PostDelayedTaskTraits{}, location); } - inline void - PostDelayedHighPrecisionTask(std::function &&task, - TimeDelta delay, - const Location &location = Location::Current()) + inline void PostDelayedHighPrecisionTask(std::function &&task, + TimeDelta delay, + const Location &location = Location::Current()) { static PostDelayedTaskTraits traits(true); PostDelayedTaskImpl(std::move(task), delay, traits, location); } - inline void - PostDelayedTaskWithPrecision(DelayPrecision precision, - std::function &&task, - TimeDelta delay, - const Location &location = Location::Current()) + inline void PostDelayedTaskWithPrecision(DelayPrecision precision, + std::function &&task, + TimeDelta delay, + const Location &location = Location::Current()) { switch (precision) { case DelayPrecision::kLow: @@ -69,6 +61,21 @@ public: } } + void BlockingCall(std::function functor, const Location &location = Location::Current()) + { + BlockingCallImpl(std::move(functor), location); + } + + template::type, + typename = typename std::enable_if::value, ReturnT>::type> + ReturnT BlockingCall(Functor &&functor, const Location &location = Location::Current()) + { + ReturnT result; + BlockingCall([&] { result = std::forward(functor)(); }, location); + return result; + } + static TaskQueueBase *Current(); bool IsCurrent() const { return Current() == this; }; @@ -77,20 +84,17 @@ protected: struct PostTaskTraits {}; struct PostDelayedTaskTraits { - PostDelayedTaskTraits(bool high_precision = false) - : high_precision(high_precision) - {} + PostDelayedTaskTraits(bool high_precision = false) : high_precision(high_precision) {} bool high_precision = false; }; - virtual void PostTaskImpl(std::function &&task, - const PostTaskTraits &traits, - const Location &location) = 0; + virtual void PostTaskImpl(std::function &&task, const PostTaskTraits &traits, const Location &location) = 0; virtual void PostDelayedTaskImpl(std::function &&task, TimeDelta delay, const PostDelayedTaskTraits &traits, const Location &location) = 0; + virtual void BlockingCallImpl(std::function &&task, const Location &location); virtual ~TaskQueueBase() = default; class CurrentTaskQueueSetter { @@ -98,8 +102,7 @@ protected: explicit CurrentTaskQueueSetter(TaskQueueBase *task_queue); ~CurrentTaskQueueSetter(); CurrentTaskQueueSetter(const CurrentTaskQueueSetter &) = delete; - CurrentTaskQueueSetter & - operator=(const CurrentTaskQueueSetter &) = delete; + CurrentTaskQueueSetter &operator=(const CurrentTaskQueueSetter &) = delete; private: TaskQueueBase *const previous_; diff --git a/src/futures/future_test.cc b/src/futures/future_test.cc new file mode 100644 index 0000000..bf568cb --- /dev/null +++ b/src/futures/future_test.cc @@ -0,0 +1,8 @@ +#include +#include + +TEST(Future, basic) +{ + // sled::Future x; + // auto res = x.Then([](int) {}); +} diff --git a/src/system/thread.cc b/src/system/thread.cc index 1ea853b..1ead5cc 100644 --- a/src/system/thread.cc +++ b/src/system/thread.cc @@ -45,8 +45,7 @@ void ThreadManager::RemoveInternal(Thread *message_queue) { MutexLock lock(&cirt_); - auto iter = std::find(message_queues_.begin(), message_queues_.end(), - message_queue); + auto iter = std::find(message_queues_.begin(), message_queues_.end(), message_queue); if (iter != message_queues_.end()) { message_queues_.erase(iter); } } @@ -96,8 +95,7 @@ ThreadManager::ProcessAllMessageQueueInternal() MutexLock lock(&cirt_); for (Thread *queue : message_queues_) { queues_not_done.fetch_add(1); - auto sub = - MakeCleanup([&queues_not_done] { queues_not_done.fetch_sub(1); }); + auto sub = MakeCleanup([&queues_not_done] { queues_not_done.fetch_sub(1); }); queue->PostDelayedTask([&sub] {}, TimeDelta::Zero()); } @@ -115,9 +113,7 @@ ThreadManager::SetCurrentThreadInternal(Thread *message_queue) Thread::Thread(SocketServer *ss) : Thread(ss, /*do_init=*/true) {} -Thread::Thread(std::unique_ptr ss) - : Thread(std::move(ss), /*do_init=*/true) -{} +Thread::Thread(std::unique_ptr ss) : Thread(std::move(ss), /*do_init=*/true) {} Thread::Thread(SocketServer *ss, bool do_init) : delayed_next_num_(0), @@ -131,11 +127,7 @@ Thread::Thread(SocketServer *ss, bool do_init) if (do_init) { DoInit(); } } -Thread::Thread(std::unique_ptr ss, bool do_init) - : Thread(ss.get(), do_init) -{ - own_ss_ = std::move(ss); -} +Thread::Thread(std::unique_ptr ss, bool do_init) : Thread(ss.get(), do_init) { own_ss_ = std::move(ss); } Thread::~Thread() { @@ -244,13 +236,10 @@ Thread::Get(int cmsWait) cmsNext = cmsDelayNext; } else { cmsNext = std::max(0, cmsTotal - cmsElapsed); - if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) { - cmsNext = cmsDelayNext; - } + if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) { cmsNext = cmsDelayNext; } } { - if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever - : TimeDelta::Millis(cmsNext), + if (!ss_->Wait(cmsNext == kForever ? SocketServer::kForever : TimeDelta::Millis(cmsNext), /*process_io=*/true)) { return nullptr; } @@ -266,9 +255,7 @@ Thread::Get(int cmsWait) } void -Thread::PostTaskImpl(std::function &&task, - const PostTaskTraits &traits, - const Location &location) +Thread::PostTaskImpl(std::function &&task, const PostTaskTraits &traits, const Location &location) { if (IsQuitting()) { return; } { @@ -303,8 +290,7 @@ Thread::PostDelayedTaskImpl(std::function &&task, } void -Thread::BlockingCallImpl(std::function functor, - const Location &location) +Thread::BlockingCallImpl(std::function &&functor, const Location &location) { if (IsQuitting()) { return; } if (IsCurrent()) { @@ -373,8 +359,7 @@ Thread::SetName(const std::string &name, const void *obj) void Thread::EnsureIsCurrentTaskQueue() { - task_queue_registration_.reset( - new TaskQueueBase::CurrentTaskQueueSetter(this)); + task_queue_registration_.reset(new TaskQueueBase::CurrentTaskQueueSetter(this)); } void @@ -426,8 +411,7 @@ Thread::PreRun(void *pv) } bool -Thread::WrapCurrentWithThreadManager(ThreadManager *thread_manager, - bool need_synchronize_access) +Thread::WrapCurrentWithThreadManager(ThreadManager *thread_manager, bool need_synchronize_access) { // assert(!IsRunning()); owned_ = false; @@ -498,8 +482,7 @@ Thread::Current() return thread; } -AutoSocketServerThread::AutoSocketServerThread(SocketServer *ss) - : Thread(ss, /*do_init=*/false) +AutoSocketServerThread::AutoSocketServerThread(SocketServer *ss) : Thread(ss, /*do_init=*/false) { DoInit(); old_thread_ = ThreadManager::Instance()->CurrentThread(); diff --git a/src/system/thread_pool.cc b/src/system/thread_pool.cc index 43ec03f..51773f5 100644 --- a/src/system/thread_pool.cc +++ b/src/system/thread_pool.cc @@ -1,15 +1,39 @@ #include "sled/system/thread_pool.h" +#include "sled/system/location.h" +#include "sled/task_queue/task_queue_base.h" namespace sled { ThreadPool::ThreadPool(int num_threads) { - if (num_threads == -1) { - num_threads = std::thread::hardware_concurrency(); - } - scheduler = new sled::Scheduler( - sled::Scheduler::Config().setWorkerThreadCount(num_threads)); + if (num_threads == -1) { num_threads = std::thread::hardware_concurrency(); } + scheduler_ = new sled::Scheduler(sled::Scheduler::Config().setWorkerThreadCount(num_threads)); } -ThreadPool::~ThreadPool() { delete scheduler; } +ThreadPool::~ThreadPool() { delete scheduler_; } + +void +ThreadPool::Delete() +{} + +void +ThreadPool::PostTaskImpl(std::function &&task, const PostTaskTraits &traits, const Location &location) +{ + scheduler_->enqueue(marl::Task([task] { task(); })); +} + +void +ThreadPool::PostDelayedTaskImpl(std::function &&task, + TimeDelta delay, + const PostDelayedTaskTraits &traits, + const Location &location) +{ + if (traits.high_precision) { + delayed_thread_->PostDelayedTaskWithPrecision(TaskQueueBase::DelayPrecision::kHigh, std::move(task), delay, + location); + } else { + delayed_thread_->PostDelayedTaskWithPrecision(TaskQueueBase::DelayPrecision::kLow, std::move(task), delay, + location); + } +} }// namespace sled diff --git a/src/task_queue/task_queue_base.cc b/src/task_queue/task_queue_base.cc index 877979f..47b2a78 100644 --- a/src/task_queue/task_queue_base.cc +++ b/src/task_queue/task_queue_base.cc @@ -1,4 +1,5 @@ #include "sled/task_queue/task_queue_base.h" +#include "sled/synchronization/event.h" namespace sled { namespace { @@ -11,12 +12,22 @@ TaskQueueBase::Current() return current; } -TaskQueueBase::CurrentTaskQueueSetter::CurrentTaskQueueSetter(TaskQueueBase *task_queue) - : previous_(current) +TaskQueueBase::CurrentTaskQueueSetter::CurrentTaskQueueSetter(TaskQueueBase *task_queue) : previous_(current) { current = task_queue; } TaskQueueBase::CurrentTaskQueueSetter::~CurrentTaskQueueSetter() { current = previous_; } +void +TaskQueueBase::BlockingCallImpl(std::function &&functor, const sled::Location &from) +{ + Event done; + PostTask([functor, &done] { + functor(); + done.Set(); + }); + done.Wait(Event::kForever); +} + }// namespace sled