264 lines
8.2 KiB
C++
264 lines
8.2 KiB
C++
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
|
||
|
|
||
|
#pragma once
|
||
|
|
||
|
#if !defined(RXCPP_RX_SYNCHRONIZE_HPP)
|
||
|
#define RXCPP_RX_SYNCHRONIZE_HPP
|
||
|
|
||
|
#include "../rx-includes.hpp"
|
||
|
|
||
|
namespace rxcpp {
|
||
|
|
||
|
namespace subjects {
|
||
|
|
||
|
namespace detail {
|
||
|
|
||
|
template<class T, class Coordination>
|
||
|
class synchronize_observer : public detail::multicast_observer<T>
|
||
|
{
|
||
|
typedef synchronize_observer<T, Coordination> this_type;
|
||
|
typedef detail::multicast_observer<T> base_type;
|
||
|
|
||
|
typedef rxu::decay_t<Coordination> coordination_type;
|
||
|
typedef typename coordination_type::coordinator_type coordinator_type;
|
||
|
typedef typename coordinator_type::template get<subscriber<T>>::type output_type;
|
||
|
|
||
|
struct synchronize_observer_state : public std::enable_shared_from_this<synchronize_observer_state>
|
||
|
{
|
||
|
typedef rxn::notification<T> notification_type;
|
||
|
typedef typename notification_type::type base_notification_type;
|
||
|
typedef std::deque<base_notification_type> queue_type;
|
||
|
|
||
|
struct mode
|
||
|
{
|
||
|
enum type {
|
||
|
Invalid = 0,
|
||
|
Processing,
|
||
|
Empty,
|
||
|
Disposed
|
||
|
};
|
||
|
};
|
||
|
|
||
|
mutable std::mutex lock;
|
||
|
mutable std::condition_variable wake;
|
||
|
mutable queue_type fill_queue;
|
||
|
composite_subscription lifetime;
|
||
|
mutable typename mode::type current;
|
||
|
coordinator_type coordinator;
|
||
|
output_type destination;
|
||
|
|
||
|
void ensure_processing(std::unique_lock<std::mutex>& guard) const {
|
||
|
if (!guard.owns_lock()) {
|
||
|
std::terminate();
|
||
|
}
|
||
|
if (current == mode::Empty) {
|
||
|
current = mode::Processing;
|
||
|
auto keepAlive = this->shared_from_this();
|
||
|
|
||
|
auto drain_queue = [keepAlive, this](const rxsc::schedulable& self){
|
||
|
RXCPP_TRY {
|
||
|
std::unique_lock<std::mutex> guard(lock);
|
||
|
if (!destination.is_subscribed()) {
|
||
|
current = mode::Disposed;
|
||
|
fill_queue.clear();
|
||
|
guard.unlock();
|
||
|
lifetime.unsubscribe();
|
||
|
return;
|
||
|
}
|
||
|
if (fill_queue.empty()) {
|
||
|
current = mode::Empty;
|
||
|
return;
|
||
|
}
|
||
|
auto notification = std::move(fill_queue.front());
|
||
|
fill_queue.pop_front();
|
||
|
guard.unlock();
|
||
|
notification->accept(destination);
|
||
|
self();
|
||
|
} RXCPP_CATCH(...) {
|
||
|
destination.on_error(rxu::current_exception());
|
||
|
std::unique_lock<std::mutex> guard(lock);
|
||
|
current = mode::Empty;
|
||
|
}
|
||
|
};
|
||
|
|
||
|
auto selectedDrain = on_exception(
|
||
|
[&](){return coordinator.act(drain_queue);},
|
||
|
destination);
|
||
|
if (selectedDrain.empty()) {
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
auto processor = coordinator.get_worker();
|
||
|
guard.unlock();
|
||
|
processor.schedule(lifetime, selectedDrain.get());
|
||
|
}
|
||
|
}
|
||
|
|
||
|
synchronize_observer_state(coordinator_type coor, composite_subscription cs, output_type scbr)
|
||
|
: lifetime(std::move(cs))
|
||
|
, current(mode::Empty)
|
||
|
, coordinator(std::move(coor))
|
||
|
, destination(std::move(scbr))
|
||
|
{
|
||
|
}
|
||
|
|
||
|
template<class V>
|
||
|
void on_next(V v) const {
|
||
|
if (lifetime.is_subscribed()) {
|
||
|
std::unique_lock<std::mutex> guard(lock);
|
||
|
fill_queue.push_back(notification_type::on_next(std::move(v)));
|
||
|
ensure_processing(guard);
|
||
|
}
|
||
|
wake.notify_one();
|
||
|
}
|
||
|
void on_error(rxu::error_ptr e) const {
|
||
|
if (lifetime.is_subscribed()) {
|
||
|
std::unique_lock<std::mutex> guard(lock);
|
||
|
fill_queue.push_back(notification_type::on_error(e));
|
||
|
ensure_processing(guard);
|
||
|
}
|
||
|
wake.notify_one();
|
||
|
}
|
||
|
void on_completed() const {
|
||
|
if (lifetime.is_subscribed()) {
|
||
|
std::unique_lock<std::mutex> guard(lock);
|
||
|
fill_queue.push_back(notification_type::on_completed());
|
||
|
ensure_processing(guard);
|
||
|
}
|
||
|
wake.notify_one();
|
||
|
}
|
||
|
};
|
||
|
|
||
|
std::shared_ptr<synchronize_observer_state> state;
|
||
|
|
||
|
public:
|
||
|
synchronize_observer(coordination_type cn, composite_subscription dl, composite_subscription il)
|
||
|
: base_type(dl)
|
||
|
{
|
||
|
auto o = make_subscriber<T>(dl, make_observer_dynamic<T>( *static_cast<base_type*>(this) ));
|
||
|
|
||
|
// creates a worker whose lifetime is the same as the destination subscription
|
||
|
auto coordinator = cn.create_coordinator(dl);
|
||
|
|
||
|
state = std::make_shared<synchronize_observer_state>(std::move(coordinator), std::move(il), std::move(o));
|
||
|
}
|
||
|
|
||
|
subscriber<T> get_subscriber() const {
|
||
|
return make_subscriber<T>(this->get_id(), state->lifetime, observer<T, detail::synchronize_observer<T, Coordination>>(*this)).as_dynamic();
|
||
|
}
|
||
|
|
||
|
template<class V>
|
||
|
void on_next(V v) const {
|
||
|
state->on_next(std::move(v));
|
||
|
}
|
||
|
void on_error(rxu::error_ptr e) const {
|
||
|
state->on_error(e);
|
||
|
}
|
||
|
void on_completed() const {
|
||
|
state->on_completed();
|
||
|
}
|
||
|
};
|
||
|
|
||
|
}
|
||
|
|
||
|
template<class T, class Coordination>
|
||
|
class synchronize
|
||
|
{
|
||
|
detail::synchronize_observer<T, Coordination> s;
|
||
|
|
||
|
public:
|
||
|
explicit synchronize(Coordination cn, composite_subscription cs = composite_subscription())
|
||
|
: s(std::move(cn), std::move(cs), composite_subscription())
|
||
|
{
|
||
|
}
|
||
|
|
||
|
bool has_observers() const {
|
||
|
return s.has_observers();
|
||
|
}
|
||
|
|
||
|
subscriber<T> get_subscriber() const {
|
||
|
return s.get_subscriber();
|
||
|
}
|
||
|
|
||
|
observable<T> get_observable() const {
|
||
|
auto keepAlive = s;
|
||
|
return make_observable_dynamic<T>([=](subscriber<T> o){
|
||
|
keepAlive.add(keepAlive.get_subscriber(), std::move(o));
|
||
|
});
|
||
|
}
|
||
|
};
|
||
|
|
||
|
}
|
||
|
|
||
|
class synchronize_in_one_worker : public coordination_base
|
||
|
{
|
||
|
rxsc::scheduler factory;
|
||
|
|
||
|
class input_type
|
||
|
{
|
||
|
rxsc::worker controller;
|
||
|
rxsc::scheduler factory;
|
||
|
identity_one_worker coordination;
|
||
|
public:
|
||
|
explicit input_type(rxsc::worker w)
|
||
|
: controller(w)
|
||
|
, factory(rxsc::make_same_worker(w))
|
||
|
, coordination(factory)
|
||
|
{
|
||
|
}
|
||
|
inline rxsc::worker get_worker() const {
|
||
|
return controller;
|
||
|
}
|
||
|
inline rxsc::scheduler get_scheduler() const {
|
||
|
return factory;
|
||
|
}
|
||
|
inline rxsc::scheduler::clock_type::time_point now() const {
|
||
|
return factory.now();
|
||
|
}
|
||
|
template<class Observable>
|
||
|
auto in(Observable o) const
|
||
|
-> decltype(o.publish_synchronized(coordination).ref_count()) {
|
||
|
return o.publish_synchronized(coordination).ref_count();
|
||
|
}
|
||
|
template<class Subscriber>
|
||
|
auto out(Subscriber s) const
|
||
|
-> Subscriber {
|
||
|
return s;
|
||
|
}
|
||
|
template<class F>
|
||
|
auto act(F f) const
|
||
|
-> F {
|
||
|
return f;
|
||
|
}
|
||
|
};
|
||
|
|
||
|
public:
|
||
|
|
||
|
explicit synchronize_in_one_worker(rxsc::scheduler sc) : factory(sc) {}
|
||
|
|
||
|
typedef coordinator<input_type> coordinator_type;
|
||
|
|
||
|
inline rxsc::scheduler::clock_type::time_point now() const {
|
||
|
return factory.now();
|
||
|
}
|
||
|
|
||
|
inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
|
||
|
auto w = factory.create_worker(std::move(cs));
|
||
|
return coordinator_type(input_type(std::move(w)));
|
||
|
}
|
||
|
};
|
||
|
|
||
|
inline synchronize_in_one_worker synchronize_event_loop() {
|
||
|
static synchronize_in_one_worker r(rxsc::make_event_loop());
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
inline synchronize_in_one_worker synchronize_new_thread() {
|
||
|
static synchronize_in_one_worker r(rxsc::make_new_thread());
|
||
|
return r;
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
#endif
|