// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-group_by.hpp \brief Return an observable that emits grouped_observables, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value. \tparam KeySelector the type of the key extracting function \tparam MarbleSelector the type of the element extracting function \tparam BinaryPredicate the type of the key comparing function \tparam DurationSelector the type of the duration observable function \param ks a function that extracts the key for each item (optional) \param ms a function that extracts the return element for each item (optional) \param p a function that implements comparison of two keys (optional) \return Observable that emits values of grouped_observable type, each of which corresponds to a unique key value and each of which emits those items from the source observable that share that key value. \sample \snippet group_by.cpp group_by full intro \snippet group_by.cpp group_by full sample \snippet output.txt group_by full sample \sample \snippet group_by.cpp group_by sample \snippet output.txt group_by sample */ #if !defined(RXCPP_OPERATORS_RX_GROUP_BY_HPP) #define RXCPP_OPERATORS_RX_GROUP_BY_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct group_by_invalid_arguments {}; template struct group_by_invalid : public rxo::operator_base> { using type = observable, group_by_invalid>; }; template using group_by_invalid_t = typename group_by_invalid::type; template struct is_group_by_selector_for { typedef rxu::decay_t selector_type; typedef T source_value_type; struct tag_not_valid {}; template static auto check(int) -> decltype((*(CS*)nullptr)(*(CV*)nullptr)); template static tag_not_valid check(...); typedef decltype(check(0)) type; static const bool value = !std::is_same::value; }; template struct group_by_traits { typedef T source_value_type; typedef rxu::decay_t source_type; typedef rxu::decay_t key_selector_type; typedef rxu::decay_t marble_selector_type; typedef rxu::decay_t predicate_type; typedef rxu::decay_t duration_selector_type; static_assert(is_group_by_selector_for::value, "group_by KeySelector must be a function with the signature key_type(source_value_type)"); typedef typename is_group_by_selector_for::type key_type; static_assert(is_group_by_selector_for::value, "group_by MarbleSelector must be a function with the signature marble_type(source_value_type)"); typedef typename is_group_by_selector_for::type marble_type; typedef rxsub::subject subject_type; typedef std::map key_subscriber_map_type; typedef grouped_observable grouped_observable_type; }; template struct group_by { typedef group_by_traits traits_type; typedef typename traits_type::key_selector_type key_selector_type; typedef typename traits_type::marble_selector_type marble_selector_type; typedef typename traits_type::marble_type marble_type; typedef typename traits_type::predicate_type predicate_type; typedef typename traits_type::duration_selector_type duration_selector_type; typedef typename traits_type::subject_type subject_type; typedef typename traits_type::key_type key_type; typedef typename traits_type::key_subscriber_map_type group_map_type; typedef std::vector bindings_type; struct group_by_state_type { group_by_state_type(composite_subscription sl, predicate_type p) : source_lifetime(sl) , groups(p) , observers(0) {} composite_subscription source_lifetime; rxsc::worker worker; group_map_type groups; std::atomic observers; }; template static void stopsource(Subscriber&& dest, std::shared_ptr& state) { ++state->observers; dest.add([state](){ if (!state->source_lifetime.is_subscribed()) { return; } --state->observers; if (state->observers == 0) { state->source_lifetime.unsubscribe(); } }); } struct group_by_values { group_by_values(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) : keySelector(std::move(ks)) , marbleSelector(std::move(ms)) , predicate(std::move(p)) , durationSelector(std::move(ds)) { } mutable key_selector_type keySelector; mutable marble_selector_type marbleSelector; mutable predicate_type predicate; mutable duration_selector_type durationSelector; }; group_by_values initial; group_by(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) : initial(std::move(ks), std::move(ms), std::move(p), std::move(ds)) { } struct group_by_observable : public rxs::source_base { mutable std::shared_ptr state; subject_type subject; key_type key; group_by_observable(std::shared_ptr st, subject_type s, key_type k) : state(std::move(st)) , subject(std::move(s)) , key(k) { } template void on_subscribe(Subscriber&& o) const { group_by::stopsource(o, state); subject.get_observable().subscribe(std::forward(o)); } key_type on_get_key() { return key; } }; template struct group_by_observer : public group_by_values { typedef group_by_observer this_type; typedef typename traits_type::grouped_observable_type value_type; typedef rxu::decay_t dest_type; typedef observer observer_type; dest_type dest; mutable std::shared_ptr state; group_by_observer(composite_subscription l, dest_type d, group_by_values v) : group_by_values(v) , dest(std::move(d)) , state(std::make_shared(l, group_by_values::predicate)) { group_by::stopsource(dest, state); } void on_next(T v) const { auto selectedKey = on_exception( [&](){ return this->keySelector(v);}, [this](rxu::error_ptr e){on_error(e);}); if (selectedKey.empty()) { return; } auto g = state->groups.find(selectedKey.get()); if (g == state->groups.end()) { if (!dest.is_subscribed()) { return; } auto sub = subject_type(); g = state->groups.insert(std::make_pair(selectedKey.get(), sub.get_subscriber())).first; auto obs = make_dynamic_grouped_observable(group_by_observable(state, sub, selectedKey.get())); auto durationObs = on_exception( [&](){ return this->durationSelector(obs);}, [this](rxu::error_ptr e){on_error(e);}); if (durationObs.empty()) { return; } dest.on_next(obs); composite_subscription duration_sub; auto ssub = state->source_lifetime.add(duration_sub); auto expire_state = state; auto expire_dest = g->second; auto expire = [=]() { auto g = expire_state->groups.find(selectedKey.get()); if (g != expire_state->groups.end()) { expire_state->groups.erase(g); expire_dest.on_completed(); } expire_state->source_lifetime.remove(ssub); }; auto robs = durationObs.get().take(1); duration_sub.add(robs.subscribe( [](const typename decltype(robs)::value_type &){}, [=](rxu::error_ptr) {expire();}, [=](){expire();} )); } auto selectedMarble = on_exception( [&](){ return this->marbleSelector(v);}, [this](rxu::error_ptr e){on_error(e);}); if (selectedMarble.empty()) { return; } g->second.on_next(std::move(selectedMarble.get())); } void on_error(rxu::error_ptr e) const { for(auto& g : state->groups) { g.second.on_error(e); } dest.on_error(e); } void on_completed() const { for(auto& g : state->groups) { g.second.on_completed(); } dest.on_completed(); } static subscriber make(dest_type d, group_by_values v) { auto cs = composite_subscription(); return make_subscriber(cs, observer_type(this_type(cs, std::move(d), std::move(v)))); } }; template auto operator()(Subscriber dest) const -> decltype(group_by_observer::make(std::move(dest), initial)) { return group_by_observer::make(std::move(dest), initial); } }; template class group_by_factory { typedef rxu::decay_t key_selector_type; typedef rxu::decay_t marble_selector_type; typedef rxu::decay_t predicate_type; typedef rxu::decay_t duration_selector_type; key_selector_type keySelector; marble_selector_type marbleSelector; predicate_type predicate; duration_selector_type durationSelector; public: group_by_factory(key_selector_type ks, marble_selector_type ms, predicate_type p, duration_selector_type ds) : keySelector(std::move(ks)) , marbleSelector(std::move(ms)) , predicate(std::move(p)) , durationSelector(std::move(ds)) { } template struct group_by_factory_traits { typedef rxu::value_type_t> value_type; typedef detail::group_by_traits traits_type; typedef detail::group_by group_by_type; }; template auto operator()(Observable&& source) -> decltype(source.template lift::traits_type::grouped_observable_type>(typename group_by_factory_traits::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector)))) { return source.template lift::traits_type::grouped_observable_type>(typename group_by_factory_traits::group_by_type(std::move(keySelector), std::move(marbleSelector), std::move(predicate), std::move(durationSelector))); } }; } /*! @copydoc rx-group_by.hpp */ template auto group_by(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template, class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p, DurationSelector&& ds) -> decltype(o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p), std::forward(ds)))) { return o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p), std::forward(ds))); } template>>, class SourceValue = rxu::value_type_t, class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms, BinaryPredicate&& p) -> decltype(o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p), rxu::ret>>()))) { return o.template lift(GroupBy(std::forward(ks), std::forward(ms), std::forward(p), rxu::ret>>())); } template>>, class SourceValue = rxu::value_type_t, class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks, MarbleSelector&& ms) -> decltype(o.template lift(GroupBy(std::forward(ks), std::forward(ms), rxu::less(), rxu::ret>>()))) { return o.template lift(GroupBy(std::forward(ks), std::forward(ms), rxu::less(), rxu::ret>>())); } template, class BinaryPredicate=rxu::less, class DurationSelector=rxu::ret>>, class SourceValue = rxu::value_type_t, class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o, KeySelector&& ks) -> decltype(o.template lift(GroupBy(std::forward(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret>>()))) { return o.template lift(GroupBy(std::forward(ks), rxu::detail::take_at<0>(), rxu::less(), rxu::ret>>())); } template, class MarbleSelector=rxu::detail::take_at<0>, class BinaryPredicate=rxu::less, class DurationSelector=rxu::ret>>, class Enabled = rxu::enable_if_all_true_type_t< all_observables>, class SourceValue = rxu::value_type_t, class Traits = rxo::detail::group_by_traits, KeySelector, MarbleSelector, BinaryPredicate, DurationSelector>, class GroupBy = rxo::detail::group_by, rxu::decay_t, rxu::decay_t, rxu::decay_t, rxu::decay_t>, class Value = typename Traits::grouped_observable_type> static auto member(Observable&& o) -> decltype(o.template lift(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret>>()))) { return o.template lift(GroupBy(rxu::detail::take_at<0>(), rxu::detail::take_at<0>(), rxu::less(), rxu::ret>>())); } template static operators::detail::group_by_invalid_t member(const AN&...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "group_by takes (optional KeySelector, optional MarbleSelector, optional BinaryKeyPredicate, optional DurationSelector), KeySelector takes (Observable::value_type) -> KeyValue, MarbleSelector takes (Observable::value_type) -> MarbleValue, BinaryKeyPredicate takes (KeyValue, KeyValue) -> bool, DurationSelector takes (Observable::value_type) -> Observable"); } }; } #endif