345 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
		
		
			
		
	
	
			345 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
|   | // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
 | ||
|  | 
 | ||
|  | #pragma once
 | ||
|  | 
 | ||
|  | #if !defined(RXCPP_OPERATORS_RX_ZIP_HPP)
 | ||
|  | #define RXCPP_OPERATORS_RX_ZIP_HPP
 | ||
|  | 
 | ||
|  | #include "../rx-includes.hpp"
 | ||
|  | 
 | ||
|  | /*! \file rx-zip.hpp
 | ||
|  | 
 | ||
|  |     \brief Bring by one item from all given observables and select a value to emit from the new observable that is returned. | ||
|  | 
 | ||
|  |     \tparam AN  types of scheduler (optional), aggregate function (optional), and source observables | ||
|  | 
 | ||
|  |     \param  an  scheduler (optional), aggregation function (optional), and source observables | ||
|  | 
 | ||
|  |     \return  Observable that emits the result of combining the items emitted and brought by one from each of the source observables. | ||
|  | 
 | ||
|  |     If scheduler is omitted, identity_current_thread is used. | ||
|  | 
 | ||
|  |     If aggregation function is omitted, the resulting observable returns tuples of emitted items. | ||
|  | 
 | ||
|  |     \sample | ||
|  | 
 | ||
|  |     Neither scheduler nor aggregation function are present: | ||
|  |     \snippet zip.cpp zip sample | ||
|  |     \snippet output.txt zip sample | ||
|  | 
 | ||
|  |     Only scheduler is present: | ||
|  |     \snippet zip.cpp Coordination zip sample | ||
|  |     \snippet output.txt Coordination zip sample | ||
|  | 
 | ||
|  |     Only aggregation function is present: | ||
|  |     \snippet zip.cpp Selector zip sample | ||
|  |     \snippet output.txt Selector zip sample | ||
|  | 
 | ||
|  |     Both scheduler and aggregation function are present: | ||
|  |     \snippet zip.cpp Coordination+Selector zip sample | ||
|  |     \snippet output.txt Coordination+Selector zip sample | ||
|  | */ | ||
|  | 
 | ||
