134 lines
4.2 KiB
C++
134 lines
4.2 KiB
C++
|
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
|
||
|
|
||
|
#pragma once
|
||
|
|
||
|
#if !defined(RXCPP_SOURCES_RX_ERROR_HPP)
|
||
|
#define RXCPP_SOURCES_RX_ERROR_HPP
|
||
|
|
||
|
#include "../rx-includes.hpp"
|
||
|
|
||
|
/*! \file rx-error.hpp
|
||
|
|
||
|
\brief Returns an observable that sends no items to observer and immediately generates an error, on the specified scheduler.
|
||
|
|
||
|
\tparam T the type of (not) emitted items
|
||
|
\tparam Exception the type of the error
|
||
|
\tparam Coordination the type of the scheduler (optional)
|
||
|
|
||
|
\param e the error to be passed to observers
|
||
|
\param cn the scheduler to use for scheduling the items (optional)
|
||
|
|
||
|
\return Observable that sends no items to observer and immediately generates an error.
|
||
|
|
||
|
\sample
|
||
|
\snippet error.cpp error sample
|
||
|
\snippet output.txt error sample
|
||
|
|
||
|
\sample
|
||
|
\snippet error.cpp threaded error sample
|
||
|
\snippet output.txt threaded error sample
|
||
|
*/
|
||
|
|
||
|
namespace rxcpp {
|
||
|
|
||
|
namespace sources {
|
||
|
|
||
|
namespace detail {
|
||
|
|
||
|
template<class T, class Coordination>
|
||
|
struct error : public source_base<T>
|
||
|
{
|
||
|
typedef error<T, Coordination> this_type;
|
||
|
|
||
|
typedef rxu::decay_t<Coordination> coordination_type;
|
||
|
|
||
|
typedef typename coordination_type::coordinator_type coordinator_type;
|
||
|
|
||
|
struct error_initial_type
|
||
|
{
|
||
|
error_initial_type(rxu::error_ptr e, coordination_type cn)
|
||
|
: exception(e)
|
||
|
, coordination(std::move(cn))
|
||
|
{
|
||
|
}
|
||
|
rxu::error_ptr exception;
|
||
|
coordination_type coordination;
|
||
|
};
|
||
|
error_initial_type initial;
|
||
|
|
||
|
error(rxu::error_ptr e, coordination_type cn)
|
||
|
: initial(e, std::move(cn))
|
||
|
{
|
||
|
}
|
||
|
|
||
|
template<class Subscriber>
|
||
|
void on_subscribe(Subscriber o) const {
|
||
|
|
||
|
// creates a worker whose lifetime is the same as this subscription
|
||
|
auto coordinator = initial.coordination.create_coordinator(o.get_subscription());
|
||
|
auto controller = coordinator.get_worker();
|
||
|
auto exception = initial.exception;
|
||
|
|
||
|
auto producer = [=](const rxsc::schedulable&){
|
||
|
auto& dest = o;
|
||
|
if (!dest.is_subscribed()) {
|
||
|
// terminate loop
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
dest.on_error(exception);
|
||
|
// o is unsubscribed
|
||
|
};
|
||
|
auto selectedProducer = on_exception(
|
||
|
[&](){return coordinator.act(producer);},
|
||
|
o);
|
||
|
if (selectedProducer.empty()) {
|
||
|
return;
|
||
|
}
|
||
|
controller.schedule(selectedProducer.get());
|
||
|
}
|
||
|
};
|
||
|
|
||
|
struct throw_ptr_tag{};
|
||
|
struct throw_instance_tag{};
|
||
|
|
||
|
template <class T, class Coordination>
|
||
|
auto make_error(throw_ptr_tag&&, rxu::error_ptr exception, Coordination cn)
|
||
|
-> observable<T, error<T, Coordination>> {
|
||
|
return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(exception), std::move(cn)));
|
||
|
}
|
||
|
|
||
|
template <class T, class E, class Coordination>
|
||
|
auto make_error(throw_instance_tag&&, E e, Coordination cn)
|
||
|
-> observable<T, error<T, Coordination>> {
|
||
|
rxu::error_ptr ep = rxu::make_error_ptr(e);
|
||
|
return observable<T, error<T, Coordination>>(error<T, Coordination>(std::move(ep), std::move(cn)));
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
namespace sources {
|
||
|
|
||
|
/*! @copydoc rx-error.hpp
|
||
|
*/
|
||
|
template<class T, class E>
|
||
|
auto error(E e)
|
||
|
-> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate())) {
|
||
|
return detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), identity_immediate());
|
||
|
}
|
||
|
/*! @copydoc rx-error.hpp
|
||
|
*/
|
||
|
template<class T, class E, class Coordination>
|
||
|
auto error(E e, Coordination cn)
|
||
|
-> decltype(detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn))) {
|
||
|
return detail::make_error<T>(typename std::conditional<std::is_same<rxu::error_ptr, rxu::decay_t<E>>::value, detail::throw_ptr_tag, detail::throw_instance_tag>::type(), std::move(e), std::move(cn));
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
}
|
||
|
|
||
|
#endif
|