432 lines
16 KiB
C++
432 lines
16 KiB
C++
#include "../test.h"
|
|
#include <rxcpp/operators/rx-concat.hpp>
|
|
#include <rxcpp/operators/rx-group_by.hpp>
|
|
#include <rxcpp/operators/rx-reduce.hpp>
|
|
#include <rxcpp/operators/rx-map.hpp>
|
|
#include <rxcpp/operators/rx-merge.hpp>
|
|
#include <rxcpp/operators/rx-take.hpp>
|
|
#include <rxcpp/operators/rx-start_with.hpp>
|
|
#include <rxcpp/operators/rx-observe_on.hpp>
|
|
|
|
#include <locale>
|
|
#include <sstream>
|
|
|
|
SCENARIO("range partitioned by group_by across hardware threads to derive pi", "[!hide][pi][group_by][observe_on][long][perf]"){
|
|
GIVEN("a for loop"){
|
|
WHEN("partitioning pi series across all hardware threads"){
|
|
|
|
std::atomic_int c;
|
|
c = 0;
|
|
auto pi = [&](int k) {
|
|
++c;
|
|
return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
|
|
};
|
|
|
|
using namespace std::chrono;
|
|
auto start = steady_clock::now();
|
|
|
|
// share an output thread across all the producer threads
|
|
auto outputthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
|
|
|
|
struct work
|
|
{
|
|
int index;
|
|
int first;
|
|
int last;
|
|
};
|
|
|
|
// use all available hardware threads
|
|
auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1).
|
|
map(
|
|
[](int index){
|
|
static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency());
|
|
int first = (chunk * index) + 1;
|
|
int last = chunk * (index + 1);
|
|
return work{index, first, last};}
|
|
).
|
|
group_by(
|
|
[](work w) -> int {return w.index % std::thread::hardware_concurrency();},
|
|
[](work w){return w;}).
|
|
map(
|
|
[=](rxcpp::grouped_observable<int, work> onproc) {
|
|
auto key = onproc.get_key();
|
|
// share a producer thread across all the ranges in this group of chunks
|
|
auto producerthread = rxcpp::observe_on_one_worker(rxcpp::observe_on_new_thread().create_coordinator().get_scheduler());
|
|
return onproc.
|
|
map(
|
|
[=](work w){
|
|
std::stringstream message;
|
|
message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last;
|
|
|
|
return rxcpp::observable<>::range(w.first, w.last, producerthread).
|
|
map(pi).
|
|
sum(). // each thread maps and reduces its contribution to the answer
|
|
map(
|
|
[=](long double v){
|
|
std::stringstream message;
|
|
message << key << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
|
|
return std::make_tuple(message.str(), v);
|
|
}).
|
|
start_with(std::make_tuple(message.str(), 0.0L)).
|
|
as_dynamic();
|
|
}).
|
|
concat(). // only subscribe to one range at a time in this group.
|
|
observe_on(outputthread).
|
|
map(rxcpp::util::apply_to(
|
|
[](std::string message, long double v){
|
|
std::cout << message << std::endl;
|
|
return v;
|
|
})).
|
|
as_dynamic();
|
|
}).
|
|
merge().
|
|
sum(). // reduces the contributions from all the threads to the answer
|
|
as_blocking().
|
|
last();
|
|
|
|
std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
|
|
auto finish = steady_clock::now();
|
|
auto msElapsed = duration_cast<milliseconds>(finish-start);
|
|
std::cout << "pi using group_by and concat to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
SCENARIO("range partitioned by dividing work across hardware threads to derive pi", "[!hide][pi][observe_on][long][perf]"){
|
|
GIVEN("a for loop"){
|
|
WHEN("partitioning pi series across all hardware threads"){
|
|
|
|
std::atomic_int c;
|
|
c = 0;
|
|
auto pi = [&](int k) {
|
|
++c;
|
|
return ( k % 2 == 0 ? -4.0L : 4.0L ) / ( ( 2.0L * k ) - 1.0L );
|
|
};
|
|
|
|
using namespace std::chrono;
|
|
auto start = steady_clock::now();
|
|
|
|
struct work
|
|
{
|
|
int index;
|
|
int first;
|
|
int last;
|
|
};
|
|
|
|
// use all available hardware threads
|
|
auto total = rxcpp::observable<>::range(0, (2 * std::thread::hardware_concurrency()) - 1).
|
|
map(
|
|
[](int index){
|
|
static const int chunk = 100000000 / (2 * std::thread::hardware_concurrency());
|
|
int first = (chunk * index) + 1;
|
|
int last = chunk * (index + 1);
|
|
return work{index, first, last};
|
|
}).
|
|
map(
|
|
[=](work w){
|
|
std::stringstream message;
|
|
message << std::setw(3) << w.index << ": range - " << w.first << "-" << w.last;
|
|
|
|
// create a new thread for every chunk
|
|
return rxcpp::observable<>::range(w.first, w.last, rxcpp::observe_on_new_thread()).
|
|
map(pi).
|
|
sum(). // each thread maps and reduces its contribution to the answer
|
|
map(
|
|
[=](long double v){
|
|
std::stringstream message;
|
|
message << w.index << " on " << std::this_thread::get_id() << " - value: " << std::setprecision(16) << v;
|
|
return std::make_tuple(message.str(), v);
|
|
}).
|
|
start_with(std::make_tuple(message.str(), 0.0L)).
|
|
as_dynamic();
|
|
}).
|
|
merge(rxcpp::observe_on_new_thread()).
|
|
map(rxcpp::util::apply_to(
|
|
[](std::string message, long double v){
|
|
std::cout << message << std::endl;
|
|
return v;
|
|
})).
|
|
sum(). // reduces the contributions from all the threads to the answer
|
|
as_blocking().
|
|
last();
|
|
|
|
std::cout << std::setprecision(16) << "Pi: " << total << std::endl;
|
|
auto finish = steady_clock::now();
|
|
auto msElapsed = duration_cast<milliseconds>(finish-start);
|
|
std::cout << "pi using division of the whole range to partition the work : " << c << " calls to pi(), " << msElapsed.count() << "ms elapsed, " << c / (msElapsed.count() / 1000.0) << " ops/sec" << std::endl;
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
char whitespace(char c) {
|
|
return std::isspace<char>(c, std::locale::classic());
|
|
}
|
|
|
|
std::string trim(std::string s) {
|
|
auto first = std::find_if_not(s.begin(), s.end(), whitespace);
|
|
auto last = std::find_if_not(s.rbegin(), s.rend(), whitespace);
|
|
if (last != s.rend()) {
|
|
s.erase(s.end() - (last-s.rbegin()), s.end());
|
|
}
|
|
s.erase(s.begin(), first);
|
|
return s;
|
|
}
|
|
|
|
bool tolowerLess(char lhs, char rhs) {
|
|
return std::tolower(lhs, std::locale::classic()) < std::tolower(rhs, std::locale::classic());
|
|
}
|
|
|
|
bool tolowerStringLess(const std::string& lhs, const std::string& rhs) {
|
|
return std::lexicographical_compare(lhs.begin(), lhs.end(), rhs.begin(), rhs.end(), tolowerLess);
|
|
}
|
|
|
|
SCENARIO("group_by", "[group_by][operators]"){
|
|
GIVEN("1 hot observable of ints."){
|
|
auto sc = rxsc::make_test();
|
|
auto w = sc.create_worker();
|
|
const rxsc::test::messages<std::string> on;
|
|
int keyInvoked = 0;
|
|
int marbleInvoked = 0;
|
|
|
|
auto xs = sc.make_hot_observable({
|
|
on.next(90, "error"),
|
|
on.next(110, "error"),
|
|
on.next(130, "error"),
|
|
on.next(220, " foo"),
|
|
on.next(240, " FoO "),
|
|
on.next(270, "baR "),
|
|
on.next(310, "foO "),
|
|
on.next(350, " Baz "),
|
|
on.next(360, " qux "),
|
|
on.next(390, " bar"),
|
|
on.next(420, " BAR "),
|
|
on.next(470, "FOO "),
|
|
on.next(480, "baz "),
|
|
on.next(510, " bAZ "),
|
|
on.next(530, " fOo "),
|
|
on.completed(570),
|
|
on.next(580, "error"),
|
|
on.completed(600),
|
|
on.error(650, std::runtime_error("error in completed sequence"))
|
|
});
|
|
|
|
WHEN("group normalized strings"){
|
|
|
|
auto res = w.start(
|
|
[&]() {
|
|
return xs
|
|
.group_by(
|
|
[&](std::string v){
|
|
++keyInvoked;
|
|
return trim(std::move(v));
|
|
},
|
|
[&](std::string v){
|
|
++marbleInvoked;
|
|
std::reverse(v.begin(), v.end());
|
|
return v;
|
|
},
|
|
tolowerStringLess)
|
|
.map([](const rxcpp::grouped_observable<std::string, std::string>& g){return g.get_key();})
|
|
// forget type to workaround lambda deduction bug on msvc 2013
|
|
.as_dynamic();
|
|
}
|
|
);
|
|
|
|
THEN("the output contains groups of group keys"){
|
|
auto required = rxu::to_vector({
|
|
on.next(220, "foo"),
|
|
on.next(270, "baR"),
|
|
on.next(350, "Baz"),
|
|
on.next(360, "qux"),
|
|
on.completed(570)
|
|
});
|
|
auto actual = res.get_observer().messages();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("there was one subscription and one unsubscription to the xs"){
|
|
auto required = rxu::to_vector({
|
|
on.subscribe(200, 570)
|
|
});
|
|
auto actual = xs.subscriptions();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("key selector was invoked for each value"){
|
|
REQUIRE(12 == keyInvoked);
|
|
}
|
|
|
|
THEN("marble selector was invoked for each value"){
|
|
REQUIRE(12 == marbleInvoked);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
SCENARIO("group_by take 1", "[group_by][take][operators]"){
|
|
GIVEN("1 hot observable of ints."){
|
|
auto sc = rxsc::make_test();
|
|
auto w = sc.create_worker();
|
|
const rxsc::test::messages<long> on;
|
|
int keyInvoked = 0;
|
|
int marbleInvoked = 0;
|
|
int groupEmitted = 0;
|
|
|
|
auto xs = sc.make_hot_observable({
|
|
on.next(130, -1),
|
|
on.next(220, 0),
|
|
on.next(240, -1),
|
|
on.next(270, 2),
|
|
on.next(310, -3),
|
|
on.next(350, 4),
|
|
on.next(360, -5),
|
|
on.next(390, 6),
|
|
on.next(420, -7),
|
|
on.next(470, 8),
|
|
on.next(480, -9),
|
|
on.completed(570)
|
|
});
|
|
|
|
WHEN("1 group of ints is emitted"){
|
|
|
|
auto res = w.start(
|
|
[&]() {
|
|
return xs
|
|
| rxo::group_by(
|
|
[&](long v) {
|
|
++keyInvoked;
|
|
return v % 2;
|
|
},
|
|
[&](long v){
|
|
++marbleInvoked;
|
|
return v;
|
|
})
|
|
| rxo::take(1)
|
|
| rxo::map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> {
|
|
++groupEmitted;
|
|
return g;
|
|
})
|
|
| rxo::merge()
|
|
// forget type to workaround lambda deduction bug on msvc 2013
|
|
| rxo::as_dynamic();
|
|
}
|
|
);
|
|
|
|
THEN("the output contains groups of ints"){
|
|
auto required = rxu::to_vector({
|
|
on.next(220, 0),
|
|
on.next(270, 2),
|
|
on.next(350, 4),
|
|
on.next(390, 6),
|
|
on.next(470, 8),
|
|
on.completed(570)
|
|
});
|
|
auto actual = res.get_observer().messages();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("there was one subscription and one unsubscription to the xs"){
|
|
auto required = rxu::to_vector({
|
|
on.subscribe(200, 570)
|
|
});
|
|
auto actual = xs.subscriptions();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("key selector was invoked for each value"){
|
|
REQUIRE(10 == keyInvoked);
|
|
}
|
|
|
|
THEN("marble selector was invoked for each value"){
|
|
REQUIRE(5 == marbleInvoked);
|
|
}
|
|
|
|
THEN("1 group emitted"){
|
|
REQUIRE(1 == groupEmitted);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
SCENARIO("group_by take 1 take 4", "[group_by][take][operators]"){
|
|
GIVEN("1 hot observable of ints."){
|
|
auto sc = rxsc::make_test();
|
|
auto w = sc.create_worker();
|
|
const rxsc::test::messages<long> on;
|
|
int keyInvoked = 0;
|
|
int marbleInvoked = 0;
|
|
int groupEmitted = 0;
|
|
|
|
auto xs = sc.make_hot_observable({
|
|
on.next(130, -1),
|
|
on.next(220, 0),
|
|
on.next(240, -1),
|
|
on.next(270, 2),
|
|
on.next(310, -3),
|
|
on.next(350, 4),
|
|
on.next(360, -5),
|
|
on.next(390, 6),
|
|
on.next(420, -7),
|
|
});
|
|
|
|
WHEN("1 group of ints is emitted"){
|
|
|
|
auto res = w.start(
|
|
[&]() {
|
|
return xs
|
|
.group_by(
|
|
[&](long v) {
|
|
++keyInvoked;
|
|
return v % 2;
|
|
},
|
|
[&](long v){
|
|
++marbleInvoked;
|
|
return v;
|
|
})
|
|
.take(1)
|
|
.map([&](const rxcpp::grouped_observable<long, long>& g) -> rxcpp::observable<long> {
|
|
++groupEmitted;
|
|
return g.take(4);
|
|
})
|
|
.merge()
|
|
// forget type to workaround lambda deduction bug on msvc 2013
|
|
.as_dynamic();
|
|
}
|
|
);
|
|
|
|
THEN("the output contains groups of ints"){
|
|
auto required = rxu::to_vector({
|
|
on.next(220, 0),
|
|
on.next(270, 2),
|
|
on.next(350, 4),
|
|
on.next(390, 6),
|
|
on.completed(390)
|
|
});
|
|
auto actual = res.get_observer().messages();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("there was one subscription and one unsubscription to the xs"){
|
|
auto required = rxu::to_vector({
|
|
on.subscribe(200, 390)
|
|
});
|
|
auto actual = xs.subscriptions();
|
|
REQUIRE(required == actual);
|
|
}
|
|
|
|
THEN("key selector was invoked for each value"){
|
|
REQUIRE(7 == keyInvoked);
|
|
}
|
|
|
|
THEN("marble selector was invoked for each value"){
|
|
REQUIRE(4 == marbleInvoked);
|
|
}
|
|
|
|
THEN("1 group emitted"){
|
|
REQUIRE(1 == groupEmitted);
|
|
}
|
|
}
|
|
}
|
|
} |