|  | namespace rxcpp { | ||
|  | 
 | ||
|  | namespace operators { | ||
|  | 
 | ||
|  | namespace detail { | ||
|  | 
 | ||
|  | template<class Observable> | ||
|  | struct zip_source_state | ||
|  | { | ||
|  |     using value_type = rxu::value_type_t<Observable>; | ||
|  |     zip_source_state()  | ||
|  |         : completed(false)  | ||
|  |     { | ||
|  |     } | ||
|  |     std::list<value_type> values; | ||
|  |     bool completed; | ||
|  | }; | ||
|  | 
 | ||
|  | struct values_not_empty { | ||
|  |     template<class Observable> | ||
|  |     bool operator()(zip_source_state<Observable>& source) const { | ||
|  |         return !source.values.empty(); | ||
|  |     } | ||
|  | }; | ||
|  | 
 | ||
|  | struct source_completed_values_empty { | ||
|  |     template<class Observable> | ||
|  |     bool operator()(zip_source_state<Observable>& source) const { | ||
|  |         return source.completed && source.values.empty(); | ||
|  |     } | ||
|  | }; | ||
|  | 
 | ||
|  | struct extract_value_front { | ||
|  |     template<class Observable, class Value = rxu::value_type_t<Observable>> | ||
|  |     Value operator()(zip_source_state<Observable>& source) const { | ||
|  |         auto val = std::move(source.values.front()); | ||
|  |         source.values.pop_front(); | ||
|  |         return val; | ||
|  |     } | ||
|  | }; | ||
|  | 
 | ||
|  | template<class... AN> | ||
|  | struct zip_invalid_arguments {}; | ||
|  | 
 | ||
|  | template<class... AN> | ||
|  | struct zip_invalid : public rxo::operator_base<zip_invalid_arguments<AN...>> { | ||
|  |     using type = observable<zip_invalid_arguments<AN...>, zip_invalid<AN...>>; | ||
|  | }; | ||
|  | template<class... AN> | ||
|  | using zip_invalid_t = typename zip_invalid<AN...>::type; | ||
|  | 
 | ||
|  | template<class Selector, class... ObservableN> | ||
|  | struct is_zip_selector_check { | ||
|  |     typedef rxu::decay_t<Selector> selector_type; | ||
|  | 
 | ||
|  |     struct tag_not_valid; | ||
|  |     template<class CS, class... CON> | ||
|  |     static auto check(int) -> decltype((*(CS*)nullptr)((*(typename CON::value_type*)nullptr)...)); | ||
|  |     template<class CS, class... CON> | ||
|  |     static tag_not_valid check(...); | ||
|  | 
 | ||
|  |     using type = decltype(check<selector_type, rxu::decay_t<ObservableN>...>(0)); | ||
|  | 
 | ||
|  |     static const bool value = !std::is_same<type, tag_not_valid>::value; | ||
|  | }; | ||
|  | 
 | ||
|  | template<class Selector, class... ObservableN> | ||
|  | struct invalid_zip_selector { | ||
|  |     static const bool value = false; | ||
|  | }; | ||
|  | 
 | ||
|  | template<class Selector, class... ObservableN> | ||
|  | struct is_zip_selector : public std::conditional< | ||
|  |     is_zip_selector_check<Selector, ObservableN...>::value,  | ||
|  |     is_zip_selector_check<Selector, ObservableN...>,  | ||
|  |     invalid_zip_selector<Selector, ObservableN...>>::type { | ||
|  | }; | ||
|  | 
 | ||
|  | template<class Selector, class... ON> | ||
|  | using result_zip_selector_t = typename is_zip_selector<Selector, ON...>::type; | ||
|  | 
 | ||
|  | template<class Coordination, class Selector, class... ObservableN> | ||
|  | struct zip_traits { | ||
|  |     typedef std::tuple<rxu::decay_t<ObservableN>...> tuple_source_type; | ||
|  |     typedef std::tuple<zip_source_state<ObservableN>...> tuple_source_values_type; | ||
|  | 
 | ||
|  |     typedef rxu::decay_t<Selector> selector_type; | ||
|  |     typedef rxu::decay_t<Coordination> coordination_type; | ||
|  | 
 | ||
|  |     typedef typename is_zip_selector<selector_type, ObservableN...>::type value_type; | ||
|  | }; | ||
|  | 
 | ||
|  | template<class Coordination, class Selector, class... ObservableN> | ||
|  | struct zip : public operator_base<rxu::value_type_t<zip_traits<Coordination, Selector, ObservableN...>>> | ||
|  | { | ||
|  |     typedef zip<Coordination, Selector, ObservableN...> this_type; | ||
|  | 
 | ||
|  |     typedef zip_traits<Coordination, Selector, ObservableN...> traits; | ||
|  | 
 | ||
|  |     typedef typename traits::tuple_source_type tuple_source_type; | ||
|  |     typedef typename traits::tuple_source_values_type tuple_source_values_type; | ||
|  | 
 | ||
|  |     typedef typename traits::selector_type selector_type; | ||
|  | 
 | ||
|  |     typedef typename traits::coordination_type coordination_type; | ||
|  |     typedef typename coordination_type::coordinator_type coordinator_type; | ||
|  | 
 | ||
|  |     struct values | ||
|  |     { | ||
|  |         values(tuple_source_type o, selector_type s, coordination_type sf) | ||
|  |             : source(std::move(o)) | ||
|  |             , selector(std::move(s)) | ||
|  |             , coordination(std::move(sf)) | ||
|  |         { | ||
|  |         } | ||
|  |         tuple_source_type source; | ||
|  |         selector_type selector; | ||
|  |         coordination_type coordination; | ||
|  |     }; | ||
|  |     values initial; | ||
|  | 
 | ||
|  |     zip(coordination_type sf, selector_type s, tuple_source_type ts) | ||
|  |         : initial(std::move(ts), std::move(s), std::move(sf)) | ||
|  |     { | ||
|  |     } | ||
|  | 
 | ||
|  |     template<int Index, class State> | ||
|  |     void subscribe_one(std::shared_ptr<State> state) const { | ||
|  | 
 | ||
|  |         typedef typename std::tuple_element<Index, tuple_source_type>::type::value_type source_value_type; | ||
|  | 
 | ||
|  |         composite_subscription innercs; | ||
|  | 
 | ||
|  |         // when the out observer is unsubscribed all the
 | ||
|  |         // inner subscriptions are unsubscribed as well
 | ||
|  |         state->out.add(innercs); | ||
|  | 
 | ||
|  |         auto source = on_exception( | ||
|  |             [&](){return state->coordinator.in(std::get<Index>(state->source));}, | ||
|  |             state->out); | ||
|  |         if (source.empty()) { | ||
|  |             return; | ||
|  |         } | ||
|  | 
 | ||
|  |         // this subscribe does not share the observer subscription
 | ||
|  |         // so that when it is unsubscribed the observer can be called
 | ||
|  |         // until the inner subscriptions have finished
 | ||
|  |         auto sink = make_subscriber<source_value_type>( | ||
|  |             state->out, | ||
|  |             innercs, | ||
|  |         // on_next
 | ||
|  |             [state](source_value_type st) { | ||
|  |                 auto& values = std::get<Index>(state->pending).values; | ||
|  |                 values.push_back(st); | ||
|  |                 if (rxu::apply_to_each(state->pending, values_not_empty(), rxu::all_values_true())) { | ||
|  |                     auto selectedResult = rxu::apply_to_each(state->pending, extract_value_front(), state->selector); | ||
|  |                     state->out.on_next(selectedResult); | ||
|  |                 } | ||
|  |                 if (rxu::apply_to_each(state->pending, source_completed_values_empty(), rxu::any_value_true())) { | ||
|  |                     state->out.on_completed(); | ||
|  |                 } | ||
|  |             }, | ||
|  |         // on_error
 | ||
|  |             [state](rxu::error_ptr e) { | ||
|  |                 state->out.on_error(e); | ||
|  |             }, | ||
|  |         // on_completed
 | ||
|  |             [state]() { | ||
|  |                 auto& completed = std::get<Index>(state->pending).completed; | ||
|  |                 completed = true; | ||
|  |                 if (--state->pendingCompletions == 0) { | ||
|  |                     state->out.on_completed(); | ||
|  |                 } | ||
|  |             } | ||
|  |         ); | ||
|  |         auto selectedSink = on_exception( | ||
|  |             [&](){return state->coordinator.out(sink);}, | ||
|  |             state->out); | ||
|  |         if (selectedSink.empty()) { | ||
|  |             return; | ||
|  |         } | ||
|  |         source->subscribe(std::move(selectedSink.get())); | ||
|  |     } | ||
|  |     template<class State, int... IndexN> | ||
|  |     void subscribe_all(std::shared_ptr<State> state, rxu::values<int, IndexN...>) const { | ||
|  |         bool subscribed[] = {(subscribe_one<IndexN>(state), true)...}; | ||
|  |         subscribed[0] = (*subscribed); // silence warning
 | ||
|  |     } | ||
|  | 
 | ||
|  |     template<class Subscriber> | ||
|  |     void on_subscribe(Subscriber scbr) const { | ||
|  |         static_assert(is_subscriber<Subscriber>::value, "subscribe must be passed a subscriber"); | ||
|  | 
 | ||
|  |         typedef Subscriber output_type; | ||
|  | 
 | ||
|  |         struct zip_state_type | ||
|  |             : public std::enable_shared_from_this<zip_state_type> | ||
|  |             , public values | ||
|  |         { | ||
|  |             zip_state_type(values i, coordinator_type coor, output_type oarg) | ||
|  |                 : values(std::move(i)) | ||
|  |                 , pendingCompletions(sizeof... (ObservableN)) | ||
|  |                 , valuesSet(0) | ||
|  |                 , coordinator(std::move(coor)) | ||
|  |                 , out(std::move(oarg)) | ||
|  |             { | ||
|  |             } | ||
|  | 
 | ||
|  |             // on_completed on the output must wait until all the
 | ||
|  |             // subscriptions have received on_completed
 | ||
|  |             mutable int pendingCompletions; | ||
|  |             mutable int valuesSet; | ||
|  |             mutable tuple_source_values_type pending; | ||
|  |             coordinator_type coordinator; | ||
|  |             output_type out; | ||
|  |         }; | ||
|  | 
 | ||
|  |         auto coordinator = initial.coordination.create_coordinator(scbr.get_subscription()); | ||
|  | 
 | ||
|  |         // take a copy of the values for each subscription
 | ||
|  |         auto state = std::make_shared<zip_state_type>(initial, std::move(coordinator), std::move(scbr)); | ||
|  | 
 | ||
|  |         subscribe_all(state, typename rxu::values_from<int, sizeof...(ObservableN)>::type()); | ||
|  |     } | ||
|  | }; | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | /*! @copydoc rx-zip.hpp
 | ||
|  | */ | ||
|  | template<class... AN> | ||
|  | auto zip(AN&&... an)  | ||
|  |     ->     operator_factory<zip_tag, AN...> { | ||
|  |     return operator_factory<zip_tag, AN...>(std::make_tuple(std::forward<AN>(an)...)); | ||
|  | } | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | template<>  | ||
|  | struct member_overload<zip_tag> | ||
|  | { | ||
|  |     template<class Observable, class... ObservableN,  | ||
|  |         class Enabled = rxu::enable_if_all_true_type_t< | ||
|  |             all_observables<Observable, ObservableN...>>, | ||
|  |         class Zip = rxo::detail::zip<identity_one_worker, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, | ||
|  |         class Value = rxu::value_type_t<Zip>, | ||
|  |         class Result = observable<Value, Zip>> | ||
|  |     static Result member(Observable&& o, ObservableN&&... on) | ||
|  |     { | ||
|  |         return Result(Zip(identity_current_thread(), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); | ||
|  |     } | ||
|  | 
 | ||
|  |     template<class Observable, class Selector, class... ObservableN, | ||
|  |         class Enabled = rxu::enable_if_all_true_type_t< | ||
|  |             operators::detail::is_zip_selector<Selector, Observable, ObservableN...>, | ||
|  |             all_observables<Observable, ObservableN...>>, | ||
|  |         class ResolvedSelector = rxu::decay_t<Selector>, | ||
|  |         class Zip = rxo::detail::zip<identity_one_worker, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, | ||
|  |         class Value = rxu::value_type_t<Zip>, | ||
|  |         class Result = observable<Value, Zip>> | ||
|  |     static Result member(Observable&& o, Selector&& s, ObservableN&&... on) | ||
|  |     { | ||
|  |         return Result(Zip(identity_current_thread(), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); | ||
|  |     } | ||
|  | 
 | ||
|  |     template<class Coordination, class Observable, class... ObservableN,  | ||
|  |         class Enabled = rxu::enable_if_all_true_type_t< | ||
|  |             is_coordination<Coordination>, | ||
|  |             all_observables<Observable, ObservableN...>>, | ||
|  |         class Zip = rxo::detail::zip<Coordination, rxu::detail::pack, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, | ||
|  |         class Value = rxu::value_type_t<Zip>, | ||
|  |         class Result = observable<Value, Zip>> | ||
|  |     static Result member(Observable&& o, Coordination&& cn, ObservableN&&... on) | ||
|  |     { | ||
|  |         return Result(Zip(std::forward<Coordination>(cn), rxu::pack(), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); | ||
|  |     } | ||
|  | 
 | ||
|  |     template<class Coordination, class Selector, class Observable, class... ObservableN, | ||
|  |         class Enabled = rxu::enable_if_all_true_type_t< | ||
|  |             is_coordination<Coordination>, | ||
|  |             operators::detail::is_zip_selector<Selector, Observable, ObservableN...>, | ||
|  |             all_observables<Observable, ObservableN...>>, | ||
|  |         class ResolvedSelector = rxu::decay_t<Selector>, | ||
|  |         class Zip = rxo::detail::zip<Coordination, ResolvedSelector, rxu::decay_t<Observable>, rxu::decay_t<ObservableN>...>, | ||
|  |         class Value = rxu::value_type_t<Zip>, | ||
|  |         class Result = observable<Value, Zip>> | ||
|  |     static Result member(Observable&& o, Coordination&& cn, Selector&& s, ObservableN&&... on) | ||
|  |     { | ||
|  |         return Result(Zip(std::forward<Coordination>(cn), std::forward<Selector>(s), std::make_tuple(std::forward<Observable>(o), std::forward<ObservableN>(on)...))); | ||
|  |     } | ||
|  | 
 | ||
|  |     template<class... AN> | ||
|  |     static operators::detail::zip_invalid_t<AN...> member(const AN&...) { | ||
|  |         std::terminate(); | ||
|  |         return {}; | ||
|  |         static_assert(sizeof...(AN) == 10000, "zip takes (optional Coordination, optional Selector, required Observable, optional Observable...), Selector takes (Observable::value_type...)"); | ||
|  |     }  | ||
|  | }; | ||
|  | 
 | ||
|  | } | ||
|  | 
 | ||
|  | #endif
 |