feat support operator
All checks were successful
linux-x64-gcc / linux-gcc (Debug) (push) Successful in 1m41s
linux-x64-gcc / linux-gcc (Release) (push) Successful in 1m1s

This commit is contained in:
tqcq 2024-03-26 11:57:10 +08:00
parent d8c5807af6
commit bc80738c27
10 changed files with 330 additions and 288 deletions

View File

@ -3,8 +3,10 @@ BinPackParameters: false
BasedOnStyle: LLVM
AccessModifierOffset: -4
AlignAfterOpenBracket: Align
AlignArrayOfStructures: Left
AlignConsecutiveAssignments: Consecutive
AlignConsecutiveAssignments: None
AlignOperands: DontAlign
AlignOperands: BreakBeforeBinaryOperators
AllowAllArgumentsOnNextLine: false
AllowAllConstructorInitializersOnNextLine: false
AllowAllParametersOfDeclarationOnNextLine: false
@ -31,7 +33,7 @@ BraceWrapping:
IndentBraces: false
SplitEmptyFunction: false
SplitEmptyRecord: true
BreakBeforeBinaryOperators: NonAssignment
BreakBeforeBinaryOperators: All
BreakBeforeTernaryOperators: true
BreakConstructorInitializers: BeforeColon
ConstructorInitializerAllOnOneLineOrOnePerLine: true

View File

@ -1,211 +0,0 @@
#ifndef SLED_FUTURES_DETAIL_BASE_FUTURE_H
#define SLED_FUTURES_DETAIL_BASE_FUTURE_H
#include "sled/log/log.h"
#include "sled/optional.h"
#include "sled/synchronization/mutex.h"
#include <memory>
#include <type_traits>
namespace sled {
namespace {
enum class State {
kCancel,
kPending,
kTimeout,
kError,
kValue,
};
};
template<typename T>
struct FutureState {
mutable sled::Mutex mutex;
mutable sled::ConditionVariable cond_var;
sled::optional<T> value;
std::exception_ptr error;
State state = State::kPending;
void AssertHasValue() const { ASSERT(state == State::kValue, "can't find value"); }
void AssertHasError() const { ASSERT(state == State::kError, "can't find error"); }
void AssertHasTimeout() const { ASSERT(state == State::kTimeout, "can't find timeout"); }
void Wait(sled::MutexLock *lock_ptr = nullptr) const
{
if (lock_ptr) {
if (state != State::kPending) { return; }
cond_var.Wait(*lock_ptr, [this] { return state != State::kPending; });
} else {
sled::MutexLock lock(&mutex);
if (state != State::kPending) { return; }
cond_var.Wait(lock, [this] { return state != State::kPending; });
}
}
void SetError(std::exception_ptr e)
{
sled::MutexLock lock(&mutex);
if (state == State::kCancel) { return; }
ASSERT(state == State::kPending, "state must be kPending");
error = std::move(e);
state = State::kError;
cond_var.NotifyAll();
}
void SetTimeout()
{
sled::MutexLock lock(&mutex);
if (state == State::kCancel) { return; }
ASSERT(state == State::kPending, "state must be kPending");
state = State::kTimeout;
cond_var.NotifyAll();
}
template<typename U = T>
typename std::enable_if<std::is_convertible<U, T>::value>::type SetValue(U &&val)
{
sled::MutexLock lock(&mutex);
if (state == State::kCancel) { return; }
ASSERT(state == State::kPending, "state must be kPending");
value = std::forward<U>(val);
state = State::kValue;
cond_var.NotifyAll();
}
};
template<>
struct FutureState<void> {
mutable sled::Mutex mutex;
mutable sled::ConditionVariable cond_var;
std::exception_ptr error;
State state = State::kPending;
void AssertHasValue() const { ASSERT(state == State::kValue, "can't find value"); }
void AssertHasError() const { ASSERT(state == State::kError, "can't find error"); }
void AssertHasTimeout() const { ASSERT(state == State::kTimeout, "can't find timeout"); }
void Wait(sled::MutexLock *lock_ptr = nullptr) const
{
if (lock_ptr) {
if (state != State::kPending) { return; }
cond_var.Wait(*lock_ptr, [this] { return state != State::kPending; });
} else {
sled::MutexLock lock(&mutex);
if (state != State::kPending) { return; }
cond_var.Wait(lock, [this] { return state != State::kPending; });
}
}
void SetTimeout()
{
sled::MutexLock lock(&mutex);
if (state == State::kCancel) { return; }
ASSERT(state == State::kPending, "state must be kPending");
state = State::kTimeout;
cond_var.NotifyAll();
}
void SetError(std::exception_ptr e)
{
sled::MutexLock lock(&mutex);
if (state == State::kCancel) { return; }
ASSERT(state == State::kPending, "state must be kPending");
error = std::move(e);
state = State::kError;
cond_var.NotifyAll();
}
void SetValue()
{
sled::MutexLock lock(&mutex);
if (state == State::kCancel) { return; }
ASSERT(state == State::kPending, "state must be kPending");
state = State::kValue;
cond_var.NotifyAll();
}
};
template<typename T>
class Future {
public:
// using ValueType = typename std::remove_reference<T>::type;
Future(std::shared_ptr<FutureState<T>> state) : state_(std::move(state)) {}
T Get() const &
{
sled::MutexLock lock(&state_->mutex);
state_->Wait(&lock);
state_->AssertHasValue();
return state_->value.value();
}
T &Get() &
{
sled::MutexLock lock(&state_->mutex);
state_->Wait(&lock);
state_->AssertHasValue();
return state_->value.value();
}
T &&Get() &&
{
sled::MutexLock lock(&state_->mutex);
state_->Wait(&lock);
state_->AssertHasValue();
return std::move(state_->value.value());
}
private:
std::shared_ptr<FutureState<T>> state_;
};
template<>
class Future<void> {
public:
Future(std::shared_ptr<FutureState<void>> state) : state_(std::move(state)) {}
void Wait() const { state_->Wait(); }
void Get() const { Wait(); }
protected:
std::shared_ptr<FutureState<void>> state_;
};
template<typename T>
class Promise {
public:
Promise() : state_(new FutureState<T>()) {}
Future<T> GetFuture() const { return Future<T>(state_); }
template<typename U = T>
typename std::enable_if<!std::is_void<U>::value && std::is_convertible<U, T>::value>::type SetValue(U &&val)
{
state_->SetValue(val);
}
template<typename U = T>
typename std::enable_if<std::is_void<U>::value>::type SetValue()
{
state_->SetValue();
}
void SetError(std::exception_ptr e) { state_->SetError(e); }
void SetTimeout() { state_->SetTimeout(); }
private:
std::shared_ptr<FutureState<T>> state_;
};
}// namespace sled
#endif// SLED_FUTURES_DETAIL_BASE_FUTURE_H

