sled/3party/rxcpp/operators/rx-retry-repeat-common.hpp

154 lines
5.0 KiB
C++
Raw Permalink Normal View History

2024-03-14 20:50:17 +08:00
#pragma once
/*! \file rx-retry-repeat-common.hpp
\brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp
*/
#include "../rx-includes.hpp"
namespace rxcpp {
namespace operators {
namespace detail {
namespace retry_repeat_common {
// Structure to perform general retry/repeat operations on state
template <class Values, class Subscriber, class EventHandlers, class T>
struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>,
public Values {
typedef Subscriber output_type;
state_type(const Values& i, const output_type& oarg)
: Values(i),
source_lifetime(composite_subscription::empty()),
out(oarg) {
}
void do_subscribe() {
auto state = this->shared_from_this();
state->out.remove(state->lifetime_token);
state->source_lifetime.unsubscribe();
state->source_lifetime = composite_subscription();
state->lifetime_token = state->out.add(state->source_lifetime);
state->source.subscribe(
state->out,
state->source_lifetime,
// on_next
[state](T t) {
state->out.on_next(t);
},
// on_error
[state](rxu::error_ptr e) {
EventHandlers::on_error(state, e);
},
// on_completed
[state]() {
EventHandlers::on_completed(state);
}
);
}
composite_subscription source_lifetime;
output_type out;
composite_subscription::weak_subscription lifetime_token;
};
// Finite case (explicitely limited with the number of times)
template <class EventHandlers, class T, class Observable, class Count>
struct finite : public operator_base<T> {
typedef rxu::decay_t<Observable> source_type;
typedef rxu::decay_t<Count> count_type;
struct values {
values(source_type s, count_type t)
: source(std::move(s)),
remaining_(std::move(t)) {
}
inline bool completed_predicate() const {
// Return true if we are completed
return remaining_ <= 0;
}
inline void update() {
// Decrement counter
--remaining_;
}
source_type source;
private:
// Counter to hold number of times remaining to complete
count_type remaining_;
};
finite(source_type s, count_type t)
: initial_(std::move(s), std::move(t)) {
}
template<class Subscriber>
void on_subscribe(const Subscriber& s) const {
typedef state_type<values, Subscriber, EventHandlers, T> state_t;
// take a copy of the values for each subscription
auto state = std::make_shared<state_t>(initial_, s);
if (initial_.completed_predicate()) {
// return completed
state->out.on_completed();
} else {
// start the first iteration
state->do_subscribe();
}
}
private:
values initial_;
};
// Infinite case
template <class EventHandlers, class T, class Observable>
struct infinite : public operator_base<T> {
typedef rxu::decay_t<Observable> source_type;
struct values {
values(source_type s)
: source(std::move(s)) {
}
static inline bool completed_predicate() {
// Infinite never completes
return false;
}
static inline void update() {
// Infinite does not need to update state
}
source_type source;
};
infinite(source_type s) : initial_(std::move(s)) {
}
template<class Subscriber>
void on_subscribe(const Subscriber& s) const {
typedef state_type<values, Subscriber, EventHandlers, T> state_t;
// take a copy of the values for each subscription
auto state = std::make_shared<state_t>(initial_, s);
// start the first iteration
state->do_subscribe();
}
private:
values initial_;
};
}
}
}
}