// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. #pragma once /*! \file rx-window.hpp \brief Return an observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. If the skip parameter is set, return an observable that emits windows every skip items containing at most count items from the source observable. \param count the maximum size of each window before it should be completed \param skip how many items need to be skipped before starting a new window \return Observable that emits connected, non-overlapping windows, each containing at most count items from the source observable. If the skip parameter is set, return an Observable that emits windows every skip items containing at most count items from the source observable. \sample \snippet window.cpp window count+skip sample \snippet output.txt window count+skip sample \sample \snippet window.cpp window count sample \snippet output.txt window count sample */ #if !defined(RXCPP_OPERATORS_RX_WINDOW_HPP) #define RXCPP_OPERATORS_RX_WINDOW_HPP #include "../rx-includes.hpp" namespace rxcpp { namespace operators { namespace detail { template struct window_invalid_arguments {}; template struct window_invalid : public rxo::operator_base> { using type = observable, window_invalid>; }; template using window_invalid_t = typename window_invalid::type; template struct window { typedef rxu::decay_t source_value_type; typedef observable value_type; struct window_values { window_values(int c, int s) : count(c) , skip(s) { } int count; int skip; }; window_values initial; window(int count, int skip) : initial(count, skip) { } template struct window_observer : public window_values { typedef window_observer this_type; typedef rxu::decay_t value_type; typedef rxu::decay_t dest_type; typedef observer observer_type; dest_type dest; mutable int cursor; mutable std::deque> subj; window_observer(dest_type d, window_values v) : window_values(v) , dest(std::move(d)) , cursor(0) { subj.push_back(rxcpp::subjects::subject()); dest.on_next(subj[0].get_observable().as_dynamic()); } void on_next(T v) const { for (auto s : subj) { s.get_subscriber().on_next(v); } int c = cursor - this->count + 1; if (c >= 0 && c % this->skip == 0) { subj[0].get_subscriber().on_completed(); subj.pop_front(); } if (++cursor % this->skip == 0) { subj.push_back(rxcpp::subjects::subject()); dest.on_next(subj[subj.size() - 1].get_observable().as_dynamic()); } } void on_error(rxu::error_ptr e) const { for (auto s : subj) { s.get_subscriber().on_error(e); } dest.on_error(e); } void on_completed() const { for (auto s : subj) { s.get_subscriber().on_completed(); } dest.on_completed(); } static subscriber make(dest_type d, window_values v) { auto cs = d.get_subscription(); return make_subscriber(std::move(cs), observer_type(this_type(std::move(d), std::move(v)))); } }; template auto operator()(Subscriber dest) const -> decltype(window_observer::make(std::move(dest), initial)) { return window_observer::make(std::move(dest), initial); } }; } /*! @copydoc rx-window.hpp */ template auto window(AN&&... an) -> operator_factory { return operator_factory(std::make_tuple(std::forward(an)...)); } } template<> struct member_overload { template>, class SourceValue = rxu::value_type_t, class Window = rxo::detail::window, class Value = rxu::value_type_t> static auto member(Observable&& o, int count, int skip) -> decltype(o.template lift(Window(count, skip))) { return o.template lift(Window(count, skip)); } template>, class SourceValue = rxu::value_type_t, class Window = rxo::detail::window, class Value = rxu::value_type_t> static auto member(Observable&& o, int count) -> decltype(o.template lift(Window(count, count))) { return o.template lift(Window(count, count)); } template static operators::detail::window_invalid_t member(AN...) { std::terminate(); return {}; static_assert(sizeof...(AN) == 10000, "window takes (Count, optional Skip)"); } }; } #endif