// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-on_error_resume_next.hpp \brief If an error occurs, take the result from the Selector and subscribe to that instead. \tparam Selector the actual type of a function of the form `observable(rxu::error_ptr)` \param s the function of the form `observable(rxu::error_ptr)` \return Observable that emits the items from the source observable and switches to a new observable on error. \sample \snippet on_error_resume_next.cpp on_error_resume_next sample \snippet output.txt on_error_resume_next sample */ #if !defined(RXCPP_OPERATORS_RX_ON_ERROR_RESUME_NEXT_HPP) #define RXCPP_OPERATORS_RX_ON_ERROR_RESUME_NEXT_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct on_error_resume_next_invalid_arguments {}; template struct on_error_resume_next_invalid : public rxo::operator_base> { using type = observable, on_error_resume_next_invalid>; }; template using on_error_resume_next_invalid_t = typename on_error_resume_next_invalid::type; template struct on_error_resume_next { typedef rxu::decay_t value_type; typedef rxu::decay_t select_type; typedef decltype((*(select_type*)nullptr)(rxu::error_ptr())) fallback_type; select_type selector; on_error_resume_next(select_type s) : selector(std::move(s)) { } template struct on_error_resume_next_observer { typedef on_error_resume_next_observer this_type; typedef rxu::decay_t value_type; typedef rxu::decay_t select_type; typedef decltype((*(select_type*)nullptr)(rxu::error_ptr())) fallback_type; typedef rxu::decay_t dest_type; typedef observer observer_type; dest_type dest; composite_subscription lifetime; select_type selector; on_error_resume_next_observer(dest_type d, composite_subscription cs, select_type s) : dest(std::move(d)) , lifetime(std::move(cs)) , selector(std::move(s)) { dest.add(lifetime); } void on_next(value_type v) const { dest.on_next(std::move(v)); } void on_error(rxu::error_ptr e) const { auto selected = on_exception( [&](){ return this->selector(std::move(e));}, dest); if (selected.empty()) { return; } selected->subscribe(dest); } void on_completed() const { dest.on_completed(); } static subscriber make(dest_type d, select_type s) { auto cs = composite_subscription(); return make_subscriber(cs, observer_type(this_type(std::move(d), cs, std::move(s)))); } }; template auto operator()(Subscriber dest) const -> decltype(on_error_resume_next_observer::make(std::move(dest), selector)) { return on_error_resume_next_observer::make(std::move(dest), selector); } }; } /*! @copydoc rx-on_error_resume_next.hpp */ template auto on_error_resume_next(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } /*! @copydoc rx-on_error_resume_next.hpp */ template auto switch_on_error(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template, class OnErrorResumeNext = rxo::detail::on_error_resume_next>, class Value = rxu::value_type_t> static auto member(Observable&& o, Selector&& p) -> decltype(o.template lift(OnErrorResumeNext(std::forward(p)))) { return o.template lift(OnErrorResumeNext(std::forward(p))); } template static operators::detail::on_error_resume_next_invalid_t member(const AN&...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "on_error_resume_next takes (Selector)"); } }; } #endif