feat update

This commit is contained in:
tqcq
2024-03-27 23:16:26 +08:00
parent c5e86092dd
commit ec6f45eb78
45 changed files with 5089 additions and 295 deletions

View File

@ -0,0 +1,77 @@
// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#ifndef ASYNCXX_H_
# error "Do not include this header directly, include <async++.h> instead."
#endif
namespace async {
namespace detail {
// Queue which holds tasks in FIFO order. Note that this queue is not
// thread-safe and must be protected by a lock.
class fifo_queue {
detail::aligned_array<void*, LIBASYNC_CACHELINE_SIZE> items;
std::size_t head, tail;
public:
fifo_queue()
: items(32), head(0), tail(0) {}
~fifo_queue()
{
// Free any unexecuted tasks
for (std::size_t i = head; i != tail; i = (i + 1) & (items.size() - 1))
task_run_handle::from_void_ptr(items[i]);
}
// Push a task to the end of the queue
void push(task_run_handle t)
{
// Resize queue if it is full
if (head == ((tail + 1) & (items.size() - 1))) {
detail::aligned_array<void*, LIBASYNC_CACHELINE_SIZE> new_items(items.size() * 2);
for (std::size_t i = 0; i != items.size(); i++)
new_items[i] = items[(i + head) & (items.size() - 1)];
head = 0;
tail = items.size() - 1;
items = std::move(new_items);
}
// Push the item
items[tail] = t.to_void_ptr();
tail = (tail + 1) & (items.size() - 1);
}
// Pop a task from the front of the queue
task_run_handle pop()
{
// See if an item is available
if (head == tail)
return task_run_handle();
else {
void* x = items[head];
head = (head + 1) & (items.size() - 1);
return task_run_handle::from_void_ptr(x);
}
}
};
} // namespace detail
} // namespace async

View File

@ -0,0 +1,95 @@
// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include <algorithm>
#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <cstdlib>
#include <memory>
#include <mutex>
#include <numeric>
#include <random>
#include <thread>
#include <type_traits>
#include <vector>
#include <async++.h>
// For posix_memalign/_aligned_malloc
#ifdef _WIN32
# include <malloc.h>
# ifdef __MINGW32__
# define _aligned_malloc __mingw_aligned_malloc
# define _aligned_free __mingw_aligned_free
# endif
#else
# include <stdlib.h>
#endif
// We don't make use of dynamic TLS initialization/destruction so we can just
// use the legacy TLS attributes.
#ifdef __GNUC__
# define THREAD_LOCAL __thread
#elif defined (_MSC_VER)
# define THREAD_LOCAL __declspec(thread)
#else
# define THREAD_LOCAL thread_local
#endif
// GCC, Clang and the Linux version of the Intel compiler and MSVC 2015 support
// thread-safe initialization of function-scope static variables.
#ifdef __GNUC__
# define HAVE_THREAD_SAFE_STATIC
#elif _MSC_VER >= 1900 && !defined(__INTEL_COMPILER)
# define HAVE_THREAD_SAFE_STATIC
#endif
// MSVC deadlocks when joining a thread from a static destructor. Use a
// workaround in that case to avoid the deadlock.
#if defined(_MSC_VER) && _MSC_VER < 1900
# define BROKEN_JOIN_IN_DESTRUCTOR
#endif
// Apple's iOS has no thread local support yet. They claim that they don't want to
// introduce a binary compatility issue when they got a better implementation available.
// Luckily, pthreads supports some kind of "emulation" for that. This detects if the we
// are compiling for iOS and enables the workaround accordingly.
// It is also possible enabling it forcibly by setting the EMULATE_PTHREAD_THREAD_LOCAL
// macro. Obviously, this will only works on platforms with pthread available.
#if __APPLE__
# include "TargetConditionals.h"
# if TARGET_IPHONE_SIMULATOR || TARGET_OS_IPHONE
# define EMULATE_PTHREAD_THREAD_LOCAL
# endif
#endif
// Force symbol visibility to hidden unless explicity exported
#ifndef LIBASYNC_STATIC
#if defined(__GNUC__) && !defined(_WIN32)
# pragma GCC visibility push(hidden)
#endif
#endif
// Include other internal headers
#include "singleton.h"
#include "task_wait_event.h"
#include "fifo_queue.h"
#include "work_steal_queue.h"

