// Copyright (c) 2015 Amanieu d'Antras // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #ifndef ASYNCXX_H_ # error "Do not include this header directly, include instead." #endif namespace async { // Result type for when_any template struct when_any_result { // Index of the task that finished first std::size_t index; // List of tasks that were passed in Result tasks; }; namespace detail { // Shared state for when_all template struct when_all_state: public ref_count_base> { event_task event; Result result; when_all_state(std::size_t count) : ref_count_base>(count) {} // When all references are dropped, signal the event ~when_all_state() { event.set(std::move(result)); } }; // Execution functions for when_all, for ranges and tuples template struct when_all_func_range { std::size_t index; ref_count_ptr> state; when_all_func_range(std::size_t index_, ref_count_ptr> state_) : index(index_), state(std::move(state_)) {} // Copy the completed task object to the shared state. The event is // automatically signaled when all references are dropped. void operator()(Task t) const { state->result[index] = std::move(t); } }; template struct when_all_func_tuple { ref_count_ptr> state; when_all_func_tuple(ref_count_ptr> state_) : state(std::move(state_)) {} // Copy the completed task object to the shared state. The event is // automatically signaled when all references are dropped. void operator()(Task t) const { std::get(state->result) = std::move(t); } }; // Shared state for when_any template struct when_any_state: public ref_count_base> { event_task> event; Result result; when_any_state(std::size_t count) : ref_count_base>(count) {} // Signal the event when the first task reaches here void set(std::size_t i) { event.set({i, std::move(result)}); } }; // Execution function for when_any template struct when_any_func { std::size_t index; ref_count_ptr> state; when_any_func(std::size_t index_, ref_count_ptr> state_) : index(index_), state(std::move(state_)) {} // Simply tell the state that our task has finished, it already has a copy // of the task object. void operator()(Task) const { state->set(index); } }; // Internal implementation of when_all for variadic arguments template void when_all_variadic(when_all_state*) {} template void when_all_variadic(when_all_state* state, First&& first, T&&... tasks) { typedef typename std::decay::type task_type; // Add a continuation to the task LIBASYNC_TRY { first.then(inline_scheduler(), detail::when_all_func_tuple(detail::ref_count_ptr>(state))); } LIBASYNC_CATCH(...) { // Make sure we don't leak memory if then() throws state->remove_ref(sizeof...(T)); LIBASYNC_RETHROW(); } // Add continuations to remaining tasks detail::when_all_variadic(state, std::forward(tasks)...); } // Internal implementation of when_any for variadic arguments template void when_any_variadic(when_any_state*) {} template void when_any_variadic(when_any_state* state, First&& first, T&&... tasks) { typedef typename std::decay::type task_type; // Add a copy of the task to the results because the event may be // set before all tasks have finished. detail::task_base* t = detail::get_internal_task(first); t->add_ref(); detail::set_internal_task(std::get(state->result), detail::task_ptr(t)); // Add a continuation to the task LIBASYNC_TRY { first.then(inline_scheduler(), detail::when_any_func(index, detail::ref_count_ptr>(state))); } LIBASYNC_CATCH(...) { // Make sure we don't leak memory if then() throws state->remove_ref(sizeof...(T)); LIBASYNC_RETHROW(); } // Add continuations to remaining tasks detail::when_any_variadic(state, std::forward(tasks)...); } } // namespace detail // Combine a set of tasks into one task which is signaled when all specified tasks finish template task::value_type>::type>> when_all(Iter begin, Iter end) { typedef typename std::decay::value_type>::type task_type; typedef std::vector result_type; // Handle empty ranges if (begin == end) return make_task(result_type()); // Create shared state, initialized with the proper reference count std::size_t count = std::distance(begin, end); auto* state = new detail::when_all_state(count); state->result.resize(count); auto out = state->event.get_task(); // Add a continuation to each task to add its result to the shared state // Last task sets the event result for (std::size_t i = 0; begin != end; i++, ++begin) { LIBASYNC_TRY { (*begin).then(inline_scheduler(), detail::when_all_func_range(i, detail::ref_count_ptr>(state))); } LIBASYNC_CATCH(...) { // Make sure we don't leak memory if then() throws state->remove_ref(std::distance(begin, end) - 1); LIBASYNC_RETHROW(); } } return out; } // Combine a set of tasks into one task which is signaled when one of the tasks finishes template task::value_type>::type>>> when_any(Iter begin, Iter end) { typedef typename std::decay::value_type>::type task_type; typedef std::vector result_type; // Handle empty ranges if (begin == end) return make_task(when_any_result()); // Create shared state, initialized with the proper reference count std::size_t count = std::distance(begin, end); auto* state = new detail::when_any_state(count); state->result.resize(count); auto out = state->event.get_task(); // Add a continuation to each task to set the event. First one wins. for (std::size_t i = 0; begin != end; i++, ++begin) { // Add a copy of the task to the results because the event may be // set before all tasks have finished. detail::task_base* t = detail::get_internal_task(*begin); t->add_ref(); detail::set_internal_task(state->result[i], detail::task_ptr(t)); LIBASYNC_TRY { (*begin).then(inline_scheduler(), detail::when_any_func(i, detail::ref_count_ptr>(state))); } LIBASYNC_CATCH(...) { // Make sure we don't leak memory if then() throws state->remove_ref(std::distance(begin, end) - 1); LIBASYNC_RETHROW(); } } return out; } // when_all wrapper accepting ranges template decltype(async::when_all(std::begin(std::declval()), std::end(std::declval()))) when_all(T&& tasks) { return async::when_all(std::begin(std::forward(tasks)), std::end(std::forward(tasks))); } // when_any wrapper accepting ranges template decltype(async::when_any(std::begin(std::declval()), std::end(std::declval()))) when_any(T&& tasks) { return async::when_any(std::begin(std::forward(tasks)), std::end(std::forward(tasks))); } // when_all with variadic arguments inline task> when_all() { return async::make_task(std::tuple<>()); } template task::type...>> when_all(T&&... tasks) { typedef std::tuple::type...> result_type; // Create shared state auto state = new detail::when_all_state(sizeof...(tasks)); auto out = state->event.get_task(); // Register all the tasks on the event detail::when_all_variadic<0>(state, std::forward(tasks)...); return out; } // when_any with variadic arguments inline task>> when_any() { return async::make_task(when_any_result>()); } template task::type...>>> when_any(T&&... tasks) { typedef std::tuple::type...> result_type; // Create shared state auto state = new detail::when_any_state(sizeof...(tasks)); auto out = state->event.get_task(); // Register all the tasks on the event detail::when_any_variadic<0>(state, std::forward(tasks)...); return out; } } // namespace async