From c0a2b73d0d3d31164825fa4e635595b3a45e4e63 Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Sat, 23 Mar 2024 18:18:15 +0800 Subject: [PATCH] feat add Promise --- CMakeLists.txt | 1 + include/sled/futures/promise.h | 1378 ++++++++++++++++++++++++++ include/sled/synchronization/mutex.h | 4 +- include/sled/units/time_delta.h | 16 +- src/futures/promise_test.cc | 21 + 5 files changed, 1411 insertions(+), 9 deletions(-) create mode 100644 include/sled/futures/promise.h create mode 100644 src/futures/promise_test.cc diff --git a/CMakeLists.txt b/CMakeLists.txt index c0c6b59..7923d31 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -124,6 +124,7 @@ if(SLED_BUILD_TESTS) src/any_test.cc src/filesystem/path_test.cc src/futures/future_test.cc + src/futures/promise_test.cc src/log/fmt_test.cc # src/profiling/profiling_test.cc src/strings/base64_test.cc diff --git a/include/sled/futures/promise.h b/include/sled/futures/promise.h new file mode 100644 index 0000000..7f40c35 --- /dev/null +++ b/include/sled/futures/promise.h @@ -0,0 +1,1378 @@ +/******************************************************************************* + * This file is part of the "https://github.com/blackmatov/promise.hpp" + * For conditions of distribution and use, see copyright notice in LICENSE.md + * Copyright (C) 2018-2023, by Matvey Cherevko (blackmatov@gmail.com) + ******************************************************************************/ + +#ifndef SLED_FUTURES_PROMISE_H +#define SLED_FUTURES_PROMISE_H +#pragma once +#include "sled/exec/detail/invoke_result.h" +#include "sled/synchronization/mutex.h" + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace promise_hpp { +// +// forward declaration +// + +template +class promise; + +// +// is_promise +// + +namespace impl { +template +struct is_promise_impl : std::false_type {}; + +template +struct is_promise_impl> : std::true_type {}; +}// namespace impl + +template +struct is_promise : impl::is_promise_impl::type> {}; + +template +constexpr bool is_promise_v = is_promise::value; + +// +// is_promise_r +// + +namespace impl { +template +struct is_promise_r_impl : std::false_type {}; + +template +struct is_promise_r_impl> : std::is_convertible {}; +}// namespace impl + +template +struct is_promise_r : impl::is_promise_r_impl> {}; + +template +constexpr bool is_promise_r_v = is_promise_r::value; + +// +// promise_wait_status +// + +enum class promise_wait_status { no_timeout, timeout }; + +// +// aggregate_exception +// + +class aggregate_exception final : public std::exception { +private: + using exceptions_t = std::vector; + using internal_state_t = std::shared_ptr; + +public: + aggregate_exception() : state_(std::make_shared()) {} + + explicit aggregate_exception(exceptions_t exceptions) + : state_(std::make_shared(std::move(exceptions))) + {} + + aggregate_exception(const aggregate_exception &other) noexcept : state_(other.state_) {} + + aggregate_exception &operator=(const aggregate_exception &other) noexcept + { + if (this != &other) { state_ = other.state_; } + return *this; + } + + const char *what() const noexcept override { return "Aggregate exception"; } + + bool empty() const noexcept { return (*state_).empty(); } + + std::size_t size() const noexcept { return (*state_).size(); } + + std::exception_ptr at(std::size_t index) const { return (*state_).at(index); } + + std::exception_ptr operator[](std::size_t index) const noexcept { return (*state_)[index]; } + +private: + internal_state_t state_; +}; +}// namespace promise_hpp + +// ----------------------------------------------------------------------------- +// +// detail +// +// ----------------------------------------------------------------------------- + +namespace promise_hpp { +namespace detail { +template +void +destroy_in_place(T &ref) noexcept +{ + ref.~T(); +} + +template +void +construct_in_place(T &ref, Args &&...args) noexcept(std::is_nothrow_constructible::value) +{ + ::new (std::addressof(ref)) T(std::forward(args)...); +} + +class noncopyable { +public: + noncopyable(const noncopyable &) = delete; + noncopyable &operator=(const noncopyable &) = delete; + +protected: + noncopyable() = default; + ~noncopyable() = default; +}; + +template +class storage final : private noncopyable { +public: + storage() = default; + + ~storage() noexcept + { + if (initialized_) { destroy_in_place(*ptr_()); } + } + + storage &operator=(T &&value) noexcept(std::is_nothrow_move_constructible::value) + { + assert(!initialized_); + construct_in_place(*ptr_(), std::move(value)); + initialized_ = true; + return *this; + } + + storage &operator=(const T &value) noexcept(std::is_nothrow_copy_constructible::value) + { + assert(!initialized_); + construct_in_place(*ptr_(), value); + initialized_ = true; + return *this; + } + + T &operator*() noexcept + { + assert(initialized_); + return *ptr_(); + } + + const T &operator*() const noexcept + { + assert(initialized_); + return *ptr_(); + } + +private: + T *ptr_() noexcept { return reinterpret_cast(&data_); } + + const T *ptr_() const noexcept { return reinterpret_cast(&data_); } + +private: + std::aligned_storage_t data_; + bool initialized_ = false; +}; + +template +class storage final : private noncopyable { +public: + storage() = default; + ~storage() = default; + + storage &operator=(T &value) noexcept + { + assert(!initialized_); + value_ = &value; + initialized_ = true; + return *this; + } + + T &operator*() noexcept + { + assert(initialized_); + return *value_; + } + + const T &operator*() const noexcept + { + assert(initialized_); + return *value_; + } + +private: + T *value_{nullptr}; + bool initialized_ = false; +}; +}// namespace detail +}// namespace promise_hpp + +// ----------------------------------------------------------------------------- +// +// promise +// +// ----------------------------------------------------------------------------- + +namespace promise_hpp { +template +class promise final { +public: + using value_type = T; + + promise() : state_(std::make_shared()) {} + + promise(promise &&) = default; + promise &operator=(promise &&) = default; + + promise(const promise &) = default; + promise &operator=(const promise &) = default; + + void swap(promise &other) noexcept { state_.swap(other.state_); } + + std::size_t hash() const noexcept { return std::hash()(state_.get()); } + + friend bool operator<(const promise &l, const promise &r) noexcept { return l.state_ < r.state_; } + + friend bool operator==(const promise &l, const promise &r) noexcept { return l.state_ == r.state_; } + + friend bool operator!=(const promise &l, const promise &r) noexcept { return l.state_ != r.state_; } + + // + // get + // + + const T &Get() const { return state_->Get(); } + + template + T GetOr(U &&def) const + { + try { + return Get(); + } catch (...) { + return std::forward(def); + } + } + + // + // wait + // + + void Wait() const noexcept { state_->Wait(); } + + template + promise_wait_status WaitFor(const std::chrono::duration &timeout_duration) const + { + return state_->WaitFor(timeout_duration); + } + + template + promise_wait_status WaitUntil(const std::chrono::time_point &timeout_time) const + { + return state_->WaitUntil(timeout_time); + } + + // + // resolve/reject + // + + template + bool Resolve(U &&value) + { + return state_->Resolve(std::forward(value)); + } + + bool Reject(std::exception_ptr e) noexcept { return state_->Reject(e); } + + template + bool Reject(E &&e) + { + return state_->Reject(std::make_exception_ptr(std::forward(e))); + } + + // + // then + // + + template> + std::enable_if_t, promise> Then(ResolveF &&on_resolve) + { + promise next; + + Then([n = next, f = std::forward(on_resolve)](auto &&v) mutable { + // auto np = eggs::invoke(std::forward(f), std::forward(v)); + auto np = eggs::invoke(std::forward(f), std::forward(v)); + std::move(np) + .then([n](auto &&...nvs) mutable { n.resolve(std::forward(nvs)...); }) + .except([n](std::exception_ptr e) mutable { n.reject(e); }); + }).except([n = next](std::exception_ptr e) mutable { n.reject(e); }); + + return next; + } + + template + auto Tap(TapF &&on_tap) + { + return Then([f = std::forward(on_tap)](auto &&v) mutable { + eggs::invoke(std::forward(f), const_cast(v)); + return std::forward(v); + }); + } + + template + auto ThenAll(ResolveF &&on_resolve) + { + return Then([f = std::forward(on_resolve)](auto &&v) mutable { + auto r = eggs::invoke(std::forward(f), std::forward(v)); + return make_all_promise(std::move(r)); + }); + } + + template + auto ThenAny(ResolveF &&on_resolve) + { + return Then([f = std::forward(on_resolve)](auto &&v) mutable { + auto r = eggs::invoke(std::forward(f), std::forward(v)); + return make_any_promise(std::move(r)); + }); + } + + template + auto ThenRace(ResolveF &&on_resolve) + { + return Then([f = std::forward(on_resolve)](auto &&v) mutable { + auto r = eggs::invoke(std::forward(f), std::forward(v)); + return make_race_promise(std::move(r)); + }); + } + + template + auto ThenTuple(ResolveF &&on_resolve) + { + return Then([f = std::forward(on_resolve)](auto &&v) mutable { + auto r = eggs::invoke(std::forward(f), std::forward(v)); + return make_tuple_promise(std::move(r)); + }); + } + + // + // then + // + + template> + std::enable_if_t, promise> Then(ResolveF &&on_resolve) + { + promise next; + + state_->Attach( + next, std::forward(on_resolve), + [](std::exception_ptr e) -> ResolveR { std::rethrow_exception(e); }, false); + + return next; + } + + template> + std::enable_if_t, promise> Then(ResolveF &&on_resolve, RejectF &&on_reject) + { + promise next; + + state_->Attach(next, std::forward(on_resolve), std::forward(on_reject), true); + + return next; + } + + // + // except + // + + template + promise Except(RejectF &&on_reject) + { + return Then([](auto &&v) { return std::forward(v); }, std::forward(on_reject)); + } + + // + // finally + // + + template + promise Finally(FinallyF &&on_finally) + { + return Then( + [f = on_finally](auto &&v) { + eggs::invoke(std::move(f)); + return std::forward(v); + }, + [f = on_finally](std::exception_ptr e) -> T { + eggs::invoke(std::move(f)); + std::rethrow_exception(e); + }); + } + +private: + class state; + std::shared_ptr state_; + +private: + class state final : private detail::noncopyable { + public: + state() = default; + + const T &Get() + { + sled::MutexLock lock(&mutex_); + // std::unique_lock lock(mutex_); + // cond_var_.wait(lock, [this]() { return status_ != status::pending; }); + cond_var_.Wait(lock, [this]() { return status_ != status::pending; }); + if (status_ == status::rejected) { std::rethrow_exception(exception_); } + assert(status_ == status::resolved); + return *storage_; + } + + void Wait() const noexcept + { + sled::MutexLock lock(&mutex_); + // std::unique_lock lock(mutex_); + // cond_var_.wait(lock, [this]() { return status_ != status::pending; }); + cond_var_.Wait(lock, [this]() { return status_ != status::pending; }); + } + + template + promise_wait_status WaitFor(const std::chrono::duration &timeout_duration) const + { + sled::MutexLock lock(&mutex_); + // std::unique_lock lock(mutex_); + // return cond_var_.wait_for(lock, timeout_duration, [this]() { return status_ != status::pending; }) + return cond_var_.WaitFor( + lock, + sled::TimeDelta::Micros(std::chrono::duration_cast(timeout_duration)), + [this]() { return status_ != status::pending; }) + ? promise_wait_status::no_timeout + : promise_wait_status::timeout; + } + + template + promise_wait_status WaitUntil(const std::chrono::time_point &timeout_time) const + { + sled::MutexLock lock(&mutex_); + // std::unique_lock lock(mutex_); + // return cond_var_.wait_until(lock, timeout_time, [this]() { return status_ != status::pending; }) + auto now = std::chrono::system_clock::now(); + auto duration = timeout_time - now; + + return cond_var_.WaitFor( + lock, sled::TimeDelta::Micros(std::chrono::duration_cast(duration)), + [this]() { return status_ != status::pending; }) + ? promise_wait_status::no_timeout + : promise_wait_status::timeout; + } + + template + bool Resolve(U &&value) + { + sled::MutexLock lock(&mutex_); + // std::lock_guard guard(mutex_); + if (status_ != status::pending) { return false; } + storage_ = std::forward(value); + status_ = status::resolved; + invoke_resolve_handlers_(); + // cond_var_.notify_all(); + cond_var_.NotifyAll(); + return true; + } + + bool Reject(std::exception_ptr e) noexcept + { + sled::MutexLock lock(&mutex_); + // std::lock_guard guard(mutex_); + if (status_ != status::pending) { return false; } + exception_ = e; + status_ = status::rejected; + invoke_reject_handlers_(); + cond_var_.NotifyAll(); + return true; + } + + public: + template + std::enable_if_t::value, void> + Attach(promise &next, ResolveF &&on_resolve, RejectF &&on_reject, bool has_reject) + { + auto reject_h = [n = next, f = std::forward(on_reject), has_reject](std::exception_ptr e) mutable { + if (has_reject) { + try { + eggs::invoke(std::forward(f), e); + n.Resolve(); + } catch (...) { + n.Reject(std::current_exception()); + } + } else { + n.Reject(e); + } + }; + + auto resolve_h = [n = next, f = std::forward(on_resolve)](auto &&v) mutable { + try { + eggs::invoke(std::forward(f), std::forward(v)); + n.Resolve(); + } catch (...) { + n.Reject(std::current_exception()); + } + }; + + sled::MutexLock lock(&mutex_); + // std::lock_guard guard(mutex_); + add_handlers_(std::move(resolve_h), std::move(reject_h)); + } + + template + std::enable_if_t::value, void> + Attach(promise &next, ResolveF &&on_resolve, RejectF &&on_reject, bool has_reject) + { + auto reject_h = [n = next, f = std::forward(on_reject), has_reject](std::exception_ptr e) mutable { + if (has_reject) { + try { + auto r = eggs::invoke(std::forward(f), e); + n.Resolve(std::move(r)); + } catch (...) { + n.Reject(std::current_exception()); + } + } else { + n.Reject(e); + } + }; + + auto resolve_h = [n = next, f = std::forward(on_resolve)](auto &&v) mutable { + try { + auto r = eggs::invoke(std::forward(f), std::forward(v)); + n.Resolve(std::move(r)); + } catch (...) { + n.Reject(std::current_exception()); + } + }; + + sled::MutexLock lock(&mutex_); + // std::lock_guard guard(mutex_); + add_handlers_(std::move(resolve_h), std::move(reject_h)); + } + + private: + template + void add_handlers_(ResolveF &&resolve, RejectF &&reject) + { + if (status_ == status::resolved) { + eggs::invoke(std::forward(resolve), *storage_); + } else if (status_ == status::rejected) { + eggs::invoke(std::forward(reject), exception_); + } else { + handlers_.push_back({std::forward(resolve), std::forward(reject)}); + } + } + + void invoke_resolve_handlers_() noexcept + { + for (const auto &h : handlers_) { h.resolve_(*storage_); } + handlers_.clear(); + } + + void invoke_reject_handlers_() noexcept + { + for (const auto &h : handlers_) { h.reject_(exception_); } + handlers_.clear(); + } + + private: + enum class status { pending, resolved, rejected }; + + status status_{status::pending}; + std::exception_ptr exception_{nullptr}; + + mutable sled::Mutex mutex_; + mutable sled::ConditionVariable cond_var_; + + // mutable std::mutex mutex_; + // mutable std::condition_variable cond_var_; + + struct handler { + using resolve_t = std::function; + using reject_t = std::function; + + resolve_t resolve_; + reject_t reject_; + }; + + detail::storage storage_; + std::vector handlers_; + }; +}; +}// namespace promise_hpp + +// ----------------------------------------------------------------------------- +// +// promise +// +// ----------------------------------------------------------------------------- + +namespace promise_hpp { +template<> +class promise final { +public: + using value_type = void; + + promise() : state_(std::make_shared()) {} + + promise(promise &&) = default; + promise &operator=(promise &&) = default; + + promise(const promise &) = default; + promise &operator=(const promise &) = default; + + void swap(promise &other) noexcept { state_.swap(other.state_); } + + std::size_t hash() const noexcept { return std::hash()(state_.get()); } + + friend bool operator<(const promise &l, const promise &r) noexcept { return l.state_ < r.state_; } + + friend bool operator==(const promise &l, const promise &r) noexcept { return l.state_ == r.state_; } + + friend bool operator!=(const promise &l, const promise &r) noexcept { return l.state_ != r.state_; } + + // + // get + // + + void get() const { state_->get(); } + + void get_or_default() const + { + try { + return get(); + } catch (...) { + // nothing + } + } + + // + // wait + // + + void wait() const noexcept { state_->wait(); } + + template + promise_wait_status wait_for(const std::chrono::duration &timeout_duration) const + { + return state_->wait_for(timeout_duration); + } + + template + promise_wait_status wait_until(const std::chrono::time_point &timeout_time) const + { + return state_->wait_until(timeout_time); + } + + // + // resolve/reject + // + + bool resolve() { return state_->resolve(); } + + bool reject(std::exception_ptr e) noexcept { return state_->reject(e); } + + template + bool reject(E &&e) + { + return state_->reject(std::make_exception_ptr(std::forward(e))); + } + + // + // then + // + + template> + std::enable_if_t, promise> then(ResolveF &&on_resolve) + { + promise next; + + then([n = next, f = std::forward(on_resolve)]() mutable { + auto np = eggs::invoke(std::forward(f)); + std::move(np) + .then([n](auto &&...nvs) mutable { n.resolve(std::forward(nvs)...); }) + .except([n](std::exception_ptr e) mutable { n.reject(e); }); + }).except([n = next](std::exception_ptr e) mutable { n.reject(e); }); + + return next; + } + + template + auto then_all(ResolveF &&on_resolve) + { + return then([f = std::forward(on_resolve)]() mutable { + auto r = eggs::invoke(std::forward(f)); + return make_all_promise(std::move(r)); + }); + } + + template + auto then_any(ResolveF &&on_resolve) + { + return then([f = std::forward(on_resolve)]() mutable { + auto r = eggs::invoke(std::forward(f)); + return make_any_promise(std::move(r)); + }); + } + + template + auto then_race(ResolveF &&on_resolve) + { + return then([f = std::forward(on_resolve)]() mutable { + auto r = eggs::invoke(std::forward(f)); + return make_race_promise(std::move(r)); + }); + } + + template + auto then_tuple(ResolveF &&on_resolve) + { + return then([f = std::forward(on_resolve)]() mutable { + auto r = eggs::invoke(std::forward(f)); + return make_tuple_promise(std::move(r)); + }); + } + + // + // then + // + + template> + std::enable_if_t, promise> then(ResolveF &&on_resolve) + { + promise next; + + state_->attach( + next, std::forward(on_resolve), + [](std::exception_ptr e) -> ResolveR { std::rethrow_exception(e); }, false); + + return next; + } + + template> + std::enable_if_t, promise> then(ResolveF &&on_resolve, RejectF &&on_reject) + { + promise next; + + state_->attach(next, std::forward(on_resolve), std::forward(on_reject), true); + + return next; + } + + // + // except + // + + template + promise except(RejectF &&on_reject) + { + return then([]() {}, std::forward(on_reject)); + } + + // + // finally + // + + template + promise finally(FinallyF &&on_finally) + { + return then([f = on_finally]() { eggs::invoke(std::move(f)); }, + [f = on_finally](std::exception_ptr e) { + eggs::invoke(std::move(f)); + std::rethrow_exception(e); + }); + } + +private: + class state; + std::shared_ptr state_; + +private: + class state final : private detail::noncopyable { + public: + state() = default; + + void get() + { + sled::MutexLock lock(&mutex_); + // std::unique_lock lock(mutex_); + // cond_var_.wait(lock, [this]() { return status_ != status::pending; }); + cond_var_.Wait(lock, [this]() { return status_ != status::pending; }); + if (status_ == status::rejected) { std::rethrow_exception(exception_); } + assert(status_ == status::resolved); + } + + void wait() const noexcept + { + sled::MutexLock lock(&mutex_); + // std::unique_lock lock(mutex_); + // cond_var_.wait(lock, [this]() { return status_ != status::pending; }); + cond_var_.Wait(lock, [this]() { return status_ != status::pending; }); + } + + template + promise_wait_status wait_for(const std::chrono::duration &timeout_duration) const + { + sled::MutexLock lock(&mutex_); + // std::unique_lock lock(mutex_); + // return cond_var_.wait_for(lock, timeout_duration, [this]() { return status_ != status::pending; }) + return cond_var_.WaitFor(lock, timeout_duration, [this]() { return status_ != status::pending; }) + ? promise_wait_status::no_timeout + : promise_wait_status::timeout; + } + + template + promise_wait_status wait_until(const std::chrono::time_point &timeout_time) const + { + sled::MutexLock lock(&mutex_); + // std::unique_lock lock(mutex_); + // return cond_var_.wait_until(lock, timeout_time, [this]() { return status_ != status::pending; }) + return cond_var_.WaitFor(lock, timeout_time, [this]() { return status_ != status::pending; }) + ? promise_wait_status::no_timeout + : promise_wait_status::timeout; + } + + bool resolve() + { + sled::MutexLock lock(&mutex_); + // std::unique_lock lock(mutex_); + // std::lock_guard guard(mutex_); + if (status_ != status::pending) { return false; } + status_ = status::resolved; + invoke_resolve_handlers_(); + cond_var_.NotifyAll(); + return true; + } + + bool reject(std::exception_ptr e) noexcept + { + sled::MutexLock lock(&mutex_); + // std::lock_guard guard(mutex_); + if (status_ != status::pending) { return false; } + exception_ = e; + status_ = status::rejected; + invoke_reject_handlers_(); + cond_var_.NotifyAll(); + return true; + } + + public: + template + std::enable_if_t::value, void> + attach(promise &next, ResolveF &&on_resolve, RejectF &&on_reject, bool has_reject) + { + auto reject_h = [n = next, f = std::forward(on_reject), has_reject](std::exception_ptr e) mutable { + if (has_reject) { + try { + eggs::invoke(std::forward(f), e); + n.resolve(); + } catch (...) { + n.reject(std::current_exception()); + } + } else { + n.reject(e); + } + }; + + auto resolve_h = [n = next, f = std::forward(on_resolve)]() mutable { + try { + eggs::invoke(std::forward(f)); + n.resolve(); + } catch (...) { + n.reject(std::current_exception()); + } + }; + + sled::MutexLock lock(&mutex_); + // std::lock_guard guard(mutex_); + add_handlers_(std::move(resolve_h), std::move(reject_h)); + } + + template + std::enable_if_t::value, void> + attach(promise &next, ResolveF &&on_resolve, RejectF &&on_reject, bool has_reject) + { + auto reject_h = [n = next, f = std::forward(on_reject), has_reject](std::exception_ptr e) mutable { + if (has_reject) { + try { + auto r = eggs::invoke(std::forward(f), e); + n.resolve(std::move(r)); + } catch (...) { + n.reject(std::current_exception()); + } + } else { + n.reject(e); + } + }; + + auto resolve_h = [n = next, f = std::forward(on_resolve)]() mutable { + try { + auto r = eggs::invoke(std::forward(f)); + n.resolve(std::move(r)); + } catch (...) { + n.reject(std::current_exception()); + } + }; + + sled::MutexLock lock(&mutex_); + // std::lock_guard guard(mutex_); + add_handlers_(std::move(resolve_h), std::move(reject_h)); + } + + private: + template + void add_handlers_(ResolveF &&resolve, RejectF &&reject) + { + if (status_ == status::resolved) { + eggs::invoke(std::forward(resolve)); + } else if (status_ == status::rejected) { + eggs::invoke(std::forward(reject), exception_); + } else { + handlers_.push_back({std::forward(resolve), std::forward(reject)}); + } + } + + void invoke_resolve_handlers_() noexcept + { + for (const auto &h : handlers_) { h.resolve_(); } + handlers_.clear(); + } + + void invoke_reject_handlers_() noexcept + { + for (const auto &h : handlers_) { h.reject_(exception_); } + handlers_.clear(); + } + + private: + enum class status { pending, resolved, rejected }; + + status status_{status::pending}; + std::exception_ptr exception_{nullptr}; + + mutable sled::Mutex mutex_; + mutable sled::ConditionVariable cond_var_; + + // mutable std::mutex mutex_; + // mutable std::condition_variable cond_var_; + + struct handler { + using resolve_t = std::function; + using reject_t = std::function; + + resolve_t resolve_; + reject_t reject_; + }; + + std::vector handlers_; + }; +}; +}// namespace promise_hpp + +namespace promise_hpp { +// +// swap +// + +template +void +swap(promise &l, promise &r) noexcept +{ + l.swap(r); +} + +// +// make_promise +// + +template +promise +make_promise() +{ + return promise(); +} + +template +promise +make_promise(F &&f) +{ + promise result; + + auto resolver = [result](auto &&v) mutable { return result.Resolve(std::forward(v)); }; + + auto rejector = [result](auto &&e) mutable { return result.Reject(std::forward(e)); }; + + try { + eggs::invoke(std::forward(f), std::move(resolver), std::move(rejector)); + } catch (...) { + result.Reject(std::current_exception()); + } + + return result; +} + +// +// make_resolved_promise +// + +inline promise +make_resolved_promise() +{ + promise result; + result.resolve(); + return result; +} + +template +promise> +make_resolved_promise(R &&v) +{ + promise> result; + result.Resolve(std::forward(v)); + return result; +} + +// +// make_rejected_promise +// + +template +promise +make_rejected_promise(E &&e) +{ + promise result; + result.reject(std::forward(e)); + return result; +} + +template +promise +make_rejected_promise(E &&e) +{ + promise result; + result.Reject(std::forward(e)); + return result; +} + +// +// make_all_promise +// + +template::value_type, + typename SubPromiseResult = typename SubPromise::value_type, + typename ResultPromiseValueType = std::vector> +promise +make_all_promise(Iter begin, Iter end) +{ + if (begin == end) { return make_resolved_promise(ResultPromiseValueType()); } + + struct context_t { + std::atomic_size_t success_counter{0u}; + std::vector> results; + + context_t(std::size_t count) : success_counter(count), results(count) {} + }; + + return make_promise([begin, end](auto &&resolver, auto &&rejector) { + std::size_t result_index = 0; + auto context = std::make_shared(std::distance(begin, end)); + for (Iter iter = begin; iter != end; ++iter, ++result_index) { + (*iter) + .then([context, resolver, result_index](auto &&v) mutable { + context->results[result_index] = std::forward(v); + if (!--context->success_counter) { + std::vector results; + results.reserve(context->results.size()); + for (auto &&r : context->results) { results.push_back(std::move(*r)); } + resolver(std::move(results)); + } + }) + .except([rejector](std::exception_ptr e) mutable { rejector(e); }); + } + }); +} + +template +auto +make_all_promise(Container &&container) +{ + return make_all_promise(std::begin(container), std::end(container)); +} + +// +// make_any_promise +// + +template::value_type, + typename SubPromiseResult = typename SubPromise::value_type, + typename ResultPromiseValueType = SubPromiseResult> +promise +make_any_promise(Iter begin, Iter end) +{ + if (begin == end) { return make_rejected_promise(aggregate_exception()); } + + struct context_t { + std::atomic_size_t failure_counter{0u}; + std::vector exceptions; + + context_t(std::size_t count) : failure_counter(count), exceptions(count) {} + }; + + return make_promise([begin, end](auto &&resolver, auto &&rejector) { + std::size_t exception_index = 0; + auto context = std::make_shared(std::distance(begin, end)); + for (Iter iter = begin; iter != end; ++iter, ++exception_index) { + (*iter) + .then([resolver](auto &&v) mutable { resolver(std::forward(v)); }) + .except([context, rejector, exception_index](std::exception_ptr e) mutable { + context->exceptions[exception_index] = e; + if (!--context->failure_counter) { rejector(aggregate_exception(std::move(context->exceptions))); } + }); + } + }); +} + +template +auto +make_any_promise(Container &&container) +{ + return make_any_promise(std::begin(container), std::end(container)); +} + +// +// make_race_promise +// + +template::value_type, + typename SubPromiseResult = typename SubPromise::value_type, + typename ResultPromiseValueType = SubPromiseResult> +promise +make_race_promise(Iter begin, Iter end) +{ + return make_promise([begin, end](auto &&resolver, auto &&rejector) { + for (Iter iter = begin; iter != end; ++iter) { (*iter).then(resolver).except(rejector); } + }); +} + +template +auto +make_race_promise(Container &&container) +{ + return make_race_promise(std::begin(container), std::end(container)); +} + +// +// make_tuple_promise +// + +namespace impl { +template +struct tuple_promise_result_impl {}; + +template +struct tuple_promise_result_impl...>> { + using type = std::tuple; +}; + +template +struct tuple_promise_result { + using type = typename tuple_promise_result_impl>::type; +}; + +template +using tuple_promise_result_t = typename tuple_promise_result::type; + +template +class tuple_promise_context_t final : private detail::noncopyable { +public: + template + bool apply_result(T &&value) + { + std::get(results_) = std::forward(value); + return ++counter_ == sizeof...(ResultTypes); + } + + std::tuple get_results() + { + return get_results_impl(std::make_index_sequence()); + } + +private: + template + std::tuple get_results_impl(std::index_sequence) + { + return {std::move(*std::get(results_))...}; + } + +private: + std::atomic_size_t counter_{0}; + std::tuple...> results_; +}; + +template +using tuple_promise_context_ptr = std::shared_ptr>; + +template +promise +make_tuple_sub_promise_impl(Tuple &&tuple, + Resolver &&resolver, + Rejector &&rejector, + const tuple_promise_context_ptr &context) +{ + return std::get(tuple) + .then([context, resolver](auto &&v) mutable { + if (context->template apply_result(std::forward(v))) { resolver(context->get_results()); } + }) + .except(rejector); +} + +template>> +std::enable_if_t> +make_tuple_promise_impl(Tuple &&, std::index_sequence) +{ + return make_resolved_promise(ResultTuple()); +} + +template>> +std::enable_if_t> +make_tuple_promise_impl(Tuple &&tuple, std::index_sequence) +{ + auto result = promise(); + + auto resolver = [result](auto &&v) mutable { return result.resolve(std::forward(v)); }; + + auto rejector = [result](auto &&e) mutable { return result.reject(std::forward(e)); }; + + try { + auto context = std::make_shared...>>(); + auto promises = std::make_tuple(make_tuple_sub_promise_impl(tuple, resolver, rejector, context)...); + (void) promises; + } catch (...) { + result.reject(std::current_exception()); + } + + return result; +} +}// namespace impl + +template>> +promise +make_tuple_promise(Tuple &&tuple) +{ + return impl::make_tuple_promise_impl(std::forward(tuple), + std::make_index_sequence::value>()); +} +}// namespace promise_hpp + +namespace std { +template +struct hash> final { + std::size_t operator()(const promise_hpp::promise &p) const noexcept { return p.hash(); } +}; +}// namespace std + +namespace sled { + +template +struct IsPromise : promise_hpp::is_promise {}; + +template +struct IsPromiseRet : promise_hpp::is_promise_r {}; + +template +using Promise = promise_hpp::promise; + +template +inline auto +MakePromise() -> Promise +{ + return promise_hpp::make_promise(); +} + +template +inline auto +MakePromise(F &&f) +{ + return promise_hpp::make_promise(std::forward(f)); +} + +inline auto +MakeResolvedPromise() -> Promise +{ + Promise result; + result.resolve(); + return result; +} + +template +inline auto +MakeResolvedPromise() +{ + return promise_hpp::make_resolved_promise(); +} + +template +inline Promise +make_rejected_promise(E &&e) +{ + Promise result; + result.reject(std::forward(e)); + return result; +} + +template +inline auto +MakeRejectedPromise(std::exception_ptr e) +{ + return promise_hpp::make_rejected_promise(e); +} + +template +inline auto +MakeAllPromise(Iter begin, Iter end) +{ + return promise_hpp::make_all_promise(begin, end); +} + +template +inline auto +MakeAllPromise(Container &&container) +{ + return promise_hpp::make_all_promise(std::begin(container), std::end(container)); +} + +}// namespace sled + +#endif// SLED_FUTURES_PROMISE_H diff --git a/include/sled/synchronization/mutex.h b/include/sled/synchronization/mutex.h index 2b59fae..6c01f1d 100644 --- a/include/sled/synchronization/mutex.h +++ b/include/sled/synchronization/mutex.h @@ -134,11 +134,13 @@ public: template inline bool WaitFor(MutexLock &lock, TimeDelta timeout, Predicate &&pred) { + if (timeout.ns() < 0) { return pred(); } + if (timeout == TimeDelta::PlusInfinity()) { cv_.wait(lock.lock_, std::forward(pred)); return true; } else { - return cv_.wait_for(lock.lock_, std::chrono::milliseconds(timeout.ms()), std::forward(pred)); + return cv_.wait_for(lock.lock_, std::chrono::microseconds(timeout.us()), std::forward(pred)); } } diff --git a/include/sled/units/time_delta.h b/include/sled/units/time_delta.h index 8e5c775..9752df5 100644 --- a/include/sled/units/time_delta.h +++ b/include/sled/units/time_delta.h @@ -43,6 +43,12 @@ public: return FromValue(value); } + template + inline TimeDelta(std::chrono::duration duration) + { + *this = Micros(std::chrono::duration_cast(duration)); + } + TimeDelta() = delete; template @@ -69,15 +75,9 @@ public: return ToMultiple<1000, T>(); } - constexpr int64_t seconds_or(int64_t fallback_value) const - { - return ToFractionOr<1000000>(fallback_value); - } + constexpr int64_t seconds_or(int64_t fallback_value) const { return ToFractionOr<1000000>(fallback_value); } - constexpr int64_t ms_or(int64_t fallback_value) const - { - return ToFractionOr<1000>(fallback_value); - } + constexpr int64_t ms_or(int64_t fallback_value) const { return ToFractionOr<1000>(fallback_value); } constexpr int64_t us_or(int64_t fallback_value) const { return ToValueOr(fallback_value); } diff --git a/src/futures/promise_test.cc b/src/futures/promise_test.cc new file mode 100644 index 0000000..0fbcf95 --- /dev/null +++ b/src/futures/promise_test.cc @@ -0,0 +1,21 @@ +#include +#include + +TEST(Promise, Basic) +{ + auto p = sled::Promise(); + auto v = p.Then([](int v) { + EXPECT_EQ(v, 1); + return v + 10; + }) + .Tap([](int v) { + EXPECT_EQ(v, 11); + // no effect + return v + 1; + }) + .Then([](int v) { + EXPECT_EQ(v, 11); + return v + 10; + }); + p.Resolve(1); +}