View File

@ -0,0 +1,247 @@
// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "internal.h"
// for pthread thread_local emulation
#if defined(EMULATE_PTHREAD_THREAD_LOCAL)
# include <pthread.h>
#endif
namespace async {
namespace detail {
void* aligned_alloc(std::size_t size, std::size_t align)
{
#ifdef _WIN32
void* ptr = _aligned_malloc(size, align);
if (!ptr)
LIBASYNC_THROW(std::bad_alloc());
return ptr;
#else
void* result;
if (posix_memalign(&result, align, size))
LIBASYNC_THROW(std::bad_alloc());
else
return result;
#endif
}
void aligned_free(void* addr) LIBASYNC_NOEXCEPT
{
#ifdef _WIN32
_aligned_free(addr);
#else
free(addr);
#endif
}
// Wait for a task to complete (for threads outside thread pool)
static void generic_wait_handler(task_wait_handle wait_task)
{
// Create an event to wait on
task_wait_event event;
event.init();
// Create a continuation for the task we are waiting for
wait_task.on_finish([&event] {
// Just signal the thread event
event.signal(wait_type::task_finished);
});
// Wait for the event to be set
event.wait();
}
#if defined(EMULATE_PTHREAD_THREAD_LOCAL)
// Wait handler function, per-thread, defaults to generic version
struct pthread_emulation_thread_wait_handler_key_initializer {
pthread_key_t key;
pthread_emulation_thread_wait_handler_key_initializer()
{
pthread_key_create(&key, nullptr);
}
~pthread_emulation_thread_wait_handler_key_initializer()
{
pthread_key_delete(key);
}
};
static pthread_key_t get_thread_wait_handler_key()
{
static pthread_emulation_thread_wait_handler_key_initializer initializer;
return initializer.key;
}
#else
static THREAD_LOCAL wait_handler thread_wait_handler = generic_wait_handler;
#endif
static void set_thread_wait_handler(wait_handler handler)
{
#if defined(EMULATE_PTHREAD_THREAD_LOCAL)
// we need to call this here, because the pthread initializer is lazy,
// this means the it could be null and we need to set it before trying to
// get or set it
pthread_setspecific(get_thread_wait_handler_key(), reinterpret_cast<void*>(handler));
#else
thread_wait_handler = handler;
#endif
}
static wait_handler get_thread_wait_handler()
{
#if defined(EMULATE_PTHREAD_THREAD_LOCAL)
// we need to call this here, because the pthread initializer is lazy,
// this means the it could be null and we need to set it before trying to
// get or set it
wait_handler handler = (wait_handler) pthread_getspecific(get_thread_wait_handler_key());
if(handler == nullptr) {
return generic_wait_handler;
}
return handler;
#else
return thread_wait_handler;
#endif
}
// Wait for a task to complete
void wait_for_task(task_base* wait_task)
{
// Dispatch to the current thread's wait handler
wait_handler thread_wait_handler = get_thread_wait_handler();
thread_wait_handler(task_wait_handle(wait_task));
}
// The default scheduler is just a thread pool which can be configured
// using environment variables.
class default_scheduler_impl: public threadpool_scheduler {
static std::size_t get_num_threads()
{
// Get the requested number of threads from the environment
// If that fails, use the number of CPUs in the system.
std::size_t num_threads;
#ifdef _MSC_VER
char* s;
# ifdef __cplusplus_winrt
// Windows store applications do not support environment variables
s = nullptr;
# else
// MSVC gives an error when trying to use getenv, work around this
// by using _dupenv_s instead.
_dupenv_s(&s, nullptr, "LIBASYNC_NUM_THREADS");
# endif
#else
const char *s = std::getenv("LIBASYNC_NUM_THREADS");
#endif
if (s)
num_threads = std::strtoul(s, nullptr, 10);
else
num_threads = hardware_concurrency();
#if defined(_MSC_VER) && !defined(__cplusplus_winrt)
// Free the string allocated by _dupenv_s
free(s);
#endif
// Make sure the thread count is reasonable
if (num_threads < 1)
num_threads = 1;
return num_threads;
}
public:
default_scheduler_impl()
: threadpool_scheduler(get_num_threads()) {}
};
// Thread scheduler implementation
void thread_scheduler_impl::schedule(task_run_handle t)
{
// A shared_ptr is used here because not all implementations of
// std::thread support move-only objects.
std::thread([](const std::shared_ptr<task_run_handle>& t) {
t->run();
}, std::make_shared<task_run_handle>(std::move(t))).detach();
}
} // namespace detail
threadpool_scheduler& default_threadpool_scheduler()
{
return detail::singleton<detail::default_scheduler_impl>::get_instance();
}
// FIFO scheduler implementation
struct fifo_scheduler::internal_data {
detail::fifo_queue queue;
std::mutex lock;
};
fifo_scheduler::fifo_scheduler()
: impl(new internal_data) {}
fifo_scheduler::~fifo_scheduler() {}
void fifo_scheduler::schedule(task_run_handle t)
{
std::lock_guard<std::mutex> locked(impl->lock);
impl->queue.push(std::move(t));
}
bool fifo_scheduler::try_run_one_task()
{
task_run_handle t;
{
std::lock_guard<std::mutex> locked(impl->lock);
t = impl->queue.pop();
}
if (t) {
t.run();
return true;
}
return false;
}
void fifo_scheduler::run_all_tasks()
{
while (try_run_one_task()) {}
}
std::size_t hardware_concurrency() LIBASYNC_NOEXCEPT
{
// Cache the value because calculating it may be expensive
static std::size_t value = std::thread::hardware_concurrency();
// Always return at least 1 core
return value == 0 ? 1 : value;
}
wait_handler set_thread_wait_handler(wait_handler handler) LIBASYNC_NOEXCEPT
{
wait_handler old = detail::get_thread_wait_handler();
detail::set_thread_wait_handler(handler);
return old;
}
} // namespace async
#ifndef LIBASYNC_STATIC
#if defined(__GNUC__) && !defined(_WIN32)
# pragma GCC visibility pop
#endif
#endif

