187 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			187 lines
		
	
	
		
			5.9 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 | |
| 
 | |
| #pragma once
 | |
| 
 | |
| #if !defined(RXCPP_RX_REPLAYSUBJECT_HPP)
 | |
| #define RXCPP_RX_REPLAYSUBJECT_HPP
 | |
| 
 | |
| #include "../rx-includes.hpp"
 | |
| 
 | |
| namespace rxcpp {
 | |
| 
 | |
| namespace subjects {
 | |
| 
 | |
| namespace detail {
 | |
| 
 | |
| template<class Coordination>
 | |
| struct replay_traits
 | |
| {
 | |
|     typedef rxu::maybe<std::size_t> count_type;
 | |
|     typedef rxu::maybe<rxsc::scheduler::clock_type::duration> period_type;
 | |
|     typedef rxsc::scheduler::clock_type::time_point time_point_type;
 | |
|     typedef rxu::decay_t<Coordination> coordination_type;
 | |
|     typedef typename coordination_type::coordinator_type coordinator_type;
 | |
| };
 | |
| 
 | |
| template<class T, class Coordination>
 | |
| class replay_observer : public detail::multicast_observer<T>
 | |
| {
 | |
|     typedef replay_observer<T, Coordination> this_type;
 | |
|     typedef detail::multicast_observer<T> base_type;
 | |
| 
 | |
|     typedef replay_traits<Coordination> traits;
 | |
|     typedef typename traits::count_type count_type;
 | |
|     typedef typename traits::period_type period_type;
 | |
|     typedef typename traits::time_point_type time_point_type;
 | |
|     typedef typename traits::coordination_type coordination_type;
 | |
|     typedef typename traits::coordinator_type coordinator_type;
 | |
| 
 | |
|     class replay_observer_state : public std::enable_shared_from_this<replay_observer_state>
 | |
|     {
 | |
|         mutable std::mutex lock;
 | |
|         mutable std::list<T> values;
 | |
|         mutable std::list<time_point_type> time_points;
 | |
|         mutable count_type count;
 | |
|         mutable period_type period;
 | |
|         mutable composite_subscription replayLifetime;
 | |
|     public:
 | |
|         mutable coordination_type coordination;
 | |
|         mutable coordinator_type coordinator;
 | |
| 
 | |
|     private:
 | |
|         void remove_oldest() const {
 | |
|             values.pop_front();
 | |
|             if (!period.empty()) {
 | |
|                 time_points.pop_front();
 | |
|             }
 | |
|         }
 | |
| 
 | |
|     public:
 | |
|         ~replay_observer_state(){
 | |
|             replayLifetime.unsubscribe();
 | |
|         }
 | |
|         explicit replay_observer_state(count_type _count, period_type _period, coordination_type _coordination, coordinator_type _coordinator, composite_subscription _replayLifetime)
 | |
|             : count(_count)
 | |
|             , period(_period)
 | |
|             , replayLifetime(_replayLifetime)
 | |
|             , coordination(std::move(_coordination))
 | |
|             , coordinator(std::move(_coordinator))
 | |
|         {
 | |
|         }
 | |
| 
 | |
|         void add(T v) const {
 | |
|             std::unique_lock<std::mutex> guard(lock);
 | |
| 
 | |
|             if (!count.empty()) {
 | |
|                 if (values.size() == count.get())
 | |
|                     remove_oldest();
 | |
|             }
 | |
| 
 | |
|             if (!period.empty()) {
 | |
|                 auto now = coordination.now();
 | |
|                 while (!time_points.empty() && (now - time_points.front() > period.get()))
 | |
|                     remove_oldest();
 | |
|                 time_points.push_back(now);
 | |
|             }
 | |
| 
 | |
|             values.push_back(std::move(v));
 | |
|         }
 | |
|         std::list<T> get() const {
 | |
|             std::unique_lock<std::mutex> guard(lock);
 | |
|             return values;
 | |
|         }
 | |
|     };
 | |
| 
 | |
|     std::shared_ptr<replay_observer_state> state;
 | |
| 
 | |
| public:
 | |
|     replay_observer(count_type count, period_type period, coordination_type coordination, composite_subscription replayLifetime, composite_subscription subscriberLifetime)
 | |
|         : base_type(subscriberLifetime)
 | |
|     {
 | |
|         replayLifetime.add(subscriberLifetime);
 | |
|         auto coordinator = coordination.create_coordinator(replayLifetime);
 | |
|         state = std::make_shared<replay_observer_state>(std::move(count), std::move(period), std::move(coordination), std::move(coordinator), std::move(replayLifetime));
 | |
|     }
 | |
| 
 | |
|     subscriber<T> get_subscriber() const {
 | |
|         return make_subscriber<T>(this->get_id(), this->get_subscription(), observer<T, detail::replay_observer<T, Coordination>>(*this)).as_dynamic();
 | |
|     }
 | |
| 
 | |
|     std::list<T> get_values() const {
 | |
|         return state->get();
 | |
|     }
 | |
| 
 | |
|     coordinator_type& get_coordinator() const {
 | |
|         return state->coordinator;
 | |
|     }
 | |
| 
 | |
|     template<class V>
 | |
|     void on_next(V v) const {
 | |
|         state->add(v);
 | |
|         base_type::on_next(std::move(v));
 | |
|     }
 | |
| };
 | |
| 
 | |
| }
 | |
| 
 | |
| template<class T, class Coordination>
 | |
| class replay
 | |
| {
 | |
|     typedef detail::replay_traits<Coordination> traits;
 | |
|     typedef typename traits::count_type count_type;
 | |
|     typedef typename traits::period_type period_type;
 | |
|     typedef typename traits::time_point_type time_point_type;
 | |
| 
 | |
|     detail::replay_observer<T, Coordination> s;
 | |
| 
 | |
| public:
 | |
|     explicit replay(Coordination cn, composite_subscription cs = composite_subscription())
 | |
|         : s(count_type(), period_type(), cn, cs, composite_subscription{})
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     replay(std::size_t count, Coordination cn, composite_subscription cs = composite_subscription())
 | |
|         : s(count_type(std::move(count)), period_type(), cn, cs, composite_subscription{})
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     replay(rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
 | |
|         : s(count_type(), period_type(period), cn, cs, composite_subscription{})
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     replay(std::size_t count, rxsc::scheduler::clock_type::duration period, Coordination cn, composite_subscription cs = composite_subscription())
 | |
|         : s(count_type(count), period_type(period), cn, cs, composite_subscription{})
 | |
|     {
 | |
|     }
 | |
| 
 | |
|     bool has_observers() const {
 | |
|         return s.has_observers();
 | |
|     }
 | |
| 
 | |
|     std::list<T> get_values() const {
 | |
|         return s.get_values();
 | |
|     }
 | |
| 
 | |
|     subscriber<T> get_subscriber() const {
 | |
|         return s.get_subscriber();
 | |
|     }
 | |
| 
 | |
|     observable<T> get_observable() const {
 | |
|         auto keepAlive = s;
 | |
|         auto observable = make_observable_dynamic<T>([keepAlive](subscriber<T> o){
 | |
|             for (auto&& value: keepAlive.get_values()) {
 | |
|                 o.on_next(value);
 | |
|             }
 | |
|             keepAlive.add(keepAlive.get_subscriber(), std::move(o));
 | |
|         });
 | |
|         return s.get_coordinator().in(observable);
 | |
|     }
 | |
| };
 | |
| 
 | |
| }
 | |
| 
 | |
| }
 | |
| 
 | |
| #endif
 |