// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-distinct_until_changed.hpp \brief For each item from this observable, filter out consequentially repeated values and emit only changes from the new observable that is returned. \tparam BinaryPredicate (optional) the type of the value comparing function. The signature should be equivalent to the following: bool pred(const T1& a, const T2& b); \param pred (optional) the function that implements comparison of two values. \return Observable that emits those items from the source observable that are distinct from their immediate predecessors. \sample \snippet distinct_until_changed.cpp distinct_until_changed sample \snippet output.txt distinct_until_changed sample */ #if !defined(RXCPP_OPERATORS_RX_DISTINCT_UNTIL_CHANGED_HPP) #define RXCPP_OPERATORS_RX_DISTINCT_UNTIL_CHANGED_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct distinct_until_changed_invalid_arguments {}; template struct distinct_until_changed_invalid : public rxo::operator_base> { using type = observable, distinct_until_changed_invalid>; }; template using distinct_until_changed_invalid_t = typename distinct_until_changed_invalid::type; template struct distinct_until_changed { typedef rxu::decay_t source_value_type; typedef rxu::decay_t predicate_type; predicate_type pred; distinct_until_changed(predicate_type p) : pred(std::move(p)) { } template struct distinct_until_changed_observer { typedef distinct_until_changed_observer this_type; typedef source_value_type value_type; typedef rxu::decay_t dest_type; typedef observer observer_type; dest_type dest; predicate_type pred; mutable rxu::detail::maybe remembered; distinct_until_changed_observer(dest_type d, predicate_type pred) : dest(std::move(d)) , pred(std::move(pred)) { } void on_next(source_value_type v) const { if (remembered.empty() || !pred(v, remembered.get())) { remembered.reset(v); dest.on_next(v); } } void on_error(rxu::error_ptr e) const { dest.on_error(e); } void on_completed() const { dest.on_completed(); } static subscriber make(dest_type d, predicate_type p) { return make_subscriber(d, this_type(d, std::move(p))); } }; template auto operator()(Subscriber dest) const -> decltype(distinct_until_changed_observer::make(std::move(dest), pred)) { return distinct_until_changed_observer::make(std::move(dest), pred); } }; } /*! @copydoc rx-distinct_until_changed.hpp */ template auto distinct_until_changed(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template, class Enabled = rxu::enable_if_all_true_type_t< is_observable>, class DistinctUntilChanged = rxo::detail::distinct_until_changed>> static auto member(Observable&& o) -> decltype(o.template lift(DistinctUntilChanged(rxu::equal_to<>()))) { return o.template lift(DistinctUntilChanged(rxu::equal_to<>())); } template, class Enabled = rxu::enable_if_all_true_type_t< is_observable>, class DistinctUntilChanged = rxo::detail::distinct_until_changed> static auto member(Observable&& o, BinaryPredicate&& pred) -> decltype(o.template lift(DistinctUntilChanged(std::forward(pred)))) { return o.template lift(DistinctUntilChanged(std::forward(pred))); } template static operators::detail::distinct_until_changed_invalid_t member(AN...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "distinct_until_changed takes (optional BinaryPredicate)"); } }; } #endif