From a10703176cbb7f1a3300f59c081f9eec9b5edd68 Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Mon, 25 Mar 2024 23:29:18 +0800 Subject: [PATCH] fix --- include/sled/futures/detail/delay.h | 69 ++++++++++++++++++++++++++++ include/sled/futures/detail/just.h | 2 + include/sled/futures/detail/retry.h | 1 + include/sled/futures/detail/then.h | 1 + include/sled/futures/detail/traits.h | 5 ++ include/sled/futures/detail/via.h | 6 ++- 6 files changed, 82 insertions(+), 2 deletions(-) create mode 100644 include/sled/futures/detail/delay.h diff --git a/include/sled/futures/detail/delay.h b/include/sled/futures/detail/delay.h new file mode 100644 index 0000000..c4c5641 --- /dev/null +++ b/include/sled/futures/detail/delay.h @@ -0,0 +1,69 @@ +#ifndef SLED_FUTURES_DETAIL_DELAY_H +#define SLED_FUTURES_DETAIL_DELAY_H + +#include "sled/units/time_delta.h" +#include "traits.h" +#include + +namespace sled { +namespace detail { + +template +struct DelayReceiver { + R receiver; + sled::TimeDelta delta; + bool stopped = false; + + template + void SetValue(U &&val) + { + if (stopped) { return; } + receiver.SetValue(std::forward(val)); + } + + void SetError(std::exception_ptr e) + { + if (stopped) { return; } + receiver.SetError(e); + } + + void SetStopped() + { + if (stopped) { return; } + stopped = true; + receiver.SetStopped(); + } +}; + +template +struct DelayOperation { + ConnectResultT op; + + void Start() { op.Start(); } + + void Stop() { op.Stop(); } +}; + +template +struct DelaySender { + using result_t = typename S::result_t; + S sender; + sled::TimeDelta delta; + + template + DelayOperation> Connect(R receiver) + { + return {sender.Connect(DelayReceiver{receiver, delta})}; + } +}; + +template +DelaySender +Delay(S sender, sled::TimeDelta const &delta) +{ + return {sender, delta}; +} + +}// namespace detail +}// namespace sled +#endif// SLED_FUTURES_DETAIL_DELAY_H diff --git a/include/sled/futures/detail/just.h b/include/sled/futures/detail/just.h index 58ac377..cba8b15 100644 --- a/include/sled/futures/detail/just.h +++ b/include/sled/futures/detail/just.h @@ -1,6 +1,7 @@ #ifndef SLED_FUTURES_DETAIL_JUST_H #define SLED_FUTURES_DETAIL_JUST_H +#include "traits.h" #include namespace sled { @@ -18,6 +19,7 @@ struct JustOperation { template struct JustSender { + using result_t = T; T value; template diff --git a/include/sled/futures/detail/retry.h b/include/sled/futures/detail/retry.h index d82736c..106d783 100644 --- a/include/sled/futures/detail/retry.h +++ b/include/sled/futures/detail/retry.h @@ -96,6 +96,7 @@ struct RetryOperation { template struct RetrySender { + using result_t = typename S::result_t; S sender; int retry_count; diff --git a/include/sled/futures/detail/then.h b/include/sled/futures/detail/then.h index 811941b..075d152 100644 --- a/include/sled/futures/detail/then.h +++ b/include/sled/futures/detail/then.h @@ -49,6 +49,7 @@ struct ThenOperation { template struct ThenSender { + using result_t = invoke_result_t; S sender; F func; diff --git a/include/sled/futures/detail/traits.h b/include/sled/futures/detail/traits.h index 332f930..fe709a9 100644 --- a/include/sled/futures/detail/traits.h +++ b/include/sled/futures/detail/traits.h @@ -1,6 +1,8 @@ #ifndef SLED_FUTURES_DETAIL_TRAITS_H #define SLED_FUTURES_DETAIL_TRAITS_H +#include "sled/exec/detail/invoke_result.h" + #include namespace sled { @@ -13,6 +15,9 @@ struct ConnectResult { template using ConnectResultT = typename ConnectResult::type; +template +using invoke_result_t = eggs::invoke_result_t; + }// namespace detail }// namespace sled #endif// SLED_FUTURES_DETAIL_TRAITS_H diff --git a/include/sled/futures/detail/via.h b/include/sled/futures/detail/via.h index 7aa8990..c743216 100644 --- a/include/sled/futures/detail/via.h +++ b/include/sled/futures/detail/via.h @@ -18,8 +18,9 @@ struct ViaReceiver { { if (stopped) { return; } try { - auto func = std::bind(&R::SetValue, &receiver, std::forward(val)); - schedule(std::move(func)); + // auto func = std::bind(&R::SetValue, &receiver, std::forward(val)); + // schedule(std::move(func)); + schedule([this, val]() mutable { receiver.SetValue(std::move(val)); }); } catch (...) { SetError(std::current_exception()); } @@ -50,6 +51,7 @@ struct ViaOperation { template struct ViaSender { + using result_t = typename S::result_t; S sender; F schedule;