sled/3party/asyncplusplus/include/async++/when_all_any.h
2024-03-27 23:16:26 +08:00

293 lines
9.8 KiB
C++

// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#ifndef ASYNCXX_H_
# error "Do not include this header directly, include <async++.h> instead."
#endif
namespace async {
// Result type for when_any
template<typename Result>
struct when_any_result {
// Index of the task that finished first
std::size_t index;
// List of tasks that were passed in
Result tasks;
};
namespace detail {
// Shared state for when_all
template<typename Result>
struct when_all_state: public ref_count_base<when_all_state<Result>> {
event_task<Result> event;
Result result;
when_all_state(std::size_t count)
: ref_count_base<when_all_state<Result>>(count) {}
// When all references are dropped, signal the event
~when_all_state()
{
event.set(std::move(result));
}
};
// Execution functions for when_all, for ranges and tuples
template<typename Task, typename Result>
struct when_all_func_range {
std::size_t index;
ref_count_ptr<when_all_state<Result>> state;
when_all_func_range(std::size_t index_, ref_count_ptr<when_all_state<Result>> state_)
: index(index_), state(std::move(state_)) {}
// Copy the completed task object to the shared state. The event is
// automatically signaled when all references are dropped.
void operator()(Task t) const
{
state->result[index] = std::move(t);
}
};
template<std::size_t index, typename Task, typename Result>
struct when_all_func_tuple {
ref_count_ptr<when_all_state<Result>> state;
when_all_func_tuple(ref_count_ptr<when_all_state<Result>> state_)
: state(std::move(state_)) {}
// Copy the completed task object to the shared state. The event is
// automatically signaled when all references are dropped.
void operator()(Task t) const
{
std::get<index>(state->result) = std::move(t);
}
};
// Shared state for when_any
template<typename Result>
struct when_any_state: public ref_count_base<when_any_state<Result>> {
event_task<when_any_result<Result>> event;
Result result;
when_any_state(std::size_t count)
: ref_count_base<when_any_state<Result>>(count) {}
// Signal the event when the first task reaches here
void set(std::size_t i)
{
event.set({i, std::move(result)});
}
};
// Execution function for when_any
template<typename Task, typename Result>
struct when_any_func {
std::size_t index;
ref_count_ptr<when_any_state<Result>> state;
when_any_func(std::size_t index_, ref_count_ptr<when_any_state<Result>> state_)
: index(index_), state(std::move(state_)) {}
// Simply tell the state that our task has finished, it already has a copy
// of the task object.
void operator()(Task) const
{
state->set(index);
}
};
// Internal implementation of when_all for variadic arguments
template<std::size_t index, typename Result>
void when_all_variadic(when_all_state<Result>*) {}
template<std::size_t index, typename Result, typename First, typename... T>
void when_all_variadic(when_all_state<Result>* state, First&& first, T&&... tasks)
{
typedef typename std::decay<First>::type task_type;
// Add a continuation to the task
LIBASYNC_TRY {
first.then(inline_scheduler(), detail::when_all_func_tuple<index, task_type, Result>(detail::ref_count_ptr<detail::when_all_state<Result>>(state)));
} LIBASYNC_CATCH(...) {
// Make sure we don't leak memory if then() throws
state->remove_ref(sizeof...(T));
LIBASYNC_RETHROW();
}
// Add continuations to remaining tasks
detail::when_all_variadic<index + 1>(state, std::forward<T>(tasks)...);
}
// Internal implementation of when_any for variadic arguments
template<std::size_t index, typename Result>
void when_any_variadic(when_any_state<Result>*) {}
template<std::size_t index, typename Result, typename First, typename... T>
void when_any_variadic(when_any_state<Result>* state, First&& first, T&&... tasks)
{
typedef typename std::decay<First>::type task_type;
// Add a copy of the task to the results because the event may be
// set before all tasks have finished.
detail::task_base* t = detail::get_internal_task(first);
t->add_ref();
detail::set_internal_task(std::get<index>(state->result), detail::task_ptr(t));
// Add a continuation to the task
LIBASYNC_TRY {
first.then(inline_scheduler(), detail::when_any_func<task_type, Result>(index, detail::ref_count_ptr<detail::when_any_state<Result>>(state)));
} LIBASYNC_CATCH(...) {
// Make sure we don't leak memory if then() throws
state->remove_ref(sizeof...(T));
LIBASYNC_RETHROW();
}
// Add continuations to remaining tasks
detail::when_any_variadic<index + 1>(state, std::forward<T>(tasks)...);
}
} // namespace detail
// Combine a set of tasks into one task which is signaled when all specified tasks finish
template<typename Iter>
task<std::vector<typename std::decay<typename std::iterator_traits<Iter>::value_type>::type>> when_all(Iter begin, Iter end)
{
typedef typename std::decay<typename std::iterator_traits<Iter>::value_type>::type task_type;
typedef std::vector<task_type> result_type;
// Handle empty ranges
if (begin == end)
return make_task(result_type());
// Create shared state, initialized with the proper reference count
std::size_t count = std::distance(begin, end);
auto* state = new detail::when_all_state<result_type>(count);
state->result.resize(count);
auto out = state->event.get_task();
// Add a continuation to each task to add its result to the shared state
// Last task sets the event result
for (std::size_t i = 0; begin != end; i++, ++begin) {
LIBASYNC_TRY {
(*begin).then(inline_scheduler(), detail::when_all_func_range<task_type, result_type>(i, detail::ref_count_ptr<detail::when_all_state<result_type>>(state)));
} LIBASYNC_CATCH(...) {
// Make sure we don't leak memory if then() throws
state->remove_ref(std::distance(begin, end) - 1);
LIBASYNC_RETHROW();
}
}
return out;
}
// Combine a set of tasks into one task which is signaled when one of the tasks finishes
template<typename Iter>
task<when_any_result<std::vector<typename std::decay<typename std::iterator_traits<Iter>::value_type>::type>>> when_any(Iter begin, Iter end)
{
typedef typename std::decay<typename std::iterator_traits<Iter>::value_type>::type task_type;
typedef std::vector<task_type> result_type;
// Handle empty ranges
if (begin == end)
return make_task(when_any_result<result_type>());
// Create shared state, initialized with the proper reference count
std::size_t count = std::distance(begin, end);
auto* state = new detail::when_any_state<result_type>(count);
state->result.resize(count);
auto out = state->event.get_task();
// Add a continuation to each task to set the event. First one wins.
for (std::size_t i = 0; begin != end; i++, ++begin) {
// Add a copy of the task to the results because the event may be
// set before all tasks have finished.
detail::task_base* t = detail::get_internal_task(*begin);
t->add_ref();
detail::set_internal_task(state->result[i], detail::task_ptr(t));
LIBASYNC_TRY {
(*begin).then(inline_scheduler(), detail::when_any_func<task_type, result_type>(i, detail::ref_count_ptr<detail::when_any_state<result_type>>(state)));
} LIBASYNC_CATCH(...) {
// Make sure we don't leak memory if then() throws
state->remove_ref(std::distance(begin, end) - 1);
LIBASYNC_RETHROW();
}
}
return out;
}
// when_all wrapper accepting ranges
template<typename T>
decltype(async::when_all(std::begin(std::declval<T>()), std::end(std::declval<T>()))) when_all(T&& tasks)
{
return async::when_all(std::begin(std::forward<T>(tasks)), std::end(std::forward<T>(tasks)));
}
// when_any wrapper accepting ranges
template<typename T>
decltype(async::when_any(std::begin(std::declval<T>()), std::end(std::declval<T>()))) when_any(T&& tasks)
{
return async::when_any(std::begin(std::forward<T>(tasks)), std::end(std::forward<T>(tasks)));
}
// when_all with variadic arguments
inline task<std::tuple<>> when_all()
{
return async::make_task(std::tuple<>());
}
template<typename... T>
task<std::tuple<typename std::decay<T>::type...>> when_all(T&&... tasks)
{
typedef std::tuple<typename std::decay<T>::type...> result_type;
// Create shared state
auto state = new detail::when_all_state<result_type>(sizeof...(tasks));
auto out = state->event.get_task();
// Register all the tasks on the event
detail::when_all_variadic<0>(state, std::forward<T>(tasks)...);
return out;
}
// when_any with variadic arguments
inline task<when_any_result<std::tuple<>>> when_any()
{
return async::make_task(when_any_result<std::tuple<>>());
}
template<typename... T>
task<when_any_result<std::tuple<typename std::decay<T>::type...>>> when_any(T&&... tasks)
{
typedef std::tuple<typename std::decay<T>::type...> result_type;
// Create shared state
auto state = new detail::when_any_state<result_type>(sizeof...(tasks));
auto out = state->event.get_task();
// Register all the tasks on the event
detail::when_any_variadic<0>(state, std::forward<T>(tasks)...);
return out;
}
} // namespace async