317 lines
9.1 KiB
C++
317 lines
9.1 KiB
C++
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
|
||
|
|
||
|
#pragma once
|
||
|
|
||
|
#if !defined(RXCPP_RX_COORDINATION_HPP)
|
||
|
#define RXCPP_RX_COORDINATION_HPP
|
||
|
|
||
|
#include "rx-includes.hpp"
|
||
|
|
||
|
namespace rxcpp {
|
||
|
|
||
|
struct tag_coordinator {};
|
||
|
struct coordinator_base {typedef tag_coordinator coordinator_tag;};
|
||
|
|
||
|
template<class T, class C = rxu::types_checked>
|
||
|
struct is_coordinator : public std::false_type {};
|
||
|
|
||
|
template<class T>
|
||
|
struct is_coordinator<T, typename rxu::types_checked_from<typename T::coordinator_tag>::type>
|
||
|
: public std::is_convertible<typename T::coordinator_tag*, tag_coordinator*> {};
|
||
|
|
||
|
struct tag_coordination {};
|
||
|
struct coordination_base {typedef tag_coordination coordination_tag;};
|
||
|
|
||
|
namespace detail {
|
||
|
|
||
|
template<class T, class C = rxu::types_checked>
|
||
|
struct is_coordination : public std::false_type {};
|
||
|
|
||
|
template<class T>
|
||
|
struct is_coordination<T, typename rxu::types_checked_from<typename T::coordination_tag>::type>
|
||
|
: public std::is_convertible<typename T::coordination_tag*, tag_coordination*> {};
|
||
|
|
||
|
}
|
||
|
|
||
|
template<class T, class Decayed = rxu::decay_t<T>>
|
||
|
struct is_coordination : detail::is_coordination<Decayed>
|
||
|
{
|
||
|
};
|
||
|
|
||
|
template<class Coordination, class DecayedCoordination = rxu::decay_t<Coordination>>
|
||
|
using coordination_tag_t = typename DecayedCoordination::coordination_tag;
|
||
|
|
||
|
template<class Input>
|
||
|
class coordinator : public coordinator_base
|
||
|
{
|
||
|
public:
|
||
|
typedef Input input_type;
|
||
|
|
||
|
private:
|
||
|
struct not_supported {typedef not_supported type;};
|
||
|
|
||
|
template<class Observable>
|
||
|
struct get_observable
|
||
|
{
|
||
|
typedef decltype((*(input_type*)nullptr).in((*(Observable*)nullptr))) type;
|
||
|
};
|
||
|
|
||
|
template<class Subscriber>
|
||
|
struct get_subscriber
|
||
|
{
|
||
|
typedef decltype((*(input_type*)nullptr).out((*(Subscriber*)nullptr))) type;
|
||
|
};
|
||
|
|
||
|
template<class F>
|
||
|
struct get_action_function
|
||
|
{
|
||
|
typedef decltype((*(input_type*)nullptr).act((*(F*)nullptr))) type;
|
||
|
};
|
||
|
|
||
|
public:
|
||
|
input_type input;
|
||
|
|
||
|
template<class T>
|
||
|
struct get
|
||
|
{
|
||
|
typedef typename std::conditional<
|
||
|
rxsc::detail::is_action_function<T>::value, get_action_function<T>, typename std::conditional<
|
||
|
is_observable<T>::value, get_observable<T>, typename std::conditional<
|
||
|
is_subscriber<T>::value, get_subscriber<T>, not_supported>::type>::type>::type::type type;
|
||
|
};
|
||
|
|
||
|
coordinator(Input i) : input(i) {}
|
||
|
|
||
|
rxsc::worker get_worker() const {
|
||
|
return input.get_worker();
|
||
|
}
|
||
|
rxsc::scheduler get_scheduler() const {
|
||
|
return input.get_scheduler();
|
||
|
}
|
||
|
|
||
|
template<class Observable>
|
||
|
auto in(Observable o) const
|
||
|
-> typename get_observable<Observable>::type {
|
||
|
return input.in(std::move(o));
|
||
|
static_assert(is_observable<Observable>::value, "can only synchronize observables");
|
||
|
}
|
||
|
|
||
|
template<class Subscriber>
|
||
|
auto out(Subscriber s) const
|
||
|
-> typename get_subscriber<Subscriber>::type {
|
||
|
return input.out(std::move(s));
|
||
|
static_assert(is_subscriber<Subscriber>::value, "can only synchronize subscribers");
|
||
|
}
|
||
|
|
||
|
template<class F>
|
||
|
auto act(F f) const
|
||
|
-> typename get_action_function<F>::type {
|
||
|
return input.act(std::move(f));
|
||
|
static_assert(rxsc::detail::is_action_function<F>::value, "can only synchronize action functions");
|
||
|
}
|
||
|
};
|
||
|
|
||
|
class identity_one_worker : public coordination_base
|
||
|
{
|
||
|
rxsc::scheduler factory;
|
||
|
|
||
|
class input_type
|
||
|
{
|
||
|
rxsc::worker controller;
|
||
|
rxsc::scheduler factory;
|
||
|
public:
|
||
|
explicit input_type(rxsc::worker w)
|
||
|
: controller(w)
|
||
|
, factory(rxsc::make_same_worker(w))
|
||
|
{
|
||
|
}
|
||
|
inline rxsc::worker get_worker() const {
|
||
|
return controller;
|
||
|
}
|
||
|
inline rxsc::scheduler get_scheduler() const {
|
||
|
return factory;
|
||
|
}
|
||
|
inline rxsc::scheduler::clock_type::time_point now() const {
|
||
|
return factory.now();
|
||
|
}
|
||
|
template<class Observable>
|
||
|
auto in(Observable o) const
|
||
|
-> Observable {
|
||
|
return o;
|
||
|
}
|
||
|
template<class Subscriber>
|
||
|
auto out(Subscriber s) const
|
||
|
-> Subscriber {
|
||
|
return s;
|
||
|
}
|
||
|
template<class F>
|
||
|
auto act(F f) const
|
||
|
-> F {
|
||
|
return f;
|
||
|
}
|
||
|
};
|
||
|
|
||
|
public:
|
||
|
|
||
|
explicit identity_one_worker(rxsc::scheduler sc) : factory(sc) {}
|
||
|
|
||
|
typedef coordinator<input_type> coordinator_type;
|
||
|
|
||
|
inline rxsc::scheduler::clock_type::time_point now() const {
|
||
|
return factory.now();
|
||
|
}
|
||
|
|
||
|
inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
|
||
|
auto w = factory.create_worker(std::move(cs));
|
||
|
return coordinator_type(input_type(std::move(w)));
|
||
|
}
|
||
|
};
|
||
|
|
||
|
inline identity_one_worker identity_immediate() {
|
||
|
static identity_one_worker r(rxsc::make_immediate());
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
inline identity_one_worker identity_current_thread() {
|
||
|
static identity_one_worker r(rxsc::make_current_thread());
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
inline identity_one_worker identity_same_worker(rxsc::worker w) {
|
||
|
return identity_one_worker(rxsc::make_same_worker(w));
|
||
|
}
|
||
|
|
||
|
class serialize_one_worker : public coordination_base
|
||
|
{
|
||
|
rxsc::scheduler factory;
|
||
|
|
||
|
template<class F>
|
||
|
struct serialize_action
|
||
|
{
|
||
|
F dest;
|
||
|
std::shared_ptr<std::mutex> lock;
|
||
|
serialize_action(F d, std::shared_ptr<std::mutex> m)
|
||
|
: dest(std::move(d))
|
||
|
, lock(std::move(m))
|
||
|
{
|
||
|
if (!lock) {
|
||
|
std::terminate();
|
||
|
}
|
||
|
}
|
||
|
auto operator()(const rxsc::schedulable& scbl) const
|
||
|
-> decltype(dest(scbl)) {
|
||
|
std::unique_lock<std::mutex> guard(*lock);
|
||
|
return dest(scbl);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
template<class Observer>
|
||
|
struct serialize_observer
|
||
|
{
|
||
|
typedef serialize_observer<Observer> this_type;
|
||
|
typedef rxu::decay_t<Observer> dest_type;
|
||
|
typedef typename dest_type::value_type value_type;
|
||
|
typedef observer<value_type, this_type> observer_type;
|
||
|
dest_type dest;
|
||
|
std::shared_ptr<std::mutex> lock;
|
||
|
|
||
|
serialize_observer(dest_type d, std::shared_ptr<std::mutex> m)
|
||
|
: dest(std::move(d))
|
||
|
, lock(std::move(m))
|
||
|
{
|
||
|
if (!lock) {
|
||
|
std::terminate();
|
||
|
}
|
||
|
}
|
||
|
void on_next(value_type v) const {
|
||
|
std::unique_lock<std::mutex> guard(*lock);
|
||
|
dest.on_next(v);
|
||
|
}
|
||
|
void on_error(rxu::error_ptr e) const {
|
||
|
std::unique_lock<std::mutex> guard(*lock);
|
||
|
dest.on_error(e);
|
||
|
}
|
||
|
void on_completed() const {
|
||
|
std::unique_lock<std::mutex> guard(*lock);
|
||
|
dest.on_completed();
|
||
|
}
|
||
|
|
||
|
template<class Subscriber>
|
||
|
static subscriber<value_type, observer_type> make(const Subscriber& s, std::shared_ptr<std::mutex> m) {
|
||
|
return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m))));
|
||
|
}
|
||
|
};
|
||
|
|
||
|
class input_type
|
||
|
{
|
||
|
rxsc::worker controller;
|
||
|
rxsc::scheduler factory;
|
||
|
std::shared_ptr<std::mutex> lock;
|
||
|
public:
|
||
|
explicit input_type(rxsc::worker w, std::shared_ptr<std::mutex> m)
|
||
|
: controller(w)
|
||
|
, factory(rxsc::make_same_worker(w))
|
||
|
, lock(std::move(m))
|
||
|
{
|
||
|
}
|
||
|
inline rxsc::worker get_worker() const {
|
||
|
return controller;
|
||
|
}
|
||
|
inline rxsc::scheduler get_scheduler() const {
|
||
|
return factory;
|
||
|
}
|
||
|
inline rxsc::scheduler::clock_type::time_point now() const {
|
||
|
return factory.now();
|
||
|
}
|
||
|
template<class Observable>
|
||
|
auto in(Observable o) const
|
||
|
-> Observable {
|
||
|
return o;
|
||
|
}
|
||
|
template<class Subscriber>
|
||
|
auto out(const Subscriber& s) const
|
||
|
-> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) {
|
||
|
return serialize_observer<decltype(s.get_observer())>::make(s, lock);
|
||
|
}
|
||
|
template<class F>
|
||
|
auto act(F f) const
|
||
|
-> serialize_action<F> {
|
||
|
return serialize_action<F>(std::move(f), lock);
|
||
|
}
|
||
|
};
|
||
|
|
||
|
public:
|
||
|
|
||
|
explicit serialize_one_worker(rxsc::scheduler sc) : factory(sc) {}
|
||
|
|
||
|
typedef coordinator<input_type> coordinator_type;
|
||
|
|
||
|
inline rxsc::scheduler::clock_type::time_point now() const {
|
||
|
return factory.now();
|
||
|
}
|
||
|
|
||
|
inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
|
||
|
auto w = factory.create_worker(std::move(cs));
|
||
|
std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
|
||
|
return coordinator_type(input_type(std::move(w), std::move(lock)));
|
||
|
}
|
||
|
};
|
||
|
|
||
|
inline serialize_one_worker serialize_event_loop() {
|
||
|
static serialize_one_worker r(rxsc::make_event_loop());
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
inline serialize_one_worker serialize_new_thread() {
|
||
|
static serialize_one_worker r(rxsc::make_new_thread());
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
inline serialize_one_worker serialize_same_worker(rxsc::worker w) {
|
||
|
return serialize_one_worker(rxsc::make_same_worker(w));
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
#endif
|