diff --git a/CMakeLists.txt b/CMakeLists.txt index 4870106..c3a6264 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -61,6 +61,8 @@ target_sources( src/sled/debugging/demangle.cc src/sled/debugging/symbolize.cc src/sled/event_bus/event_bus.cc + src/sled/futures/future.cc + src/sled/futures/internal/failure_handling.cc src/sled/filesystem/path.cc src/sled/log/log.cc src/sled/network/async_resolver.cc @@ -210,6 +212,7 @@ if(SLED_BUILD_TESTS) sled_add_test(NAME sled_inja_test SRCS src/sled/nonstd/inja_test.cc) sled_add_test(NAME sled_fsm_test SRCS src/sled/nonstd/fsm_test.cc) sled_add_test(NAME sled_timestamp_test SRCS src/sled/units/timestamp_test.cc) + sled_add_test(NAME sled_future_test SRCS src/sled/futures/future_test.cc) sled_add_test( NAME sled_cache_test SRCS src/sled/cache/lru_cache_test.cc src/sled/cache/fifo_cache_test.cc src/sled/cache/expire_cache_test.cc) diff --git a/src/sled/async/async_test.cc b/src/sled/async/async_test.cc index 4e669dd..892cfe4 100644 --- a/src/sled/async/async_test.cc +++ b/src/sled/async/async_test.cc @@ -11,7 +11,7 @@ TEST_SUITE("Async") CHECK_EQ(value, 126); return value; }); - task1.wait(); + // task1.wait(); CHECK_EQ(126, task1.get()); } diff --git a/src/sled/futures/detail/delay.h b/src/sled/futures/detail/delay.h deleted file mode 100644 index 5771953..0000000 --- a/src/sled/futures/detail/delay.h +++ /dev/null @@ -1,92 +0,0 @@ -#ifndef SLED_FUTURES_DETAIL_DELAY_H -#define SLED_FUTURES_DETAIL_DELAY_H - -#include "sled/units/time_delta.h" -#include "traits.h" -#include - -namespace sled { -namespace detail { - -template -struct DelayReceiver { - R receiver; - sled::TimeDelta delta; - bool stopped = false; - - template - void SetValue(U &&val) - { - if (stopped) { return; } - receiver.SetValue(std::forward(val)); - } - - void SetError(std::exception_ptr e) - { - if (stopped) { return; } - receiver.SetError(e); - } - - void SetStopped() - { - if (stopped) { return; } - stopped = true; - receiver.SetStopped(); - } -}; - -template -struct DelayOperation { - ConnectResultT op; - - void Start() { op.Start(); } - - void Stop() { op.Stop(); } -}; - -template -struct DelaySender { - using result_t = typename S::result_t; - using this_type = DelaySender; - S sender; - sled::TimeDelta delta; - - template - DelayOperation> Connect(R receiver) - { - return {sender.Connect(DelayReceiver{receiver, delta})}; - } - - template - friend ContinueResultT operator|(this_type sender, Lazy lazy) - { - return lazy.Continue(sender); - } -}; - -template -DelaySender -Delay(S sender, sled::TimeDelta const &delta) -{ - return {sender, delta}; -} - -struct DelayLazy { - sled::TimeDelta delta; - - template - DelaySender Continue(S sender) const - { - return {sender, delta}; - } -}; - -inline DelayLazy -Delay(sled::TimeDelta const &delta) -{ - return {delta}; -} - -}// namespace detail -}// namespace sled -#endif// SLED_FUTURES_DETAIL_DELAY_H diff --git a/src/sled/futures/detail/future.h b/src/sled/futures/detail/future.h deleted file mode 100644 index 9d30a7c..0000000 --- a/src/sled/futures/detail/future.h +++ /dev/null @@ -1,103 +0,0 @@ -#ifndef SLED_FUTURES_DETAIL_FUTURE_H -#define SLED_FUTURES_DETAIL_FUTURE_H -#include "sled/synchronization/mutex.h" -#include "traits.h" - -namespace sled { -namespace detail { - -namespace { -enum class State { - kPending, - kError, - kValue, -}; - -template -struct FutureState { - - sled::Mutex mutex; - sled::ConditionVariable cv; - State state = State::kPending; - - union { - T value; - std::exception_ptr error; - }; -}; -}// namespace - -template -struct FutureOperation { - std::shared_ptr> state; - R receiver; - mutable bool stopped = false; - - void Start() - { - if (stopped) { return; } - sled::MutexLock lock(&state->mutex); - state->cv.Wait(lock, [&] { return state->state != State::kPending; }); - if (state->state == State::kValue) { - receiver.SetValue(std::move(state->value)); - } else { - receiver.SetError(state->error); - } - } - - void Stop() - { - if (stopped) { return; } - stopped = true; - receiver.SetStopped(); - } -}; - -template -struct FutureSender { - using result_t = T; - using this_type = FutureSender; - std::shared_ptr> state; - - template - FutureOperation Connect(R receiver) - { - return {state, receiver}; - } - - template - friend ContinueResultT operator|(this_type sender, Lazy lazy) - { - return lazy.Continue(sender); - } -}; - -template -struct Promise { - std::shared_ptr> state; - - Promise() : state(std::make_shared>()) {} - - template - typename std::enable_if::value>::type SetValue(U &&val) - { - sled::MutexLock lock(&state->mutex); - state->value = std::forward(val); - state->state = State::kValue; - state->cv.NotifyAll(); - } - - void SetError(std::exception_ptr e) - { - sled::MutexLock lock(&state->mutex); - state->error = e; - state->state = State::kError; - state->cv.NotifyAll(); - } - - FutureSender GetFuture() { return {state}; } -}; - -}// namespace detail -}// namespace sled -#endif// SLED_FUTURES_DETAIL_FUTURE_H diff --git a/src/sled/futures/detail/just.h b/src/sled/futures/detail/just.h deleted file mode 100644 index 4882c0a..0000000 --- a/src/sled/futures/detail/just.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef SLED_FUTURES_DETAIL_JUST_H -#define SLED_FUTURES_DETAIL_JUST_H - -#include "traits.h" -#include - -namespace sled { -namespace detail { - -template -struct JustOperation { - T value; - R receiver; - - void Start() { receiver.SetValue(std::forward(value)); } - - void Stop() { receiver.SetStopped(); } -}; - -template -struct JustSender { - using result_t = T; - using this_type = JustSender; - T value; - - template - JustOperation Connect(R receiver) - { - return {std::forward(value), receiver}; - } - - template - friend ContinueResultT operator|(this_type sender, Lazy lazy) - { - return lazy.Continue(sender); - } -}; - -template -JustSender -Just(T &&value) -{ - return {std::forward(value)}; -} - -}// namespace detail -}// namespace sled -#endif// SLED_FUTURES_DETAIL_JUST_H diff --git a/src/sled/futures/detail/just_test.cc b/src/sled/futures/detail/just_test.cc deleted file mode 100644 index 39b7b37..0000000 --- a/src/sled/futures/detail/just_test.cc +++ /dev/null @@ -1,3 +0,0 @@ -#include - -TEST(Just, basic) { auto s1 = sled::detail::Just(42); } diff --git a/src/sled/futures/detail/on.h b/src/sled/futures/detail/on.h deleted file mode 100644 index 0b9491f..0000000 --- a/src/sled/futures/detail/on.h +++ /dev/null @@ -1,114 +0,0 @@ -#ifndef SLED_FUTURES_DETAIL_VIA_H -#define SLED_FUTURES_DETAIL_VIA_H -#include "traits.h" -#include -#include -#include - -namespace sled { -namespace detail { -template -struct OnReceiver { - R receiver; - F schedule; - bool stopped = false; - - template - typename std::enable_if::value - || (!std::is_copy_assignable::value && !std::is_copy_constructible::value)>::type - SetValue(U &&val) - { - static_assert(std::is_rvalue_reference::value, "U must be an rvalue reference"); - if (stopped) { return; } - try { - auto moved = make_move_on_copy(val); - schedule([this, moved]() mutable { receiver.SetValue(std::move(moved.value)); }); - } catch (...) { - SetError(std::current_exception()); - } - } - - template - typename std::enable_if::value - && (std::is_copy_assignable::value || std::is_copy_constructible::value)>::type - SetValue(U &&val) - { - if (stopped) { return; } - try { - schedule([this, val] { receiver.SetValue(val); }); - } catch (...) { - SetError(std::current_exception()); - } - } - - void SetError(std::exception_ptr e) - { - if (stopped) { return; } - receiver.SetError(e); - } - - void SetStopped() - { - if (stopped) { return; } - stopped = true; - receiver.SetStopped(); - } -}; - -template -struct OnOperation { - ConnectResultT op; - - void Start() { op.Start(); } - - void Stop() { op.Stop(); } -}; - -template -struct OnSender { - using result_t = typename S::result_t; - using this_type = OnSender; - S sender; - F schedule; - - template - OnOperation> Connect(R receiver) - { - return {sender.Connect(OnReceiver{receiver, schedule})}; - } - - template - friend ContinueResultT operator|(this_type sender, Lazy lazy) - { - return lazy.Continue(sender); - } -}; - -template -struct OnLazy { - F func; - - template - OnSender Continue(S sender) const - { - return {sender, func}; - } -}; - -template -OnSender -On(S sender, F &&schedule) -{ - return {sender, std::forward(schedule)}; -} - -template -OnLazy -On(F &&schedule) -{ - return {schedule}; -} - -}// namespace detail -}// namespace sled -#endif// SLED_FUTURES_DETAIL_VIA_H diff --git a/src/sled/futures/detail/retry.h b/src/sled/futures/detail/retry.h deleted file mode 100644 index 38a4195..0000000 --- a/src/sled/futures/detail/retry.h +++ /dev/null @@ -1,144 +0,0 @@ -#ifndef SLED_FUTURES_DETAIL_RETRY_H -#define SLED_FUTURES_DETAIL_RETRY_H - -#include "sled/log/log.h" -#include "sled/synchronization/mutex.h" -#include "traits.h" -#include - -namespace sled { -namespace detail { - -namespace { -struct RetryState { - enum State { kPending, kDone, kRetry }; - - sled::Mutex mutex; - sled::ConditionVariable cv; - int retry_count = 0; - State state = kPending; -}; -}// namespace - -template -struct RetryReceiver { - std::shared_ptr state; - R receiver; - bool stopped = false; - - template - void SetValue(U &&val) - { - { - sled::MutexLock lock(&state->mutex); - if (stopped) { return; } - state->state = RetryState::kDone; - state->cv.NotifyAll(); - } - receiver.SetValue(std::forward(val)); - } - - void SetError(std::exception_ptr e) - { - // notify - { - sled::MutexLock lock(&state->mutex); - if (stopped) { return; } - if (state->retry_count > 0) { - --state->retry_count; - state->state = RetryState::kRetry; - return; - } else { - state->state = RetryState::kDone; - state->cv.NotifyAll(); - } - } - receiver.SetError(e); - } - - void SetStopped() - { - { - sled::MutexLock lock(&state->mutex); - if (stopped) { return; } - stopped = true; - state->state = RetryState::kDone; - state->cv.NotifyAll(); - } - receiver.SetStopped(); - } -}; - -template -struct RetryOperation { - int retry_count; - std::shared_ptr state; - ConnectResultT op; - - void Start() - { - { - sled::MutexLock lock(&state->mutex); - state->retry_count = retry_count; - state->state = RetryState::kPending; - } - do { - op.Start(); - sled::MutexLock lock(&state->mutex); - state->cv.Wait(lock, [this] { return state->state != RetryState::kPending; }); - if (state->state == RetryState::kDone) { break; } - state->state = RetryState::kPending; - } while (true); - } - - void Stop() { op.Stop(); } -}; - -template -struct RetrySender { - using result_t = typename S::result_t; - using this_type = RetrySender; - S sender; - int retry_count; - - template - RetryOperation> Connect(R receiver) - { - auto state = std::make_shared(); - auto op = sender.Connect(RetryReceiver{state, receiver}); - return {retry_count, state, op}; - } - - template - friend ContinueResultT operator|(this_type sender, Lazy lazy) - { - return lazy.Continue(sender); - } -}; - -template -RetrySender -Retry(S sender, int retry_count) -{ - return {sender, retry_count}; -} - -struct RetryLazy { - int retry_count; - - template - RetrySender Continue(S sender) const - { - return {sender, retry_count}; - } -}; - -inline RetryLazy -Retry(int retry_count) -{ - return {retry_count}; -} - -}// namespace detail -}// namespace sled -#endif// SLED_FUTURES_DETAIL_RETRY_H diff --git a/src/sled/futures/detail/then.h b/src/sled/futures/detail/then.h deleted file mode 100644 index f1faaac..0000000 --- a/src/sled/futures/detail/then.h +++ /dev/null @@ -1,97 +0,0 @@ -#ifndef SLED_FUTURES_DETAIL_THEN_H -#define SLED_FUTURES_DETAIL_THEN_H - -#include "traits.h" -#include - -namespace sled { -namespace detail { - -template -struct ThenReceiver { - R receiver; - F func; - bool stopped = false; - - template> - void SetValue(U &&val) - { - if (stopped) { return; } - try { - receiver.SetValue(std::forward(func(std::forward(val)))); - } catch (...) { - SetError(std::current_exception()); - } - } - - void SetError(std::exception_ptr e) - { - if (stopped) { return; } - receiver.SetError(e); - } - - void SetStopped() - { - if (stopped) { return; } - stopped = true; - receiver.SetStopped(); - } -}; - -template -struct ThenOperation { - ConnectResultT op; - - void Start() { op.Start(); } - - void Stop() { op.Stop(); } -}; - -template -struct ThenSender { - using result_t = invoke_result_t::result_t>; - using this_type = ThenSender; - S sender; - F func; - - template - ThenOperation> Connect(R receiver) - { - return {sender.Connect(ThenReceiver{receiver, func})}; - } - - template - friend ContinueResultT operator|(this_type sender, Lazy lazy) - { - return lazy.Continue(sender); - } -}; - -template -ThenSender -Then(S &&sender, F &&func) -{ - return {std::forward(sender), std::forward(func)}; -} - -template -struct ThenLazy { - F func; - - template - ThenSender Continue(S sender) const - { - return {sender, func}; - } -}; - -template -ThenLazy -Then(F &&func) -{ - return {func}; -} - -}// namespace detail -}// namespace sled -#endif// SLED_FUTURES_DETAIL_THEN_H diff --git a/src/sled/futures/detail/traits.h b/src/sled/futures/detail/traits.h deleted file mode 100644 index 8f42b3a..0000000 --- a/src/sled/futures/detail/traits.h +++ /dev/null @@ -1,56 +0,0 @@ -#ifndef SLED_FUTURES_DETAIL_TRAITS_H -#define SLED_FUTURES_DETAIL_TRAITS_H - -#include "sled/exec/detail/invoke_result.h" - -#include -#include - -namespace sled { -namespace detail { -template -struct ConnectResult { - typedef decltype(std::declval().Connect(std::declval())) type; -}; - -template -using ConnectResultT = typename ConnectResult::type; - -template -struct ContinueResult { - typedef decltype(std::declval().Continue(std::declval())) type; -}; - -template -using ContinueResultT = typename ContinueResult::type; - -template -using invoke_result_t = eggs::invoke_result_t; - -template -using decay_t = typename std::decay::type; - -template -struct move_on_copy { - using type = typename std::remove_reference::type; - - move_on_copy(type &&value) : value(std::move(value)) {} - - move_on_copy(const move_on_copy &other) : value(std::move(other.value)) {} - - move_on_copy(move_on_copy &&) = delete; - move_on_copy &operator=(const move_on_copy &) = delete; - - mutable type value; -}; - -template -move_on_copy -make_move_on_copy(T &&value) -{ - return {std::move(value)}; -} - -}// namespace detail -}// namespace sled -#endif// SLED_FUTURES_DETAIL_TRAITS_H diff --git a/src/sled/futures/future.cc b/src/sled/futures/future.cc new file mode 100644 index 0000000..036b561 --- /dev/null +++ b/src/sled/futures/future.cc @@ -0,0 +1,15 @@ + +#include "sled/futures/future.h" + +namespace sled { +namespace detail { +void +IncrementFuturesUsage() +{} + +void +DecrementFuturesUsage() +{} + +}// namespace detail +}// namespace sled diff --git a/src/sled/futures/future.h b/src/sled/futures/future.h index a21378b..712b18e 100644 --- a/src/sled/futures/future.h +++ b/src/sled/futures/future.h @@ -1,6 +1,327 @@ #ifndef SLED_FUTURES_FUTURE_H #define SLED_FUTURES_FUTURE_H -namespace sled {} +#pragma once +#include "sled/futures/internal/failure_handling.h" +#include "sled/futures/internal/promise.h" +#include "sled/lang/attributes.h" +#include "sled/log/log.h" +#include "sled/synchronization/event.h" +#include "sled/synchronization/mutex.h" +#include "sled/variant.h" +#include +#include -#endif// SLED_FUTURES_FUTURE_H +namespace sled { +namespace detail { +template +struct is_invocable : std::is_constructible, + std::reference_wrapper::type>> {}; + +template +struct is_invocable_r : std::is_constructible, + std::reference_wrapper::type>> {}; + +enum FutureState { + kNotCompletedFuture = 0, + kSuccessFuture = 1, + kFailedFuture = 2, +}; + +SLED_EXPORT void IncrementFuturesUsage(); +SLED_EXPORT void DecrementFuturesUsage(); + +template +struct FutureData { + FutureData() { IncrementFuturesUsage(); } + + FutureData(const FutureData &) = delete; + FutureData(FutureData &&) = delete; + FutureData &operator=(const FutureData &) = delete; + FutureData &operator=(FutureData &&) = delete; + + ~FutureData() { DecrementFuturesUsage(); } + + std::atomic_int state{kNotCompletedFuture}; + sled::variant value; + std::list> success_callbacks; + std::list> failure_callbacks; + sled::Mutex mutex_; +}; +}// namespace detail + +// + +template +class Future { + static_assert(!std::is_same::value, "Future is not allowed. Use Future instead"); + static_assert(!std::is_same::value, "Future<_, void> is not allowed. Use Future<_, bool> instead"); + template + friend class Future; + friend class Promise; + friend struct detail::FutureData; + +public: + using Value = T; + using Failure = FailureT; + + Future() noexcept = default; + + explicit Future(const Promise &promise) { data_ = promise.future().data_; } + + Future(const Future &) noexcept = default; + Future(Future &&) noexcept = default; + Future &operator=(const Future &) noexcept = default; + Future &operator=(Future &&) noexcept = default; + ~Future() = default; + + bool operator==(const Future &other) const noexcept { return data_ == other.data_; } + + bool operator!=(const Future &other) const noexcept { return !operator==(other); } + + bool IsCompleted() const noexcept + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + int value = data_->state.load(std::memory_order_acquire); + return value == detail::kSuccessFuture || value == detail::kFailedFuture; + } + + bool IsFailed() const noexcept + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + return data_->state.load(std::memory_order_acquire) == detail::kFailedFuture; + } + + bool IsSucceeded() const noexcept + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + return data_->state.load(std::memory_order_acquire) == detail::kSuccessFuture; + } + + bool IsValid() const noexcept { return static_cast(data_); } + + bool Wait(int64_t timeout_ms) const noexcept { return Wait(sled::TimeDelta::Millis(timeout_ms)); } + + bool Wait(sled::TimeDelta timeout = sled::Event::kForever) const noexcept + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + if (IsCompleted()) { return true; } + + bool wait_forever = timeout <= sled::TimeDelta::Zero(); + sled::TimeDelta wait_time = wait_forever ? sled::Event::kForever : timeout; + + auto event_ptr = std::make_shared(); + OnComplete([event_ptr]() { event_ptr->Set(); }); + event_ptr->Wait(wait_time); + + return IsCompleted(); + } + + template::value, Dummy>::type> + T Result() const noexcept + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + if (!IsCompleted()) Wait(); + if (IsSucceeded()) { + try { + return sled::get(data_->value); + } catch (...) {} + } + return T(); + } + + const T &ResultRef() const + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + if (!IsCompleted()) { Wait(); } + return sled::get(data_->value); + } + + FailureT FailureReason() const + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + if (!IsCompleted()) { Wait(); } + if (IsFailed()) { + try { + return sled::get(data_->value); + } catch (...) {} + } + return FailureT(); + } + + template::value>::type> + Future OnSuccess(Func &&f) const noexcept + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + bool call_it = false; + { + sled::MutexLock lock(&data_->mutex_); + if (IsCompleted()) { + call_it = IsSucceeded(); + } else { + try { + data_->success_callbacks.emplace_back(std::forward(f)); + } catch (std::exception &e) { + return Future::Failed(detail::ExceptionFailure(e)); + } catch (...) { + return Future::Failed(detail::ExceptionFailure()); + } + } + } + if (call_it) { + try { + f(sled::get(data_->value)); + } catch (...) {} + } + return Future(data_); + } + + template::value>::type> + Future OnFailure(Func &&f) const noexcept + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + bool call_it = false; + { + sled::MutexLock lock(&data_->mutex_); + if (IsCompleted()) { + call_it = IsFailed(); + } else { + try { + data_->failure_callbacks.emplace_back(std::forward(f)); + } catch (std::exception &e) { + return Future::Failed(detail::ExceptionFailure(e)); + } catch (...) { + return Future::Failed(detail::ExceptionFailure()); + } + } + } + if (call_it) { + try { + f(sled::get(data_->value)); + } catch (...) {} + } + return Future(data_); + } + + template::value>::type> + Future OnComplete(Func &&f) const noexcept + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + OnSuccess([f](const T &) noexcept { f(); }); + OnFailure([f](const FailureT &) noexcept { f(); }); + return Future(data_); + } + + // template::value>::type> + // Future OnComplete(Func &&F) const noexcept + // { + // SLED_ASSERT(data_ != nullptr, "Future is not valid"); + // OnSuccess([f](const auto &) noexcept { f(); }) + // } + + static Future::type, FailureT> Successful(T &&value) noexcept + { + Future::type, FailureT> result + = Future::type, FailureT>::Create(); + result.FillSuccess(std::forward(value)); + return result; + } + + static Future successful(const T &value) noexcept + { + Future result = Future::Create(); + result.FillSuccess(value); + return result; + } + + static Future Successful() { return Future::Successful(T()); } + + static Future Failed(const FailureT &failure) noexcept + { + Future result = Future::Create(); + result.FillFailure(failure); + return result; + } + +private: + explicit Future(std::shared_ptr> other_data) { data_ = other_data; } + + inline static Future Create() + { + Future result; + result.data_ = std::make_shared>(); + return result; + } + + void FillSuccess(const T &value) + { + T copy = value; + FillSuccess(std::move(copy)); + } + + void FillSuccess(T &&value) + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + if (detail::HasLastFailure()) { + FailureT failure = detail::LastFailure(); + detail::InvalidateLastFailure(); + FillFailure(std::move(failure)); + return; + } + + std::list> callbacks; + { + sled::MutexLock lock(&data_->mutex_); + if (IsCompleted()) { return; } + + try { + data_->value.template emplace(std::move(value)); + } catch (...) {} + data_->state.store(detail::kSuccessFuture, std::memory_order_release); + callbacks = std::move(data_->success_callbacks); + data_->success_callbacks = std::list>(); + data_->failure_callbacks.clear(); + } + + for (const auto &f : callbacks) { + try { + f(sled::get(data_->value)); + } catch (...) {} + } + } + + void FillFailure(const FailureT &reason) + { + FailureT copy = reason; + FillFailure(std::move(copy)); + } + + void FillFailure(FailureT &&reason) + { + SLED_ASSERT(data_ != nullptr, "Future is not valid"); + std::list> callbacks; + { + sled::MutexLock lock(&data_->mutex_); + if (IsCompleted()) { return; } + try { + data_->value.template emplace(std::move(reason)); + } catch (...) {} + data_->state.store(detail::kFailedFuture, std::memory_order_release); + callbacks = std::move(data_->failure_callbacks); + data_->failure_callbacks = std::list>(); + data_->success_callbacks.clear(); + } + + for (const auto &f : callbacks) { + try { + f(sled::get(data_->value)); + } catch (...) {} + } + } + + std::shared_ptr> data_; +}; +}// namespace sled + +#endif// SLED_FUTURES_FUTURE_H diff --git a/src/sled/futures/future_test.cc b/src/sled/futures/future_test.cc new file mode 100644 index 0000000..f90d9a0 --- /dev/null +++ b/src/sled/futures/future_test.cc @@ -0,0 +1,26 @@ +#include + +TEST_SUITE("future") +{ + TEST_CASE("base success") + { + sled::Promise p; + auto f = p.GetFuture(); + p.Success(42); + CHECK(f.Wait(-1)); + CHECK(f.IsValid()); + CHECK_EQ(f.Result(), 42); + } + TEST_CASE("base failed") + { + sled::Promise p; + auto f = p.GetFuture(); + p.Failure("error"); + REQUIRE(p.IsFilled()); + REQUIRE(f.IsCompleted()); + CHECK(f.Wait(-1)); + CHECK(f.IsValid()); + CHECK_EQ(f.FailureReason(), "error"); + } + TEST_CASE("thread success") {} +} diff --git a/src/sled/futures/internal/failure_handling.cc b/src/sled/futures/internal/failure_handling.cc new file mode 100644 index 0000000..0ca392d --- /dev/null +++ b/src/sled/futures/internal/failure_handling.cc @@ -0,0 +1,32 @@ +#include "sled/futures/internal/failure_handling.h" + +namespace sled { +namespace detail { +static thread_local sled::any last_failure; + +bool +HasLastFailure() noexcept +{ + return last_failure.has_value(); +} + +void +InvalidateLastFailure() noexcept +{ + last_failure.reset(); +} + +const sled::any & +LastFailureAny() noexcept +{ + return last_failure; +} + +void +SetLastFailure(const sled::any &failure) noexcept +{ + last_failure = failure; +} + +}// namespace detail +}// namespace sled diff --git a/src/sled/futures/internal/failure_handling.h b/src/sled/futures/internal/failure_handling.h new file mode 100644 index 0000000..14977e6 --- /dev/null +++ b/src/sled/futures/internal/failure_handling.h @@ -0,0 +1,108 @@ +#ifndef SLED_FUTURES_INTERNAL_FAILURE_HANDLING_H +#define SLED_FUTURES_INTERNAL_FAILURE_HANDLING_H + +#pragma once +#include "sled/any.h" +#include + +namespace sled { +namespace failure { +template +inline FailureT +FailureFromString(std::string &&) +{ + return FailureT(); +} + +template +inline FailureT +FailureFromString(const std::string &str) +{ + std::string copy = str; + return FailureFromString(std::move(copy)); +} + +template<> +inline std::string +FailureFromString(std::string &&str) +{ + return std::move(str); +} + +}// namespace failure + +namespace detail { +bool HasLastFailure() noexcept; +void InvalidateLastFailure() noexcept; +const sled::any &LastFailureAny() noexcept; +void SetLastFailure(const sled::any &) noexcept; + +template +inline FailureT +LastFailure() noexcept +{ + if (!HasLastFailure()) { return FailureT(); } + try { + return sled::any_cast(LastFailureAny()); + } catch (...) { + return FailureT(); + } +} + +template +inline void +SetLastFailure(const FailureT &failure) noexcept +{ + SetLastFailure(sled::any(failure)); +} + +template +FailureT +ExceptionFailure(const std::exception &e) +{ + return failure::FailureFromString(e.what()); +} + +template +FailureT +ExceptionFailure() +{ + return failure::FailureFromString("Exception"); +} +}// namespace detail + +template +class Future; + +template +struct WithFuture { + explicit WithFuture(const FailureT &f = FailureT()) noexcept : failure_(f) {} + + explicit WithFuture(FailureT &&f) noexcept : failure_(std::move(f)) {} + + template + explicit WithFuture(Args &&...args) noexcept : failure_(std::forward(args)...) + {} + + template + operator T() const noexcept + { + detail::SetLastFailure(std::move(failure_)); + return T(); + } + + template + operator Future() noexcept + { + Future result = Future::Create(); + result.Faillure(failure_); + return result; + } + +private: + FailureT failure_; +}; + +}// namespace sled + +#endif// SLED_FUTURES_INTERNAL_FAILURE_HANDLING_H diff --git a/src/sled/futures/internal/promise.h b/src/sled/futures/internal/promise.h new file mode 100644 index 0000000..8c73a62 --- /dev/null +++ b/src/sled/futures/internal/promise.h @@ -0,0 +1,43 @@ +#ifndef SLED_FUTURES_INTERNAL_PROMISE_H +#define SLED_FUTURES_INTERNAL_PROMISE_H + +#pragma once + +#include +#include + +namespace sled { +template +class Future; + +template +class Promise { + static_assert(!std::is_same::value, "Promise is not allowed. Use Promise instead"); + static_assert(!std::is_same::value, + "Promise<_, void> is not allowed. Use Promise<_, bool> instead"); + +public: + using Value = T; + Promise() = default; + Promise(const Promise &) noexcept = default; + Promise(Promise &&) noexcept = default; + Promise &operator=(const Promise &) noexcept = default; + Promise &operator=(Promise &&) noexcept = default; + ~Promise() = default; + + Future GetFuture() const { return future_; }; + + bool IsFilled() const noexcept { return future_.IsCompleted(); } + + void Failure(const FailureT &reason) { return future_.FillFailure(reason); } + + void Success(const T &value) { return future_.FillSuccess(value); } + + void Success(T &&value) { return future_.FillSuccess(std::move(value)); } + +private: + Future future_ = Future::Create(); +}; +}// namespace sled + +#endif// SLED_FUTURES_INTERNAL_PROMISE_H diff --git a/src/sled/futures/promise.h b/src/sled/futures/promise.h deleted file mode 100644 index e869167..0000000 --- a/src/sled/futures/promise.h +++ /dev/null @@ -1,1438 +0,0 @@ -/******************************************************************************* - * This file is part of the "https://github.com/blackmatov/promise.hpp" - * For conditions of distribution and use, see copyright notice in LICENSE.md - * Copyright (C) 2018-2023, by Matvey Cherevko (blackmatov@gmail.com) - ******************************************************************************/ - -#ifndef SLED_FUTURES_PROMISE_H -#define SLED_FUTURES_PROMISE_H -#pragma once -#include "sled/exec/detail/invoke_result.h" -#include "sled/synchronization/mutex.h" - -#include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace promise_hpp { -// -// forward declaration -// - -template -class promise; - -// -// is_promise -// - -namespace impl { -template -struct is_promise_impl : std::false_type {}; - -template -struct is_promise_impl> : std::true_type {}; -}// namespace impl - -template -struct is_promise : impl::is_promise_impl::type> {}; - -template -constexpr bool is_promise_v = is_promise::value; - -// -// is_promise_r -// - -namespace impl { -template -struct is_promise_r_impl : std::false_type {}; - -template -struct is_promise_r_impl> : std::is_convertible {}; -}// namespace impl - -template -struct is_promise_r : impl::is_promise_r_impl> {}; - -template -constexpr bool is_promise_r_v = is_promise_r::value; - -// -// promise_wait_status -// - -enum class promise_wait_status { no_timeout, timeout }; - -// -// aggregate_exception -// - -class aggregate_exception final : public std::exception { -private: - using exceptions_t = std::vector; - using internal_state_t = std::shared_ptr; - -public: - aggregate_exception() : state_(std::make_shared()) {} - - explicit aggregate_exception(exceptions_t exceptions) - : state_(std::make_shared(std::move(exceptions))) - {} - - aggregate_exception(const aggregate_exception &other) noexcept : state_(other.state_) {} - - aggregate_exception &operator=(const aggregate_exception &other) noexcept - { - if (this != &other) { state_ = other.state_; } - return *this; - } - - const char *what() const noexcept override { return "Aggregate exception"; } - - bool empty() const noexcept { return (*state_).empty(); } - - std::size_t size() const noexcept { return (*state_).size(); } - - std::exception_ptr at(std::size_t index) const { return (*state_).at(index); } - - std::exception_ptr operator[](std::size_t index) const noexcept { return (*state_)[index]; } - -private: - internal_state_t state_; -}; -}// namespace promise_hpp - -// ----------------------------------------------------------------------------- -// -// detail -// -// ----------------------------------------------------------------------------- - -namespace promise_hpp { -namespace detail { -template -void -destroy_in_place(T &ref) noexcept -{ - ref.~T(); -} - -template -void -construct_in_place(T &ref, Args &&...args) noexcept(std::is_nothrow_constructible::value) -{ - ::new (std::addressof(ref)) T(std::forward(args)...); -} - -class noncopyable { -public: - noncopyable(const noncopyable &) = delete; - noncopyable &operator=(const noncopyable &) = delete; - -protected: - noncopyable() = default; - ~noncopyable() = default; -}; - -template -class storage final : private noncopyable { -public: - storage() = default; - - ~storage() noexcept - { - if (initialized_) { destroy_in_place(*ptr_()); } - } - - storage &operator=(T &&value) noexcept(std::is_nothrow_move_constructible::value) - { - assert(!initialized_); - construct_in_place(*ptr_(), std::move(value)); - initialized_ = true; - return *this; - } - - storage &operator=(const T &value) noexcept(std::is_nothrow_copy_constructible::value) - { - assert(!initialized_); - construct_in_place(*ptr_(), value); - initialized_ = true; - return *this; - } - - T &operator*() noexcept - { - assert(initialized_); - return *ptr_(); - } - - const T &operator*() const noexcept - { - assert(initialized_); - return *ptr_(); - } - -private: - T *ptr_() noexcept { return reinterpret_cast(&data_); } - - const T *ptr_() const noexcept { return reinterpret_cast(&data_); } - -private: - std::aligned_storage_t data_; - bool initialized_ = false; -}; - -template -class storage final : private noncopyable { -public: - storage() = default; - ~storage() = default; - - storage &operator=(T &value) noexcept - { - assert(!initialized_); - value_ = &value; - initialized_ = true; - return *this; - } - - T &operator*() noexcept - { - assert(initialized_); - return *value_; - } - - const T &operator*() const noexcept - { - assert(initialized_); - return *value_; - } - -private: - T *value_{nullptr}; - bool initialized_ = false; -}; -}// namespace detail -}// namespace promise_hpp - -// ----------------------------------------------------------------------------- -// -// promise -// -// ----------------------------------------------------------------------------- - -namespace promise_hpp { - -template -class promise final { -public: - using value_type = T; - - promise() : state_(std::make_shared()) {} - - promise(promise &&) = default; - promise &operator=(promise &&) = default; - - promise(const promise &) = default; - promise &operator=(const promise &) = default; - - void swap(promise &other) noexcept { state_.swap(other.state_); } - - std::size_t hash() const noexcept { return std::hash()(state_.get()); } - - friend bool operator<(const promise &l, const promise &r) noexcept { return l.state_ < r.state_; } - - friend bool operator==(const promise &l, const promise &r) noexcept { return l.state_ == r.state_; } - - friend bool operator!=(const promise &l, const promise &r) noexcept { return l.state_ != r.state_; } - - promise GetFuture() const { return promise(*this); } - - // - // get - // - - const T &Get() const { return state_->Get(); } - - template - T GetOr(U &&def) const - { - try { - return Get(); - } catch (...) { - return std::forward(def); - } - } - - // - // wait - // - - void Wait() const noexcept { state_->Wait(); } - - inline promise_wait_status WaitFor(const sled::TimeDelta &timeout_duration) const - { - return state_->wait_for(std::chrono::microseconds(timeout_duration.us())); - } - - template - promise_wait_status WaitFor(const std::chrono::duration &timeout_duration) const - { - return state_->WaitFor(timeout_duration); - } - - template - promise_wait_status WaitUntil(const std::chrono::time_point &timeout_time) const - { - return state_->WaitUntil(timeout_time); - } - - // - // resolve/reject - // - - template - bool Resolve(U &&value) - { - return state_->Resolve(std::forward(value)); - } - - bool Reject(std::exception_ptr e) noexcept { return state_->Reject(e); } - - template - bool Reject(E &&e) - { - return state_->Reject(std::make_exception_ptr(std::forward(e))); - } - - // - // then - // - - template> - std::enable_if_t, promise> Then(ResolveF &&on_resolve) - { - promise next; - - Then([n = next, f = std::forward(on_resolve)](auto &&v) mutable { - // auto np = eggs::invoke(std::forward(f), std::forward(v)); - auto np = eggs::invoke(std::forward(f), std::forward(v)); - std::move(np) - .then([n](auto &&...nvs) mutable { n.resolve(std::forward(nvs)...); }) - .except([n](std::exception_ptr e) mutable { n.reject(e); }); - }).except([n = next](std::exception_ptr e) mutable { n.reject(e); }); - - return next; - } - - template - auto Tap(TapF &&on_tap) - { - return Then([f = std::forward(on_tap)](auto &&v) mutable { - eggs::invoke(std::forward(f), const_cast(v)); - return std::forward(v); - }); - } - - template - auto ThenAll(ResolveF &&on_resolve) - { - return Then([f = std::forward(on_resolve)](auto &&v) mutable { - auto r = eggs::invoke(std::forward(f), std::forward(v)); - return make_all_promise(std::move(r)); - }); - } - - template - auto ThenAny(ResolveF &&on_resolve) - { - return Then([f = std::forward(on_resolve)](auto &&v) mutable { - auto r = eggs::invoke(std::forward(f), std::forward(v)); - return make_any_promise(std::move(r)); - }); - } - - template - auto ThenRace(ResolveF &&on_resolve) - { - return Then([f = std::forward(on_resolve)](auto &&v) mutable { - auto r = eggs::invoke(std::forward(f), std::forward(v)); - return make_race_promise(std::move(r)); - }); - } - - template - auto ThenTuple(ResolveF &&on_resolve) - { - return Then([f = std::forward(on_resolve)](auto &&v) mutable { - auto r = eggs::invoke(std::forward(f), std::forward(v)); - return make_tuple_promise(std::move(r)); - }); - } - - // - // then - // - - template> - std::enable_if_t, promise> Then(ResolveF &&on_resolve) - { - promise next; - - state_->Attach( - next, std::forward(on_resolve), - [](std::exception_ptr e) -> ResolveR { std::rethrow_exception(e); }, false); - - return next; - } - - template> - std::enable_if_t, promise> Then(ResolveF &&on_resolve, RejectF &&on_reject) - { - promise next; - - state_->Attach(next, std::forward(on_resolve), std::forward(on_reject), true); - - return next; - } - - // - // except - // - - template - promise Except(RejectF &&on_reject) - { - return Then([](auto &&v) { return std::forward(v); }, std::forward(on_reject)); - } - - // - // finally - // - - template - promise Finally(FinallyF &&on_finally) - { - return Then( - [f = on_finally](auto &&v) { - eggs::invoke(std::move(f)); - return std::forward(v); - }, - [f = on_finally](std::exception_ptr e) -> T { - eggs::invoke(std::move(f)); - std::rethrow_exception(e); - }); - } - -private: - class state; - std::shared_ptr state_; - -private: - class state final : private detail::noncopyable { - public: - state() = default; - - const T &Get() - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // cond_var_.wait(lock, [this]() { return status_ != status::pending; }); - cond_var_.Wait(lock, [this]() { return status_ != status::pending; }); - if (status_ == status::rejected) { std::rethrow_exception(exception_); } - assert(status_ == status::resolved); - return *storage_; - } - - void Wait() const noexcept - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // cond_var_.wait(lock, [this]() { return status_ != status::pending; }); - cond_var_.Wait(lock, [this]() { return status_ != status::pending; }); - } - - template - promise_wait_status WaitFor(const std::chrono::duration &timeout_duration) const - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // return cond_var_.wait_for(lock, timeout_duration, [this]() { return status_ != status::pending; }) - return cond_var_.WaitFor( - lock, - sled::TimeDelta::Micros(std::chrono::duration_cast(timeout_duration)), - [this]() { return status_ != status::pending; }) - ? promise_wait_status::no_timeout - : promise_wait_status::timeout; - } - - template - promise_wait_status WaitUntil(const std::chrono::time_point &timeout_time) const - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // return cond_var_.wait_until(lock, timeout_time, [this]() { return status_ != status::pending; }) - auto now = std::chrono::system_clock::now(); - auto duration = timeout_time - now; - - return cond_var_.WaitFor( - lock, sled::TimeDelta::Micros(std::chrono::duration_cast(duration)), - [this]() { return status_ != status::pending; }) - ? promise_wait_status::no_timeout - : promise_wait_status::timeout; - } - - template - bool Resolve(U &&value) - { - sled::MutexLock lock(&mutex_); - // std::lock_guard guard(mutex_); - if (status_ != status::pending) { return false; } - storage_ = std::forward(value); - status_ = status::resolved; - invoke_resolve_handlers_(); - // cond_var_.notify_all(); - cond_var_.NotifyAll(); - return true; - } - - bool Reject(std::exception_ptr e) noexcept - { - sled::MutexLock lock(&mutex_); - // std::lock_guard guard(mutex_); - if (status_ != status::pending) { return false; } - exception_ = e; - status_ = status::rejected; - invoke_reject_handlers_(); - cond_var_.NotifyAll(); - return true; - } - - public: - template - std::enable_if_t::value, void> - Attach(promise &next, ResolveF &&on_resolve, RejectF &&on_reject, bool has_reject) - { - auto reject_h = [n = next, f = std::forward(on_reject), has_reject](std::exception_ptr e) mutable { - if (has_reject) { - try { - eggs::invoke(std::forward(f), e); - n.Resolve(); - } catch (...) { - n.Reject(std::current_exception()); - } - } else { - n.Reject(e); - } - }; - - auto resolve_h = [n = next, f = std::forward(on_resolve)](auto &&v) mutable { - try { - eggs::invoke(std::forward(f), std::forward(v)); - n.Resolve(); - } catch (...) { - n.Reject(std::current_exception()); - } - }; - - sled::MutexLock lock(&mutex_); - // std::lock_guard guard(mutex_); - add_handlers_(std::move(resolve_h), std::move(reject_h)); - } - - template - std::enable_if_t::value, void> - Attach(promise &next, ResolveF &&on_resolve, RejectF &&on_reject, bool has_reject) - { - auto reject_h = [n = next, f = std::forward(on_reject), has_reject](std::exception_ptr e) mutable { - if (has_reject) { - try { - auto r = eggs::invoke(std::forward(f), e); - n.Resolve(std::move(r)); - } catch (...) { - n.Reject(std::current_exception()); - } - } else { - n.Reject(e); - } - }; - - auto resolve_h = [n = next, f = std::forward(on_resolve)](auto &&v) mutable { - try { - auto r = eggs::invoke(std::forward(f), std::forward(v)); - n.Resolve(std::move(r)); - } catch (...) { - n.Reject(std::current_exception()); - } - }; - - sled::MutexLock lock(&mutex_); - // std::lock_guard guard(mutex_); - add_handlers_(std::move(resolve_h), std::move(reject_h)); - } - - private: - template - void add_handlers_(ResolveF &&resolve, RejectF &&reject) - { - if (status_ == status::resolved) { - eggs::invoke(std::forward(resolve), *storage_); - } else if (status_ == status::rejected) { - eggs::invoke(std::forward(reject), exception_); - } else { - handlers_.push_back({std::forward(resolve), std::forward(reject)}); - } - } - - void invoke_resolve_handlers_() noexcept - { - for (const auto &h : handlers_) { h.resolve_(*storage_); } - handlers_.clear(); - } - - void invoke_reject_handlers_() noexcept - { - for (const auto &h : handlers_) { h.reject_(exception_); } - handlers_.clear(); - } - - private: - enum class status { pending, resolved, rejected }; - - status status_{status::pending}; - std::exception_ptr exception_{nullptr}; - - mutable sled::Mutex mutex_; - mutable sled::ConditionVariable cond_var_; - - // mutable std::mutex mutex_; - // mutable std::condition_variable cond_var_; - - struct handler { - using resolve_t = std::function; - using reject_t = std::function; - - resolve_t resolve_; - reject_t reject_; - }; - - detail::storage storage_; - std::vector handlers_; - }; -}; - -template -using future = promise; -}// namespace promise_hpp - -// ----------------------------------------------------------------------------- -// -// promise -// -// ----------------------------------------------------------------------------- - -namespace promise_hpp { -template<> -class promise final { -public: - using value_type = void; - - promise() : state_(std::make_shared()) {} - - promise(promise &&) = default; - promise &operator=(promise &&) = default; - - promise(const promise &) = default; - promise &operator=(const promise &) = default; - - void swap(promise &other) noexcept { state_.swap(other.state_); } - - std::size_t hash() const noexcept { return std::hash()(state_.get()); } - - friend bool operator<(const promise &l, const promise &r) noexcept { return l.state_ < r.state_; } - - friend bool operator==(const promise &l, const promise &r) noexcept { return l.state_ == r.state_; } - - friend bool operator!=(const promise &l, const promise &r) noexcept { return l.state_ != r.state_; } - - promise GetFuture() const { return promise(*this); } - - // - // get - // - - void Get() const { state_->get(); } - - void GetOr() const - { - try { - return Get(); - } catch (...) { - // nothing - } - } - - // - // wait - // - - void Wait() const noexcept { state_->wait(); } - - inline promise_wait_status WaitFor(const sled::TimeDelta &timeout_duration) const - { - return state_->wait_for(timeout_duration); - } - - template - promise_wait_status WaitFor(const std::chrono::duration &timeout_duration) const - { - return state_->wait_for(timeout_duration); - } - - template - promise_wait_status WaitUntil(const std::chrono::time_point &timeout_time) const - { - return state_->wait_until(timeout_time); - } - - // - // resolve/reject - // - - bool Resolve() { return state_->Resolve(); } - - bool Reject(std::exception_ptr e) noexcept { return state_->Reject(e); } - - template - bool Reject(E &&e) - { - return state_->Reject(std::make_exception_ptr(std::forward(e))); - } - - // - // then - // - - template> - std::enable_if_t, promise> Then(ResolveF &&on_resolve) - { - promise next; - - Then([n = next, f = std::forward(on_resolve)]() mutable { - auto np = eggs::invoke(std::forward(f)); - std::move(np) - .then([n](auto &&...nvs) mutable { n.resolve(std::forward(nvs)...); }) - .except([n](std::exception_ptr e) mutable { n.reject(e); }); - }).except([n = next](std::exception_ptr e) mutable { n.reject(e); }); - - return next; - } - - template - auto ThenAll(ResolveF &&on_resolve) - { - return Then([f = std::forward(on_resolve)]() mutable { - auto r = eggs::invoke(std::forward(f)); - return make_all_promise(std::move(r)); - }); - } - - template - auto ThenAny(ResolveF &&on_resolve) - { - return Then([f = std::forward(on_resolve)]() mutable { - auto r = eggs::invoke(std::forward(f)); - return make_any_promise(std::move(r)); - }); - } - - template - auto ThenRace(ResolveF &&on_resolve) - { - return Then([f = std::forward(on_resolve)]() mutable { - auto r = eggs::invoke(std::forward(f)); - return make_race_promise(std::move(r)); - }); - } - - template - auto ThenTuple(ResolveF &&on_resolve) - { - return Then([f = std::forward(on_resolve)]() mutable { - auto r = eggs::invoke(std::forward(f)); - return make_tuple_promise(std::move(r)); - }); - } - - // - // then - // - - template> - std::enable_if_t, promise> Then(ResolveF &&on_resolve) - { - promise next; - - state_->attach( - next, std::forward(on_resolve), - [](std::exception_ptr e) -> ResolveR { std::rethrow_exception(e); }, false); - - return next; - } - - template> - std::enable_if_t, promise> Then(ResolveF &&on_resolve, RejectF &&on_reject) - { - promise next; - - state_->attach(next, std::forward(on_resolve), std::forward(on_reject), true); - - return next; - } - - // - // except - // - - template - promise Except(RejectF &&on_reject) - { - return Then([]() {}, std::forward(on_reject)); - } - - // - // finally - // - - template - promise Finally(FinallyF &&on_finally) - { - return Then([f = on_finally]() { eggs::invoke(std::move(f)); }, - [f = on_finally](std::exception_ptr e) { - eggs::invoke(std::move(f)); - std::rethrow_exception(e); - }); - } - -private: - class state; - std::shared_ptr state_; - -private: - class state final : private detail::noncopyable { - public: - state() = default; - - void get() - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // cond_var_.wait(lock, [this]() { return status_ != status::pending; }); - cond_var_.Wait(lock, [this]() { return status_ != status::pending; }); - if (status_ == status::rejected) { std::rethrow_exception(exception_); } - assert(status_ == status::resolved); - } - - void wait() const noexcept - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // cond_var_.wait(lock, [this]() { return status_ != status::pending; }); - cond_var_.Wait(lock, [this]() { return status_ != status::pending; }); - } - - promise_wait_status wait_for(const sled::TimeDelta &timeout_duration) const - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // return cond_var_.wait_for(lock, timeout_duration, [this]() { return status_ != status::pending; }) - return cond_var_.WaitFor(lock, timeout_duration, [this]() { return status_ != status::pending; }) - ? promise_wait_status::no_timeout - : promise_wait_status::timeout; - } - - template - promise_wait_status wait_for(const std::chrono::duration &timeout_duration) const - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // return cond_var_.wait_for(lock, timeout_duration, [this]() { return status_ != status::pending; }) - return cond_var_.WaitFor(lock, timeout_duration, [this]() { return status_ != status::pending; }) - ? promise_wait_status::no_timeout - : promise_wait_status::timeout; - } - - template - promise_wait_status wait_until(const std::chrono::time_point &timeout_time) const - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // return cond_var_.wait_until(lock, timeout_time, [this]() { return status_ != status::pending; }) - auto duration = timeout_time - std::chrono::system_clock::now(); - return cond_var_.WaitFor(lock, duration, [this]() { return status_ != status::pending; }) - ? promise_wait_status::no_timeout - : promise_wait_status::timeout; - } - - bool Resolve() - { - sled::MutexLock lock(&mutex_); - // std::unique_lock lock(mutex_); - // std::lock_guard guard(mutex_); - if (status_ != status::pending) { return false; } - status_ = status::resolved; - invoke_resolve_handlers_(); - cond_var_.NotifyAll(); - return true; - } - - bool Reject(std::exception_ptr e) noexcept - { - sled::MutexLock lock(&mutex_); - // std::lock_guard guard(mutex_); - if (status_ != status::pending) { return false; } - exception_ = e; - status_ = status::rejected; - invoke_reject_handlers_(); - cond_var_.NotifyAll(); - return true; - } - - public: - template - std::enable_if_t::value, void> - attach(promise &next, ResolveF &&on_resolve, RejectF &&on_reject, bool has_reject) - { - auto reject_h = [n = next, f = std::forward(on_reject), has_reject](std::exception_ptr e) mutable { - if (has_reject) { - try { - eggs::invoke(std::forward(f), e); - n.Resolve(); - } catch (...) { - n.Reject(std::current_exception()); - } - } else { - n.Reject(e); - } - }; - - auto resolve_h = [n = next, f = std::forward(on_resolve)]() mutable { - try { - eggs::invoke(std::forward(f)); - n.Resolve(); - } catch (...) { - n.Reject(std::current_exception()); - } - }; - - sled::MutexLock lock(&mutex_); - // std::lock_guard guard(mutex_); - add_handlers_(std::move(resolve_h), std::move(reject_h)); - } - - template - std::enable_if_t::value, void> - attach(promise &next, ResolveF &&on_resolve, RejectF &&on_reject, bool has_reject) - { - auto reject_h = [n = next, f = std::forward(on_reject), has_reject](std::exception_ptr e) mutable { - if (has_reject) { - try { - auto r = eggs::invoke(std::forward(f), e); - n.Resolve(std::move(r)); - } catch (...) { - n.Reject(std::current_exception()); - } - } else { - n.Reject(e); - } - }; - - auto resolve_h = [n = next, f = std::forward(on_resolve)]() mutable { - try { - auto r = eggs::invoke(std::forward(f)); - n.Resolve(std::move(r)); - } catch (...) { - n.Reject(std::current_exception()); - } - }; - - sled::MutexLock lock(&mutex_); - // std::lock_guard guard(mutex_); - add_handlers_(std::move(resolve_h), std::move(reject_h)); - } - - private: - template - void add_handlers_(ResolveF &&resolve, RejectF &&reject) - { - if (status_ == status::resolved) { - eggs::invoke(std::forward(resolve)); - } else if (status_ == status::rejected) { - eggs::invoke(std::forward(reject), exception_); - } else { - handlers_.push_back({std::forward(resolve), std::forward(reject)}); - } - } - - void invoke_resolve_handlers_() noexcept - { - for (const auto &h : handlers_) { h.resolve_(); } - handlers_.clear(); - } - - void invoke_reject_handlers_() noexcept - { - for (const auto &h : handlers_) { h.reject_(exception_); } - handlers_.clear(); - } - - private: - enum class status { pending, resolved, rejected }; - - status status_{status::pending}; - std::exception_ptr exception_{nullptr}; - - mutable sled::Mutex mutex_; - mutable sled::ConditionVariable cond_var_; - - // mutable std::mutex mutex_; - // mutable std::condition_variable cond_var_; - - struct handler { - using resolve_t = std::function; - using reject_t = std::function; - - resolve_t resolve_; - reject_t reject_; - }; - - std::vector handlers_; - }; -}; -}// namespace promise_hpp - -namespace promise_hpp { -// -// swap -// - -template -void -swap(promise &l, promise &r) noexcept -{ - l.swap(r); -} - -// -// make_promise -// - -template -promise -make_promise() -{ - return promise(); -} - -template -promise -make_promise(F &&f) -{ - promise result; - - auto resolver = [result](auto &&v) mutable { return result.Resolve(std::forward(v)); }; - - auto rejector = [result](auto &&e) mutable { return result.Reject(std::forward(e)); }; - - try { - eggs::invoke(std::forward(f), std::move(resolver), std::move(rejector)); - } catch (...) { - result.Reject(std::current_exception()); - } - - return result; -} - -// -// make_resolved_promise -// - -inline promise -make_resolved_promise() -{ - promise result; - result.Resolve(); - return result; -} - -template -promise> -make_resolved_promise(R &&v) -{ - promise> result; - result.Resolve(std::forward(v)); - return result; -} - -// -// make_rejected_promise -// - -template -promise -make_rejected_promise(E &&e) -{ - promise result; - result.Reject(std::forward(e)); - return result; -} - -template -promise -make_rejected_promise(E &&e) -{ - promise result; - result.Reject(std::forward(e)); - return result; -} - -// -// make_all_promise -// - -template::value_type, - typename SubPromiseResult = typename SubPromise::value_type, - typename ResultPromiseValueType = std::vector> -promise -make_all_promise(Iter begin, Iter end) -{ - if (begin == end) { return make_resolved_promise(ResultPromiseValueType()); } - - struct context_t { - std::atomic_size_t success_counter{0u}; - std::vector> results; - - context_t(std::size_t count) : success_counter(count), results(count) {} - }; - - return make_promise([begin, end](auto &&resolver, auto &&rejector) { - std::size_t result_index = 0; - auto context = std::make_shared(std::distance(begin, end)); - for (Iter iter = begin; iter != end; ++iter, ++result_index) { - (*iter) - .then([context, resolver, result_index](auto &&v) mutable { - context->results[result_index] = std::forward(v); - if (!--context->success_counter) { - std::vector results; - results.reserve(context->results.size()); - for (auto &&r : context->results) { results.push_back(std::move(*r)); } - resolver(std::move(results)); - } - }) - .except([rejector](std::exception_ptr e) mutable { rejector(e); }); - } - }); -} - -template -auto -make_all_promise(Container &&container) -{ - return make_all_promise(std::begin(container), std::end(container)); -} - -// -// make_any_promise -// - -template::value_type, - typename SubPromiseResult = typename SubPromise::value_type, - typename ResultPromiseValueType = SubPromiseResult> -promise -make_any_promise(Iter begin, Iter end) -{ - if (begin == end) { return make_rejected_promise(aggregate_exception()); } - - struct context_t { - std::atomic_size_t failure_counter{0u}; - std::vector exceptions; - - context_t(std::size_t count) : failure_counter(count), exceptions(count) {} - }; - - return make_promise([begin, end](auto &&resolver, auto &&rejector) { - std::size_t exception_index = 0; - auto context = std::make_shared(std::distance(begin, end)); - for (Iter iter = begin; iter != end; ++iter, ++exception_index) { - (*iter) - .then([resolver](auto &&v) mutable { resolver(std::forward(v)); }) - .except([context, rejector, exception_index](std::exception_ptr e) mutable { - context->exceptions[exception_index] = e; - if (!--context->failure_counter) { rejector(aggregate_exception(std::move(context->exceptions))); } - }); - } - }); -} - -template -auto -make_any_promise(Container &&container) -{ - return make_any_promise(std::begin(container), std::end(container)); -} - -// -// make_race_promise -// - -template::value_type, - typename SubPromiseResult = typename SubPromise::value_type, - typename ResultPromiseValueType = SubPromiseResult> -promise -make_race_promise(Iter begin, Iter end) -{ - return make_promise([begin, end](auto &&resolver, auto &&rejector) { - for (Iter iter = begin; iter != end; ++iter) { (*iter).then(resolver).except(rejector); } - }); -} - -template -auto -make_race_promise(Container &&container) -{ - return make_race_promise(std::begin(container), std::end(container)); -} - -// -// make_tuple_promise -// - -namespace impl { -template -struct tuple_promise_result_impl {}; - -template -struct tuple_promise_result_impl...>> { - using type = std::tuple; -}; - -template -struct tuple_promise_result { - using type = typename tuple_promise_result_impl>::type; -}; - -template -using tuple_promise_result_t = typename tuple_promise_result::type; - -template -class tuple_promise_context_t final : private detail::noncopyable { -public: - template - bool apply_result(T &&value) - { - std::get(results_) = std::forward(value); - return ++counter_ == sizeof...(ResultTypes); - } - - std::tuple get_results() - { - return get_results_impl(std::make_index_sequence()); - } - -private: - template - std::tuple get_results_impl(std::index_sequence) - { - return {std::move(*std::get(results_))...}; - } - -private: - std::atomic_size_t counter_{0}; - std::tuple...> results_; -}; - -template -using tuple_promise_context_ptr = std::shared_ptr>; - -template -promise -make_tuple_sub_promise_impl(Tuple &&tuple, - Resolver &&resolver, - Rejector &&rejector, - const tuple_promise_context_ptr &context) -{ - return std::get(tuple) - .then([context, resolver](auto &&v) mutable { - if (context->template apply_result(std::forward(v))) { resolver(context->get_results()); } - }) - .except(rejector); -} - -template>> -std::enable_if_t> -make_tuple_promise_impl(Tuple &&, std::index_sequence) -{ - return make_resolved_promise(ResultTuple()); -} - -template>> -std::enable_if_t> -make_tuple_promise_impl(Tuple &&tuple, std::index_sequence) -{ - auto result = promise(); - - auto resolver = [result](auto &&v) mutable { return result.resolve(std::forward(v)); }; - - auto rejector = [result](auto &&e) mutable { return result.reject(std::forward(e)); }; - - try { - auto context = std::make_shared...>>(); - auto promises = std::make_tuple(make_tuple_sub_promise_impl(tuple, resolver, rejector, context)...); - (void) promises; - } catch (...) { - result.reject(std::current_exception()); - } - - return result; -} -}// namespace impl - -template>> -promise -make_tuple_promise(Tuple &&tuple) -{ - return impl::make_tuple_promise_impl(std::forward(tuple), - std::make_index_sequence::value>()); -} -}// namespace promise_hpp - -namespace std { -template -struct hash> final { - std::size_t operator()(const promise_hpp::promise &p) const noexcept { return p.hash(); } -}; -}// namespace std - -namespace sled { - -template -struct IsPromise : promise_hpp::is_promise {}; - -template -struct IsPromiseRet : promise_hpp::is_promise_r {}; - -template -using Promise = promise_hpp::promise; - -template -using Future = promise_hpp::future; - -template -inline auto -MakePromise() -> Promise -{ - return promise_hpp::make_promise(); -} - -template -inline auto -MakePromise(F &&f) -{ - return promise_hpp::make_promise(std::forward(f)); -} - -inline auto -MakeResolvedPromise() -> Promise -{ - Promise result; - result.Resolve(); - return result; -} - -template -inline auto -MakeResolvedPromise() -{ - return promise_hpp::make_resolved_promise(); -} - -template -inline Promise -make_rejected_promise(E &&e) -{ - Promise result; - result.Reject(std::forward(e)); - return result; -} - -template -inline auto -MakeRejectedPromise(std::exception_ptr e) -{ - return promise_hpp::make_rejected_promise(e); -} - -template -inline auto -MakeAllPromise(Iter begin, Iter end) -{ - return promise_hpp::make_all_promise(begin, end); -} - -template -inline auto -MakeAllPromise(Container &&container) -{ - return promise_hpp::make_all_promise(std::begin(container), std::end(container)); -} - -template -inline auto -MakeAnyPromise(Iter begin, Iter end) -{ - return promise_hpp::make_any_promise(begin, end); -} - -template -inline auto -MakeAnyPromise(Container &&container) -{ - return promise_hpp::make_any_promise(std::begin(container), std::end(container)); -} - -template -inline auto -MakeRacePromise(Iter begin, Iter end) -{ - return promise_hpp::make_any_promise(begin, end); -} - -template -inline auto -MakeRacePromise(Container &&container) -{ - return promise_hpp::make_any_promise(std::begin(container), std::end(container)); -} - -}// namespace sled - -#endif// SLED_FUTURES_PROMISE_H diff --git a/src/sled/futures/promise_test.cc b/src/sled/futures/promise_test.cc deleted file mode 100644 index d4d675b..0000000 --- a/src/sled/futures/promise_test.cc +++ /dev/null @@ -1,57 +0,0 @@ -#include -#include - -TEST(Promise, Basic) -{ - auto p = sled::Promise(); - auto v = p.Then([](int v) { - EXPECT_EQ(v, 1); - return v + 10; - }) - .Tap([](int v) { - EXPECT_EQ(v, 11); - // no effect - return v + 1; - }) - .Then([](int v) { - EXPECT_EQ(v, 11); - return v + 10; - }); - p.Resolve(1); -} - -TEST(Future, Basic) -{ - auto p = sled::Promise(); - auto future = p.GetFuture() - .Then([](int v) { - EXPECT_EQ(v, 1); - return v + 10; - }) - .Then([](int v) { - EXPECT_EQ(v, 11); - return v + 10; - }); - p.Resolve(1); - EXPECT_EQ(future.Get(), 21); -} - -TEST(Future, Except) -{ - auto p = sled::Promise(); - p.Resolve(1); - p.GetFuture() - .Then([](int) { - return 1; - // throw std::runtime_error("test"); - }) - .Except([](std::exception_ptr e) { - try { - std::rethrow_exception(e); - } catch (const std::exception &e) { - EXPECT_STREQ(e.what(), "test"); - } - return false; - }) - .Get(); -}