View File

@ -0,0 +1,73 @@
// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
namespace async {
namespace detail {
// Thread-safe singleton wrapper class
#ifdef HAVE_THREAD_SAFE_STATIC
// C++11 guarantees thread safety for static initialization
template<typename T>
class singleton {
public:
static T& get_instance()
{
static T instance;
return instance;
}
};
#else
// Some compilers don't support thread-safe static initialization, so emulate it
template<typename T>
class singleton {
std::mutex lock;
std::atomic<bool> init_flag;
alignas(T) std::uint8_t storage[sizeof(T)];
static singleton instance;
// Use a destructor instead of atexit() because the latter does not work
// properly when the singleton is in a library that is unloaded.
~singleton()
{
if (init_flag.load(std::memory_order_acquire))
reinterpret_cast<T*>(&storage)->~T();
}
public:
static T& get_instance()
{
T* ptr = reinterpret_cast<T*>(&instance.storage);
if (!instance.init_flag.load(std::memory_order_acquire)) {
std::lock_guard<std::mutex> locked(instance.lock);
if (!instance.init_flag.load(std::memory_order_relaxed)) {
new(ptr) T;
instance.init_flag.store(true, std::memory_order_release);
}
}
return *ptr;
}
};
template<typename T> singleton<T> singleton<T>::instance;
#endif
} // namespace detail
} // namespace async

View File

