1808 lines
62 KiB
C++
1808 lines
62 KiB
C++
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
|
|
|
|
#pragma once
|
|
|
|
#if !defined(RXCPP_RX_OBSERVABLE_HPP)
|
|
#define RXCPP_RX_OBSERVABLE_HPP
|
|
|
|
#include "rx-includes.hpp"
|
|
|
|
#ifdef __GNUG__
|
|
#define EXPLICIT_THIS this->
|
|
#else
|
|
#define EXPLICIT_THIS
|
|
#endif
|
|
|
|
namespace rxcpp {
|
|
|
|
namespace detail {
|
|
|
|
template<class Subscriber, class T>
|
|
struct has_on_subscribe_for
|
|
{
|
|
struct not_void {};
|
|
template<class CS, class CT>
|
|
static auto check(int) -> decltype((*(CT*)nullptr).on_subscribe(*(CS*)nullptr));
|
|
template<class CS, class CT>
|
|
static not_void check(...);
|
|
|
|
typedef decltype(check<rxu::decay_t<Subscriber>, T>(0)) detail_result;
|
|
static const bool value = std::is_same<detail_result, void>::value;
|
|
};
|
|
|
|
}
|
|
|
|
template<class T>
|
|
class dynamic_observable
|
|
: public rxs::source_base<T>
|
|
{
|
|
struct state_type
|
|
: public std::enable_shared_from_this<state_type>
|
|
{
|
|
typedef std::function<void(subscriber<T>)> onsubscribe_type;
|
|
|
|
onsubscribe_type on_subscribe;
|
|
};
|
|
std::shared_ptr<state_type> state;
|
|
|
|
template<class U>
|
|
friend bool operator==(const dynamic_observable<U>&, const dynamic_observable<U>&);
|
|
|
|
template<class SO>
|
|
void construct(SO&& source, rxs::tag_source&&) {
|
|
rxu::decay_t<SO> so = std::forward<SO>(source);
|
|
state->on_subscribe = [so](subscriber<T> o) mutable {
|
|
so.on_subscribe(std::move(o));
|
|
};
|
|
}
|
|
|
|
struct tag_function {};
|
|
template<class F>
|
|
void construct(F&& f, tag_function&&) {
|
|
state->on_subscribe = std::forward<F>(f);
|
|
}
|
|
|
|
public:
|
|
|
|
typedef tag_dynamic_observable dynamic_observable_tag;
|
|
|
|
dynamic_observable()
|
|
{
|
|
}
|
|
|
|
template<class SOF>
|
|
explicit dynamic_observable(SOF&& sof, typename std::enable_if<!is_dynamic_observable<SOF>::value, void**>::type = 0)
|
|
: state(std::make_shared<state_type>())
|
|
{
|
|
construct(std::forward<SOF>(sof),
|
|
typename std::conditional<rxs::is_source<SOF>::value || rxo::is_operator<SOF>::value, rxs::tag_source, tag_function>::type());
|
|
}
|
|
|
|
void on_subscribe(subscriber<T> o) const {
|
|
state->on_subscribe(std::move(o));
|
|
}
|
|
|
|
template<class Subscriber>
|
|
typename std::enable_if<is_subscriber<Subscriber>::value, void>::type
|
|
on_subscribe(Subscriber o) const {
|
|
state->on_subscribe(o.as_dynamic());
|
|
}
|
|
};
|
|
|
|
template<class T>
|
|
inline bool operator==(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
|
|
return lhs.state == rhs.state;
|
|
}
|
|
template<class T>
|
|
inline bool operator!=(const dynamic_observable<T>& lhs, const dynamic_observable<T>& rhs) {
|
|
return !(lhs == rhs);
|
|
}
|
|
|
|
template<class T, class Source>
|
|
observable<T> make_observable_dynamic(Source&& s) {
|
|
return observable<T>(dynamic_observable<T>(std::forward<Source>(s)));
|
|
}
|
|
|
|
namespace detail {
|
|
template<bool Selector, class Default, class SO>
|
|
struct resolve_observable;
|
|
|
|
template<class Default, class SO>
|
|
struct resolve_observable<true, Default, SO>
|
|
{
|
|
typedef typename SO::type type;
|
|
typedef typename type::value_type value_type;
|
|
static const bool value = true;
|
|
typedef observable<value_type, type> observable_type;
|
|
template<class... AN>
|
|
static observable_type make(const Default&, AN&&... an) {
|
|
return observable_type(type(std::forward<AN>(an)...));
|
|
}
|
|
};
|
|
template<class Default, class SO>
|
|
struct resolve_observable<false, Default, SO>
|
|
{
|
|
static const bool value = false;
|
|
typedef Default observable_type;
|
|
template<class... AN>
|
|
static observable_type make(const observable_type& that, const AN&...) {
|
|
return that;
|
|
}
|
|
};
|
|
template<class SO>
|
|
struct resolve_observable<true, void, SO>
|
|
{
|
|
typedef typename SO::type type;
|
|
typedef typename type::value_type value_type;
|
|
static const bool value = true;
|
|
typedef observable<value_type, type> observable_type;
|
|
template<class... AN>
|
|
static observable_type make(AN&&... an) {
|
|
return observable_type(type(std::forward<AN>(an)...));
|
|
}
|
|
};
|
|
template<class SO>
|
|
struct resolve_observable<false, void, SO>
|
|
{
|
|
static const bool value = false;
|
|
typedef void observable_type;
|
|
template<class... AN>
|
|
static observable_type make(const AN&...) {
|
|
}
|
|
};
|
|
|
|
}
|
|
|
|
template<class Selector, class Default, template<class... TN> class SO, class... AN>
|
|
struct defer_observable
|
|
: public detail::resolve_observable<Selector::value, Default, rxu::defer_type<SO, AN...>>
|
|
{
|
|
};
|
|
|
|
/*!
|
|
\brief a source of values whose methods block until all values have been emitted. subscribe or use one of the operator methods that reduce the values emitted to a single value.
|
|
|
|
\ingroup group-observable
|
|
|
|
*/
|
|
template<class T, class Observable>
|
|
class blocking_observable
|
|
{
|
|
template<class Obsvbl, class... ArgN>
|
|
static auto blocking_subscribe(const Obsvbl& source, bool do_rethrow, ArgN&&... an)
|
|
-> void {
|
|
std::mutex lock;
|
|
std::condition_variable wake;
|
|
bool disposed = false;
|
|
rxu::error_ptr error;
|
|
|
|
auto dest = make_subscriber<T>(std::forward<ArgN>(an)...);
|
|
|
|
// keep any error to rethrow at the end.
|
|
auto scbr = make_subscriber<T>(
|
|
dest,
|
|
[&](T t){dest.on_next(t);},
|
|
[&](rxu::error_ptr e){
|
|
if (do_rethrow) {
|
|
error = e;
|
|
} else {
|
|
dest.on_error(e);
|
|
}
|
|
},
|
|
[&](){dest.on_completed();}
|
|
);
|
|
|
|
auto cs = scbr.get_subscription();
|
|
cs.add(
|
|
[&](){
|
|
std::unique_lock<std::mutex> guard(lock);
|
|
disposed = true;
|
|
wake.notify_one();
|
|
});
|
|
|
|
source.subscribe(std::move(scbr));
|
|
|
|
std::unique_lock<std::mutex> guard(lock);
|
|
wake.wait(guard,
|
|
[&](){
|
|
return disposed;
|
|
});
|
|
|
|
if (error) {rxu::rethrow_exception(error);}
|
|
}
|
|
|
|
public:
|
|
typedef rxu::decay_t<Observable> observable_type;
|
|
observable_type source;
|
|
~blocking_observable()
|
|
{
|
|
}
|
|
blocking_observable(observable_type s) : source(std::move(s)) {}
|
|
|
|
///
|
|
/// `subscribe` will cause this observable to emit values to the provided subscriber.
|
|
///
|
|
/// \return void
|
|
///
|
|
/// \param an... - the arguments are passed to make_subscriber().
|
|
///
|
|
/// callers must provide enough arguments to make a subscriber.
|
|
/// overrides are supported. thus
|
|
/// `subscribe(thesubscriber, composite_subscription())`
|
|
/// will take `thesubscriber.get_observer()` and the provided
|
|
/// subscription and subscribe to the new subscriber.
|
|
/// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
|
|
/// if a subscription or subscriber is not provided then a new subscription will be created.
|
|
///
|
|
template<class... ArgN>
|
|
auto subscribe(ArgN&&... an) const
|
|
-> void {
|
|
return blocking_subscribe(source, false, std::forward<ArgN>(an)...);
|
|
}
|
|
|
|
///
|
|
/// `subscribe_with_rethrow` will cause this observable to emit values to the provided subscriber.
|
|
///
|
|
/// \note If the source observable calls on_error, the raised exception is rethrown by this method.
|
|
///
|
|
/// \note If the source observable calls on_error, the `on_error` method on the subscriber will not be called.
|
|
///
|
|
/// \return void
|
|
///
|
|
/// \param an... - the arguments are passed to make_subscriber().
|
|
///
|
|
/// callers must provide enough arguments to make a subscriber.
|
|
/// overrides are supported. thus
|
|
/// `subscribe(thesubscriber, composite_subscription())`
|
|
/// will take `thesubscriber.get_observer()` and the provided
|
|
/// subscription and subscribe to the new subscriber.
|
|
/// the `on_next`, `on_error`, `on_completed` methods can be supplied instead of an observer
|
|
/// if a subscription or subscriber is not provided then a new subscription will be created.
|
|
///
|
|
template<class... ArgN>
|
|
auto subscribe_with_rethrow(ArgN&&... an) const
|
|
-> void {
|
|
return blocking_subscribe(source, true, std::forward<ArgN>(an)...);
|
|
}
|
|
|
|
/*! Return the first item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
|
|
|
|
\return The first item emitted by this blocking_observable.
|
|
|
|
\note If the source observable calls on_error, the raised exception is rethrown by this method.
|
|
|
|
\sample
|
|
When the source observable emits at least one item:
|
|
\snippet blocking_observable.cpp blocking first sample
|
|
\snippet output.txt blocking first sample
|
|
|
|
When the source observable is empty:
|
|
\snippet blocking_observable.cpp blocking first empty sample
|
|
\snippet output.txt blocking first empty sample
|
|
*/
|
|
template<class... AN>
|
|
auto first(AN**...) -> delayed_type_t<T, AN...> const {
|
|
rxu::maybe<T> result;
|
|
composite_subscription cs;
|
|
subscribe_with_rethrow(
|
|
cs,
|
|
[&](T v){result.reset(v); cs.unsubscribe();});
|
|
if (result.empty())
|
|
rxu::throw_exception(rxcpp::empty_error("first() requires a stream with at least one value"));
|
|
return result.get();
|
|
static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
|
|
}
|
|
|
|
/*! Return the last item emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
|
|
|
|
\return The last item emitted by this blocking_observable.
|
|
|
|
\note If the source observable calls on_error, the raised exception is rethrown by this method.
|
|
|
|
\sample
|
|
When the source observable emits at least one item:
|
|
\snippet blocking_observable.cpp blocking last sample
|
|
\snippet output.txt blocking last sample
|
|
|
|
When the source observable is empty:
|
|
\snippet blocking_observable.cpp blocking last empty sample
|
|
\snippet output.txt blocking last empty sample
|
|
*/
|
|
template<class... AN>
|
|
auto last(AN**...) -> delayed_type_t<T, AN...> const {
|
|
rxu::maybe<T> result;
|
|
subscribe_with_rethrow(
|
|
[&](T v){result.reset(v);});
|
|
if (result.empty())
|
|
rxu::throw_exception(rxcpp::empty_error("last() requires a stream with at least one value"));
|
|
return result.get();
|
|
static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
|
|
}
|
|
|
|
/*! Return the total number of items emitted by this blocking_observable.
|
|
|
|
\return The total number of items emitted by this blocking_observable.
|
|
|
|
\sample
|
|
\snippet blocking_observable.cpp blocking count sample
|
|
\snippet output.txt blocking count sample
|
|
|
|
When the source observable calls on_error:
|
|
\snippet blocking_observable.cpp blocking count error sample
|
|
\snippet output.txt blocking count error sample
|
|
*/
|
|
int count() const {
|
|
int result = 0;
|
|
source.count().as_blocking().subscribe_with_rethrow(
|
|
[&](int v){result = v;});
|
|
return result;
|
|
}
|
|
|
|
/*! Return the sum of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
|
|
|
|
\return The sum of all items emitted by this blocking_observable.
|
|
|
|
\sample
|
|
When the source observable emits at least one item:
|
|
\snippet blocking_observable.cpp blocking sum sample
|
|
\snippet output.txt blocking sum sample
|
|
|
|
When the source observable is empty:
|
|
\snippet blocking_observable.cpp blocking sum empty sample
|
|
\snippet output.txt blocking sum empty sample
|
|
|
|
When the source observable calls on_error:
|
|
\snippet blocking_observable.cpp blocking sum error sample
|
|
\snippet output.txt blocking sum error sample
|
|
*/
|
|
T sum() const {
|
|
return source.sum().as_blocking().last();
|
|
}
|
|
|
|
/*! Return the average value of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
|
|
|
|
\return The average value of all items emitted by this blocking_observable.
|
|
|
|
\sample
|
|
When the source observable emits at least one item:
|
|
\snippet blocking_observable.cpp blocking average sample
|
|
\snippet output.txt blocking average sample
|
|
|
|
When the source observable is empty:
|
|
\snippet blocking_observable.cpp blocking average empty sample
|
|
\snippet output.txt blocking average empty sample
|
|
|
|
When the source observable calls on_error:
|
|
\snippet blocking_observable.cpp blocking average error sample
|
|
\snippet output.txt blocking average error sample
|
|
*/
|
|
double average() const {
|
|
return source.average().as_blocking().last();
|
|
}
|
|
|
|
/*! Return the max of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
|
|
|
|
\return The max of all items emitted by this blocking_observable.
|
|
|
|
\sample
|
|
When the source observable emits at least one item:
|
|
\snippet blocking_observable.cpp blocking max sample
|
|
\snippet output.txt blocking max sample
|
|
|
|
When the source observable is empty:
|
|
\snippet blocking_observable.cpp blocking max empty sample
|
|
\snippet output.txt blocking max empty sample
|
|
|
|
When the source observable calls on_error:
|
|
\snippet blocking_observable.cpp blocking max error sample
|
|
\snippet output.txt blocking max error sample
|
|
*/
|
|
T max() const {
|
|
return source.max().as_blocking().last();
|
|
}
|
|
|
|
/*! Return the min of all items emitted by this blocking_observable, or throw an std::runtime_error exception if it emits no items.
|
|
|
|
\return The min of all items emitted by this blocking_observable.
|
|
|
|
\sample
|
|
When the source observable emits at least one item:
|
|
\snippet blocking_observable.cpp blocking min sample
|
|
\snippet output.txt blocking min sample
|
|
|
|
When the source observable is empty:
|
|
\snippet blocking_observable.cpp blocking min empty sample
|
|
\snippet output.txt blocking min empty sample
|
|
|
|
When the source observable calls on_error:
|
|
\snippet blocking_observable.cpp blocking min error sample
|
|
\snippet output.txt blocking min error sample
|
|
*/
|
|
T min() const {
|
|
return source.min().as_blocking().last();
|
|
}
|
|
};
|
|
|
|
namespace detail {
|
|
|
|
template<class SourceOperator, class Subscriber>
|
|
struct safe_subscriber
|
|
{
|
|
safe_subscriber(SourceOperator& so, Subscriber& o) : so(std::addressof(so)), o(std::addressof(o)) {}
|
|
|
|
void subscribe() {
|
|
RXCPP_TRY {
|
|
so->on_subscribe(*o);
|
|
} RXCPP_CATCH(...) {
|
|
if (!o->is_subscribed()) {
|
|
rxu::rethrow_current_exception();
|
|
}
|
|
o->on_error(rxu::make_error_ptr(rxu::current_exception()));
|
|
o->unsubscribe();
|
|
}
|
|
}
|
|
|
|
void operator()(const rxsc::schedulable&) {
|
|
subscribe();
|
|
}
|
|
|
|
SourceOperator* so;
|
|
Subscriber* o;
|
|
};
|
|
|
|
}
|
|
|
|
template<>
|
|
class observable<void, void>;
|
|
|
|
/*!
|
|
\defgroup group-observable Observables
|
|
|
|
\brief These are the set of observable classes in rxcpp.
|
|
|
|
\class rxcpp::observable
|
|
|
|
\ingroup group-observable group-core
|
|
|
|
\brief a source of values. subscribe or use one of the operator methods that return a new observable, which uses this observable as a source.
|
|
|
|
\par Some code
|
|
This sample will observable::subscribe() to values from a observable<void, void>::range().
|
|
|
|
\sample
|
|
\snippet range.cpp range sample
|
|
\snippet output.txt range sample
|
|
|
|
*/
|
|
template<class T, class SourceOperator>
|
|
class observable
|
|
: public observable_base<T>
|
|
{
|
|
static_assert(std::is_same<T, typename SourceOperator::value_type>::value, "SourceOperator::value_type must be the same as T in observable<T, SourceOperator>");
|
|
|
|
typedef observable<T, SourceOperator> this_type;
|
|
|
|
public:
|
|
typedef rxu::decay_t<SourceOperator> source_operator_type;
|
|
mutable source_operator_type source_operator;
|
|
|
|
private:
|
|
|
|
template<class U, class SO>
|
|
friend class observable;
|
|
|
|
template<class U, class SO>
|
|
friend bool operator==(const observable<U, SO>&, const observable<U, SO>&);
|
|
|
|
template<class Subscriber>
|
|
auto detail_subscribe(Subscriber o) const
|
|
-> composite_subscription {
|
|
|
|
typedef rxu::decay_t<Subscriber> subscriber_type;
|
|
|
|
static_assert(is_subscriber<subscriber_type>::value, "subscribe must be passed a subscriber");
|
|
static_assert(std::is_same<typename source_operator_type::value_type, T>::value && std::is_convertible<T*, typename subscriber_type::value_type*>::value, "the value types in the sequence must match or be convertible");
|
|
static_assert(detail::has_on_subscribe_for<subscriber_type, source_operator_type>::value, "inner must have on_subscribe method that accepts this subscriber ");
|
|
|
|
trace_activity().subscribe_enter(*this, o);
|
|
|
|
if (!o.is_subscribed()) {
|
|
trace_activity().subscribe_return(*this);
|
|
return o.get_subscription();
|
|
}
|
|
|
|
detail::safe_subscriber<source_operator_type, subscriber_type> subscriber(source_operator, o);
|
|
|
|
// make sure to let current_thread take ownership of the thread as early as possible.
|
|
if (rxsc::current_thread::is_schedule_required()) {
|
|
const auto& sc = rxsc::make_current_thread();
|
|
sc.create_worker(o.get_subscription()).schedule(subscriber);
|
|
} else {
|
|
// current_thread already owns this thread.
|
|
subscriber.subscribe();
|
|
}
|
|
|
|
trace_activity().subscribe_return(*this);
|
|
return o.get_subscription();
|
|
}
|
|
|
|
public:
|
|
typedef T value_type;
|
|
|
|
static_assert(rxo::is_operator<source_operator_type>::value || rxs::is_source<source_operator_type>::value, "observable must wrap an operator or source");
|
|
|
|
~observable()
|
|
{
|
|
}
|
|
|
|
observable()
|
|
{
|
|
}
|
|
|
|
explicit observable(const source_operator_type& o)
|
|
: source_operator(o)
|
|
{
|
|
}
|
|
explicit observable(source_operator_type&& o)
|
|
: source_operator(std::move(o))
|
|
{
|
|
}
|
|
|
|
/// implicit conversion between observables of the same value_type
|
|
template<class SO>
|
|
observable(const observable<T, SO>& o)
|
|
: source_operator(o.source_operator)
|
|
{}
|
|
/// implicit conversion between observables of the same value_type
|
|
template<class SO>
|
|
observable(observable<T, SO>&& o)
|
|
: source_operator(std::move(o.source_operator))
|
|
{}
|
|
|
|
#if 0
|
|
template<class I>
|
|
void on_subscribe(observer<T, I> o) const {
|
|
source_operator.on_subscribe(o);
|
|
}
|
|
#endif
|
|
|
|
/*! @copydoc rxcpp::operators::as_dynamic
|
|
*/
|
|
template<class... AN>
|
|
observable<T> as_dynamic(AN**...) const {
|
|
return *this;
|
|
static_assert(sizeof...(AN) == 0, "as_dynamic() was passed too many arguments.");
|
|
}
|
|
|
|
/*! @copydoc rx-ref_count.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto ref_count(AN... an) const // ref_count(ConnectableObservable&&)
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(ref_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(ref_count_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::as_blocking
|
|
*/
|
|
template<class... AN>
|
|
blocking_observable<T, this_type> as_blocking(AN**...) const {
|
|
return blocking_observable<T, this_type>(*this);
|
|
static_assert(sizeof...(AN) == 0, "as_blocking() was passed too many arguments.");
|
|
}
|
|
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
|
|
///
|
|
/// takes any function that will take this observable and produce a result value.
|
|
/// this is intended to allow externally defined operators, that use subscribe,
|
|
/// to be connected into the expression.
|
|
///
|
|
template<class OperatorFactory>
|
|
auto op(OperatorFactory&& of) const
|
|
-> decltype(of(*(const this_type*)nullptr)) {
|
|
return of(*this);
|
|
static_assert(is_operator_factory_for<this_type, OperatorFactory>::value, "Function passed for op() must have the signature Result(SourceObservable)");
|
|
}
|
|
|
|
/*! @copydoc rx-lift.hpp
|
|
*/
|
|
template<class ResultType, class Operator>
|
|
auto lift(Operator&& op) const
|
|
-> observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>> {
|
|
return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
|
|
rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
|
|
static_assert(detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value, "Function passed for lift() must have the signature subscriber<...>(subscriber<T, ...>)");
|
|
}
|
|
|
|
///
|
|
/// takes any function that will take a subscriber for this observable and produce a subscriber.
|
|
/// this is intended to allow externally defined operators, that use make_subscriber, to be connected
|
|
/// into the expression.
|
|
///
|
|
template<class ResultType, class Operator>
|
|
auto lift_if(Operator&& op) const
|
|
-> typename std::enable_if<detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
|
|
observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>>::type {
|
|
return observable<rxu::value_type_t<rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>, rxo::detail::lift_operator<ResultType, source_operator_type, Operator>>(
|
|
rxo::detail::lift_operator<ResultType, source_operator_type, Operator>(source_operator, std::forward<Operator>(op)));
|
|
}
|
|
///
|
|
/// takes any function that will take a subscriber for this observable and produce a subscriber.
|
|
/// this is intended to allow externally defined operators, that use make_subscriber, to be connected
|
|
/// into the expression.
|
|
///
|
|
template<class ResultType, class Operator>
|
|
auto lift_if(Operator&&) const
|
|
-> typename std::enable_if<!detail::is_lift_function_for<T, subscriber<ResultType>, Operator>::value,
|
|
decltype(rxs::from<ResultType>())>::type {
|
|
return rxs::from<ResultType>();
|
|
}
|
|
/// \endcond
|
|
|
|
/*! @copydoc rx-subscribe.hpp
|
|
*/
|
|
template<class... ArgN>
|
|
auto subscribe(ArgN&&... an) const
|
|
-> composite_subscription {
|
|
return detail_subscribe(make_subscriber<T>(std::forward<ArgN>(an)...));
|
|
}
|
|
|
|
/*! @copydoc rx-all.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto all(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(all_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(all_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::is_empty
|
|
*/
|
|
template<class... AN>
|
|
auto is_empty(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(is_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(is_empty_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-any.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto any(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(any_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(any_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::exists
|
|
*/
|
|
template<class... AN>
|
|
auto exists(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(exists_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(exists_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::contains
|
|
*/
|
|
template<class... AN>
|
|
auto contains(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(contains_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(contains_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-filter.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto filter(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(filter_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(filter_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-switch_if_empty.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto switch_if_empty(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(switch_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(switch_if_empty_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::default_if_empty
|
|
*/
|
|
template<class... AN>
|
|
auto default_if_empty(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(default_if_empty_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(default_if_empty_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-sequence_equal.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto sequence_equal(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(sequence_equal_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(sequence_equal_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-tap.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto tap(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(tap_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(tap_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-time_interval.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto time_interval(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(time_interval_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(time_interval_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-timeout.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto timeout(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(timeout_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(timeout_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-timestamp.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto timestamp(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(timestamp_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(timestamp_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-finally.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto finally(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(finally_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(finally_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-on_error_resume_next.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto on_error_resume_next(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-on_error_resume_next.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto switch_on_error(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(on_error_resume_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(on_error_resume_next_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-map.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto map(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-map.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto transform(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(map_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-debounce.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto debounce(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(debounce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(debounce_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-delay.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto delay(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(delay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(delay_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-distinct.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto distinct(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(distinct_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(distinct_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-distinct_until_changed.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto distinct_until_changed(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(distinct_until_changed_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(distinct_until_changed_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-element_at.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto element_at(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(element_at_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(element_at_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-window.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto window(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(window_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(window_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-window_time.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto window_with_time(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(window_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(window_with_time_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-window_time_count.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto window_with_time_or_count(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(window_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(window_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-window_toggle.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto window_toggle(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(window_toggle_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(window_toggle_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-buffer_count.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto buffer(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(buffer_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(buffer_count_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-buffer_time.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto buffer_with_time(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(buffer_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(buffer_with_time_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-buffer_time_count.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto buffer_with_time_or_count(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(buffer_with_time_or_count_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(buffer_with_time_or_count_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-switch_on_next.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto switch_on_next(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(switch_on_next_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(switch_on_next_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-merge.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto merge(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(merge_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(merge_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-merge_delay_error.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto merge_delay_error(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(merge_delay_error_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(merge_delay_error_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-amb.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto amb(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(amb_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(amb_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-flat_map.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto flat_map(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-flat_map.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto merge_transform(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(flat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(flat_map_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-concat.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto concat(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(concat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(concat_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-concat_map.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto concat_map(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-concat_map.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto concat_transform(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(concat_map_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(concat_map_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-with_latest_from.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto with_latest_from(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(with_latest_from_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(with_latest_from_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
|
|
/*! @copydoc rx-combine_latest.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto combine_latest(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(combine_latest_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(combine_latest_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-zip.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto zip(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(zip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(zip_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-group_by.hpp
|
|
*/
|
|
template<class... AN>
|
|
inline auto group_by(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(group_by_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(group_by_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-ignore_elements.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto ignore_elements(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(ignore_elements_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(ignore_elements_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-muticast.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto multicast(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(multicast_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(multicast_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-publish.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto publish(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(publish_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(publish_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::publish_synchronized
|
|
*/
|
|
template<class... AN>
|
|
auto publish_synchronized(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(publish_synchronized_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(publish_synchronized_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-replay.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto replay(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(replay_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(replay_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-subscribe_on.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto subscribe_on(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(subscribe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(subscribe_on_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-observe_on.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto observe_on(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(observe_on_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(observe_on_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-reduce.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto reduce(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-reduce.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto accumulate(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(reduce_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(reduce_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::first
|
|
*/
|
|
template<class... AN>
|
|
auto first(AN**...) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(delayed_type<first_tag, AN...>::value(), *(this_type*)nullptr))
|
|
/// \endcond
|
|
{
|
|
return observable_member(delayed_type<first_tag, AN...>::value(), *this);
|
|
static_assert(sizeof...(AN) == 0, "first() was passed too many arguments.");
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::last
|
|
*/
|
|
template<class... AN>
|
|
auto last(AN**...) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(delayed_type<last_tag, AN...>::value(), *(this_type*)nullptr))
|
|
/// \endcond
|
|
{
|
|
return observable_member(delayed_type<last_tag, AN...>::value(), *this);
|
|
static_assert(sizeof...(AN) == 0, "last() was passed too many arguments.");
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::count
|
|
*/
|
|
template<class... AN>
|
|
auto count(AN**...) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(delayed_type<reduce_tag, AN...>::value(), *(this_type*)nullptr, 0, rxu::count(), identity_for<int>()))
|
|
/// \endcond
|
|
{
|
|
return observable_member(delayed_type<reduce_tag, AN...>::value(), *this, 0, rxu::count(), identity_for<int>());
|
|
static_assert(sizeof...(AN) == 0, "count() was passed too many arguments.");
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::sum
|
|
*/
|
|
template<class... AN>
|
|
auto sum(AN**...) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(delayed_type<sum_tag, AN...>::value(), *(this_type*)nullptr))
|
|
/// \endcond
|
|
{
|
|
return observable_member(delayed_type<sum_tag, AN...>::value(), *this);
|
|
static_assert(sizeof...(AN) == 0, "sum() was passed too many arguments.");
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::average
|
|
*/
|
|
template<class... AN>
|
|
auto average(AN**...) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(delayed_type<average_tag, AN...>::value(), *(this_type*)nullptr))
|
|
/// \endcond
|
|
{
|
|
return observable_member(delayed_type<average_tag, AN...>::value(), *this);
|
|
static_assert(sizeof...(AN) == 0, "average() was passed too many arguments.");
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::max
|
|
*/
|
|
template<class... AN>
|
|
auto max(AN**...) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(delayed_type<max_tag, AN...>::value(), *(this_type*)nullptr))
|
|
/// \endcond
|
|
{
|
|
return observable_member(delayed_type<max_tag, AN...>::value(), *this);
|
|
static_assert(sizeof...(AN) == 0, "max() was passed too many arguments.");
|
|
}
|
|
|
|
/*! @copydoc rxcpp::operators::min
|
|
*/
|
|
template<class... AN>
|
|
auto min(AN**...) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(delayed_type<min_tag, AN...>::value(), *(this_type*)nullptr))
|
|
/// \endcond
|
|
{
|
|
return observable_member(delayed_type<min_tag, AN...>::value(), *this);
|
|
static_assert(sizeof...(AN) == 0, "min() was passed too many arguments.");
|
|
}
|
|
|
|
/*! @copydoc rx-scan.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto scan(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(scan_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(scan_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-sample_time.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto sample_with_time(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(sample_with_time_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(sample_with_time_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-skip.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto skip(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(skip_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(skip_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-skip.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto skip_while(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(skip_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(skip_while_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-skip_last.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto skip_last(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(skip_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(skip_last_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-skip_until.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto skip_until(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(skip_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(skip_until_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-take.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto take(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(take_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(take_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-take_last.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto take_last(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(take_last_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(take_last_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-take_until.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto take_until(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(take_until_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(take_until_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-take_while.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto take_while(AN&&... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(take_while_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(take_while_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-repeat.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto repeat(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(repeat_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(repeat_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-retry.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto retry(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(retry_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(retry_tag{}, *(this_type*)this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-start_with.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto start_with(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(start_with_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(start_with_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
|
|
/*! @copydoc rx-pairwise.hpp
|
|
*/
|
|
template<class... AN>
|
|
auto pairwise(AN... an) const
|
|
/// \cond SHOW_SERVICE_MEMBERS
|
|
-> decltype(observable_member(pairwise_tag{}, *(this_type*)nullptr, std::forward<AN>(an)...))
|
|
/// \endcond
|
|
{
|
|
return observable_member(pairwise_tag{}, *this, std::forward<AN>(an)...);
|
|
}
|
|
};
|
|
|
|
template<class T, class SourceOperator>
|
|
inline bool operator==(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
|
|
return lhs.source_operator == rhs.source_operator;
|
|
}
|
|
template<class T, class SourceOperator>
|
|
inline bool operator!=(const observable<T, SourceOperator>& lhs, const observable<T, SourceOperator>& rhs) {
|
|
return !(lhs == rhs);
|
|
}
|
|
|
|
/*!
|
|
\defgroup group-core Basics
|
|
|
|
\brief These are the core classes that combine to represent a set of values emitted over time that can be cancelled.
|
|
|
|
\class rxcpp::observable<void, void>
|
|
|
|
\brief typed as ```rxcpp::observable<>```, this is a collection of factory methods that return an observable.
|
|
|
|
\ingroup group-core
|
|
|
|
\par Create a new type of observable
|
|
|
|
\sample
|
|
\snippet create.cpp Create sample
|
|
\snippet output.txt Create sample
|
|
|
|
\par Create an observable that emits a range of values
|
|
|
|
\sample
|
|
\snippet range.cpp range sample
|
|
\snippet output.txt range sample
|
|
|
|
\par Create an observable that emits nothing / generates an error / immediately completes
|
|
|
|
\sample
|
|
\snippet never.cpp never sample
|
|
\snippet output.txt never sample
|
|
\snippet error.cpp error sample
|
|
\snippet output.txt error sample
|
|
\snippet empty.cpp empty sample
|
|
\snippet output.txt empty sample
|
|
|
|
\par Create an observable that generates new observable for each subscriber
|
|
|
|
\sample
|
|
\snippet defer.cpp defer sample
|
|
\snippet output.txt defer sample
|
|
|
|
\par Create an observable that emits items every specified interval of time
|
|
|
|
\sample
|
|
\snippet interval.cpp interval sample
|
|
\snippet output.txt interval sample
|
|
|
|
\par Create an observable that emits items in the specified interval of time
|
|
|
|
\sample
|
|
\snippet timer.cpp duration timer sample
|
|
\snippet output.txt duration timer sample
|
|
|
|
\par Create an observable that emits all items from a collection
|
|
|
|
\sample
|
|
\snippet iterate.cpp iterate sample
|
|
\snippet output.txt iterate sample
|
|
|
|
\par Create an observable that emits a set of specified items
|
|
|
|
\sample
|
|
\snippet from.cpp from sample
|
|
\snippet output.txt from sample
|
|
|
|
\par Create an observable that emits a single item
|
|
|
|
\sample
|
|
\snippet just.cpp just sample
|
|
\snippet output.txt just sample
|
|
|
|
\par Create an observable that emits a set of items and then subscribes to another observable
|
|
|
|
\sample
|
|
\snippet start_with.cpp full start_with sample
|
|
\snippet output.txt full start_with sample
|
|
|
|
\par Create an observable that generates a new observable based on a generated resource for each subscriber
|
|
|
|
\sample
|
|
\snippet scope.cpp scope sample
|
|
\snippet output.txt scope sample
|
|
|
|
*/
|
|
template<>
|
|
class observable<void, void>
|
|
{
|
|
~observable();
|
|
public:
|
|
/*! @copydoc rx-create.hpp
|
|
*/
|
|
template<class T, class OnSubscribe>
|
|
static auto create(OnSubscribe os)
|
|
-> decltype(rxs::create<T>(std::move(os))) {
|
|
return rxs::create<T>(std::move(os));
|
|
}
|
|
|
|
/*! @copydoc rx-range.hpp
|
|
*/
|
|
template<class T>
|
|
static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
|
|
-> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
|
|
return rxs::range<T>(first, last, step, identity_current_thread());
|
|
}
|
|
/*! @copydoc rx-range.hpp
|
|
*/
|
|
template<class T, class Coordination>
|
|
static auto range(T first, T last, std::ptrdiff_t step, Coordination cn)
|
|
-> decltype(rxs::range<T>(first, last, step, std::move(cn))) {
|
|
return rxs::range<T>(first, last, step, std::move(cn));
|
|
}
|
|
/*! @copydoc rx-range.hpp
|
|
*/
|
|
template<class T, class Coordination>
|
|
static auto range(T first, T last, Coordination cn)
|
|
-> decltype(rxs::range<T>(first, last, std::move(cn))) {
|
|
return rxs::range<T>(first, last, std::move(cn));
|
|
}
|
|
/*! @copydoc rx-range.hpp
|
|
*/
|
|
template<class T, class Coordination>
|
|
static auto range(T first, Coordination cn)
|
|
-> decltype(rxs::range<T>(first, std::move(cn))) {
|
|
return rxs::range<T>(first, std::move(cn));
|
|
}
|
|
|
|
/*! @copydoc rx-never.hpp
|
|
*/
|
|
template<class T>
|
|
static auto never()
|
|
-> decltype(rxs::never<T>()) {
|
|
return rxs::never<T>();
|
|
}
|
|
|
|
/*! @copydoc rx-defer.hpp
|
|
*/
|
|
template<class ObservableFactory>
|
|
static auto defer(ObservableFactory of)
|
|
-> decltype(rxs::defer(std::move(of))) {
|
|
return rxs::defer(std::move(of));
|
|
}
|
|
|
|
/*! @copydoc rx-interval.hpp
|
|
*/
|
|
template<class... AN>
|
|
static auto interval(rxsc::scheduler::clock_type::duration period, AN**...)
|
|
-> decltype(rxs::interval(period)) {
|
|
return rxs::interval(period);
|
|
static_assert(sizeof...(AN) == 0, "interval(period) was passed too many arguments.");
|
|
}
|
|
/*! @copydoc rx-interval.hpp
|
|
*/
|
|
template<class Coordination>
|
|
static auto interval(rxsc::scheduler::clock_type::duration period, Coordination cn)
|
|
-> decltype(rxs::interval(period, std::move(cn))) {
|
|
return rxs::interval(period, std::move(cn));
|
|
}
|
|
/*! @copydoc rx-interval.hpp
|
|
*/
|
|
template<class... AN>
|
|
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, AN**...)
|
|
-> decltype(rxs::interval(initial, period)) {
|
|
return rxs::interval(initial, period);
|
|
static_assert(sizeof...(AN) == 0, "interval(initial, period) was passed too many arguments.");
|
|
}
|
|
/*! @copydoc rx-interval.hpp
|
|
*/
|
|
template<class Coordination>
|
|
static auto interval(rxsc::scheduler::clock_type::time_point initial, rxsc::scheduler::clock_type::duration period, Coordination cn)
|
|
-> decltype(rxs::interval(initial, period, std::move(cn))) {
|
|
return rxs::interval(initial, period, std::move(cn));
|
|
}
|
|
|
|
/*! @copydoc rx-timer.hpp
|
|
*/
|
|
template<class... AN>
|
|
static auto timer(rxsc::scheduler::clock_type::time_point at, AN**...)
|
|
-> decltype(rxs::timer(at)) {
|
|
return rxs::timer(at);
|
|
static_assert(sizeof...(AN) == 0, "timer(at) was passed too many arguments.");
|
|
}
|
|
/*! @copydoc rx-timer.hpp
|
|
*/
|
|
template<class... AN>
|
|
static auto timer(rxsc::scheduler::clock_type::duration after, AN**...)
|
|
-> decltype(rxs::timer(after)) {
|
|
return rxs::timer(after);
|
|
static_assert(sizeof...(AN) == 0, "timer(after) was passed too many arguments.");
|
|
}
|
|
/*! @copydoc rx-timer.hpp
|
|
*/
|
|
template<class Coordination>
|
|
static auto timer(rxsc::scheduler::clock_type::time_point when, Coordination cn)
|
|
-> decltype(rxs::timer(when, std::move(cn))) {
|
|
return rxs::timer(when, std::move(cn));
|
|
}
|
|
/*! @copydoc rx-timer.hpp
|
|
*/
|
|
template<class Coordination>
|
|
static auto timer(rxsc::scheduler::clock_type::duration when, Coordination cn)
|
|
-> decltype(rxs::timer(when, std::move(cn))) {
|
|
return rxs::timer(when, std::move(cn));
|
|
}
|
|
|
|
/*! @copydoc rx-iterate.hpp
|
|
*/
|
|
template<class Collection>
|
|
static auto iterate(Collection c)
|
|
-> decltype(rxs::iterate(std::move(c), identity_current_thread())) {
|
|
return rxs::iterate(std::move(c), identity_current_thread());
|
|
}
|
|
/*! @copydoc rx-iterate.hpp
|
|
*/
|
|
template<class Collection, class Coordination>
|
|
static auto iterate(Collection c, Coordination cn)
|
|
-> decltype(rxs::iterate(std::move(c), std::move(cn))) {
|
|
return rxs::iterate(std::move(c), std::move(cn));
|
|
}
|
|
|
|
/*! @copydoc rxcpp::sources::from()
|
|
*/
|
|
template<class T>
|
|
static auto from()
|
|
-> decltype( rxs::from<T>()) {
|
|
return rxs::from<T>();
|
|
}
|
|
/*! @copydoc rxcpp::sources::from(Coordination cn)
|
|
*/
|
|
template<class T, class Coordination>
|
|
static auto from(Coordination cn)
|
|
-> typename std::enable_if<is_coordination<Coordination>::value,
|
|
decltype( rxs::from<T>(std::move(cn)))>::type {
|
|
return rxs::from<T>(std::move(cn));
|
|
}
|
|
/*! @copydoc rxcpp::sources::from(Value0 v0, ValueN... vn)
|
|
*/
|
|
template<class Value0, class... ValueN>
|
|
static auto from(Value0 v0, ValueN... vn)
|
|
-> typename std::enable_if<!is_coordination<Value0>::value,
|
|
decltype( rxs::from(v0, vn...))>::type {
|
|
return rxs::from(v0, vn...);
|
|
}
|
|
/*! @copydoc rxcpp::sources::from(Coordination cn, Value0 v0, ValueN... vn)
|
|
*/
|
|
template<class Coordination, class Value0, class... ValueN>
|
|
static auto from(Coordination cn, Value0 v0, ValueN... vn)
|
|
-> typename std::enable_if<is_coordination<Coordination>::value,
|
|
decltype( rxs::from(std::move(cn), v0, vn...))>::type {
|
|
return rxs::from(std::move(cn), v0, vn...);
|
|
}
|
|
|
|
/*! @copydoc rxcpp::sources::just(Value0 v0)
|
|
*/
|
|
template<class T>
|
|
static auto just(T v)
|
|
-> decltype(rxs::just(std::move(v))) {
|
|
return rxs::just(std::move(v));
|
|
}
|
|
/*! @copydoc rxcpp::sources::just(Value0 v0, Coordination cn)
|
|
*/
|
|
template<class T, class Coordination>
|
|
static auto just(T v, Coordination cn)
|
|
-> decltype(rxs::just(std::move(v), std::move(cn))) {
|
|
return rxs::just(std::move(v), std::move(cn));
|
|
}
|
|
|
|
/*! @copydoc rxcpp::sources::start_with(Observable o, Value0 v0, ValueN... vn)
|
|
*/
|
|
template<class Observable, class Value0, class... ValueN>
|
|
static auto start_with(Observable o, Value0 v0, ValueN... vn)
|
|
-> decltype(rxs::start_with(std::move(o), std::move(v0), std::move(vn)...)) {
|
|
return rxs::start_with(std::move(o), std::move(v0), std::move(vn)...);
|
|
}
|
|
|
|
/*! @copydoc rx-empty.hpp
|
|
*/
|
|
template<class T>
|
|
static auto empty()
|
|
-> decltype(from<T>()) {
|
|
return from<T>();
|
|
}
|
|
/*! @copydoc rx-empty.hpp
|
|
*/
|
|
template<class T, class Coordination>
|
|
static auto empty(Coordination cn)
|
|
-> decltype(from<T>(std::move(cn))) {
|
|
return from<T>(std::move(cn));
|
|
}
|
|
|
|
/*! @copydoc rx-error.hpp
|
|
*/
|
|
template<class T, class Exception>
|
|
static auto error(Exception&& e)
|
|
-> decltype(rxs::error<T>(std::forward<Exception>(e))) {
|
|
return rxs::error<T>(std::forward<Exception>(e));
|
|
}
|
|
/*! @copydoc rx-error.hpp
|
|
*/
|
|
template<class T, class Exception, class Coordination>
|
|
static auto error(Exception&& e, Coordination cn)
|
|
-> decltype(rxs::error<T>(std::forward<Exception>(e), std::move(cn))) {
|
|
return rxs::error<T>(std::forward<Exception>(e), std::move(cn));
|
|
}
|
|
|
|
/*! @copydoc rx-scope.hpp
|
|
*/
|
|
template<class ResourceFactory, class ObservableFactory>
|
|
static auto scope(ResourceFactory rf, ObservableFactory of)
|
|
-> decltype(rxs::scope(std::move(rf), std::move(of))) {
|
|
return rxs::scope(std::move(rf), std::move(of));
|
|
}
|
|
};
|
|
|
|
}
|
|
|
|
//
|
|
// support range() >> filter() >> subscribe() syntax
|
|
// '>>' is spelled 'stream'
|
|
//
|
|
template<class T, class SourceOperator, class OperatorFactory>
|
|
auto operator >> (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
|
|
-> decltype(source.op(std::forward<OperatorFactory>(of))) {
|
|
return source.op(std::forward<OperatorFactory>(of));
|
|
}
|
|
|
|
//
|
|
// support range() | filter() | subscribe() syntax
|
|
// '|' is spelled 'pipe'
|
|
//
|
|
template<class T, class SourceOperator, class OperatorFactory>
|
|
auto operator | (const rxcpp::observable<T, SourceOperator>& source, OperatorFactory&& of)
|
|
-> decltype(source.op(std::forward<OperatorFactory>(of))) {
|
|
return source.op(std::forward<OperatorFactory>(of));
|
|
}
|
|
|
|
#endif
|