From 181823d4fe0deb24380e9f6771d96aacad512ede Mon Sep 17 00:00:00 2001 From: tqcq <99722391+tqcq@users.noreply.github.com> Date: Thu, 28 Mar 2024 22:19:28 +0800 Subject: [PATCH] fix async deadlock --- .../include/async++/scheduler_fwd.h | 91 ++++++++++--------- CMakeLists.txt | 3 + include/sled/async/async.h | 12 ++- include/sled/synchronization/mutex.h | 68 +++++++------- src/async/async.cc | 31 +++++-- src/async/async_test.cc | 9 ++ 6 files changed, 126 insertions(+), 88 deletions(-) diff --git a/3party/asyncplusplus/include/async++/scheduler_fwd.h b/3party/asyncplusplus/include/async++/scheduler_fwd.h index 6e124b0..8b41c58 100644 --- a/3party/asyncplusplus/include/async++/scheduler_fwd.h +++ b/3party/asyncplusplus/include/async++/scheduler_fwd.h @@ -19,7 +19,7 @@ // THE SOFTWARE. #ifndef ASYNCXX_H_ -# error "Do not include this header directly, include instead." +#error "Do not include this header directly, include instead." #endif namespace async { @@ -37,20 +37,22 @@ namespace detail { // Detect whether an object is a scheduler template().schedule(std::declval()))> -two& is_scheduler_helper(int); +two &is_scheduler_helper(int); template -one& is_scheduler_helper(...); +one &is_scheduler_helper(...); + template -struct is_scheduler: public std::integral_constant(0)) - 1> {}; +struct is_scheduler : public std::integral_constant(0)) - 1> {}; // Singleton scheduler classes class thread_scheduler_impl { public: - LIBASYNC_EXPORT static void schedule(task_run_handle t); + LIBASYNC_EXPORT static void schedule(task_run_handle t); }; + class inline_scheduler_impl { public: - static void schedule(task_run_handle t); + static void schedule(task_run_handle t); }; // Reference counted pointer to task data @@ -59,93 +61,98 @@ typedef ref_count_ptr task_ptr; // Helper function to schedule a task using a scheduler template -void schedule_task(Sched& sched, task_ptr t); +void schedule_task(Sched &sched, task_ptr t); // Wait for the given task to finish. This will call the wait handler currently // active for this thread, which causes the thread to sleep by default. -LIBASYNC_EXPORT void wait_for_task(task_base* wait_task); +#ifndef LIBASYNC_CUSTOM_WAIT_FOR_TASK +LIBASYNC_EXPORT void wait_for_task(task_base *wait_task); +#endif // Forward-declaration for data used by threadpool_scheduler struct threadpool_data; -} // namespace detail +}// namespace detail // Run a task in the current thread as soon as it is scheduled -inline detail::inline_scheduler_impl& inline_scheduler() +inline detail::inline_scheduler_impl & +inline_scheduler() { - static detail::inline_scheduler_impl instance; - return instance; + static detail::inline_scheduler_impl instance; + return instance; } // Run a task in a separate thread. Note that this scheduler does not wait for // threads to finish at process exit. You must ensure that all threads finish // before ending the process. -inline detail::thread_scheduler_impl& thread_scheduler() +inline detail::thread_scheduler_impl & +thread_scheduler() { - static detail::thread_scheduler_impl instance; - return instance; + static detail::thread_scheduler_impl instance; + return instance; } // Built-in thread pool scheduler with a size that is configurable from the // LIBASYNC_NUM_THREADS environment variable. If that variable does not exist // then the number of CPUs in the system is used instead. -LIBASYNC_EXPORT threadpool_scheduler& default_threadpool_scheduler(); +LIBASYNC_EXPORT threadpool_scheduler &default_threadpool_scheduler(); // Default scheduler that is used when one isn't specified. This defaults to // default_threadpool_scheduler(), but can be overriden by defining // LIBASYNC_CUSTOM_DEFAULT_SCHEDULER before including async++.h. Keep in mind // that in that case async::default_scheduler should be declared before // including async++.h. + #ifndef LIBASYNC_CUSTOM_DEFAULT_SCHEDULER -inline threadpool_scheduler& default_scheduler() +inline threadpool_scheduler & +default_scheduler() { - return default_threadpool_scheduler(); + return default_threadpool_scheduler(); } #endif // Scheduler that holds a list of tasks which can then be explicitly executed // by a thread. Both adding and running tasks are thread-safe operations. class fifo_scheduler { - struct internal_data; - std::unique_ptr impl; + struct internal_data; + std::unique_ptr impl; public: - LIBASYNC_EXPORT fifo_scheduler(); - LIBASYNC_EXPORT ~fifo_scheduler(); + LIBASYNC_EXPORT fifo_scheduler(); + LIBASYNC_EXPORT ~fifo_scheduler(); - // Add a task to the queue - LIBASYNC_EXPORT void schedule(task_run_handle t); + // Add a task to the queue + LIBASYNC_EXPORT void schedule(task_run_handle t); - // Try running one task from the queue. Returns false if the queue was empty. - LIBASYNC_EXPORT bool try_run_one_task(); + // Try running one task from the queue. Returns false if the queue was empty. + LIBASYNC_EXPORT bool try_run_one_task(); - // Run all tasks in the queue - LIBASYNC_EXPORT void run_all_tasks(); + // Run all tasks in the queue + LIBASYNC_EXPORT void run_all_tasks(); }; // Scheduler that runs tasks in a work-stealing thread pool of the given size. // Note that destroying the thread pool before all tasks have completed may // result in some tasks not being executed. class threadpool_scheduler { - std::unique_ptr impl; + std::unique_ptr impl; public: - LIBASYNC_EXPORT threadpool_scheduler(threadpool_scheduler&& other); + LIBASYNC_EXPORT threadpool_scheduler(threadpool_scheduler &&other); - // Create a thread pool with the given number of threads - LIBASYNC_EXPORT threadpool_scheduler(std::size_t num_threads); + // Create a thread pool with the given number of threads + LIBASYNC_EXPORT threadpool_scheduler(std::size_t num_threads); - // Create a thread pool with the given number of threads. Call `prerun` + // Create a thread pool with the given number of threads. Call `prerun` // function before execution loop and `postrun` after. - LIBASYNC_EXPORT threadpool_scheduler(std::size_t num_threads, - std::function&& prerun_, - std::function&& postrun_); + LIBASYNC_EXPORT + threadpool_scheduler(std::size_t num_threads, std::function &&prerun_, std::function &&postrun_); - // Destroy the thread pool, tasks that haven't been started are dropped - LIBASYNC_EXPORT ~threadpool_scheduler(); + // Destroy the thread pool, tasks that haven't been started are dropped + LIBASYNC_EXPORT ~threadpool_scheduler(); - // Schedule a task to be run in the thread pool - LIBASYNC_EXPORT void schedule(task_run_handle t); + // Schedule a task to be run in the thread pool + LIBASYNC_EXPORT void schedule(task_run_handle t); }; namespace detail { @@ -153,5 +160,5 @@ namespace detail { // Work-around for Intel compiler handling decltype poorly in function returns typedef std::remove_reference::type default_scheduler_type; -} // namespace detail -} // namespace async +}// namespace detail +}// namespace async diff --git a/CMakeLists.txt b/CMakeLists.txt index 0b25ed6..89fea01 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -141,6 +141,9 @@ if(SLED_BUILD_TESTS) src/system/thread_pool_test.cc src/rx_test.cc src/uri_test.cc) + if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") + target_compile_options(sled_tests PRIVATE -Wthread-safety) + endif() target_link_libraries(sled_tests PRIVATE sled GTest::gtest GTest::gtest_main) add_test(NAME sled_tests COMMAND sled_tests) endif(SLED_BUILD_TESTS) diff --git a/include/sled/async/async.h b/include/sled/async/async.h index 35f147a..d566fe6 100644 --- a/include/sled/async/async.h +++ b/include/sled/async/async.h @@ -3,18 +3,28 @@ namespace sled { class FiberScheduler; + } namespace async { sled::FiberScheduler &default_scheduler(); -} +class task_base; +namespace detail { +void wait_for_task(task_base *wait_task); +} +}// namespace async + +#define LIBASYNC_CUSTON_EVENT #define LIBASYNC_CUSTOM_DEFAULT_SCHEDULER #include namespace sled { +void SleepWaitHandler(async::task_wait_handle t); + class FiberScheduler { public: + FiberScheduler(); void schedule(async::task_run_handle t); }; diff --git a/include/sled/synchronization/mutex.h b/include/sled/synchronization/mutex.h index 6c01f1d..1b029a6 100644 --- a/include/sled/synchronization/mutex.h +++ b/include/sled/synchronization/mutex.h @@ -22,7 +22,7 @@ namespace internal { template struct HasLockAndUnlock { template().Lock()) * = nullptr, + decltype(std::declval().Lock()) * = nullptr, decltype(std::declval().Unlock()) * = nullptr> static int Test(int); @@ -33,31 +33,32 @@ struct HasLockAndUnlock { }; }// namespace internal -using Mutex = marl::mutex; +// using Mutex = marl::mutex; -// class Mutex final { -// public: -// Mutex() = default; -// Mutex(const Mutex &) = delete; -// Mutex &operator=(const Mutex &) = delete; -// -// inline void Lock() { impl_.lock(); }; -// -// inline bool TryLock() { return impl_.try_lock(); } -// -// inline void AssertHeld() {} -// -// inline void Unlock() { impl_.unlock(); } -// -// private: -// std::mutex impl_; -// friend class ConditionVariable; -// }; - -class RecursiveMutex final { +class SLED_LOCKABLE Mutex final { public: - RecursiveMutex() = default; - RecursiveMutex(const RecursiveMutex &) = delete; + Mutex() = default; + Mutex(const Mutex &) = delete; + Mutex &operator=(const Mutex &) = delete; + + inline void Lock() SLED_EXCLUSIVE_LOCK_FUNCTION(impl_) { impl_.lock(); }; + + inline bool TryLock() SLED_EXCLUSIVE_TRYLOCK_FUNCTION(true) { return impl_.try_lock(); } + + inline void AssertHeld() SLED_ASSERT_EXCLUSIVE_LOCK(impl_) {} + + inline void Unlock() SLED_UNLOCK_FUNCTION(impl_) { impl_.unlock(); } + +private: + marl::mutex impl_; + friend class ConditionVariable; + friend class MutexLock; +}; + +class SLED_LOCKABLE RecursiveMutex final { +public: + RecursiveMutex() = default; + RecursiveMutex(const RecursiveMutex &) = delete; RecursiveMutex &operator=(const RecursiveMutex &) = delete; inline void Lock() SLED_SHARED_LOCK_FUNCTION() { impl_.lock(); } @@ -72,17 +73,14 @@ private: std::recursive_mutex impl_; }; -class RecursiveMutexLock final { +class SLED_SCOPED_CAPABILITY RecursiveMutexLock final { public: - RecursiveMutexLock(const RecursiveMutexLock &) = delete; + RecursiveMutexLock(const RecursiveMutexLock &) = delete; RecursiveMutexLock &operator=(const RecursiveMutexLock &) = delete; - explicit RecursiveMutexLock(RecursiveMutex *mutex) SLED_EXCLUSIVE_LOCK_FUNCTION(mutex) : mutex_(mutex) - { - mutex->Lock(); - } + explicit RecursiveMutexLock(RecursiveMutex *mutex) SLED_ACQUIRE_SHARED(mutex) : mutex_(mutex) { mutex->Lock(); } - ~RecursiveMutexLock() SLED_UNLOCK_FUNCTION() { mutex_->Unlock(); } + ~RecursiveMutexLock() SLED_RELEASE_SHARED(mutex_) { mutex_->Unlock(); } private: RecursiveMutex *mutex_; @@ -101,13 +99,13 @@ private: // friend class ConditionVariable; // }; // -class MutexLock final { +class SLED_SCOPED_CAPABILITY MutexLock final { public: - MutexLock(Mutex *mutex) SLED_EXCLUSIVE_LOCK_FUNCTION(mutex) : lock_(*mutex) {} + MutexLock(Mutex *mutex) SLED_ACQUIRE(mutex) : lock_(mutex->impl_) {} - ~MutexLock() SLED_UNLOCK_FUNCTION() = default; + ~MutexLock() SLED_RELEASE() = default; - MutexLock(const MutexLock &) = delete; + MutexLock(const MutexLock &) = delete; MutexLock &operator=(const MutexLock &) = delete; private: diff --git a/src/async/async.cc b/src/async/async.cc index 90128c3..44bed72 100644 --- a/src/async/async.cc +++ b/src/async/async.cc @@ -5,15 +5,6 @@ // clang-format off #include -// clang-format on -namespace async { -sled::FiberScheduler & -default_scheduler() -{ - static sled::FiberScheduler scheduler; - return scheduler; -} -}// namespace async namespace sled { @@ -25,13 +16,33 @@ SleepWaitHandler(async::task_wait_handle t) event.Wait(sled::Event::kForever); } +FiberScheduler::FiberScheduler() +{ +} + void FiberScheduler::schedule(async::task_run_handle t) { static ThreadPool thread_pool; auto move_on_copy = sled::MakeMoveOnCopy(t); - // thread_pool.PostTask([move_on_copy] { move_on_copy.value.run_with_wait_handler(SleepWaitHandler); }); thread_pool.submit([move_on_copy] { move_on_copy.value.run_with_wait_handler(SleepWaitHandler); }); + // thread_pool.submit([move_on_copy] { move_on_copy.value.run(); }); } }// namespace sled + +// clang-format on +namespace async { +sled::FiberScheduler & +default_scheduler() +{ + static sled::FiberScheduler scheduler; + return scheduler; +} + +void +detail::wait_for_task(task_base *wait_task) +{ + sled::SleepWaitHandler(task_wait_handle(wait_task)); +} +}// namespace async diff --git a/src/async/async_test.cc b/src/async/async_test.cc index 33d3ae7..58ed743 100644 --- a/src/async/async_test.cc +++ b/src/async/async_test.cc @@ -28,3 +28,12 @@ TEST(Async, parallel_for) // wg.Wait(); for (int i = 0; i < count; i++) { EXPECT_TRUE(values[i]) << i; } } + +TEST(Async, parallel_reduce) +{ + auto r = async::parallel_reduce(async::irange(1, 5), 0, [](int x, int y) { + LOGD("", "{},{}", x, y); + return x + y; + }); + LOGD("", "{}", r); +}