feat add just,retry,then,via
All checks were successful
linux-x64-gcc / linux-gcc (Debug) (push) Successful in 54s
linux-x64-gcc / linux-gcc (Release) (push) Successful in 1m57s

This commit is contained in:
tqcq 2024-03-25 22:55:30 +08:00
parent 4077bebfc7
commit 53ecd8dd13
7 changed files with 496 additions and 31 deletions

View File

@ -1,63 +1,208 @@
#ifndef SLED_FUTURES_DETAIL_BASE_FUTURE_H #ifndef SLED_FUTURES_DETAIL_BASE_FUTURE_H
#define SLED_FUTURES_DETAIL_BASE_FUTURE_H #define SLED_FUTURES_DETAIL_BASE_FUTURE_H
#include "sled/any.h" #include "sled/log/log.h"
#include "sled/optional.h"
#include "sled/synchronization/mutex.h" #include "sled/synchronization/mutex.h"
#include <memory> #include <memory>
#include <type_traits> #include <type_traits>
namespace sled { namespace sled {
template<typename T> namespace {
class Promise; enum class State {
kCancel,
kPending,
kTimeout,
kError,
kValue,
};
};
template<typename T> template<typename T>
class FutureState { struct FutureState {
public: mutable sled::Mutex mutex;
T Get() mutable sled::ConditionVariable cond_var;
sled::optional<T> value;
std::exception_ptr error;
State state = State::kPending;
void AssertHasValue() const { ASSERT(state == State::kValue, "can't find value"); }
void AssertHasError() const { ASSERT(state == State::kError, "can't find error"); }
void AssertHasTimeout() const { ASSERT(state == State::kTimeout, "can't find timeout"); }
void Wait(sled::MutexLock *lock_ptr = nullptr) const
{ {
sled::MutexLock lock(&mutex_); if (lock_ptr) {
cv_.Wait(&mutex_, [this]() { return done_; }); if (state != State::kPending) { return; }
return sled::any_cast<T>(value_); cond_var.Wait(*lock_ptr, [this] { return state != State::kPending; });
} else {
sled::MutexLock lock(&mutex);
if (state != State::kPending) { return; }
cond_var.Wait(lock, [this] { return state != State::kPending; });
}
} }
void SetError(std::exception_ptr e) void SetError(std::exception_ptr e)
{ {
sled::MutexLock lock(&mutex_); sled::MutexLock lock(&mutex);
value_ = e; if (state == State::kCancel) { return; }
done_ = true; ASSERT(state == State::kPending, "state must be kPending");
}; error = std::move(e);
state = State::kError;
cond_var.NotifyAll();
}
template<typename U = T> void SetTimeout()
typename std::enable_if<!std::is_void<T>::value && std::is_convertible<U, T>::value>::type SetValue(U &&value)
{ {
sled::MutexLock lock(&mutex_); sled::MutexLock lock(&mutex);
value_ = static_cast<T>(std::forward<U>(value)); if (state == State::kCancel) { return; }
done_ = true; ASSERT(state == State::kPending, "state must be kPending");
state = State::kTimeout;
cond_var.NotifyAll();
} }
template<typename U = T> template<typename U = T>
typename std::enable_if<std::is_void<T>::value>::type SetValue(U &&value) typename std::enable_if<std::is_convertible<U, T>::value>::type SetValue(U &&val)
{ {
sled::MutexLock lock(&mutex_); sled::MutexLock lock(&mutex);
done_ = true; if (state == State::kCancel) { return; }
ASSERT(state == State::kPending, "state must be kPending");
value = std::forward<U>(val);
state = State::kValue;
cond_var.NotifyAll();
}
};
template<>
struct FutureState<void> {
mutable sled::Mutex mutex;
mutable sled::ConditionVariable cond_var;
std::exception_ptr error;
State state = State::kPending;
void AssertHasValue() const { ASSERT(state == State::kValue, "can't find value"); }
void AssertHasError() const { ASSERT(state == State::kError, "can't find error"); }
void AssertHasTimeout() const { ASSERT(state == State::kTimeout, "can't find timeout"); }
void Wait(sled::MutexLock *lock_ptr = nullptr) const
{
if (lock_ptr) {
if (state != State::kPending) { return; }
cond_var.Wait(*lock_ptr, [this] { return state != State::kPending; });
} else {
sled::MutexLock lock(&mutex);
if (state != State::kPending) { return; }
cond_var.Wait(lock, [this] { return state != State::kPending; });
}
} }
private: void SetTimeout()
sled::Mutex mutex_; {
sled::ConditionVariable cv_; sled::MutexLock lock(&mutex);
sled::any value_; if (state == State::kCancel) { return; }
bool done_{false}; ASSERT(state == State::kPending, "state must be kPending");
state = State::kTimeout;
cond_var.NotifyAll();
}
void SetError(std::exception_ptr e)
{
sled::MutexLock lock(&mutex);
if (state == State::kCancel) { return; }
ASSERT(state == State::kPending, "state must be kPending");
error = std::move(e);
state = State::kError;
cond_var.NotifyAll();
}
void SetValue()
{
sled::MutexLock lock(&mutex);
if (state == State::kCancel) { return; }
ASSERT(state == State::kPending, "state must be kPending");
state = State::kValue;
cond_var.NotifyAll();
}
}; };
template<typename T> template<typename T>
class BaseFuture { class Future {
public: public:
template<typename T> // using ValueType = typename std::remove_reference<T>::type;
T Get() const Future(std::shared_ptr<FutureState<T>> state) : state_(std::move(state)) {}
T Get() const &
{ {
return state_->Get(); sled::MutexLock lock(&state_->mutex);
state_->Wait(&lock);
state_->AssertHasValue();
return state_->value.value();
} }
T &Get() &
{
sled::MutexLock lock(&state_->mutex);
state_->Wait(&lock);
state_->AssertHasValue();
return state_->value.value();
}
T &&Get() &&
{
sled::MutexLock lock(&state_->mutex);
state_->Wait(&lock);
state_->AssertHasValue();
return std::move(state_->value.value());
}
private:
std::shared_ptr<FutureState<T>> state_;
};
template<>
class Future<void> {
public:
Future(std::shared_ptr<FutureState<void>> state) : state_(std::move(state)) {}
void Wait() const { state_->Wait(); }
void Get() const { Wait(); }
protected:
std::shared_ptr<FutureState<void>> state_;
};
template<typename T>
class Promise {
public:
Promise() : state_(new FutureState<T>()) {}
Future<T> GetFuture() const { return Future<T>(state_); }
template<typename U = T>
typename std::enable_if<!std::is_void<U>::value && std::is_convertible<U, T>::value>::type SetValue(U &&val)
{
state_->SetValue(val);
}
template<typename U = T>
typename std::enable_if<std::is_void<U>::value>::type SetValue()
{
state_->SetValue();
}
void SetError(std::exception_ptr e) { state_->SetError(e); }
void SetTimeout() { state_->SetTimeout(); }
private: private:
std::shared_ptr<FutureState<T>> state_; std::shared_ptr<FutureState<T>> state_;
}; };

View File

@ -0,0 +1,39 @@
#ifndef SLED_FUTURES_DETAIL_JUST_H
#define SLED_FUTURES_DETAIL_JUST_H
#include <memory>
namespace sled {
namespace detail {
template<typename T, typename R>
struct JustOperation {
T value;
R receiver;
void Start() { receiver.SetValue(std::move(value)); }
void Stop() { receiver.SetStopped(); }
};
template<typename T>
struct JustSender {
T value;
template<typename R>
JustOperation<T, R> Connect(R receiver)
{
return {value, receiver};
}
};
template<typename T>
JustSender<T>
Just(T value)
{
return {value};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_JUST_H

View File

@ -0,0 +1,120 @@
#ifndef SLED_FUTURES_DETAIL_RETRY_H
#define SLED_FUTURES_DETAIL_RETRY_H
#include "sled/log/log.h"
#include "sled/synchronization/mutex.h"
#include "traits.h"
#include <memory>
namespace sled {
namespace detail {
namespace {
struct RetryState {
enum State { kPending, kDone, kRetry };
sled::Mutex mutex;
sled::ConditionVariable cv;
int retry_count = 0;
State state = kPending;
};
}// namespace
template<typename R>
struct RetryReceiver {
std::shared_ptr<RetryState> state;
R receiver;
bool stopped = false;
template<typename U>
void SetValue(U &&val)
{
{
sled::MutexLock lock(&state->mutex);
if (stopped) { return; }
state->state = RetryState::kDone;
state->cv.NotifyAll();
}
receiver.SetValue(std::forward<U>(val));
}
void SetError(std::exception_ptr e)
{
// notify
{
sled::MutexLock lock(&state->mutex);
if (stopped) { return; }
if (state->retry_count > 0) {
--state->retry_count;
state->state = RetryState::kRetry;
return;
} else {
state->state = RetryState::kDone;
state->cv.NotifyAll();
}
}
receiver.SetError(e);
}
void SetStopped()
{
{
sled::MutexLock lock(&state->mutex);
if (stopped) { return; }
stopped = true;
state->state = RetryState::kDone;
state->cv.NotifyAll();
}
receiver.SetStopped();
}
};
template<typename S, typename R>
struct RetryOperation {
int retry_count;
std::shared_ptr<RetryState> state;
ConnectResultT<S, R> op;
void Start()
{
{
sled::MutexLock lock(&state->mutex);
state->retry_count = retry_count;
state->state = RetryState::kPending;
}
do {
op.Start();
sled::MutexLock lock(&state->mutex);
state->cv.Wait(lock, [this] { return state->state != RetryState::kPending; });
if (state->state == RetryState::kDone) { break; }
state->state = RetryState::kPending;
} while (true);
}
void Stop() { op.Stop(); }
};
template<typename S>
struct RetrySender {
S sender;
int retry_count;
template<typename R>
RetryOperation<S, RetryReceiver<R>> Connect(R receiver)
{
auto state = std::make_shared<RetryState>();
auto op = sender.Connect(RetryReceiver<R>{state, receiver});
return {retry_count, state, op};
}
};
template<typename S>
RetrySender<S>
Retry(S sender, int retry_count)
{
return {sender, retry_count};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_RETRY_H

View File

@ -0,0 +1,71 @@
#ifndef SLED_FUTURES_DETAIL_THEN_H
#define SLED_FUTURES_DETAIL_THEN_H
#include "traits.h"
#include <memory>
namespace sled {
namespace detail {
template<typename R, typename F>
struct ThenReceiver {
R receiver;
F func;
bool stopped = false;
template<typename U>
void SetValue(U &&val)
{
if (stopped) { return; }
try {
receiver.SetValue(func(std::forward<U>(val)));
} catch (...) {
SetError(std::current_exception());
}
}
void SetError(std::exception_ptr e)
{
if (stopped) { return; }
receiver.SetError(e);
}
void SetStopped()
{
if (stopped) { return; }
stopped = true;
receiver.SetStopped();
}
};
template<typename S, typename R>
struct ThenOperation {
ConnectResultT<S, R> op;
void Start() { op.Start(); }
void Stop() { op.Stop(); }
};
template<typename S, typename F>
struct ThenSender {
S sender;
F func;
template<typename R>
ThenOperation<S, ThenReceiver<R, F>> Connect(R receiver)
{
return {sender.Connect(ThenReceiver<R, F>{receiver, func})};
}
};
template<typename S, typename F>
ThenSender<S, F>
Then(S sender, F &&func)
{
return {sender, std::forward<F>(func)};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_THEN_H

View File

@ -0,0 +1,18 @@
#ifndef SLED_FUTURES_DETAIL_TRAITS_H
#define SLED_FUTURES_DETAIL_TRAITS_H
#include <type_traits>
namespace sled {
namespace detail {
template<typename S, typename R>
struct ConnectResult {
typedef decltype(std::declval<S>().Connect(std::declval<R>())) type;
};
template<typename S, typename R>
using ConnectResultT = typename ConnectResult<S, R>::type;
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_TRAITS_H

View File

@ -0,0 +1,72 @@
#ifndef SLED_FUTURES_DETAIL_VIA_H
#define SLED_FUTURES_DETAIL_VIA_H
#include "traits.h"
#include <exception>
#include <functional>
#include <memory>
namespace sled {
namespace detail {
template<typename R, typename F>
struct ViaReceiver {
R receiver;
F schedule;
bool stopped = false;
template<typename U>
void SetValue(U &&val)
{
if (stopped) { return; }
try {
auto func = std::bind(&R::SetValue, &receiver, std::forward<U>(val));
schedule(std::move(func));
} catch (...) {
SetError(std::current_exception());
}
}
void SetError(std::exception_ptr e)
{
if (stopped) { return; }
receiver.SetError(e);
}
void SetStopped()
{
if (stopped) { return; }
stopped = true;
receiver.SetStopped();
}
};
template<typename S, typename R>
struct ViaOperation {
ConnectResultT<S, R> op;
void Start() { op.Start(); }
void Stop() { op.Stop(); }
};
template<typename S, typename F>
struct ViaSender {
S sender;
F schedule;
template<typename R>
ViaOperation<S, ViaReceiver<R, F>> Connect(R receiver)
{
return {sender.Connect(ViaReceiver<R, F>{receiver, schedule})};
}
};
template<typename S, typename F>
ViaSender<S, F>
Via(S sender, F &&schedule)
{
return {sender, std::forward<F>(schedule)};
}
}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_VIA_H

View File

@ -11,7 +11,7 @@
#include "sled/filesystem/temporary_file.h" #include "sled/filesystem/temporary_file.h"
// futures // futures
#include "sled/futures/promise.h" // #include "sled/futures/promise.h"
// lang // lang
#include "lang/attributes.h" #include "lang/attributes.h"