Commit b385bca8 authored by tqcq's avatar tqcq
Browse files

feat add future

parent b6e7c12a
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -61,6 +61,8 @@ target_sources(
          src/sled/debugging/demangle.cc
          src/sled/debugging/symbolize.cc
          src/sled/event_bus/event_bus.cc
          src/sled/futures/future.cc
          src/sled/futures/internal/failure_handling.cc
          src/sled/filesystem/path.cc
          src/sled/log/log.cc
          src/sled/network/async_resolver.cc
@@ -210,6 +212,7 @@ if(SLED_BUILD_TESTS)
  sled_add_test(NAME sled_inja_test SRCS src/sled/nonstd/inja_test.cc)
  sled_add_test(NAME sled_fsm_test SRCS src/sled/nonstd/fsm_test.cc)
  sled_add_test(NAME sled_timestamp_test SRCS src/sled/units/timestamp_test.cc)
  sled_add_test(NAME sled_future_test SRCS src/sled/futures/future_test.cc)
  sled_add_test(
    NAME sled_cache_test SRCS src/sled/cache/lru_cache_test.cc
    src/sled/cache/fifo_cache_test.cc src/sled/cache/expire_cache_test.cc)
+1 −1
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ TEST_SUITE("Async")
            CHECK_EQ(value, 126);
            return value;
        });
        task1.wait();
        // task1.wait();
        CHECK_EQ(126, task1.get());
    }

src/sled/futures/detail/delay.h

deleted100644 → 0
+0 −92
Original line number Diff line number Diff line
#ifndef SLED_FUTURES_DETAIL_DELAY_H
#define SLED_FUTURES_DETAIL_DELAY_H

#include "sled/units/time_delta.h"
#include "traits.h"
#include <exception>

namespace sled {
namespace detail {

template<typename R>
struct DelayReceiver {
    R receiver;
    sled::TimeDelta delta;
    bool stopped = false;

    template<typename U>
    void SetValue(U &&val)
    {
        if (stopped) { return; }
        receiver.SetValue(std::forward<U>(val));
    }

    void SetError(std::exception_ptr e)
    {
        if (stopped) { return; }
        receiver.SetError(e);
    }

    void SetStopped()
    {
        if (stopped) { return; }
        stopped = true;
        receiver.SetStopped();
    }
};

template<typename S, typename R>
struct DelayOperation {
    ConnectResultT<S, R> op;

    void Start() { op.Start(); }

    void Stop() { op.Stop(); }
};

template<typename S>
struct DelaySender {
    using result_t  = typename S::result_t;
    using this_type = DelaySender<S>;
    S sender;
    sled::TimeDelta delta;

    template<typename R>
    DelayOperation<S, DelayReceiver<R>> Connect(R receiver)
    {
        return {sender.Connect(DelayReceiver<R>{receiver, delta})};
    }

    template<typename Lazy>
    friend ContinueResultT<this_type, Lazy> operator|(this_type sender, Lazy lazy)
    {
        return lazy.Continue(sender);
    }
};

template<typename S>
DelaySender<S>
Delay(S sender, sled::TimeDelta const &delta)
{
    return {sender, delta};
}

struct DelayLazy {
    sled::TimeDelta delta;

    template<typename S>
    DelaySender<S> Continue(S sender) const
    {
        return {sender, delta};
    }
};

inline DelayLazy
Delay(sled::TimeDelta const &delta)
{
    return {delta};
}

}// namespace detail
}// namespace sled
#endif//  SLED_FUTURES_DETAIL_DELAY_H

src/sled/futures/detail/future.h

deleted100644 → 0
+0 −103
Original line number Diff line number Diff line
#ifndef SLED_FUTURES_DETAIL_FUTURE_H
#define SLED_FUTURES_DETAIL_FUTURE_H
#include "sled/synchronization/mutex.h"
#include "traits.h"

namespace sled {
namespace detail {

namespace {
enum class State {
    kPending,
    kError,
    kValue,
};

template<typename T>
struct FutureState {

    sled::Mutex mutex;
    sled::ConditionVariable cv;
    State state = State::kPending;

    union {
        T value;
        std::exception_ptr error;
    };
};
}// namespace

template<typename R>
struct FutureOperation {
    std::shared_ptr<FutureState<typename R::result_t>> state;
    R receiver;
    mutable bool stopped = false;

    void Start()
    {
        if (stopped) { return; }
        sled::MutexLock lock(&state->mutex);
        state->cv.Wait(lock, [&] { return state->state != State::kPending; });
        if (state->state == State::kValue) {
            receiver.SetValue(std::move(state->value));
        } else {
            receiver.SetError(state->error);
        }
    }

    void Stop()
    {
        if (stopped) { return; }
        stopped = true;
        receiver.SetStopped();
    }
};

template<typename T>
struct FutureSender {
    using result_t  = T;
    using this_type = FutureSender<T>;
    std::shared_ptr<FutureState<T>> state;

    template<typename R>
    FutureOperation<R> Connect(R receiver)
    {
        return {state, receiver};
    }

    template<typename Lazy>
    friend ContinueResultT<this_type, Lazy> operator|(this_type sender, Lazy lazy)
    {
        return lazy.Continue(sender);
    }
};

template<typename T>
struct Promise {
    std::shared_ptr<FutureState<T>> state;

    Promise() : state(std::make_shared<FutureState<T>>()) {}

    template<typename U = T>
    typename std::enable_if<std::is_convertible<U, T>::value>::type SetValue(U &&val)
    {
        sled::MutexLock lock(&state->mutex);
        state->value = std::forward<T>(val);
        state->state = State::kValue;
        state->cv.NotifyAll();
    }

    void SetError(std::exception_ptr e)
    {
        sled::MutexLock lock(&state->mutex);
        state->error = e;
        state->state = State::kError;
        state->cv.NotifyAll();
    }

    FutureSender<T> GetFuture() { return {state}; }
};

}// namespace detail
}// namespace sled
#endif// SLED_FUTURES_DETAIL_FUTURE_H

src/sled/futures/detail/just.h

deleted100644 → 0
+0 −48
Original line number Diff line number Diff line
#ifndef SLED_FUTURES_DETAIL_JUST_H
#define SLED_FUTURES_DETAIL_JUST_H

#include "traits.h"
#include <memory>

namespace sled {
namespace detail {

template<typename T, typename R>
struct JustOperation {
    T value;
    R receiver;

    void Start() { receiver.SetValue(std::forward<T>(value)); }

    void Stop() { receiver.SetStopped(); }
};

template<typename T>
struct JustSender {
    using result_t  = T;
    using this_type = JustSender<T>;
    T value;

    template<typename R>
    JustOperation<T, R> Connect(R receiver)
    {
        return {std::forward<T>(value), receiver};
    }

    template<typename Lazy>
    friend ContinueResultT<this_type, Lazy> operator|(this_type sender, Lazy lazy)
    {
        return lazy.Continue(sender);
    }
};

template<typename T>
JustSender<T>
Just(T &&value)
{
    return {std::forward<T>(value)};
}

}// namespace detail
}// namespace sled
#endif//  SLED_FUTURES_DETAIL_JUST_H
Loading