336 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			336 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 | |
| 
 | |
| #pragma once
 | |
| 
 | |
| /*! \file rx-observe_on.hpp
 | |
| 
 | |
|     \brief All values are queued and delivered using the scheduler from the supplied coordination.
 | |
| 
 | |
|     \tparam Coordination  the type of the scheduler.
 | |
| 
 | |
|     \param  cn  the scheduler to notify observers on.
 | |
| 
 | |
|     \return  The source observable modified so that its observers are notified on the specified scheduler.
 | |
| 
 | |
|     \sample
 | |
|     \snippet observe_on.cpp observe_on sample
 | |
|     \snippet output.txt observe_on sample
 | |
| 
 | |
|     Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results:
 | |
|     \snippet output.txt subscribe_on sample
 | |
| */
 | |
| 
 | |
| #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP)
 | |
| #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP
 | |
| 
 | |
| #include "../rx-includes.hpp"
 | |
| 
 | |
| namespace rxcpp {
 | |
| 
 | |
| namespace operators {
 | |
| 
 | |
| namespace detail {
 | |
| 
 | |
| template<class... AN>
 | |
| struct observe_on_invalid_arguments {};
 | |
| 
 | |
| template<class... AN>
 | |
| struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> {
 | |
|     using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>;
 | |
| };
 | |
| template<class... AN>
 | |
| using observe_on_invalid_t = typename observe_on_invalid<AN...>::type;
 | |
| 
 | |
| template<class T, class Coordination>
 | |
| struct observe_on
 | |
| {
 | |
|     typedef rxu::decay_t<T> source_value_type;
 | |
| 
 | |
|     typedef rxu::decay_t<Coordination> coordination_type;
 | |
|     typedef typename coordination_type::coordinator_type coordinator_type;
 | |
| 
 | |
|     coordination_type coordination;
 | |
| 
 | |
|     observe_on(coordination_type cn)
 | |
|         : coordination(std::move(cn))
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     template<class Subscriber>
 | |
|     struct observe_on_observer
 | |
|     {
 | |
|         typedef observe_on_observer<Subscriber> this_type;
 | |
|         typedef source_value_type value_type;
 | |
|         typedef rxu::decay_t<Subscriber> dest_type;
 | |
|         typedef observer<value_type, this_type> observer_type;
 | |
| 
 | |
|         typedef rxn::notification<T> notification_type;
 | |
|         typedef typename notification_type::type base_notification_type;
 | |
|         typedef std::deque<base_notification_type> queue_type;
 | |
| 
 | |
|         struct mode
 | |
|         {
 | |
|             enum type {
 | |
|                 Invalid = 0,
 | |
|                 Processing,
 | |
|                 Empty,
 | |
|                 Disposed,
 | |
|                 Errored
 | |
|             };
 | |
|         };
 | |
|         struct observe_on_state : std::enable_shared_from_this<observe_on_state>
 | |
|         {
 | |
|             mutable std::mutex lock;
 | |
|             mutable queue_type fill_queue;
 | |
|             mutable queue_type drain_queue;
 | |
|             composite_subscription lifetime;
 | |
|             mutable typename mode::type current;
 | |
|             coordinator_type coordinator;
 | |
|             dest_type destination;
 | |
| 
 | |
|             observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs)
 | |
|                 : lifetime(std::move(cs))
 | |
|                 , current(mode::Empty)
 | |
|                 , coordinator(std::move(coor))
 | |
|                 , destination(std::move(d))
 | |
|             {
 | |
|             }
 | |
| 
 | |
|             void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const {
 | |
|                 if (!guard.owns_lock()) {
 | |
|                     std::terminate();
 | |
|                 }
 | |
|                 if (current == mode::Errored || current == mode::Disposed) {return;}
 | |
|                 current = end;
 | |
|                 queue_type fill_expired;
 | |
|                 swap(fill_expired, fill_queue);
 | |
|                 queue_type drain_expired;
 | |
|                 swap(drain_expired, drain_queue);
 | |
|                 RXCPP_UNWIND_AUTO([&](){guard.lock();});
 | |
|                 guard.unlock();
 | |
|                 lifetime.unsubscribe();
 | |
|                 destination.unsubscribe();
 | |
|             }
 | |
| 
 | |
|             void ensure_processing(std::unique_lock<std::mutex>& guard) const {
 | |
|                 if (!guard.owns_lock()) {
 | |
|                     std::terminate();
 | |
|                 }
 | |
|                 if (current == mode::Empty) {
 | |
|                     current = mode::Processing;
 | |
| 
 | |
|                     if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) {
 | |
|                         finish(guard, mode::Disposed);
 | |
|                     }
 | |
| 
 | |
|                     auto keepAlive = this->shared_from_this();
 | |
| 
 | |
|                     auto drain = [keepAlive, this](const rxsc::schedulable& self){
 | |
|                         using std::swap;
 | |
|                         RXCPP_TRY {
 | |
|                             for (;;) {
 | |
|                                 if (drain_queue.empty() || !destination.is_subscribed()) {
 | |
|                                     std::unique_lock<std::mutex> guard(lock);
 | |
|                                     if (!destination.is_subscribed() ||
 | |
|                                         (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) {
 | |
|                                         finish(guard, mode::Disposed);
 | |
|                                         return;
 | |
|                                     }
 | |
|                                     if (drain_queue.empty()) {
 | |
|                                         if (fill_queue.empty()) {
 | |
|                                             current = mode::Empty;
 | |
|                                             return;
 | |
|                                         }
 | |
|                                         swap(fill_queue, drain_queue);
 | |
|                                     }
 | |
|                                 }
 | |
|                                 auto notification = std::move(drain_queue.front());
 | |
|                                 drain_queue.pop_front();
 | |
|                                 notification->accept(destination);
 | |
|                                 std::unique_lock<std::mutex> guard(lock);
 | |
|                                 self();
 | |
|                                 if (lifetime.is_subscribed()) break;
 | |
|                             }
 | |
|                         }
 | |
|                         RXCPP_CATCH(...) {
 | |
|                             destination.on_error(rxu::current_exception());
 | |
|                             std::unique_lock<std::mutex> guard(lock);
 | |
|                             finish(guard, mode::Errored);
 | |
|                         }
 | |
|                     };
 | |
| 
 | |
|                     auto selectedDrain = on_exception(
 | |
|                         [&](){return coordinator.act(drain);},
 | |
|                         destination);
 | |
|                     if (selectedDrain.empty()) {
 | |
|                         finish(guard, mode::Errored);
 | |
|                         return;
 | |
|                     }
 | |
| 
 | |
|                     auto processor = coordinator.get_worker();
 | |
| 
 | |
|                     RXCPP_UNWIND_AUTO([&](){guard.lock();});
 | |
|                     guard.unlock();
 | |
| 
 | |
|                     processor.schedule(selectedDrain.get());
 | |
|                 }
 | |
|             }
 | |
|         };
 | |
|         std::shared_ptr<observe_on_state> state;
 | |
| 
 | |
|         observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs)
 | |
|             : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs)))
 | |
