diff --git a/.clang-format b/.clang-format index 56105dc..d512f71 100644 --- a/.clang-format +++ b/.clang-format @@ -3,8 +3,10 @@ BinPackParameters: false BasedOnStyle: LLVM AccessModifierOffset: -4 AlignAfterOpenBracket: Align +AlignArrayOfStructures: Left +AlignConsecutiveAssignments: Consecutive AlignConsecutiveAssignments: None -AlignOperands: DontAlign +AlignOperands: BreakBeforeBinaryOperators AllowAllArgumentsOnNextLine: false AllowAllConstructorInitializersOnNextLine: false AllowAllParametersOfDeclarationOnNextLine: false @@ -31,7 +33,7 @@ BraceWrapping: IndentBraces: false SplitEmptyFunction: false SplitEmptyRecord: true -BreakBeforeBinaryOperators: NonAssignment +BreakBeforeBinaryOperators: All BreakBeforeTernaryOperators: true BreakConstructorInitializers: BeforeColon ConstructorInitializerAllOnOneLineOrOnePerLine: true diff --git a/include/sled/futures/detail/base_future.h b/include/sled/futures/detail/base_future.h deleted file mode 100644 index 55285cb..0000000 --- a/include/sled/futures/detail/base_future.h +++ /dev/null @@ -1,211 +0,0 @@ -#ifndef SLED_FUTURES_DETAIL_BASE_FUTURE_H -#define SLED_FUTURES_DETAIL_BASE_FUTURE_H -#include "sled/log/log.h" -#include "sled/optional.h" -#include "sled/synchronization/mutex.h" -#include -#include - -namespace sled { - -namespace { -enum class State { - kCancel, - kPending, - kTimeout, - kError, - kValue, -}; -}; - -template -struct FutureState { - mutable sled::Mutex mutex; - mutable sled::ConditionVariable cond_var; - - sled::optional value; - std::exception_ptr error; - - State state = State::kPending; - - void AssertHasValue() const { ASSERT(state == State::kValue, "can't find value"); } - - void AssertHasError() const { ASSERT(state == State::kError, "can't find error"); } - - void AssertHasTimeout() const { ASSERT(state == State::kTimeout, "can't find timeout"); } - - void Wait(sled::MutexLock *lock_ptr = nullptr) const - { - if (lock_ptr) { - if (state != State::kPending) { return; } - cond_var.Wait(*lock_ptr, [this] { return state != State::kPending; }); - } else { - sled::MutexLock lock(&mutex); - if (state != State::kPending) { return; } - cond_var.Wait(lock, [this] { return state != State::kPending; }); - } - } - - void SetError(std::exception_ptr e) - { - sled::MutexLock lock(&mutex); - if (state == State::kCancel) { return; } - ASSERT(state == State::kPending, "state must be kPending"); - error = std::move(e); - state = State::kError; - cond_var.NotifyAll(); - } - - void SetTimeout() - { - sled::MutexLock lock(&mutex); - if (state == State::kCancel) { return; } - ASSERT(state == State::kPending, "state must be kPending"); - state = State::kTimeout; - cond_var.NotifyAll(); - } - - template - typename std::enable_if::value>::type SetValue(U &&val) - { - sled::MutexLock lock(&mutex); - if (state == State::kCancel) { return; } - ASSERT(state == State::kPending, "state must be kPending"); - value = std::forward(val); - state = State::kValue; - cond_var.NotifyAll(); - } -}; - -template<> -struct FutureState { - mutable sled::Mutex mutex; - mutable sled::ConditionVariable cond_var; - - std::exception_ptr error; - - State state = State::kPending; - - void AssertHasValue() const { ASSERT(state == State::kValue, "can't find value"); } - - void AssertHasError() const { ASSERT(state == State::kError, "can't find error"); } - - void AssertHasTimeout() const { ASSERT(state == State::kTimeout, "can't find timeout"); } - - void Wait(sled::MutexLock *lock_ptr = nullptr) const - { - if (lock_ptr) { - if (state != State::kPending) { return; } - cond_var.Wait(*lock_ptr, [this] { return state != State::kPending; }); - } else { - sled::MutexLock lock(&mutex); - if (state != State::kPending) { return; } - cond_var.Wait(lock, [this] { return state != State::kPending; }); - } - } - - void SetTimeout() - { - sled::MutexLock lock(&mutex); - if (state == State::kCancel) { return; } - ASSERT(state == State::kPending, "state must be kPending"); - state = State::kTimeout; - cond_var.NotifyAll(); - } - - void SetError(std::exception_ptr e) - { - sled::MutexLock lock(&mutex); - if (state == State::kCancel) { return; } - ASSERT(state == State::kPending, "state must be kPending"); - error = std::move(e); - state = State::kError; - cond_var.NotifyAll(); - } - - void SetValue() - { - sled::MutexLock lock(&mutex); - if (state == State::kCancel) { return; } - ASSERT(state == State::kPending, "state must be kPending"); - state = State::kValue; - cond_var.NotifyAll(); - } -}; - -template -class Future { -public: - // using ValueType = typename std::remove_reference::type; - Future(std::shared_ptr> state) : state_(std::move(state)) {} - - T Get() const & - { - sled::MutexLock lock(&state_->mutex); - state_->Wait(&lock); - state_->AssertHasValue(); - return state_->value.value(); - } - - T &Get() & - { - sled::MutexLock lock(&state_->mutex); - state_->Wait(&lock); - state_->AssertHasValue(); - return state_->value.value(); - } - - T &&Get() && - { - sled::MutexLock lock(&state_->mutex); - state_->Wait(&lock); - state_->AssertHasValue(); - return std::move(state_->value.value()); - } - -private: - std::shared_ptr> state_; -}; - -template<> -class Future { -public: - Future(std::shared_ptr> state) : state_(std::move(state)) {} - - void Wait() const { state_->Wait(); } - - void Get() const { Wait(); } - -protected: - std::shared_ptr> state_; -}; - -template -class Promise { -public: - Promise() : state_(new FutureState()) {} - - Future GetFuture() const { return Future(state_); } - - template - typename std::enable_if::value && std::is_convertible::value>::type SetValue(U &&val) - { - state_->SetValue(val); - } - - template - typename std::enable_if::value>::type SetValue() - { - state_->SetValue(); - } - - void SetError(std::exception_ptr e) { state_->SetError(e); } - - void SetTimeout() { state_->SetTimeout(); } - -private: - std::shared_ptr> state_; -}; - -}// namespace sled -#endif// SLED_FUTURES_DETAIL_BASE_FUTURE_H diff --git a/include/sled/futures/detail/delay.h b/include/sled/futures/detail/delay.h index c4c5641..bc6e8a1 100644 --- a/include/sled/futures/detail/delay.h +++ b/include/sled/futures/detail/delay.h @@ -47,6 +47,7 @@ struct DelayOperation { template struct DelaySender { using result_t = typename S::result_t; + using this_type = DelaySender; S sender; sled::TimeDelta delta; @@ -55,6 +56,12 @@ struct DelaySender { { return {sender.Connect(DelayReceiver{receiver, delta})}; } + + template + friend ContinueResultT operator|(this_type sender, Lazy lazy) + { + return lazy.Continue(sender); + } }; template @@ -64,6 +71,22 @@ Delay(S sender, sled::TimeDelta const &delta) return {sender, delta}; } +struct DelayLazy { + sled::TimeDelta delta; + + template + DelaySender Continue(S sender) const + { + return {sender, delta}; + } +}; + +inline DelayLazy +Delay(sled::TimeDelta const &delta) +{ + return {delta}; +} + }// namespace detail }// namespace sled #endif// SLED_FUTURES_DETAIL_DELAY_H diff --git a/include/sled/futures/detail/future.h b/include/sled/futures/detail/future.h new file mode 100644 index 0000000..45be496 --- /dev/null +++ b/include/sled/futures/detail/future.h @@ -0,0 +1,103 @@ +#ifndef SLED_FUTURES_DETAIL_FUTURE_H +#define SLED_FUTURES_DETAIL_FUTURE_H +#include "sled/synchronization/mutex.h" +#include "traits.h" + +namespace sled { +namespace detail { + +namespace { +enum class State { + kPending, + kError, + kValue, +}; + +template +struct FutureState { + + sled::Mutex mutex; + sled::ConditionVariable cv; + State state = State::kPending; + + union { + T value; + std::exception_ptr error; + }; +}; +}// namespace + +template +struct FutureOperation { + std::shared_ptr> state; + R receiver; + mutable bool stopped = false; + + void Start() + { + if (stopped) { return; } + sled::MutexLock lock(&state->mutex); + state->cv.Wait(lock, [&] { return state->state != State::kPending; }); + if (state->state == State::kValue) { + receiver.SetValue(std::move(state->value)); + } else { + receiver.SetError(state->error); + } + } + + void Stop() + { + if (stopped) { return; } + stopped = true; + receiver.SetStopped(); + } +}; + +template +struct FutureSender { + using result_t = T; + using this_type = FutureSender; + std::shared_ptr> state; + + template + FutureOperation Connect(R receiver) + { + return {state, receiver}; + } + + template + friend ContinueResultT operator|(this_type sender, Lazy lazy) + { + return lazy.Continue(sender); + } +}; + +template +struct Promise { + std::shared_ptr> state; + + Promise() : state(std::make_shared>()) {} + + template + typename std::enable_if::value>::type SetValue(U &&val) + { + sled::MutexLock lock(&state->mutex); + state->value = std::forward(val); + state->state = State::kValue; + state->cv.NotifyAll(); + } + + void SetError(std::exception_ptr e) + { + sled::MutexLock lock(&state->mutex); + state->error = e; + state->state = State::kError; + state->cv.NotifyAll(); + } + + FutureSender GetFuture() { return {state}; } +}; + +}// namespace detail +}// namespace sled +#endif// SLED_FUTURES_DETAIL_FUTURE_H diff --git a/include/sled/futures/detail/just.h b/include/sled/futures/detail/just.h index 38a78f8..2554bd9 100644 --- a/include/sled/futures/detail/just.h +++ b/include/sled/futures/detail/just.h @@ -12,7 +12,7 @@ struct JustOperation { T value; R receiver; - void Start() { receiver.SetValue(std::move(value)); } + void Start() { receiver.SetValue(std::forward(value)); } void Stop() { receiver.SetStopped(); } }; @@ -20,6 +20,7 @@ struct JustOperation { template struct JustSender { using result_t = T; + using this_type = JustSender; T value; template @@ -27,6 +28,12 @@ struct JustSender { { return {std::forward(value), receiver}; } + + template + friend ContinueResultT operator|(this_type sender, Lazy lazy) + { + return lazy.Continue(sender); + } }; template diff --git a/include/sled/futures/detail/on.h b/include/sled/futures/detail/on.h new file mode 100644 index 0000000..cb07a18 --- /dev/null +++ b/include/sled/futures/detail/on.h @@ -0,0 +1,114 @@ +#ifndef SLED_FUTURES_DETAIL_VIA_H +#define SLED_FUTURES_DETAIL_VIA_H +#include "traits.h" +#include +#include +#include + +namespace sled { +namespace detail { +template +struct OnReceiver { + R receiver; + F schedule; + bool stopped = false; + + template + typename std::enable_if::value + || (!std::is_copy_assignable::value && !std::is_copy_constructible::value)>::type + SetValue(U &&val) + { + static_assert(std::is_rvalue_reference::value, "U must be an rvalue reference"); + if (stopped) { return; } + try { + auto moved = make_move_on_copy(val); + schedule([this, moved]() mutable { receiver.SetValue(std::move(moved.value)); }); + } catch (...) { + SetError(std::current_exception()); + } + } + + template + typename std::enable_if::value + && (std::is_copy_assignable::value || std::is_copy_constructible::value)>::type + SetValue(U &&val) + { + if (stopped) { return; } + try { + schedule([this, val] { receiver.SetValue(val); }); + } catch (...) { + SetError(std::current_exception()); + } + } + + void SetError(std::exception_ptr e) + { + if (stopped) { return; } + receiver.SetError(e); + } + + void SetStopped() + { + if (stopped) { return; } + stopped = true; + receiver.SetStopped(); + } +}; + +template +struct OnOperation { + ConnectResultT op; + + void Start() { op.Start(); } + + void Stop() { op.Stop(); } +}; + +template +struct OnSender { + using result_t = typename S::result_t; + using this_type = OnSender; + S sender; + F schedule; + + template + OnOperation> Connect(R receiver) + { + return {sender.Connect(OnReceiver{receiver, schedule})}; + } + + template + friend ContinueResultT operator|(this_type sender, Lazy lazy) + { + return lazy.Continue(sender); + } +}; + +template +struct OnLazy { + F func; + + template + OnSender Continue(S sender) const + { + return {sender, func}; + } +}; + +template +OnSender +On(S sender, F &&schedule) +{ + return {sender, std::forward(schedule)}; +} + +template +OnLazy +On(F &&schedule) +{ + return {schedule}; +} + +}// namespace detail +}// namespace sled +#endif// SLED_FUTURES_DETAIL_VIA_H diff --git a/include/sled/futures/detail/retry.h b/include/sled/futures/detail/retry.h index 106d783..6ed8100 100644 --- a/include/sled/futures/detail/retry.h +++ b/include/sled/futures/detail/retry.h @@ -97,6 +97,7 @@ struct RetryOperation { template struct RetrySender { using result_t = typename S::result_t; + using this_type = RetrySender; S sender; int retry_count; @@ -107,6 +108,12 @@ struct RetrySender { auto op = sender.Connect(RetryReceiver{state, receiver}); return {retry_count, state, op}; } + + template + friend ContinueResultT operator|(this_type sender, Lazy lazy) + { + return lazy.Continue(sender); + } }; template @@ -116,6 +123,22 @@ Retry(S sender, int retry_count) return {sender, retry_count}; } +struct RetryLazy { + int retry_count; + + template + RetrySender Continue(S sender) const + { + return {sender, retry_count}; + } +}; + +inline RetryLazy +Retry(int retry_count) +{ + return {retry_count}; +} + }// namespace detail }// namespace sled #endif// SLED_FUTURES_DETAIL_RETRY_H diff --git a/include/sled/futures/detail/then.h b/include/sled/futures/detail/then.h index 2b497e8..b46b00b 100644 --- a/include/sled/futures/detail/then.h +++ b/include/sled/futures/detail/then.h @@ -50,6 +50,7 @@ struct ThenOperation { template struct ThenSender { using result_t = invoke_result_t::result_t>; + using this_type = ThenSender; S sender; F func; @@ -58,6 +59,12 @@ struct ThenSender { { return {sender.Connect(ThenReceiver{receiver, func})}; } + + template + friend ContinueResultT operator|(this_type sender, Lazy lazy) + { + return lazy.Continue(sender); + } }; template @@ -67,6 +74,24 @@ Then(S &&sender, F &&func) return {std::forward(sender), std::forward(func)}; } +template +struct ThenLazy { + F func; + + template + ThenSender Continue(S sender) const + { + return {sender, func}; + } +}; + +template +ThenLazy +Then(F &&func) +{ + return {func}; +} + }// namespace detail }// namespace sled #endif// SLED_FUTURES_DETAIL_THEN_H diff --git a/include/sled/futures/detail/traits.h b/include/sled/futures/detail/traits.h index 63b9b14..868f0aa 100644 --- a/include/sled/futures/detail/traits.h +++ b/include/sled/futures/detail/traits.h @@ -3,6 +3,7 @@ #include "sled/exec/detail/invoke_result.h" +#include #include namespace sled { @@ -15,12 +16,41 @@ struct ConnectResult { template using ConnectResultT = typename ConnectResult::type; +template +struct ContinueResult { + typedef decltype(std::declval().Continue(std::declval())) type; +}; + +template +using ContinueResultT = typename ContinueResult::type; + template using invoke_result_t = eggs::invoke_result_t; template using decay_t = typename std::decay::type; +template +struct move_on_copy { + using type = typename std::remove_reference::type; + + move_on_copy(type &&value) : value(std::move(value)) {} + + move_on_copy(const move_on_copy &other) : value(std::move(other.value)) {} + + move_on_copy(move_on_copy &&) = delete; + move_on_copy &operator=(const move_on_copy &) = delete; + + mutable type value; +}; + +template +move_on_copy +make_move_on_copy(T &&value) +{ + return {std::move(value)}; +} + }// namespace detail }// namespace sled #endif// SLED_FUTURES_DETAIL_TRAITS_H diff --git a/include/sled/futures/detail/via.h b/include/sled/futures/detail/via.h deleted file mode 100644 index c743216..0000000 --- a/include/sled/futures/detail/via.h +++ /dev/null @@ -1,74 +0,0 @@ -#ifndef SLED_FUTURES_DETAIL_VIA_H -#define SLED_FUTURES_DETAIL_VIA_H -#include "traits.h" -#include -#include -#include - -namespace sled { -namespace detail { -template -struct ViaReceiver { - R receiver; - F schedule; - bool stopped = false; - - template - void SetValue(U &&val) - { - if (stopped) { return; } - try { - // auto func = std::bind(&R::SetValue, &receiver, std::forward(val)); - // schedule(std::move(func)); - schedule([this, val]() mutable { receiver.SetValue(std::move(val)); }); - } catch (...) { - SetError(std::current_exception()); - } - } - - void SetError(std::exception_ptr e) - { - if (stopped) { return; } - receiver.SetError(e); - } - - void SetStopped() - { - if (stopped) { return; } - stopped = true; - receiver.SetStopped(); - } -}; - -template -struct ViaOperation { - ConnectResultT op; - - void Start() { op.Start(); } - - void Stop() { op.Stop(); } -}; - -template -struct ViaSender { - using result_t = typename S::result_t; - S sender; - F schedule; - - template - ViaOperation> Connect(R receiver) - { - return {sender.Connect(ViaReceiver{receiver, schedule})}; - } -}; - -template -ViaSender -Via(S sender, F &&schedule) -{ - return {sender, std::forward(schedule)}; -} - -}// namespace detail -}// namespace sled -#endif// SLED_FUTURES_DETAIL_VIA_H