// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-element_at.hpp \brief Pulls an item located at a specified index location in the sequence of items and emits that item as its own sole emission. \param index the index of the element to return. \return An observable that emit an item located at a specified index location. \sample \snippet element_at.cpp element_at sample \snippet output.txt element_at sample */ #if !defined(RXCPP_OPERATORS_RX_ELEMENT_AT_HPP) #define RXCPP_OPERATORS_RX_ELEMENT_AT_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct element_at_invalid_arguments {}; template struct element_at_invalid : public rxo::operator_base> { using type = observable, element_at_invalid>; }; template using element_at_invalid_t = typename element_at_invalid::type; template struct element_at { typedef rxu::decay_t source_value_type; struct element_at_values { element_at_values(int i) : index(i) { } int index; }; element_at_values initial; element_at(int i) : initial(i) { } template struct element_at_observer : public element_at_values { typedef element_at_observer this_type; typedef source_value_type value_type; typedef rxu::decay_t dest_type; typedef observer observer_type; dest_type dest; mutable int current; element_at_observer(dest_type d, element_at_values v) : element_at_values(v), dest(d), current(0) { } void on_next(source_value_type v) const { if (current++ == this->index) { dest.on_next(v); dest.on_completed(); } } void on_error(rxu::error_ptr e) const { dest.on_error(e); } void on_completed() const { if(current <= this->index) { dest.on_error(rxu::make_error_ptr(std::range_error("index is out of bounds"))); } } static subscriber make(dest_type d, element_at_values v) { return make_subscriber(d, this_type(d, v)); } }; template auto operator()(Subscriber dest) const -> decltype(element_at_observer::make(std::move(dest), initial)) { return element_at_observer::make(std::move(dest), initial); } }; } /*! @copydoc rx-element_at.hpp */ template auto element_at(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template >, class SourceValue = rxu::value_type_t, class element_at = rxo::detail::element_at> static auto member(Observable&& o, int index) -> decltype(o.template lift(element_at(index))) { return o.template lift(element_at(index)); } template static operators::detail::element_at_invalid_t member(const AN...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "element_at takes (required int)"); } }; } #endif