|         {
 | |
|         }
 | |
| 
 | |
|         void on_next(source_value_type v) const {
 | |
|             std::unique_lock<std::mutex> guard(state->lock);
 | |
|             if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
 | |
|             state->fill_queue.push_back(notification_type::on_next(std::move(v)));
 | |
|             state->ensure_processing(guard);
 | |
|         }
 | |
|         void on_error(rxu::error_ptr e) const {
 | |
|             std::unique_lock<std::mutex> guard(state->lock);
 | |
|             if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
 | |
|             state->fill_queue.push_back(notification_type::on_error(e));
 | |
|             state->ensure_processing(guard);
 | |
|         }
 | |
|         void on_completed() const {
 | |
|             std::unique_lock<std::mutex> guard(state->lock);
 | |
|             if (state->current == mode::Errored || state->current == mode::Disposed) { return; }
 | |
|             state->fill_queue.push_back(notification_type::on_completed());
 | |
|             state->ensure_processing(guard);
 | |
|         }
 | |
| 
 | |
|         static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) {
 | |
|             auto coor = cn.create_coordinator(d.get_subscription());
 | |
|             d.add(cs);
 | |
| 
 | |
|             this_type o(d, std::move(coor), cs);
 | |
|             auto keepAlive = o.state;
 | |
|             cs.add([=](){
 | |
|                 std::unique_lock<std::mutex> guard(keepAlive->lock);
 | |
|                 keepAlive->ensure_processing(guard);
 | |
|             });
 | |
| 
 | |
|             return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o)));
 | |
