sled/3party/rxcpp/rx-coordination.hpp
2024-03-14 20:50:17 +08:00

317 lines
9.1 KiB
C++

// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
#pragma once
#if !defined(RXCPP_RX_COORDINATION_HPP)
#define RXCPP_RX_COORDINATION_HPP
#include "rx-includes.hpp"
namespace rxcpp {
struct tag_coordinator {};
struct coordinator_base {typedef tag_coordinator coordinator_tag;};
template<class T, class C = rxu::types_checked>
struct is_coordinator : public std::false_type {};
template<class T>
struct is_coordinator<T, typename rxu::types_checked_from<typename T::coordinator_tag>::type>
: public std::is_convertible<typename T::coordinator_tag*, tag_coordinator*> {};
struct tag_coordination {};
struct coordination_base {typedef tag_coordination coordination_tag;};
namespace detail {
template<class T, class C = rxu::types_checked>
struct is_coordination : public std::false_type {};
template<class T>
struct is_coordination<T, typename rxu::types_checked_from<typename T::coordination_tag>::type>
: public std::is_convertible<typename T::coordination_tag*, tag_coordination*> {};
}
template<class T, class Decayed = rxu::decay_t<T>>
struct is_coordination : detail::is_coordination<Decayed>
{
};
template<class Coordination, class DecayedCoordination = rxu::decay_t<Coordination>>
using coordination_tag_t = typename DecayedCoordination::coordination_tag;
template<class Input>
class coordinator : public coordinator_base
{
public:
typedef Input input_type;
private:
struct not_supported {typedef not_supported type;};
template<class Observable>
struct get_observable
{
typedef decltype((*(input_type*)nullptr).in((*(Observable*)nullptr))) type;
};
template<class Subscriber>
struct get_subscriber
{
typedef decltype((*(input_type*)nullptr).out((*(Subscriber*)nullptr))) type;
};
template<class F>
struct get_action_function
{
typedef decltype((*(input_type*)nullptr).act((*(F*)nullptr))) type;
};
public:
input_type input;
template<class T>
struct get
{
typedef typename std::conditional<
rxsc::detail::is_action_function<T>::value, get_action_function<T>, typename std::conditional<
is_observable<T>::value, get_observable<T>, typename std::conditional<
is_subscriber<T>::value, get_subscriber<T>, not_supported>::type>::type>::type::type type;
};
coordinator(Input i) : input(i) {}
rxsc::worker get_worker() const {
return input.get_worker();
}
rxsc::scheduler get_scheduler() const {
return input.get_scheduler();
}
template<class Observable>
auto in(Observable o) const
-> typename get_observable<Observable>::type {
return input.in(std::move(o));
static_assert(is_observable<Observable>::value, "can only synchronize observables");
}
template<class Subscriber>
auto out(Subscriber s) const
-> typename get_subscriber<Subscriber>::type {
return input.out(std::move(s));
static_assert(is_subscriber<Subscriber>::value, "can only synchronize subscribers");
}
template<class F>
auto act(F f) const
-> typename get_action_function<F>::type {
return input.act(std::move(f));
static_assert(rxsc::detail::is_action_function<F>::value, "can only synchronize action functions");
}
};
class identity_one_worker : public coordination_base
{
rxsc::scheduler factory;
class input_type
{
rxsc::worker controller;
rxsc::scheduler factory;
public:
explicit input_type(rxsc::worker w)
: controller(w)
, factory(rxsc::make_same_worker(w))
{
}
inline rxsc::worker get_worker() const {
return controller;
}
inline rxsc::scheduler get_scheduler() const {
return factory;
}
inline rxsc::scheduler::clock_type::time_point now() const {
return factory.now();
}
template<class Observable>
auto in(Observable o) const
-> Observable {
return o;
}
template<class Subscriber>
auto out(Subscriber s) const
-> Subscriber {
return s;
}
template<class F>
auto act(F f) const
-> F {
return f;
}
};
public:
explicit identity_one_worker(rxsc::scheduler sc) : factory(sc) {}
typedef coordinator<input_type> coordinator_type;
inline rxsc::scheduler::clock_type::time_point now() const {
return factory.now();
}
inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
auto w = factory.create_worker(std::move(cs));
return coordinator_type(input_type(std::move(w)));
}
};
inline identity_one_worker identity_immediate() {
static identity_one_worker r(rxsc::make_immediate());
return r;
}
inline identity_one_worker identity_current_thread() {
static identity_one_worker r(rxsc::make_current_thread());
return r;
}
inline identity_one_worker identity_same_worker(rxsc::worker w) {
return identity_one_worker(rxsc::make_same_worker(w));
}
class serialize_one_worker : public coordination_base
{
rxsc::scheduler factory;
template<class F>
struct serialize_action
{
F dest;
std::shared_ptr<std::mutex> lock;
serialize_action(F d, std::shared_ptr<std::mutex> m)
: dest(std::move(d))
, lock(std::move(m))
{
if (!lock) {
std::terminate();
}
}
auto operator()(const rxsc::schedulable& scbl) const
-> decltype(dest(scbl)) {
std::unique_lock<std::mutex> guard(*lock);
return dest(scbl);
}
};
template<class Observer>
struct serialize_observer
{
typedef serialize_observer<Observer> this_type;
typedef rxu::decay_t<Observer> dest_type;
typedef typename dest_type::value_type value_type;
typedef observer<value_type, this_type> observer_type;
dest_type dest;
std::shared_ptr<std::mutex> lock;
serialize_observer(dest_type d, std::shared_ptr<std::mutex> m)
: dest(std::move(d))
, lock(std::move(m))
{
if (!lock) {
std::terminate();
}
}
void on_next(value_type v) const {
std::unique_lock<std::mutex> guard(*lock);
dest.on_next(v);
}
void on_error(rxu::error_ptr e) const {
std::unique_lock<std::mutex> guard(*lock);
dest.on_error(e);
}
void on_completed() const {
std::unique_lock<std::mutex> guard(*lock);
dest.on_completed();
}
template<class Subscriber>
static subscriber<value_type, observer_type> make(const Subscriber& s, std::shared_ptr<std::mutex> m) {
return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m))));
}
};
class input_type
{
rxsc::worker controller;
rxsc::scheduler factory;
std::shared_ptr<std::mutex> lock;
public:
explicit input_type(rxsc::worker w, std::shared_ptr<std::mutex> m)
: controller(w)
, factory(rxsc::make_same_worker(w))
, lock(std::move(m))
{
}
inline rxsc::worker get_worker() const {
return controller;
}
inline rxsc::scheduler get_scheduler() const {
return factory;
}
inline rxsc::scheduler::clock_type::time_point now() const {
return factory.now();
}
template<class Observable>
auto in(Observable o) const
-> Observable {
return o;
}
template<class Subscriber>
auto out(const Subscriber& s) const
-> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) {
return serialize_observer<decltype(s.get_observer())>::make(s, lock);
}
template<class F>
auto act(F f) const
-> serialize_action<F> {
return serialize_action<F>(std::move(f), lock);
}
};
public:
explicit serialize_one_worker(rxsc::scheduler sc) : factory(sc) {}
typedef coordinator<input_type> coordinator_type;
inline rxsc::scheduler::clock_type::time_point now() const {
return factory.now();
}
inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
auto w = factory.create_worker(std::move(cs));
std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
return coordinator_type(input_type(std::move(w), std::move(lock)));
}
};
inline serialize_one_worker serialize_event_loop() {
static serialize_one_worker r(rxsc::make_event_loop());
return r;
}
inline serialize_one_worker serialize_new_thread() {
static serialize_one_worker r(rxsc::make_new_thread());
return r;
}
inline serialize_one_worker serialize_same_worker(rxsc::worker w) {
return serialize_one_worker(rxsc::make_same_worker(w));
}
}
#endif