@ -0,0 +1,109 @@
// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
namespace async {
namespace detail {
// Set of events that an task_wait_event can hold
enum wait_type {
// The task that is being waited on has completed
task_finished = 1,
// A task is available to execute from the scheduler
task_available = 2
};
// OS-supported event object which can be used to wait for either a task to
// finish or for the scheduler to have more work for the current thread.
//
// The event object is lazily initialized to avoid unnecessary API calls.
class task_wait_event {
alignas(std::mutex) std::uint8_t m[sizeof(std::mutex)];
alignas(std::condition_variable) std::uint8_t c[sizeof(std::condition_variable)];
int event_mask;
bool initialized;
std::mutex& mutex()
{
return *reinterpret_cast<std::mutex*>(&m);
}
std::condition_variable& cond()
{
return *reinterpret_cast<std::condition_variable*>(&c);
}
public:
task_wait_event()
: event_mask(0), initialized(false) {}
~task_wait_event()
{
if (initialized) {
mutex().~mutex();
cond().~condition_variable();
}
}
// Initialize the event, must be done before any other functions are called.
void init()
{
if (!initialized) {
new(&m) std::mutex;
new(&c) std::condition_variable;
initialized = true;
}
}
// Wait for an event to occur. Returns the event(s) that occurred. This also
// clears any pending events afterwards.
int wait()
{
std::unique_lock<std::mutex> lock(mutex());
while (event_mask == 0)
cond().wait(lock);
int result = event_mask;
event_mask = 0;
return result;
}
// Check if a specific event is ready
bool try_wait(int event)
{
std::lock_guard<std::mutex> lock(mutex());
int result = event_mask & event;
event_mask &= ~event;
return result != 0;
}
// Signal an event and wake up a sleeping thread
void signal(int event)
{
std::unique_lock<std::mutex> lock(mutex());
event_mask |= event;
// This must be done while holding the lock otherwise we may end up with
// a use-after-free due to a race with wait().
cond().notify_one();
lock.unlock();
}
};
} // namespace detail
} // namespace async

View File