View File

@ -47,6 +47,7 @@ struct DelayOperation {
template<typename S>
struct DelaySender {
using result_t = typename S::result_t;
using this_type = DelaySender<S>;
S sender;
sled::TimeDelta delta;
@ -55,6 +56,12 @@ struct DelaySender {
{
return {sender.Connect(DelayReceiver<R>{receiver, delta})};
}
template<typename Lazy>
friend ContinueResultT<this_type, Lazy> operator|(this_type sender, Lazy lazy)
{
return lazy.Continue(sender);
}
};
template<typename S>
@ -64,6 +71,22 @@ Delay(S sender, sled::TimeDelta const &delta)
return {sender, delta};
}
struct DelayLazy {
sled::TimeDelta delta;
template<typename S>
DelaySender<S> Continue(S sender) const
{
return {sender, delta};
}
};
inline DelayLazy
Delay(sled::TimeDelta const &delta)
{
return {delta};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_DELAY_H

View File

@ -0,0 +1,103 @@
#ifndef SLED_FUTURES_DETAIL_FUTURE_H
#define SLED_FUTURES_DETAIL_FUTURE_H
#include "sled/synchronization/mutex.h"
#include "traits.h"
namespace sled {
namespace detail {
namespace {
enum class State {
kPending,
kError,
kValue,
};
template<typename T>
struct FutureState {
sled::Mutex mutex;
sled::ConditionVariable cv;
State state = State::kPending;
union {
T value;
std::exception_ptr error;
};
};
}// namespace
template<typename R>
struct FutureOperation {
std::shared_ptr<FutureState<typename R::result_t>> state;
R receiver;
mutable bool stopped = false;
void Start()
{
if (stopped) { return; }
sled::MutexLock lock(&state->mutex);
state->cv.Wait(lock, [&] { return state->state != State::kPending; });
if (state->state == State::kValue) {
receiver.SetValue(std::move(state->value));
} else {
receiver.SetError(state->error);
}
}
void Stop()
{
if (stopped) { return; }
stopped = true;
receiver.SetStopped();
}
};
template<typename T>
struct FutureSender {
using result_t = T;
using this_type = FutureSender<T>;
std::shared_ptr<FutureState<T>> state;
template<typename R>
FutureOperation<R> Connect(R receiver)
{
return {state, receiver};
}
template<typename Lazy>
friend ContinueResultT<this_type, Lazy> operator|(this_type sender, Lazy lazy)
{
return lazy.Continue(sender);
}
};
template<typename T>
struct Promise {
std::shared_ptr<FutureState<T>> state;
Promise() : state(std::make_shared<FutureState<T>>()) {}
template<typename U = T>
typename std::enable_if<std::is_convertible<U, T>::value>::type SetValue(U &&val)
{
sled::MutexLock lock(&state->mutex);
state->value = std::forward<T>(val);
state->state = State::kValue;
state->cv.NotifyAll();
}
void SetError(std::exception_ptr e)
{
sled::MutexLock lock(&state->mutex);
state->error = e;
state->state = State::kError;
state->cv.NotifyAll();
}
FutureSender<T> GetFuture() { return {state}; }
};
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_FUTURE_H

View File

@ -12,7 +12,7 @@ struct JustOperation {
T value;
R receiver;
void Start() { receiver.SetValue(std::move(value)); }
void Start() { receiver.SetValue(std::forward<T>(value)); }
void Stop() { receiver.SetStopped(); }
};
@ -20,6 +20,7 @@ struct JustOperation {
template<typename T>
struct JustSender {
using result_t = T;
using this_type = JustSender<T>;
T value;
template<typename R>
@ -27,6 +28,12 @@ struct JustSender {
{
return {std::forward<T>(value), receiver};
}
template<typename Lazy>
friend ContinueResultT<this_type, Lazy> operator|(this_type sender, Lazy lazy)
{
return lazy.Continue(sender);
}
};
template<typename T>

View File

@ -0,0 +1,114 @@
#ifndef SLED_FUTURES_DETAIL_VIA_H
#define SLED_FUTURES_DETAIL_VIA_H
#include "traits.h"
#include <exception>
#include <functional>
#include <memory>
namespace sled {
namespace detail {
template<typename R, typename F>
struct OnReceiver {
R receiver;
F schedule;
bool stopped = false;
template<typename U>
typename std::enable_if<std::is_rvalue_reference<U>::value
|| (!std::is_copy_assignable<U>::value && !std::is_copy_constructible<U>::value)>::type
SetValue(U &&val)
{
static_assert(std::is_rvalue_reference<decltype(val)>::value, "U must be an rvalue reference");
if (stopped) { return; }
try {
auto moved = make_move_on_copy(val);
schedule([this, moved]() mutable { receiver.SetValue(std::move(moved.value)); });
} catch (...) {
SetError(std::current_exception());
}
}
template<typename U>
typename std::enable_if<!std::is_rvalue_reference<U>::value
&& (std::is_copy_assignable<U>::value || std::is_copy_constructible<U>::value)>::type
SetValue(U &&val)
{
if (stopped) { return; }
try {
schedule([this, val] { receiver.SetValue(val); });
} catch (...) {
SetError(std::current_exception());
}
}
void SetError(std::exception_ptr e)
{
if (stopped) { return; }
receiver.SetError(e);
}
void SetStopped()
{
if (stopped) { return; }
stopped = true;
receiver.SetStopped();
}
};
template<typename S, typename R>
struct OnOperation {
ConnectResultT<S, R> op;
void Start() { op.Start(); }
void Stop() { op.Stop(); }
};
template<typename S, typename F>
struct OnSender {
using result_t = typename S::result_t;
using this_type = OnSender<S, F>;
S sender;
F schedule;
template<typename R>
OnOperation<S, OnReceiver<R, F>> Connect(R receiver)
{
return {sender.Connect(OnReceiver<R, F>{receiver, schedule})};
}
template<typename Lazy>
friend ContinueResultT<this_type, Lazy> operator|(this_type sender, Lazy lazy)
{
return lazy.Continue(sender);
}
};
template<typename F>
struct OnLazy {
F func;
template<typename S>
OnSender<S, F> Continue(S sender) const
{
return {sender, func};
}
};
template<typename S, typename F>
OnSender<S, F>
On(S sender, F &&schedule)
{
return {sender, std::forward<F>(schedule)};
}
template<typename F>
OnLazy<F>
On(F &&schedule)
{
return {schedule};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_VIA_H

View File

@ -97,6 +97,7 @@ struct RetryOperation {
template<typename S>
struct RetrySender {
using result_t = typename S::result_t;
using this_type = RetrySender<S>;
S sender;
int retry_count;
@ -107,6 +108,12 @@ struct RetrySender {
auto op = sender.Connect(RetryReceiver<R>{state, receiver});
return {retry_count, state, op};
}
template<typename Lazy>
friend ContinueResultT<this_type, Lazy> operator|(this_type sender, Lazy lazy)
{
return lazy.Continue(sender);
}
};
template<typename S>
@ -116,6 +123,22 @@ Retry(S sender, int retry_count)
return {sender, retry_count};
}
struct RetryLazy {
int retry_count;
template<typename S>
RetrySender<S> Continue(S sender) const
{
return {sender, retry_count};
}
};
inline RetryLazy
Retry(int retry_count)
{
return {retry_count};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_RETRY_H

View File

@ -50,6 +50,7 @@ struct ThenOperation {
template<typename S, typename F>
struct ThenSender {
using result_t = invoke_result_t<F, typename decay_t<S>::result_t>;
using this_type = ThenSender<S, F>;
S sender;
F func;
@ -58,6 +59,12 @@ struct ThenSender {
{
return {sender.Connect(ThenReceiver<R, F>{receiver, func})};
}
template<typename Lazy>
friend ContinueResultT<this_type, Lazy> operator|(this_type sender, Lazy lazy)
{
return lazy.Continue(sender);
}
};
template<typename S, typename F>
@ -67,6 +74,24 @@ Then(S &&sender, F &&func)
return {std::forward<S>(sender), std::forward<F>(func)};
}
template<typename F>
struct ThenLazy {
F func;
template<typename S>
ThenSender<S, F> Continue(S sender) const
{
return {sender, func};
}
};
template<typename F>
ThenLazy<F>
Then(F &&func)
{
return {func};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_THEN_H

View File

@ -3,6 +3,7 @@
#include "sled/exec/detail/invoke_result.h"
#include <memory>
#include <type_traits>
namespace sled {
@ -15,12 +16,41 @@ struct ConnectResult {
template<typename S, typename R>
using ConnectResultT = typename ConnectResult<S, R>::type;
template<typename S, typename Lazy>
struct ContinueResult {
typedef decltype(std::declval<Lazy>().Continue(std::declval<S>())) type;
};
template<typename S, typename Lazy>
using ContinueResultT = typename ContinueResult<S, Lazy>::type;
template<typename F, typename... Args>
using invoke_result_t = eggs::invoke_result_t<F, Args...>;
template<typename T>
using decay_t = typename std::decay<T>::type;
template<typename T>
struct move_on_copy {
using type = typename std::remove_reference<T>::type;
move_on_copy(type &&value) : value(std::move(value)) {}
move_on_copy(const move_on_copy &other) : value(std::move(other.value)) {}
move_on_copy(move_on_copy &&) = delete;
move_on_copy &operator=(const move_on_copy &) = delete;
mutable type value;
};
template<typename T>
move_on_copy<T>
make_move_on_copy(T &&value)
{
return {std::move<T>(value)};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_TRAITS_H

View File

@ -1,74 +0,0 @@
#ifndef SLED_FUTURES_DETAIL_VIA_H
#define SLED_FUTURES_DETAIL_VIA_H
#include "traits.h"
#include <exception>
#include <functional>
#include <memory>
namespace sled {
namespace detail {
template<typename R, typename F>
struct ViaReceiver {
R receiver;
F schedule;
bool stopped = false;
template<typename U>
void SetValue(U &&val)
{
if (stopped) { return; }
try {
// auto func = std::bind(&R::SetValue, &receiver, std::forward<U>(val));
// schedule(std::move(func));
schedule([this, val]() mutable { receiver.SetValue(std::move(val)); });
} catch (...) {
SetError(std::current_exception());
}
}
void SetError(std::exception_ptr e)
{
if (stopped) { return; }
receiver.SetError(e);
}
void SetStopped()
{
if (stopped) { return; }
stopped = true;
receiver.SetStopped();
}
};
template<typename S, typename R>
struct ViaOperation {
ConnectResultT<S, R> op;
void Start() { op.Start(); }
void Stop() { op.Stop(); }
};
template<typename S, typename F>
struct ViaSender {
using result_t = typename S::result_t;
S sender;
F schedule;
template<typename R>
ViaOperation<S, ViaReceiver<R, F>> Connect(R receiver)
{
return {sender.Connect(ViaReceiver<R, F>{receiver, schedule})};
}
};
template<typename S, typename F>
ViaSender<S, F>
Via(S sender, F &&schedule)
{
return {sender, std::forward<F>(schedule)};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_VIA_H