// Copyright (c) 2015 Amanieu d'Antras // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. #ifndef ASYNCXX_H_ # error "Do not include this header directly, include instead." #endif namespace async { namespace detail { // Task states enum class task_state: unsigned char { pending, // Task has not completed yet locked, // Task is locked (used by event_task to prevent double set) unwrapped, // Task is waiting for an unwrapped task to finish completed, // Task has finished execution and a result is available canceled // Task has been canceled and an exception is available }; // Determine whether a task is in a final state inline bool is_finished(task_state s) { return s == task_state::completed || s == task_state::canceled; } // Virtual function table used to allow dynamic dispatch for task objects. // While this is very similar to what a compiler would generate with virtual // functions, this scheme was found to result in significantly smaller // generated code size. struct task_base_vtable { // Destroy the function and result void (*destroy)(task_base*) LIBASYNC_NOEXCEPT; // Run the associated function void (*run)(task_base*) LIBASYNC_NOEXCEPT; // Cancel the task with an exception void (*cancel)(task_base*, std::exception_ptr&&) LIBASYNC_NOEXCEPT; // Schedule the task using its scheduler void (*schedule)(task_base* parent, task_ptr t); }; // Type-generic base task object struct task_base_deleter; struct LIBASYNC_CACHELINE_ALIGN task_base: public ref_count_base { // Task state std::atomic state; // Whether get_task() was already called on an event_task bool event_task_got_task; // Vector of continuations continuation_vector continuations; // Virtual function table used for dynamic dispatch const task_base_vtable* vtable; // Use aligned memory allocation static void* operator new(std::size_t size) { return aligned_alloc(size, LIBASYNC_CACHELINE_SIZE); } static void operator delete(void* ptr) { aligned_free(ptr); } // Initialize task state task_base() : state(task_state::pending) {} // Check whether the task is ready and include an acquire barrier if it is bool ready() const { return is_finished(state.load(std::memory_order_acquire)); } // Run a single continuation template void run_continuation(Sched& sched, task_ptr&& cont) { LIBASYNC_TRY { detail::schedule_task(sched, cont); } LIBASYNC_CATCH(...) { // This is suboptimal, but better than letting the exception leak cont->vtable->cancel(cont.get(), std::current_exception()); } } // Run all of the task's continuations after it has completed or canceled. // The list of continuations is emptied and locked to prevent any further // continuations from being added. void run_continuations() { continuations.flush_and_lock([this](task_ptr t) { const task_base_vtable* vtable_ptr = t->vtable; vtable_ptr->schedule(this, std::move(t)); }); } // Add a continuation to this task template void add_continuation(Sched& sched, task_ptr cont) { // Check for task completion task_state current_state = state.load(std::memory_order_relaxed); if (!is_finished(current_state)) { // Try to add the task to the continuation list. This can fail only // if the task has just finished, in which case we run it directly. if (continuations.try_add(std::move(cont))) return; } // Otherwise run the continuation directly std::atomic_thread_fence(std::memory_order_acquire); run_continuation(sched, std::move(cont)); } // Finish the task after it has been executed and the result set void finish() { state.store(task_state::completed, std::memory_order_release); run_continuations(); } // Wait for the task to finish executing task_state wait() { task_state s = state.load(std::memory_order_acquire); if (!is_finished(s)) { wait_for_task(this); s = state.load(std::memory_order_relaxed); } return s; } }; // Deleter for task_ptr struct task_base_deleter { static void do_delete(task_base* p) { // Go through the vtable to delete p with its proper type p->vtable->destroy(p); } }; // Result type-specific task object template struct task_result_holder: public task_base { union { alignas(Result) std::uint8_t result[sizeof(Result)]; alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)]; // Scheduler that should be used to schedule this task. The scheduler // type has been erased and is held by vtable->schedule. void* sched; }; template void set_result(T&& t) { new(&result) Result(std::forward(t)); } // Return a result using an lvalue or rvalue reference depending on the task // type. The task parameter is not used, it is just there for overload resolution. template Result&& get_result(const task&) { return std::move(*reinterpret_cast(&result)); } template const Result& get_result(const shared_task&) { return *reinterpret_cast(&result); } // Destroy the result ~task_result_holder() { // Result is only present if the task completed successfully if (state.load(std::memory_order_relaxed) == task_state::completed) reinterpret_cast(&result)->~Result(); } }; // Specialization for references template struct task_result_holder: public task_base { union { // Store as pointer internally Result* result; alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)]; void* sched; }; void set_result(Result& obj) { result = std::addressof(obj); } template Result& get_result(const task&) { return *result; } template Result& get_result(const shared_task&) { return *result; } }; // Specialization for void template<> struct task_result_holder: public task_base { union { alignas(std::exception_ptr) std::uint8_t except[sizeof(std::exception_ptr)]; void* sched; }; void set_result(fake_void) {} // Get the result as fake_void so that it can be passed to set_result and // continuations template fake_void get_result(const task&) { return fake_void(); } template fake_void get_result(const shared_task&) { return fake_void(); } }; template struct task_result: public task_result_holder { // Virtual function table for task_result static const task_base_vtable vtable_impl; task_result() { this->vtable = &vtable_impl; } // Destroy the exception ~task_result() { // Exception is only present if the task was canceled if (this->state.load(std::memory_order_relaxed) == task_state::canceled) reinterpret_cast(&this->except)->~exception_ptr(); } // Cancel a task with the given exception void cancel_base(std::exception_ptr&& except_) { set_exception(std::move(except_)); this->state.store(task_state::canceled, std::memory_order_release); this->run_continuations(); } // Set the exception value of the task void set_exception(std::exception_ptr&& except_) { new(&this->except) std::exception_ptr(std::move(except_)); } // Get the exception a task was canceled with std::exception_ptr& get_exception() { return *reinterpret_cast(&this->except); } // Wait and throw the exception if the task was canceled void wait_and_throw() { if (this->wait() == task_state::canceled) LIBASYNC_RETHROW_EXCEPTION(get_exception()); } // Delete the task using its proper type static void destroy(task_base* t) LIBASYNC_NOEXCEPT { delete static_cast*>(t); } }; template const task_base_vtable task_result::vtable_impl = { task_result::destroy, // destroy nullptr, // run nullptr, // cancel nullptr // schedule }; // Class to hold a function object, with empty base class optimization template struct func_base { Func func; template explicit func_base(F&& f) : func(std::forward(f)) {} Func& get_func() { return func; } }; template struct func_base::value>::type> { template explicit func_base(F&& f) { new(this) Func(std::forward(f)); } ~func_base() { get_func().~Func(); } Func& get_func() { return *reinterpret_cast(this); } }; // Class to hold a function object and initialize/destroy it at any time template struct func_holder { alignas(Func) std::uint8_t func[sizeof(Func)]; Func& get_func() { return *reinterpret_cast(&func); } template void init_func(Args&&... args) { new(&func) Func(std::forward(args)...); } void destroy_func() { get_func().~Func(); } }; template struct func_holder::value>::type> { Func& get_func() { return *reinterpret_cast(this); } template void init_func(Args&&... args) { new(this) Func(std::forward(args)...); } void destroy_func() { get_func().~Func(); } }; // Task object with an associated function object // Using private inheritance so empty Func doesn't take up space template struct task_func: public task_result, func_holder { // Virtual function table for task_func static const task_base_vtable vtable_impl; template explicit task_func(Args&&... args) { this->vtable = &vtable_impl; this->init_func(std::forward(args)...); } // Run the stored function static void run(task_base* t) LIBASYNC_NOEXCEPT { LIBASYNC_TRY { // Dispatch to execution function static_cast*>(t)->get_func()(t); } LIBASYNC_CATCH(...) { cancel(t, std::current_exception()); } } // Cancel the task static void cancel(task_base* t, std::exception_ptr&& except) LIBASYNC_NOEXCEPT { // Destroy the function object when canceling since it won't be // used anymore. static_cast*>(t)->destroy_func(); static_cast*>(t)->cancel_base(std::move(except)); } // Schedule a continuation task using its scheduler static void schedule(task_base* parent, task_ptr t) { void* sched = static_cast*>(t.get())->sched; parent->run_continuation(*static_cast(sched), std::move(t)); } // Free the function ~task_func() { // If the task hasn't completed yet, destroy the function object. Note // that an unwrapped task has already destroyed its function object. if (this->state.load(std::memory_order_relaxed) == task_state::pending) this->destroy_func(); } // Delete the task using its proper type static void destroy(task_base* t) LIBASYNC_NOEXCEPT { delete static_cast*>(t); } }; template const task_base_vtable task_func::vtable_impl = { task_func::destroy, // destroy task_func::run, // run task_func::cancel, // cancel task_func::schedule // schedule }; // Helper functions to access the internal_task member of a task object, which // avoids us having to specify half of the functions in the detail namespace // as friend. Also, internal_task is downcast to the appropriate task_result<>. template typename Task::internal_task_type* get_internal_task(const Task& t) { return static_cast(t.internal_task.get()); } template void set_internal_task(Task& t, task_ptr p) { t.internal_task = std::move(p); } // Common code for task unwrapping template struct unwrapped_func { explicit unwrapped_func(task_ptr t) : parent_task(std::move(t)) {} void operator()(Child child_task) const { // Forward completion state and result to parent task task_result* parent = static_cast*>(parent_task.get()); LIBASYNC_TRY { if (get_internal_task(child_task)->state.load(std::memory_order_relaxed) == task_state::completed) { parent->set_result(get_internal_task(child_task)->get_result(child_task)); parent->finish(); } else { // We don't call the generic cancel function here because // the function of the parent task has already been destroyed. parent->cancel_base(std::exception_ptr(get_internal_task(child_task)->get_exception())); } } LIBASYNC_CATCH(...) { // If the copy/move constructor of the result threw, propagate the exception parent->cancel_base(std::current_exception()); } } task_ptr parent_task; }; template void unwrapped_finish(task_base* parent_base, Child child_task) { // Destroy the parent task's function since it has been executed parent_base->state.store(task_state::unwrapped, std::memory_order_relaxed); static_cast*>(parent_base)->destroy_func(); // Set up a continuation on the child to set the result of the parent LIBASYNC_TRY { parent_base->add_ref(); child_task.then(inline_scheduler(), unwrapped_func(task_ptr(parent_base))); } LIBASYNC_CATCH(...) { // Use cancel_base here because the function object is already destroyed. static_cast*>(parent_base)->cancel_base(std::current_exception()); } } // Execution functions for root tasks: // - With and without task unwraping template struct root_exec_func: private func_base { template explicit root_exec_func(F&& f) : func_base(std::forward(f)) {} void operator()(task_base* t) { static_cast*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()))); static_cast*>(t)->destroy_func(); t->finish(); } }; template struct root_exec_func: private func_base { template explicit root_exec_func(F&& f) : func_base(std::forward(f)) {} void operator()(task_base* t) { unwrapped_finish(t, std::move(this->get_func())()); } }; // Execution functions for continuation tasks: // - With and without task unwraping // - For void, value-based and task-based continuations template struct continuation_exec_func: private func_base { template continuation_exec_func(F&& f, P&& p) : func_base(std::forward(f)), parent(std::forward

