fix async deadlock
All checks were successful
linux-x64-gcc / linux-gcc (Release) (push) Successful in 56s
linux-x64-gcc / linux-gcc (Debug) (push) Successful in 3m36s

This commit is contained in:
tqcq 2024-03-28 22:19:28 +08:00
parent fd46ca62ae
commit 181823d4fe
6 changed files with 126 additions and 88 deletions

View File

@ -19,7 +19,7 @@
// THE SOFTWARE. // THE SOFTWARE.
#ifndef ASYNCXX_H_ #ifndef ASYNCXX_H_
# error "Do not include this header directly, include <async++.h> instead." #error "Do not include this header directly, include <async++.h> instead."
#endif #endif
namespace async { namespace async {
@ -37,20 +37,22 @@ namespace detail {
// Detect whether an object is a scheduler // Detect whether an object is a scheduler
template<typename T, typename = decltype(std::declval<T>().schedule(std::declval<task_run_handle>()))> template<typename T, typename = decltype(std::declval<T>().schedule(std::declval<task_run_handle>()))>
two& is_scheduler_helper(int); two &is_scheduler_helper(int);
template<typename T> template<typename T>
one& is_scheduler_helper(...); one &is_scheduler_helper(...);
template<typename T> template<typename T>
struct is_scheduler: public std::integral_constant<bool, sizeof(is_scheduler_helper<T>(0)) - 1> {}; struct is_scheduler : public std::integral_constant<bool, sizeof(is_scheduler_helper<T>(0)) - 1> {};
// Singleton scheduler classes // Singleton scheduler classes
class thread_scheduler_impl { class thread_scheduler_impl {
public: public:
LIBASYNC_EXPORT static void schedule(task_run_handle t); LIBASYNC_EXPORT static void schedule(task_run_handle t);
}; };
class inline_scheduler_impl { class inline_scheduler_impl {
public: public:
static void schedule(task_run_handle t); static void schedule(task_run_handle t);
}; };
// Reference counted pointer to task data // Reference counted pointer to task data
@ -59,93 +61,98 @@ typedef ref_count_ptr<task_base> task_ptr;
// Helper function to schedule a task using a scheduler // Helper function to schedule a task using a scheduler
template<typename Sched> template<typename Sched>
void schedule_task(Sched& sched, task_ptr t); void schedule_task(Sched &sched, task_ptr t);
// Wait for the given task to finish. This will call the wait handler currently // Wait for the given task to finish. This will call the wait handler currently
// active for this thread, which causes the thread to sleep by default. // active for this thread, which causes the thread to sleep by default.
LIBASYNC_EXPORT void wait_for_task(task_base* wait_task); #ifndef LIBASYNC_CUSTOM_WAIT_FOR_TASK
LIBASYNC_EXPORT void wait_for_task(task_base *wait_task);
#endif
// Forward-declaration for data used by threadpool_scheduler // Forward-declaration for data used by threadpool_scheduler
struct threadpool_data; struct threadpool_data;
} // namespace detail }// namespace detail
// Run a task in the current thread as soon as it is scheduled // Run a task in the current thread as soon as it is scheduled
inline detail::inline_scheduler_impl& inline_scheduler() inline detail::inline_scheduler_impl &
inline_scheduler()
{ {
static detail::inline_scheduler_impl instance; static detail::inline_scheduler_impl instance;
return instance; return instance;
} }
// Run a task in a separate thread. Note that this scheduler does not wait for // Run a task in a separate thread. Note that this scheduler does not wait for
// threads to finish at process exit. You must ensure that all threads finish // threads to finish at process exit. You must ensure that all threads finish
// before ending the process. // before ending the process.
inline detail::thread_scheduler_impl& thread_scheduler() inline detail::thread_scheduler_impl &
thread_scheduler()
{ {
static detail::thread_scheduler_impl instance; static detail::thread_scheduler_impl instance;
return instance; return instance;
} }
// Built-in thread pool scheduler with a size that is configurable from the // Built-in thread pool scheduler with a size that is configurable from the
// LIBASYNC_NUM_THREADS environment variable. If that variable does not exist // LIBASYNC_NUM_THREADS environment variable. If that variable does not exist
// then the number of CPUs in the system is used instead. // then the number of CPUs in the system is used instead.
LIBASYNC_EXPORT threadpool_scheduler& default_threadpool_scheduler(); LIBASYNC_EXPORT threadpool_scheduler &default_threadpool_scheduler();
// Default scheduler that is used when one isn't specified. This defaults to // Default scheduler that is used when one isn't specified. This defaults to
// default_threadpool_scheduler(), but can be overriden by defining // default_threadpool_scheduler(), but can be overriden by defining
// LIBASYNC_CUSTOM_DEFAULT_SCHEDULER before including async++.h. Keep in mind // LIBASYNC_CUSTOM_DEFAULT_SCHEDULER before including async++.h. Keep in mind
// that in that case async::default_scheduler should be declared before // that in that case async::default_scheduler should be declared before
// including async++.h. // including async++.h.
#ifndef LIBASYNC_CUSTOM_DEFAULT_SCHEDULER #ifndef LIBASYNC_CUSTOM_DEFAULT_SCHEDULER
inline threadpool_scheduler& default_scheduler() inline threadpool_scheduler &
default_scheduler()
{ {
return default_threadpool_scheduler(); return default_threadpool_scheduler();
} }
#endif #endif
// Scheduler that holds a list of tasks which can then be explicitly executed // Scheduler that holds a list of tasks which can then be explicitly executed
// by a thread. Both adding and running tasks are thread-safe operations. // by a thread. Both adding and running tasks are thread-safe operations.
class fifo_scheduler { class fifo_scheduler {
struct internal_data; struct internal_data;
std::unique_ptr<internal_data> impl; std::unique_ptr<internal_data> impl;
public: public:
LIBASYNC_EXPORT fifo_scheduler(); LIBASYNC_EXPORT fifo_scheduler();
LIBASYNC_EXPORT ~fifo_scheduler(); LIBASYNC_EXPORT ~fifo_scheduler();
// Add a task to the queue // Add a task to the queue
LIBASYNC_EXPORT void schedule(task_run_handle t); LIBASYNC_EXPORT void schedule(task_run_handle t);
// Try running one task from the queue. Returns false if the queue was empty. // Try running one task from the queue. Returns false if the queue was empty.
LIBASYNC_EXPORT bool try_run_one_task(); LIBASYNC_EXPORT bool try_run_one_task();
// Run all tasks in the queue // Run all tasks in the queue
LIBASYNC_EXPORT void run_all_tasks(); LIBASYNC_EXPORT void run_all_tasks();
}; };
// Scheduler that runs tasks in a work-stealing thread pool of the given size. // Scheduler that runs tasks in a work-stealing thread pool of the given size.
// Note that destroying the thread pool before all tasks have completed may // Note that destroying the thread pool before all tasks have completed may
// result in some tasks not being executed. // result in some tasks not being executed.
class threadpool_scheduler { class threadpool_scheduler {
std::unique_ptr<detail::threadpool_data> impl; std::unique_ptr<detail::threadpool_data> impl;
public: public:
LIBASYNC_EXPORT threadpool_scheduler(threadpool_scheduler&& other); LIBASYNC_EXPORT threadpool_scheduler(threadpool_scheduler &&other);
// Create a thread pool with the given number of threads // Create a thread pool with the given number of threads
LIBASYNC_EXPORT threadpool_scheduler(std::size_t num_threads); LIBASYNC_EXPORT threadpool_scheduler(std::size_t num_threads);
// Create a thread pool with the given number of threads. Call `prerun` // Create a thread pool with the given number of threads. Call `prerun`
// function before execution loop and `postrun` after. // function before execution loop and `postrun` after.
LIBASYNC_EXPORT threadpool_scheduler(std::size_t num_threads, LIBASYNC_EXPORT
std::function<void()>&& prerun_, threadpool_scheduler(std::size_t num_threads, std::function<void()> &&prerun_, std::function<void()> &&postrun_);
std::function<void()>&& postrun_);
// Destroy the thread pool, tasks that haven't been started are dropped // Destroy the thread pool, tasks that haven't been started are dropped
LIBASYNC_EXPORT ~threadpool_scheduler(); LIBASYNC_EXPORT ~threadpool_scheduler();
// Schedule a task to be run in the thread pool // Schedule a task to be run in the thread pool
LIBASYNC_EXPORT void schedule(task_run_handle t); LIBASYNC_EXPORT void schedule(task_run_handle t);
}; };
namespace detail { namespace detail {
@ -153,5 +160,5 @@ namespace detail {
// Work-around for Intel compiler handling decltype poorly in function returns // Work-around for Intel compiler handling decltype poorly in function returns
typedef std::remove_reference<decltype(::async::default_scheduler())>::type default_scheduler_type; typedef std::remove_reference<decltype(::async::default_scheduler())>::type default_scheduler_type;
} // namespace detail }// namespace detail
} // namespace async }// namespace async

View File

@ -141,6 +141,9 @@ if(SLED_BUILD_TESTS)
src/system/thread_pool_test.cc src/system/thread_pool_test.cc
src/rx_test.cc src/rx_test.cc
src/uri_test.cc) src/uri_test.cc)
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
target_compile_options(sled_tests PRIVATE -Wthread-safety)
endif()
target_link_libraries(sled_tests PRIVATE sled GTest::gtest GTest::gtest_main) target_link_libraries(sled_tests PRIVATE sled GTest::gtest GTest::gtest_main)
add_test(NAME sled_tests COMMAND sled_tests) add_test(NAME sled_tests COMMAND sled_tests)
endif(SLED_BUILD_TESTS) endif(SLED_BUILD_TESTS)

View File

@ -3,18 +3,28 @@
namespace sled { namespace sled {
class FiberScheduler; class FiberScheduler;
} }
namespace async { namespace async {
sled::FiberScheduler &default_scheduler(); sled::FiberScheduler &default_scheduler();
} class task_base;
namespace detail {
void wait_for_task(task_base *wait_task);
}
}// namespace async
#define LIBASYNC_CUSTON_EVENT
#define LIBASYNC_CUSTOM_DEFAULT_SCHEDULER #define LIBASYNC_CUSTOM_DEFAULT_SCHEDULER
#include <async++.h> #include <async++.h>
namespace sled { namespace sled {
void SleepWaitHandler(async::task_wait_handle t);
class FiberScheduler { class FiberScheduler {
public: public:
FiberScheduler();
void schedule(async::task_run_handle t); void schedule(async::task_run_handle t);
}; };

View File

@ -22,7 +22,7 @@ namespace internal {
template<typename T> template<typename T>
struct HasLockAndUnlock { struct HasLockAndUnlock {
template<typename U, template<typename U,
decltype(std::declval<U>().Lock()) * = nullptr, decltype(std::declval<U>().Lock()) * = nullptr,
decltype(std::declval<U>().Unlock()) * = nullptr> decltype(std::declval<U>().Unlock()) * = nullptr>
static int Test(int); static int Test(int);
@ -33,31 +33,32 @@ struct HasLockAndUnlock {
}; };
}// namespace internal }// namespace internal
using Mutex = marl::mutex; // using Mutex = marl::mutex;
// class Mutex final { class SLED_LOCKABLE Mutex final {
// public:
// Mutex() = default;
// Mutex(const Mutex &) = delete;
// Mutex &operator=(const Mutex &) = delete;
//
// inline void Lock() { impl_.lock(); };
//
// inline bool TryLock() { return impl_.try_lock(); }
//
// inline void AssertHeld() {}
//
// inline void Unlock() { impl_.unlock(); }
//
// private:
// std::mutex impl_;
// friend class ConditionVariable;
// };
class RecursiveMutex final {
public: public:
RecursiveMutex() = default; Mutex() = default;
RecursiveMutex(const RecursiveMutex &) = delete; Mutex(const Mutex &) = delete;
Mutex &operator=(const Mutex &) = delete;
inline void Lock() SLED_EXCLUSIVE_LOCK_FUNCTION(impl_) { impl_.lock(); };
inline bool TryLock() SLED_EXCLUSIVE_TRYLOCK_FUNCTION(true) { return impl_.try_lock(); }
inline void AssertHeld() SLED_ASSERT_EXCLUSIVE_LOCK(impl_) {}
inline void Unlock() SLED_UNLOCK_FUNCTION(impl_) { impl_.unlock(); }
private:
marl::mutex impl_;
friend class ConditionVariable;
friend class MutexLock;
};
class SLED_LOCKABLE RecursiveMutex final {
public:
RecursiveMutex() = default;
RecursiveMutex(const RecursiveMutex &) = delete;
RecursiveMutex &operator=(const RecursiveMutex &) = delete; RecursiveMutex &operator=(const RecursiveMutex &) = delete;
inline void Lock() SLED_SHARED_LOCK_FUNCTION() { impl_.lock(); } inline void Lock() SLED_SHARED_LOCK_FUNCTION() { impl_.lock(); }
@ -72,17 +73,14 @@ private:
std::recursive_mutex impl_; std::recursive_mutex impl_;
}; };
class RecursiveMutexLock final { class SLED_SCOPED_CAPABILITY RecursiveMutexLock final {
public: public:
RecursiveMutexLock(const RecursiveMutexLock &) = delete; RecursiveMutexLock(const RecursiveMutexLock &) = delete;
RecursiveMutexLock &operator=(const RecursiveMutexLock &) = delete; RecursiveMutexLock &operator=(const RecursiveMutexLock &) = delete;
explicit RecursiveMutexLock(RecursiveMutex *mutex) SLED_EXCLUSIVE_LOCK_FUNCTION(mutex) : mutex_(mutex) explicit RecursiveMutexLock(RecursiveMutex *mutex) SLED_ACQUIRE_SHARED(mutex) : mutex_(mutex) { mutex->Lock(); }
{
mutex->Lock();
}
~RecursiveMutexLock() SLED_UNLOCK_FUNCTION() { mutex_->Unlock(); } ~RecursiveMutexLock() SLED_RELEASE_SHARED(mutex_) { mutex_->Unlock(); }
private: private:
RecursiveMutex *mutex_; RecursiveMutex *mutex_;
@ -101,13 +99,13 @@ private:
// friend class ConditionVariable; // friend class ConditionVariable;
// }; // };
// //
class MutexLock final { class SLED_SCOPED_CAPABILITY MutexLock final {
public: public:
MutexLock(Mutex *mutex) SLED_EXCLUSIVE_LOCK_FUNCTION(mutex) : lock_(*mutex) {} MutexLock(Mutex *mutex) SLED_ACQUIRE(mutex) : lock_(mutex->impl_) {}
~MutexLock() SLED_UNLOCK_FUNCTION() = default; ~MutexLock() SLED_RELEASE() = default;
MutexLock(const MutexLock &) = delete; MutexLock(const MutexLock &) = delete;
MutexLock &operator=(const MutexLock &) = delete; MutexLock &operator=(const MutexLock &) = delete;
private: private:

View File

@ -5,15 +5,6 @@
// clang-format off // clang-format off
#include <async++.h> #include <async++.h>
// clang-format on
namespace async {
sled::FiberScheduler &
default_scheduler()
{
static sled::FiberScheduler scheduler;
return scheduler;
}
}// namespace async
namespace sled { namespace sled {
@ -25,13 +16,33 @@ SleepWaitHandler(async::task_wait_handle t)
event.Wait(sled::Event::kForever); event.Wait(sled::Event::kForever);
} }
FiberScheduler::FiberScheduler()
{
}
void void
FiberScheduler::schedule(async::task_run_handle t) FiberScheduler::schedule(async::task_run_handle t)
{ {
static ThreadPool thread_pool; static ThreadPool thread_pool;
auto move_on_copy = sled::MakeMoveOnCopy(t); auto move_on_copy = sled::MakeMoveOnCopy(t);
// thread_pool.PostTask([move_on_copy] { move_on_copy.value.run_with_wait_handler(SleepWaitHandler); });
thread_pool.submit([move_on_copy] { move_on_copy.value.run_with_wait_handler(SleepWaitHandler); }); thread_pool.submit([move_on_copy] { move_on_copy.value.run_with_wait_handler(SleepWaitHandler); });
// thread_pool.submit([move_on_copy] { move_on_copy.value.run(); });
} }
}// namespace sled }// namespace sled
// clang-format on
namespace async {
sled::FiberScheduler &
default_scheduler()
{
static sled::FiberScheduler scheduler;
return scheduler;
}
void
detail::wait_for_task(task_base *wait_task)
{
sled::SleepWaitHandler(task_wait_handle(wait_task));
}
}// namespace async

View File

@ -28,3 +28,12 @@ TEST(Async, parallel_for)
// wg.Wait(); // wg.Wait();
for (int i = 0; i < count; i++) { EXPECT_TRUE(values[i]) << i; } for (int i = 0; i < count; i++) { EXPECT_TRUE(values[i]) << i; }
} }
TEST(Async, parallel_reduce)
{
auto r = async::parallel_reduce(async::irange(1, 5), 0, [](int x, int y) {
LOGD("", "{},{}", x, y);
return x + y;
});
LOGD("", "{}", r);
}