154 lines
5.0 KiB
C++
154 lines
5.0 KiB
C++
#pragma once
|
|
|
|
/*! \file rx-retry-repeat-common.hpp
|
|
|
|
\brief Implementation commonalities between retry and repeat operators abstracted away from rx-retry.hpp and rx-repeat.hpp files. Should be used only from rx-retry.hpp and rx-repeat.hpp
|
|
|
|
*/
|
|
|
|
#include "../rx-includes.hpp"
|
|
|
|
namespace rxcpp {
|
|
namespace operators {
|
|
namespace detail {
|
|
|
|
namespace retry_repeat_common {
|
|
// Structure to perform general retry/repeat operations on state
|
|
template <class Values, class Subscriber, class EventHandlers, class T>
|
|
struct state_type : public std::enable_shared_from_this<state_type<Values, Subscriber, EventHandlers, T>>,
|
|
public Values {
|
|
|
|
typedef Subscriber output_type;
|
|
state_type(const Values& i, const output_type& oarg)
|
|
: Values(i),
|
|
source_lifetime(composite_subscription::empty()),
|
|
out(oarg) {
|
|
}
|
|
|
|
void do_subscribe() {
|
|
auto state = this->shared_from_this();
|
|
|
|
state->out.remove(state->lifetime_token);
|
|
state->source_lifetime.unsubscribe();
|
|
|
|
state->source_lifetime = composite_subscription();
|
|
state->lifetime_token = state->out.add(state->source_lifetime);
|
|
|
|
state->source.subscribe(
|
|
state->out,
|
|
state->source_lifetime,
|
|
// on_next
|
|
[state](T t) {
|
|
state->out.on_next(t);
|
|
},
|
|
// on_error
|
|
[state](rxu::error_ptr e) {
|
|
EventHandlers::on_error(state, e);
|
|
},
|
|
// on_completed
|
|
[state]() {
|
|
EventHandlers::on_completed(state);
|
|
}
|
|
);
|
|
}
|
|
|
|
composite_subscription source_lifetime;
|
|
output_type out;
|
|
composite_subscription::weak_subscription lifetime_token;
|
|
};
|
|
|
|
// Finite case (explicitely limited with the number of times)
|
|
template <class EventHandlers, class T, class Observable, class Count>
|
|
struct finite : public operator_base<T> {
|
|
typedef rxu::decay_t<Observable> source_type;
|
|
typedef rxu::decay_t<Count> count_type;
|
|
|
|
struct values {
|
|
values(source_type s, count_type t)
|
|
: source(std::move(s)),
|
|
remaining_(std::move(t)) {
|
|
}
|
|
|
|
inline bool completed_predicate() const {
|
|
// Return true if we are completed
|
|
return remaining_ <= 0;
|
|
}
|
|
|
|
inline void update() {
|
|
// Decrement counter
|
|
--remaining_;
|
|
}
|
|
|
|
source_type source;
|
|
|
|
private:
|
|
// Counter to hold number of times remaining to complete
|
|
count_type remaining_;
|
|
};
|
|
|
|
finite(source_type s, count_type t)
|
|
: initial_(std::move(s), std::move(t)) {
|
|
}
|
|
|
|
template<class Subscriber>
|
|
void on_subscribe(const Subscriber& s) const {
|
|
typedef state_type<values, Subscriber, EventHandlers, T> state_t;
|
|
// take a copy of the values for each subscription
|
|
auto state = std::make_shared<state_t>(initial_, s);
|
|
if (initial_.completed_predicate()) {
|
|
// return completed
|
|
state->out.on_completed();
|
|
} else {
|
|
// start the first iteration
|
|
state->do_subscribe();
|
|
}
|
|
}
|
|
|
|
private:
|
|
values initial_;
|
|
};
|
|
|
|
// Infinite case
|
|
template <class EventHandlers, class T, class Observable>
|
|
struct infinite : public operator_base<T> {
|
|
typedef rxu::decay_t<Observable> source_type;
|
|
|
|
struct values {
|
|
values(source_type s)
|
|
: source(std::move(s)) {
|
|
}
|
|
|
|
static inline bool completed_predicate() {
|
|
// Infinite never completes
|
|
return false;
|
|
}
|
|
|
|
static inline void update() {
|
|
// Infinite does not need to update state
|
|
}
|
|
|
|
source_type source;
|
|
};
|
|
|
|
infinite(source_type s) : initial_(std::move(s)) {
|
|
}
|
|
|
|
template<class Subscriber>
|
|
void on_subscribe(const Subscriber& s) const {
|
|
typedef state_type<values, Subscriber, EventHandlers, T> state_t;
|
|
// take a copy of the values for each subscription
|
|
auto state = std::make_shared<state_t>(initial_, s);
|
|
// start the first iteration
|
|
state->do_subscribe();
|
|
}
|
|
|
|
private:
|
|
values initial_;
|
|
};
|
|
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|