336 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
		
		
			
		
	
	
			336 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
|   | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 | ||
|  | 
 | ||
|  | #pragma once
 | ||
|  | 
 | ||
|  | /*! \file rx-observe_on.hpp
 | ||
|  | 
 | ||
|  |     \brief All values are queued and delivered using the scheduler from the supplied coordination. | ||
|  | 
 | ||
|  |     \tparam Coordination  the type of the scheduler. | ||
|  | 
 | ||
|  |     \param  cn  the scheduler to notify observers on. | ||
|  | 
 | ||
|  |     \return  The source observable modified so that its observers are notified on the specified scheduler. | ||
|  | 
 | ||
|  |     \sample | ||
|  |     \snippet observe_on.cpp observe_on sample | ||
|  |     \snippet output.txt observe_on sample | ||
|  | 
 | ||
|  |     Invoking rxcpp::observable::subscribe_on operator, instead of observe_on, gives following results: | ||
|  |     \snippet output.txt subscribe_on sample | ||
|  | */ | ||
|  | 
 | ||
|  | #if !defined(RXCPP_OPERATORS_RX_OBSERVE_ON_HPP)
 | ||
|  | #define RXCPP_OPERATORS_RX_OBSERVE_ON_HPP
 | ||
|  | 
 | ||
|  | #include "../rx-includes.hpp"
 | ||
|  | 
 | ||
