288 lines
8.9 KiB
C++
288 lines
8.9 KiB
C++
#include "../test.h"
|
|
|
|
namespace detail {
|
|
|
|
template<class Predicate>
|
|
struct liftfilter
|
|
{
|
|
typedef typename std::decay<Predicate>::type test_type;
|
|
test_type test;
|
|
|
|
liftfilter(test_type t)
|
|
: test(t)
|
|
{
|
|
}
|
|
|
|
template<class Subscriber>
|
|
struct filter_observer : public rx::observer_base<typename std::decay<Subscriber>::type::value_type>
|
|
{
|
|
typedef filter_observer<Subscriber> this_type;
|
|
typedef rx::observer_base<typename std::decay<Subscriber>::type::value_type> base_type;
|
|
typedef typename base_type::value_type value_type;
|
|
typedef typename std::decay<Subscriber>::type dest_type;
|
|
typedef rx::observer<value_type, this_type> observer_type;
|
|
dest_type dest;
|
|
test_type test;
|
|
|
|
filter_observer(dest_type d, test_type t)
|
|
: dest(d)
|
|
, test(t)
|
|
{
|
|
}
|
|
void on_next(typename dest_type::value_type v) const {
|
|
bool filtered = false;
|
|
RXCPP_TRY {
|
|
filtered = !test(v);
|
|
} RXCPP_CATCH(...) {
|
|
dest.on_error(rxu::current_exception());
|
|
return;
|
|
}
|
|
if (!filtered) {
|
|
dest.on_next(v);
|
|
}
|
|
}
|
|
void on_error(rxu::error_ptr e) const {
|
|
dest.on_error(e);
|
|
}
|
|
void on_completed() const {
|
|
dest.on_completed();
|
|
}
|
|
|
|
static rx::subscriber<value_type, observer_type> make(const dest_type& d, const test_type& t) {
|
|
return rx::make_subscriber<value_type>(d, observer_type(this_type(d, t)));
|
|
}
|
|
};
|
|
|
|
template<class Subscriber>
|
|
auto operator()(const Subscriber& dest) const
|
|
-> decltype(filter_observer<Subscriber>::make(dest, test)) {
|
|
return filter_observer<Subscriber>::make(dest, test);
|
|
}
|
|
};
|
|
|
|
}
|
|
|
|
namespace {
|
|
|
|
template<class Predicate>
|
|
auto liftfilter(Predicate&& p)
|
|
-> detail::liftfilter<typename std::decay<Predicate>::type> {
|
|
return detail::liftfilter<typename std::decay<Predicate>::type>(std::forward<Predicate>(p));
|
|
}
|
|
|
|
bool IsPrime(int x)
|
|
{
|
|
if (x < 2) return false;
|
|
for (int i = 2; i <= x/2; ++i)
|
|
{
|
|
if (x % i == 0)
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
}
|
|
|
|
SCENARIO("lift liftfilter stops on disposal", "[where][filter][lift][operators]"){
|
|
GIVEN("a test hot observable of ints"){
|
|
auto sc = rxsc::make_test();
|
|
auto w = sc.create_worker();
|
|
const rxsc::test::messages<int> on;
|
|
|
|
long invoked = 0;
|
|
|
|
auto xs = sc.make_hot_observable({
|
|
on.next(110, 1),
|
|
on.next(180, 2),
|
|
on.next(230, 3),
|
|
on.next(270, 4),
|
|
on.next(340, 5),
|
|
on.next(380, 6),
|
|
on.next(390, 7),
|
|
on.next(450, 8),
|
|
on.next(470, 9),
|
|
on.next(560, 10),
|
|
on.next(580, 11),
|
|
on.completed(600)
|
|
});
|
|
|
|
WHEN("filtered to ints that are primes"){
|
|
|
|
auto res = w.start(
|
|
[&xs, &invoked]() {
|
|
return xs
|
|
.lift<int>(liftfilter([&invoked](int x) {
|
|
invoked++;
|
|
return IsPrime(x);
|
|
}))
|
|
// forget type to workaround lambda deduction bug on msvc 2013
|
|
.as_dynamic();
|
|
},
|
|
400
|
|
);
|
|
|
|
THEN("the output only contains primes that arrived before disposal"){
|
|
auto required = rxu::to_vector({
|
|
on.next(230, 3),
|
|
on.next(340, 5),
|
|
on.next(390, 7)
|
|
});
|
|
auto actual = res.get_observer().messages();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("there was one subscription and one unsubscription"){
|
|
auto required = rxu::to_vector({
|
|
on.subscribe(200, 400)
|
|
});
|
|
auto actual = xs.subscriptions();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("where was called until disposed"){
|
|
REQUIRE(5 == invoked);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
SCENARIO("stream lift liftfilter stops on disposal", "[where][filter][lift][stream][operators]"){
|
|
GIVEN("a test hot observable of ints"){
|
|
auto sc = rxsc::make_test();
|
|
auto w = sc.create_worker();
|
|
const rxsc::test::messages<int> on;
|
|
|
|
long invoked = 0;
|
|
|
|
auto xs = sc.make_hot_observable({
|
|
on.next(110, 1),
|
|
on.next(180, 2),
|
|
on.next(230, 3),
|
|
on.next(270, 4),
|
|
on.next(340, 5),
|
|
on.next(380, 6),
|
|
on.next(390, 7),
|
|
on.next(450, 8),
|
|
on.next(470, 9),
|
|
on.next(560, 10),
|
|
on.next(580, 11),
|
|
on.completed(600)
|
|
});
|
|
|
|
WHEN("filtered to ints that are primes"){
|
|
|
|
auto res = w.start(
|
|
[&xs, &invoked]() {
|
|
return xs
|
|
>> rxo::lift<int>(liftfilter([&invoked](int x) {
|
|
invoked++;
|
|
return IsPrime(x);
|
|
}))
|
|
// forget type to workaround lambda deduction bug on msvc 2013
|
|
>> rxo::as_dynamic();
|
|
},
|
|
400
|
|
);
|
|
|
|
THEN("the output only contains primes that arrived before disposal"){
|
|
auto required = rxu::to_vector({
|
|
on.next(230, 3),
|
|
on.next(340, 5),
|
|
on.next(390, 7)
|
|
});
|
|
auto actual = res.get_observer().messages();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("there was one subscription and one unsubscription"){
|
|
auto required = rxu::to_vector({
|
|
on.subscribe(200, 400)
|
|
});
|
|
auto actual = xs.subscriptions();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("where was called until disposed"){
|
|
REQUIRE(5 == invoked);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
SCENARIO("lift lambda filter stops on disposal", "[where][filter][lift][lambda][operators]"){
|
|
GIVEN("a test hot observable of ints"){
|
|
auto sc = rxsc::make_test();
|
|
auto w = sc.create_worker();
|
|
const rxsc::test::messages<int> on;
|
|
|
|
long invoked = 0;
|
|
|
|
auto xs = sc.make_hot_observable({
|
|
on.next(110, 1),
|
|
on.next(180, 2),
|
|
on.next(230, 3),
|
|
on.next(270, 4),
|
|
on.next(340, 5),
|
|
on.next(380, 6),
|
|
on.next(390, 7),
|
|
on.next(450, 8),
|
|
on.next(470, 9),
|
|
on.next(560, 10),
|
|
on.next(580, 11),
|
|
on.completed(600)
|
|
});
|
|
|
|
WHEN("filtered to ints that are primes"){
|
|
|
|
auto res = w.start(
|
|
[&xs, &invoked]() {
|
|
auto predicate = [&](int x){
|
|
invoked++;
|
|
return IsPrime(x);
|
|
};
|
|
return xs
|
|
.lift<int>([=](rx::subscriber<int> dest){
|
|
// VS2013 deduction issue requires dynamic (type-forgetting)
|
|
return rx::make_subscriber<int>(
|
|
dest,
|
|
rx::make_observer_dynamic<int>(
|
|
[=](int n){
|
|
bool pass = false;
|
|
RXCPP_TRY {pass = predicate(n);} RXCPP_CATCH(...){dest.on_error(rxu::current_exception());};
|
|
if (pass) {dest.on_next(n);}
|
|
},
|
|
[=](rxu::error_ptr e){dest.on_error(e);},
|
|
[=](){dest.on_completed();}));
|
|
})
|
|
// forget type to workaround lambda deduction bug on msvc 2013
|
|
.as_dynamic();
|
|
},
|
|
400
|
|
);
|
|
|
|
THEN("the output only contains primes that arrived before disposal"){
|
|
auto required = rxu::to_vector({
|
|
on.next(230, 3),
|
|
on.next(340, 5),
|
|
on.next(390, 7)
|
|
});
|
|
auto actual = res.get_observer().messages();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("there was one subscription and one unsubscription"){
|
|
auto required = rxu::to_vector({
|
|
on.subscribe(200, 400)
|
|
});
|
|
auto actual = xs.subscriptions();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("where was called until disposed"){
|
|
REQUIRE(5 == invoked);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|