|         }
 | |
|     };
 | |
| 
 | |
|     template<class Subscriber>
 | |
|     auto operator()(Subscriber dest) const
 | |
|         -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) {
 | |
|         return      observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination);
 | |
|     }
 | |
| };
 | |
| 
 | |
| }
 | |
| 
 | |
| /*! @copydoc rx-observe_on.hpp
 | |
| */
 | |
| template<class... AN>
 | |
| auto observe_on(AN&&... an)
 | |
|     ->      operator_factory<observe_on_tag, AN...> {
 | |
|      return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...));
 | |
| }
 | |
| 
 | |
| }
 | |
| 
 | |
| template<>
 | |
| struct member_overload<observe_on_tag>
 | |
| {
 | |
|     template<class Observable, class Coordination,
 | |
|         class Enabled = rxu::enable_if_all_true_type_t<
 | |
|             is_observable<Observable>,
 | |
|             is_coordination<Coordination>>,
 | |
|         class SourceValue = rxu::value_type_t<Observable>,
 | |
|         class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>>
 | |
|     static auto member(Observable&& o, Coordination&& cn)
 | |
|         -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) {
 | |
|         return      o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)));
 | |
|     }
 | |
| 
 | |
|     template<class... AN>
 | |
|     static operators::detail::observe_on_invalid_t<AN...> member(AN...) {
 | |
|         std::terminate();
 | |
|         return {};
 | |
|         static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)");
 | |
|     }
 | |
| };
 | |
| 
 | |
| class observe_on_one_worker : public coordination_base
 | |
| {
 | |
|     rxsc::scheduler factory;
 | |
| 
 | |
|     class input_type
 | |
|     {
 | |
|         rxsc::worker controller;
 | |
|         rxsc::scheduler factory;
 | |
|         identity_one_worker coordination;
 | |
|     public:
 | |
|         explicit input_type(rxsc::worker w)
 | |
|             : controller(w)
 | |
|             , factory(rxsc::make_same_worker(w))
 | |
|             , coordination(factory)
 | |
|         {
 | |
|         }
 | |
|         inline rxsc::worker get_worker() const {
 | |
|             return controller;
 | |
|         }
 | |
|         inline rxsc::scheduler get_scheduler() const {
 | |
|             return factory;
 | |
|         }
 | |
|         inline rxsc::scheduler::clock_type::time_point now() const {
 | |
|             return factory.now();
 | |
|         }
 | |
|         template<class Observable>
 | |
|         auto in(Observable o) const
 | |
|             -> decltype(o.observe_on(coordination)) {
 | |
|             return      o.observe_on(coordination);
 | |
|         }
 | |
|         template<class Subscriber>
 | |
|         auto out(Subscriber s) const
 | |
|             -> Subscriber {
 | |
|             return s;
 | |
|         }
 | |
|         template<class F>
 | |
|         auto act(F f) const
 | |
|             -> F {
 | |
|             return f;
 | |
|         }
 | |
|     };
 | |
| 
 | |
| public:
 | |
| 
 | |
|     explicit observe_on_one_worker(rxsc::scheduler sc) : factory(sc) {}
 | |
| 
 | |
|     typedef coordinator<input_type> coordinator_type;
 | |
| 
 | |
|     inline rxsc::scheduler::clock_type::time_point now() const {
 | |
|         return factory.now();
 | |
|     }
 | |
| 
 | |
|     inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
 | |
|         auto w = factory.create_worker(std::move(cs));
 | |
|         return coordinator_type(input_type(std::move(w)));
 | |
|     }
 | |
| };
 | |
| 
 | |
| inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) {
 | |
|     return observe_on_one_worker(rxsc::make_run_loop(rl));
 | |
| }
 | |
| 
 | |
| inline observe_on_one_worker observe_on_event_loop() {
 | |
|     static observe_on_one_worker r(rxsc::make_event_loop());
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| inline observe_on_one_worker observe_on_new_thread() {
 | |
|     static observe_on_one_worker r(rxsc::make_new_thread());
 | |
|     return r;
 | |
| }
 | |
| 
 | |
| }
 | |
| 
 | |
| #endif
 |