|  | namespace rxcpp { | ||
|  | 
 | ||
|  | namespace operators { | ||
|  | 
 | ||
|  | namespace detail { | ||
|  | 
 | ||
|  | template<class... AN> | ||
|  | struct observe_on_invalid_arguments {}; | ||
|  | 
 | ||
|  | template<class... AN> | ||
|  | struct observe_on_invalid : public rxo::operator_base<observe_on_invalid_arguments<AN...>> { | ||
|  |     using type = observable<observe_on_invalid_arguments<AN...>, observe_on_invalid<AN...>>; | ||
|  | }; | ||
|  | template<class... AN> | ||
|  | using observe_on_invalid_t = typename observe_on_invalid<AN...>::type; | ||
|  | 
 | ||
|  | template<class T, class Coordination> | ||
|  | struct observe_on | ||
|  | { | ||
|  |     typedef rxu::decay_t<T> source_value_type; | ||
|  | 
 | ||
|  |     typedef rxu::decay_t<Coordination> coordination_type; | ||
|  |     typedef typename coordination_type::coordinator_type coordinator_type; | ||
|  | 
 | ||
|  |     coordination_type coordination; | ||
|  | 
 | ||
|  |     observe_on(coordination_type cn) | ||
|  |         : coordination(std::move(cn)) | ||
|  |     { | ||
|  |     } | ||
|  | 
 | ||
|  |     template<class Subscriber> | ||
|  |     struct observe_on_observer | ||
|  |     { | ||
|  |         typedef observe_on_observer<Subscriber> this_type; | ||
|  |         typedef source_value_type value_type; | ||
|  |         typedef rxu::decay_t<Subscriber> dest_type; | ||
|  |         typedef observer<value_type, this_type> observer_type; | ||
|  | 
 | ||
|  |         typedef rxn::notification<T> notification_type; | ||
|  |         typedef typename notification_type::type base_notification_type; | ||
|  |         typedef std::deque<base_notification_type> queue_type; | ||
|  | 
 | ||
|  |         struct mode | ||
|  |         { | ||
|  |             enum type { | ||
|  |                 Invalid = 0, | ||
|  |                 Processing, | ||
|  |                 Empty, | ||
|  |                 Disposed, | ||
|  |                 Errored | ||
|  |             }; | ||
|  |         }; | ||
|  |         struct observe_on_state : std::enable_shared_from_this<observe_on_state> | ||
|  |         { | ||
|  |             mutable std::mutex lock; | ||
|  |             mutable queue_type fill_queue; | ||
|  |             mutable queue_type drain_queue; | ||
|  |             composite_subscription lifetime; | ||
|  |             mutable typename mode::type current; | ||
|  |             coordinator_type coordinator; | ||
|  |             dest_type destination; | ||
|  | 
 | ||
|  |             observe_on_state(dest_type d, coordinator_type coor, composite_subscription cs) | ||
|  |                 : lifetime(std::move(cs)) | ||
|  |                 , current(mode::Empty) | ||
|  |                 , coordinator(std::move(coor)) | ||
|  |                 , destination(std::move(d)) | ||
|  |             { | ||
|  |             } | ||
|  | 
 | ||
|  |             void finish(std::unique_lock<std::mutex>& guard, typename mode::type end) const { | ||
|  |                 if (!guard.owns_lock()) { | ||
|  |                     std::terminate(); | ||
|  |                 } | ||
|  |                 if (current == mode::Errored || current == mode::Disposed) {return;} | ||
|  |                 current = end; | ||
|  |                 queue_type fill_expired; | ||
|  |                 swap(fill_expired, fill_queue); | ||
|  |                 queue_type drain_expired; | ||
|  |                 swap(drain_expired, drain_queue); | ||
|  |                 RXCPP_UNWIND_AUTO([&](){guard.lock();}); | ||
|  |                 guard.unlock(); | ||
|  |                 lifetime.unsubscribe(); | ||
|  |                 destination.unsubscribe(); | ||
|  |             } | ||
|  | 
 | ||
|  |             void ensure_processing(std::unique_lock<std::mutex>& guard) const { | ||
|  |                 if (!guard.owns_lock()) { | ||
|  |                     std::terminate(); | ||
|  |                 } | ||
|  |                 if (current == mode::Empty) { | ||
|  |                     current = mode::Processing; | ||
|  | 
 | ||
|  |                     if (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty()) { | ||
|  |                         finish(guard, mode::Disposed); | ||
|  |                     } | ||
|  | 
 | ||
|  |                     auto keepAlive = this->shared_from_this(); | ||
|  | 
 | ||
|  |                     auto drain = [keepAlive, this](const rxsc::schedulable& self){ | ||
|  |                         using std::swap; | ||
|  |                         RXCPP_TRY { | ||
|  |                             for (;;) { | ||
|  |                                 if (drain_queue.empty() || !destination.is_subscribed()) { | ||
|  |                                     std::unique_lock<std::mutex> guard(lock); | ||
|  |                                     if (!destination.is_subscribed() || | ||
|  |                                         (!lifetime.is_subscribed() && fill_queue.empty() && drain_queue.empty())) { | ||
|  |                                         finish(guard, mode::Disposed); | ||
|  |                                         return; | ||
|  |                                     } | ||
|  |                                     if (drain_queue.empty()) { | ||
|  |                                         if (fill_queue.empty()) { | ||
|  |                                             current = mode::Empty; | ||
|  |                                             return; | ||
|  |                                         } | ||
|  |                                         swap(fill_queue, drain_queue); | ||
|  |                                     } | ||
|  |                                 } | ||
|  |                                 auto notification = std::move(drain_queue.front()); | ||
|  |                                 drain_queue.pop_front(); | ||
|  |                                 notification->accept(destination); | ||
|  |                                 std::unique_lock<std::mutex> guard(lock); | ||
|  |                                 self(); | ||
|  |                                 if (lifetime.is_subscribed()) break; | ||
|  |                             } | ||
|  |                         } | ||
|  |                         RXCPP_CATCH(...) { | ||
|  |                             destination.on_error(rxu::current_exception()); | ||
|  |                             std::unique_lock<std::mutex> guard(lock); | ||
|  |                             finish(guard, mode::Errored); | ||
|  |                         } | ||
|  |                     }; | ||
|  | 
 | ||
|  |                     auto selectedDrain = on_exception( | ||
|  |                         [&](){return coordinator.act(drain);}, | ||
|  |                         destination); | ||
|  |                     if (selectedDrain.empty()) { | ||
|  |                         finish(guard, mode::Errored); | ||
|  |                         return; | ||
|  |                     } | ||
|  | 
 | ||
|  |                     auto processor = coordinator.get_worker(); | ||
|  | 
 | ||
|  |                     RXCPP_UNWIND_AUTO([&](){guard.lock();}); | ||
|  |                     guard.unlock(); | ||
|  | 
 | ||
|  |                     processor.schedule(selectedDrain.get()); | ||
|  |                 } | ||
|  |             } | ||
|  |         }; | ||
|  |         std::shared_ptr<observe_on_state> state; | ||
|  | 
 | ||
|  |         observe_on_observer(dest_type d, coordinator_type coor, composite_subscription cs) | ||
|  |             : state(std::make_shared<observe_on_state>(std::move(d), std::move(coor), std::move(cs))) | ||
|  |         { | ||
|  |         } | ||
|  | 
 | ||
|  |         void on_next(source_value_type v) const { | ||
|  |             std::unique_lock<std::mutex> guard(state->lock); | ||
|  |             if (state->current == mode::Errored || state->current == mode::Disposed) { return; } | ||
|  |             state->fill_queue.push_back(notification_type::on_next(std::move(v))); | ||
|  |             state->ensure_processing(guard); | ||
|  |         } | ||
|  |         void on_error(rxu::error_ptr e) const { | ||
|  |             std::unique_lock<std::mutex> guard(state->lock); | ||
|  |             if (state->current == mode::Errored || state->current == mode::Disposed) { return; } | ||
|  |             state->fill_queue.push_back(notification_type::on_error(e)); | ||
|  |             state->ensure_processing(guard); | ||
|  |         } | ||
|  |         void on_completed() const { | ||
|  |             std::unique_lock<std::mutex> guard(state->lock); | ||
|  |             if (state->current == mode::Errored || state->current == mode::Disposed) { return; } | ||
|  |             state->fill_queue.push_back(notification_type::on_completed()); | ||
|  |             state->ensure_processing(guard); | ||
|  |         } | ||
|  | 
 | ||
|  |         static subscriber<value_type, observer<value_type, this_type>> make(dest_type d, coordination_type cn, composite_subscription cs = composite_subscription()) { | ||
|  |             auto coor = cn.create_coordinator(d.get_subscription()); | ||
|  |             d.add(cs); | ||
|  | 
 | ||
|  |             this_type o(d, std::move(coor), cs); | ||
|  |             auto keepAlive = o.state; | ||
|  |             cs.add([=](){ | ||
|  |                 std::unique_lock<std::mutex> guard(keepAlive->lock); | ||
|  |                 keepAlive->ensure_processing(guard); | ||
|  |             }); | ||
|  | 
 | ||
|  |             return make_subscriber<value_type>(d, cs, make_observer<value_type>(std::move(o))); | ||
|  |         } | ||
|  |     }; | ||
|  | 
 | ||
|  |     template<class Subscriber> | ||
|  |     auto operator()(Subscriber dest) const | ||
|  |         -> decltype(observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination)) { | ||
|  |         return      observe_on_observer<decltype(dest.as_dynamic())>::make(dest.as_dynamic(), coordination); | ||
|  |     } | ||
|  | }; | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | /*! @copydoc rx-observe_on.hpp
 | ||
|  | */ | ||
|  | template<class... AN> | ||
|  | auto observe_on(AN&&... an) | ||
|  |     ->      operator_factory<observe_on_tag, AN...> { | ||
|  |      return operator_factory<observe_on_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); | ||
|  | } | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | template<> | ||
|  | struct member_overload<observe_on_tag> | ||
|  | { | ||
|  |     template<class Observable, class Coordination, | ||
|  |         class Enabled = rxu::enable_if_all_true_type_t< | ||
|  |             is_observable<Observable>, | ||
|  |             is_coordination<Coordination>>, | ||
|  |         class SourceValue = rxu::value_type_t<Observable>, | ||
|  |         class ObserveOn = rxo::detail::observe_on<SourceValue, rxu::decay_t<Coordination>>> | ||
|  |     static auto member(Observable&& o, Coordination&& cn) | ||
|  |         -> decltype(o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn)))) { | ||
|  |         return      o.template lift<SourceValue>(ObserveOn(std::forward<Coordination>(cn))); | ||
|  |     } | ||
|  | 
 | ||