(p)) {} void operator()(task_base* t) { static_cast*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), std::move(parent))); static_cast*>(t)->destroy_func(); t->finish(); } Parent parent; }; template struct continuation_exec_func: private func_base { template continuation_exec_func(F&& f, P&& p) : func_base(std::forward(f)), parent(std::forward

(p)) {} void operator()(task_base* t) { if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled) task_func::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception())); else { static_cast*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), get_internal_task(parent)->get_result(parent))); static_cast*>(t)->destroy_func(); t->finish(); } } Parent parent; }; template struct continuation_exec_func: private func_base { template continuation_exec_func(F&& f, P&& p) : func_base(std::forward(f)), parent(std::forward

(p)) {} void operator()(task_base* t) { if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled) task_func::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception())); else { static_cast*>(t)->set_result(detail::invoke_fake_void(std::move(this->get_func()), fake_void())); static_cast*>(t)->destroy_func(); t->finish(); } } Parent parent; }; template struct continuation_exec_func: private func_base { template continuation_exec_func(F&& f, P&& p) : func_base(std::forward(f)), parent(std::forward

(p)) {} void operator()(task_base* t) { unwrapped_finish(t, detail::invoke_fake_void(std::move(this->get_func()), std::move(parent))); } Parent parent; }; template struct continuation_exec_func: private func_base { template continuation_exec_func(F&& f, P&& p) : func_base(std::forward(f)), parent(std::forward

(p)) {} void operator()(task_base* t) { if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled) task_func::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception())); else unwrapped_finish(t, detail::invoke_fake_void(std::move(this->get_func()), get_internal_task(parent)->get_result(parent))); } Parent parent; }; template struct continuation_exec_func: private func_base { template continuation_exec_func(F&& f, P&& p) : func_base(std::forward(f)), parent(std::forward

(p)) {} void operator()(task_base* t) { if (get_internal_task(parent)->state.load(std::memory_order_relaxed) == task_state::canceled) task_func::cancel(t, std::exception_ptr(get_internal_task(parent)->get_exception())); else unwrapped_finish(t, detail::invoke_fake_void(std::move(this->get_func()), fake_void())); } Parent parent; }; } // namespace detail } // namespace async