@ -0,0 +1,448 @@
// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#include "internal.h"
// For GetProcAddress and GetModuleHandle
#ifdef _WIN32
#include <windows.h>
#endif
// for pthread thread_local emulation
#if defined(EMULATE_PTHREAD_THREAD_LOCAL)
# include <pthread.h>
#endif
namespace async {
namespace detail {
// Per-thread data, aligned to cachelines to avoid false sharing
struct LIBASYNC_CACHELINE_ALIGN thread_data_t {
work_steal_queue queue;
std::minstd_rand rng;
std::thread handle;
};
// Internal data used by threadpool_scheduler
struct threadpool_data {
threadpool_data(std::size_t num_threads)
: thread_data(num_threads), shutdown(false), num_waiters(0), waiters(new task_wait_event*[num_threads]) {}
threadpool_data(std::size_t num_threads, std::function<void()>&& prerun_, std::function<void()>&& postrun_)
: thread_data(num_threads), shutdown(false), num_waiters(0), waiters(new task_wait_event*[num_threads]),
prerun(std::move(prerun_)), postrun(std::move(postrun_)) {}
// Mutex protecting everything except thread_data
std::mutex lock;
// Array of per-thread data
aligned_array<thread_data_t> thread_data;
// Global queue for tasks from outside the pool
fifo_queue public_queue;
// Shutdown request indicator
bool shutdown;
// List of threads waiting for tasks to run. num_waiters needs to be atomic
// because it is sometimes read outside the mutex.
std::atomic<std::size_t> num_waiters;
std::unique_ptr<task_wait_event*[]> waiters;
// Pre/Post run functions.
std::function<void()> prerun;
std::function<void()> postrun;
#ifdef BROKEN_JOIN_IN_DESTRUCTOR
// Shutdown complete event, used instead of thread::join()
std::size_t shutdown_num_threads;
std::condition_variable shutdown_complete_event;
#endif
};
// this wrapper encapsulates both the owning_threadpool pointer and the thread id.
// this is done to improve performance on the emulated thread_local reducing the number
// of calls to "pthread_getspecific"
struct threadpool_data_wrapper {
threadpool_data* owning_threadpool;
std::size_t thread_id;
threadpool_data_wrapper(threadpool_data* owning_threadpool, std::size_t thread_id):
owning_threadpool(owning_threadpool), thread_id(thread_id) { }
};
#if defined(EMULATE_PTHREAD_THREAD_LOCAL)
struct pthread_emulation_threadpool_data_initializer {
pthread_key_t key;
pthread_emulation_threadpool_data_initializer()
{
pthread_key_create(&key, [](void* wrapper_ptr) {
threadpool_data_wrapper* wrapper = static_cast<threadpool_data_wrapper*>(wrapper_ptr);
delete wrapper;
});
}
~pthread_emulation_threadpool_data_initializer()
{
pthread_key_delete(key);
}
};
static pthread_key_t get_local_threadpool_data_key()
{
static pthread_emulation_threadpool_data_initializer initializer;
return initializer.key;
}
#else
// Thread pool this thread belongs to, or null if not in pool
static THREAD_LOCAL threadpool_data* owning_threadpool = nullptr;
// Current thread's index in the pool
static THREAD_LOCAL std::size_t thread_id;
#endif
static void create_threadpool_data(threadpool_data* owning_threadpool_, std::size_t thread_id_)
{
#if defined(EMULATE_PTHREAD_THREAD_LOCAL)
// the memory allocated here gets deallocated by the lambda declared on the key creation
pthread_setspecific(get_local_threadpool_data_key(), new threadpool_data_wrapper(owning_threadpool_, thread_id_));
#else
owning_threadpool = owning_threadpool_;
thread_id = thread_id_;
#endif
}
static threadpool_data_wrapper get_threadpool_data_wrapper()
{
#if defined(EMULATE_PTHREAD_THREAD_LOCAL)
threadpool_data_wrapper* wrapper = static_cast<threadpool_data_wrapper*>(pthread_getspecific(get_local_threadpool_data_key()));
if(wrapper == nullptr) {
// if, for some reason, the wrapper is not set, this won't cause a crash
return threadpool_data_wrapper(nullptr, 0);
}
return *wrapper;
#else
return threadpool_data_wrapper(owning_threadpool, thread_id);
#endif
}
// Try to steal a task from another thread's queue
static task_run_handle steal_task(threadpool_data* impl, std::size_t thread_id)
{
// Make a list of victim thread ids and shuffle it
std::vector<std::size_t> victims(impl->thread_data.size());
std::iota(victims.begin(), victims.end(), 0);
std::shuffle(victims.begin(), victims.end(), impl->thread_data[thread_id].rng);
// Try to steal from another thread
for (std::size_t i: victims) {
// Don't try to steal from ourself
if (i == thread_id)
continue;
if (task_run_handle t = impl->thread_data[i].queue.steal())
return t;
}
// No tasks found, but we might have missed one if it was just added. In
// practice this doesn't really matter since it will be handled by another
// thread.
return task_run_handle();
}
// Main task stealing loop which is used by worker threads when they have
// nothing to do.
static void thread_task_loop(threadpool_data* impl, std::size_t thread_id, task_wait_handle wait_task)
{
// Get our thread's data
thread_data_t& current_thread = impl->thread_data[thread_id];
// Flag indicating if we have added a continuation to the task
bool added_continuation = false;
// Event to wait on
task_wait_event event;
// Loop while waiting for the task to complete
while (true) {
// Check if the task has finished. If we have added a continuation, we
// need to make sure the event has been signaled, otherwise the other
// thread may try to signal it after we have freed it.
if (wait_task && (added_continuation ? event.try_wait(wait_type::task_finished) : wait_task.ready()))
return;
// Try to get a task from the local queue
if (task_run_handle t = current_thread.queue.pop()) {
t.run();
continue;
}
// Stealing loop
while (true) {
// Try to steal a task
if (task_run_handle t = steal_task(impl, thread_id)) {
t.run();
break;
}
// Try to fetch from the public queue
std::unique_lock<std::mutex> locked(impl->lock);
if (task_run_handle t = impl->public_queue.pop()) {
// Don't hold the lock while running the task
locked.unlock();
t.run();
break;
}
// If shutting down and we don't have a task to wait for, return.
if (!wait_task && impl->shutdown) {
#ifdef BROKEN_JOIN_IN_DESTRUCTOR
// Notify once all worker threads have exited
if (--impl->shutdown_num_threads == 0)
impl->shutdown_complete_event.notify_one();
#endif
return;
}
// Initialize the event object
event.init();
// No tasks found, so sleep until something happens.
// If a continuation has not been added yet, add it.
if (wait_task && !added_continuation) {
// Create a continuation for the task we are waiting for
wait_task.on_finish([&event] {
// Signal the thread's event
event.signal(wait_type::task_finished);
});
added_continuation = true;
}
// Add our thread to the list of waiting threads
size_t num_waiters_val = impl->num_waiters.load(std::memory_order_relaxed);
impl->waiters[num_waiters_val] = &event;
impl->num_waiters.store(num_waiters_val + 1, std::memory_order_relaxed);
// Wait for our event to be signaled when a task is scheduled or
// the task we are waiting for has completed.
locked.unlock();
int events = event.wait();
locked.lock();
// Remove our thread from the list of waiting threads
num_waiters_val = impl->num_waiters.load(std::memory_order_relaxed);
for (std::size_t i = 0; i < num_waiters_val; i++) {
if (impl->waiters[i] == &event) {
if (i != num_waiters_val - 1)
std::swap(impl->waiters[i], impl->waiters[num_waiters_val - 1]);
impl->num_waiters.store(num_waiters_val - 1, std::memory_order_relaxed);
break;
}
}
// Check again if the task has finished. We have added a
// continuation at this point, so we need to check that the
// continuation has finished signaling the event.
if (wait_task && (events & wait_type::task_finished))
return;
}
}
}
// Wait for a task to complete (for worker threads inside thread pool)
static void threadpool_wait_handler(task_wait_handle wait_task)
{
threadpool_data_wrapper wrapper = get_threadpool_data_wrapper();
thread_task_loop(wrapper.owning_threadpool, wrapper.thread_id, wait_task);
}
// Worker thread main loop
static void worker_thread(threadpool_data* owning_threadpool, std::size_t thread_id)
{
// store on the local thread data
create_threadpool_data(owning_threadpool, thread_id);
// Set the wait handler so threads from the pool do useful work while
// waiting for another task to finish.
set_thread_wait_handler(threadpool_wait_handler);
// Seed the random number generator with our id. This gives each thread a
// different steal order.
owning_threadpool->thread_data[thread_id].rng.seed(static_cast<std::minstd_rand::result_type>(thread_id));
// Prerun hook
if (owning_threadpool->prerun) owning_threadpool->prerun();
// Main loop, runs until the shutdown signal is recieved
thread_task_loop(owning_threadpool, thread_id, task_wait_handle());
// Postrun hook
if (owning_threadpool->postrun) owning_threadpool->postrun();
}
// Recursive function to spawn all worker threads in parallel
static void recursive_spawn_worker_thread(threadpool_data* impl, std::size_t index, std::size_t threads)
{
// If we are down to one thread, go to the worker main loop
if (threads == 1)
worker_thread(impl, index);
else {
// Split thread range into 2 sub-ranges
std::size_t mid = index + threads / 2;
// Spawn a thread for half of the range
impl->thread_data[mid].handle = std::thread(recursive_spawn_worker_thread, impl, mid, threads - threads / 2);
#ifdef BROKEN_JOIN_IN_DESTRUCTOR
impl->thread_data[mid].handle.detach();
#endif
// Tail-recurse to handle our half of the range
recursive_spawn_worker_thread(impl, index, threads / 2);
}
}
} // namespace detail
threadpool_scheduler::threadpool_scheduler(threadpool_scheduler&& other)
: impl(std::move(other.impl)) {}
threadpool_scheduler::threadpool_scheduler(std::size_t num_threads)
: impl(new detail::threadpool_data(num_threads))
{
// Start worker threads
impl->thread_data[0].handle = std::thread(detail::recursive_spawn_worker_thread, impl.get(), 0, num_threads);
#ifdef BROKEN_JOIN_IN_DESTRUCTOR
impl->thread_data[0].handle.detach();
#endif
}
threadpool_scheduler::threadpool_scheduler(std::size_t num_threads,
std::function<void()>&& prerun,
std::function<void()>&& postrun)
: impl(new detail::threadpool_data(num_threads, std::move(prerun), std::move(postrun)))
{
// Start worker threads
impl->thread_data[0].handle = std::thread(detail::recursive_spawn_worker_thread, impl.get(), 0, num_threads);
#ifdef BROKEN_JOIN_IN_DESTRUCTOR
impl->thread_data[0].handle.detach();
#endif
}
// Wait for all currently running tasks to finish
threadpool_scheduler::~threadpool_scheduler()
{
if (!impl) return;
#ifdef _WIN32
// Windows kills all threads except one on process exit before calling
// global destructors in DLLs. Waiting for dead threads to exit will likely
// result in deadlocks, so we just exit early if we detect that the process
// is exiting.
auto RtlDllShutdownInProgress = reinterpret_cast<BOOLEAN(WINAPI *)()>(GetProcAddress(GetModuleHandleW(L"ntdll.dll"), "RtlDllShutdownInProgress"));
if (RtlDllShutdownInProgress && RtlDllShutdownInProgress()) {
# ifndef BROKEN_JOIN_IN_DESTRUCTOR
// We still need to detach the thread handles otherwise the std::thread
// destructor will throw an exception.
for (std::size_t i = 0; i < impl->thread_data.size(); i++) {
try {
impl->thread_data[i].handle.detach();
} catch (...) {}
}
# endif
return;
}
#endif
{
std::unique_lock<std::mutex> locked(impl->lock);
// Signal shutdown
impl->shutdown = true;
// Wake up any sleeping threads
size_t num_waiters_val = impl->num_waiters.load(std::memory_order_relaxed);
for (std::size_t i = 0; i < num_waiters_val; i++)
impl->waiters[i]->signal(detail::wait_type::task_available);
impl->num_waiters.store(0, std::memory_order_relaxed);
#ifdef BROKEN_JOIN_IN_DESTRUCTOR
// Wait for the threads to exit
impl->shutdown_num_threads = impl->thread_data.size();
impl->shutdown_complete_event.wait(locked);
#endif
}
#ifndef BROKEN_JOIN_IN_DESTRUCTOR
// Wait for the threads to exit
for (std::size_t i = 0; i < impl->thread_data.size(); i++)
impl->thread_data[i].handle.join();
#endif
}
// Schedule a task on the thread pool
void threadpool_scheduler::schedule(task_run_handle t)
{
detail::threadpool_data_wrapper wrapper = detail::get_threadpool_data_wrapper();
// Check if we are in the thread pool
if (wrapper.owning_threadpool == impl.get()) {
// Push the task onto our task queue
impl->thread_data[wrapper.thread_id].queue.push(std::move(t));
// If there are no sleeping threads, just return. We check outside the
// lock to avoid locking overhead in the fast path.
if (impl->num_waiters.load(std::memory_order_relaxed) == 0)
return;
// Get a thread to wake up from the list
std::lock_guard<std::mutex> locked(impl->lock);
// Check again if there are waiters
size_t num_waiters_val = impl->num_waiters.load(std::memory_order_relaxed);
if (num_waiters_val == 0)
return;
// Pop a thread from the list and wake it up
impl->waiters[num_waiters_val - 1]->signal(detail::wait_type::task_available);
impl->num_waiters.store(num_waiters_val - 1, std::memory_order_relaxed);
} else {
std::lock_guard<std::mutex> locked(impl->lock);
// Push task onto the public queue
impl->public_queue.push(std::move(t));
// Wake up a sleeping thread
size_t num_waiters_val = impl->num_waiters.load(std::memory_order_relaxed);
if (num_waiters_val == 0)
return;
impl->waiters[num_waiters_val - 1]->signal(detail::wait_type::task_available);
impl->num_waiters.store(num_waiters_val - 1, std::memory_order_relaxed);
}
}
} // namespace async
#ifndef LIBASYNC_STATIC
#if defined(__GNUC__) && !defined(_WIN32)
# pragma GCC visibility pop
#endif
#endif