|  |     template<class... AN> | ||
|  |     static operators::detail::observe_on_invalid_t<AN...> member(AN...) { | ||
|  |         std::terminate(); | ||
|  |         return {}; | ||
|  |         static_assert(sizeof...(AN) == 10000, "observe_on takes (Coordination)"); | ||
|  |     } | ||
|  | }; | ||
|  | 
 | ||
|  | class observe_on_one_worker : public coordination_base | ||
|  | { | ||
|  |     rxsc::scheduler factory; | ||
|  | 
 | ||
|  |     class input_type | ||
|  |     { | ||
|  |         rxsc::worker controller; | ||
|  |         rxsc::scheduler factory; | ||
|  |         identity_one_worker coordination; | ||
|  |     public: | ||
|  |         explicit input_type(rxsc::worker w) | ||
|  |             : controller(w) | ||
|  |             , factory(rxsc::make_same_worker(w)) | ||
|  |             , coordination(factory) | ||
|  |         { | ||
|  |         } | ||
|  |         inline rxsc::worker get_worker() const { | ||
|  |             return controller; | ||
|  |         } | ||
|  |         inline rxsc::scheduler get_scheduler() const { | ||
|  |             return factory; | ||
|  |         } | ||
|  |         inline rxsc::scheduler::clock_type::time_point now() const { | ||
|  |             return factory.now(); | ||
|  |         } | ||
|  |         template<class Observable> | ||
|  |         auto in(Observable o) const | ||
|  |             -> decltype(o.observe_on(coordination)) { | ||
|  |             return      o.observe_on(coordination); | ||
|  |         } | ||
|  |         template<class Subscriber> | ||
|  |         auto out(Subscriber s) const | ||
|  |             -> Subscriber { | ||
|  |             return s; | ||
|  |         } | ||
|  |         template<class F> | ||
|  |         auto act(F f) const | ||
|  |             -> F { | ||
|  |             return f; | ||
|  |         } | ||
|  |     }; | ||
|  | 
 | ||
|  | public: | ||
|  | 
 | ||
|  |     explicit observe_on_one_worker(rxsc::scheduler sc) : factory(sc) {} | ||
|  | 
 | ||
|  |     typedef coordinator<input_type> coordinator_type; | ||
|  | 
 | ||
|  |     inline rxsc::scheduler::clock_type::time_point now() const { | ||
|  |         return factory.now(); | ||
|  |     } | ||
|  | 
 | ||
|  |     inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const { | ||
|  |         auto w = factory.create_worker(std::move(cs)); | ||
|  |         return coordinator_type(input_type(std::move(w))); | ||
|  |     } | ||
|  | }; | ||
|  | 
 | ||
|  | inline observe_on_one_worker observe_on_run_loop(const rxsc::run_loop& rl) { | ||
|  |     return observe_on_one_worker(rxsc::make_run_loop(rl)); | ||
|  | } | ||
|  | 
 | ||
|  | inline observe_on_one_worker observe_on_event_loop() { | ||
|  |     static observe_on_one_worker r(rxsc::make_event_loop()); | ||
|  |     return r; | ||
|  | } | ||
|  | 
 | ||
|  | inline observe_on_one_worker observe_on_new_thread() { | ||
|  |     static observe_on_one_worker r(rxsc::make_new_thread()); | ||
|  |     return r; | ||
|  | } | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | #endif
 |