View File

@ -0,0 +1,186 @@
// Copyright (c) 2015 Amanieu d'Antras
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
#ifndef ASYNCXX_H_
# error "Do not include this header directly, include <async++.h> instead."
#endif
namespace async {
namespace detail {
// Chase-Lev work stealing deque
//
// Dynamic Circular Work-Stealing Deque
// http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.170.1097&rep=rep1&type=pdf
//
// Correct and Efficient Work-Stealing for Weak Memory Models
// http://www.di.ens.fr/~zappa/readings/ppopp13.pdf
class work_steal_queue {
// Circular array of void*
class circular_array {
detail::aligned_array<void*, LIBASYNC_CACHELINE_SIZE> items;
std::unique_ptr<circular_array> previous;
public:
circular_array(std::size_t n)
: items(n) {}
std::size_t size() const
{
return items.size();
}
void* get(std::size_t index)
{
return items[index & (size() - 1)];
}
void put(std::size_t index, void* x)
{
items[index & (size() - 1)] = x;
}
// Growing the array returns a new circular_array object and keeps a
// linked list of all previous arrays. This is done because other threads
// could still be accessing elements from the smaller arrays.
circular_array* grow(std::size_t top, std::size_t bottom)
{
circular_array* new_array = new circular_array(size() * 2);
new_array->previous.reset(this);
for (std::size_t i = top; i != bottom; i++)
new_array->put(i, get(i));
return new_array;
}
};
std::atomic<circular_array*> array;
std::atomic<std::size_t> top, bottom;
// Convert a 2's complement unsigned value to a signed value. We need to do
// this because (b - t) may not always be positive.
static std::ptrdiff_t to_signed(std::size_t x)
{
// Unsigned to signed conversion is implementation-defined if the value
// doesn't fit, so we convert manually.
static_assert(static_cast<std::size_t>(PTRDIFF_MAX) + 1 == static_cast<std::size_t>(PTRDIFF_MIN), "Wrong integer wrapping behavior");
if (x > static_cast<std::size_t>(PTRDIFF_MAX))
return static_cast<std::ptrdiff_t>(x - static_cast<std::size_t>(PTRDIFF_MIN)) + PTRDIFF_MIN;
else
return static_cast<std::ptrdiff_t>(x);
}
public:
work_steal_queue()
: array(new circular_array(32)), top(0), bottom(0) {}
~work_steal_queue()
{
// Free any unexecuted tasks
std::size_t b = bottom.load(std::memory_order_relaxed);
std::size_t t = top.load(std::memory_order_relaxed);
circular_array* a = array.load(std::memory_order_relaxed);
for (std::size_t i = t; i != b; i++)
task_run_handle::from_void_ptr(a->get(i));
delete a;
}
// Push a task to the bottom of this thread's queue
void push(task_run_handle x)
{
std::size_t b = bottom.load(std::memory_order_relaxed);
std::size_t t = top.load(std::memory_order_acquire);
circular_array* a = array.load(std::memory_order_relaxed);
// Grow the array if it is full
if (to_signed(b - t) >= to_signed(a->size())) {
a = a->grow(t, b);
array.store(a, std::memory_order_release);
}
// Note that we only convert to void* here in case grow throws due to
// lack of memory.
a->put(b, x.to_void_ptr());
std::atomic_thread_fence(std::memory_order_release);
bottom.store(b + 1, std::memory_order_relaxed);
}
// Pop a task from the bottom of this thread's queue
task_run_handle pop()
{
std::size_t b = bottom.load(std::memory_order_relaxed);
// Early exit if queue is empty
std::size_t t = top.load(std::memory_order_relaxed);
if (to_signed(b - t) <= 0)
return task_run_handle();
// Make sure bottom is stored before top is read
bottom.store(--b, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_seq_cst);
t = top.load(std::memory_order_relaxed);
// If the queue is empty, restore bottom and exit
if (to_signed(b - t) < 0) {
bottom.store(b + 1, std::memory_order_relaxed);
return task_run_handle();
}
// Fetch the element from the queue
circular_array* a = array.load(std::memory_order_relaxed);
void* x = a->get(b);
// If this was the last element in the queue, check for races
if (b == t) {
if (!top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed)) {
bottom.store(b + 1, std::memory_order_relaxed);
return task_run_handle();
}
bottom.store(b + 1, std::memory_order_relaxed);
}
return task_run_handle::from_void_ptr(x);
}
// Steal a task from the top of this thread's queue
task_run_handle steal()
{
// Loop while the compare_exchange fails. This is still lock-free because
// a fail means that another thread has sucessfully stolen a task.
while (true) {
// Make sure top is read before bottom
std::size_t t = top.load(std::memory_order_acquire);
std::atomic_thread_fence(std::memory_order_seq_cst);
std::size_t b = bottom.load(std::memory_order_acquire);
// Exit if the queue is empty
if (to_signed(b - t) <= 0)
return task_run_handle();
// Fetch the element from the queue
circular_array* a = array.load(std::memory_order_consume);
void* x = a->get(t);
// Attempt to increment top
if (top.compare_exchange_weak(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
return task_run_handle::from_void_ptr(x);
}
}
};
} // namespace detail
} // namespace async