From bd1d8b71cff16b46723e4fda59550e7c3808a95a Mon Sep 17 00:00:00 2001 From: Victor Zarubkin Date: Tue, 27 Feb 2018 21:41:32 +0300 Subject: [PATCH] #0 [Gui] Added background jobs to thread pool and removed detached threads from code --- profiler_gui/blocks_tree_widget.cpp | 36 +++----- profiler_gui/descriptors_tree_widget.cpp | 49 ++++------ profiler_gui/thread_pool.cpp | 111 +++++++++++++++++++---- profiler_gui/thread_pool.h | 19 +++- profiler_gui/thread_pool_task.cpp | 4 +- profiler_gui/tree_widget_loader.cpp | 39 +++----- 6 files changed, 151 insertions(+), 107 deletions(-) diff --git a/profiler_gui/blocks_tree_widget.cpp b/profiler_gui/blocks_tree_widget.cpp index 18b1fd3..4048f9b 100644 --- a/profiler_gui/blocks_tree_widget.cpp +++ b/profiler_gui/blocks_tree_widget.cpp @@ -79,18 +79,9 @@ #include #include #include -#include #include "blocks_tree_widget.h" #include "globals.h" - -#ifdef _WIN32 -#include - -#ifdef __MINGW32__ -#include -#endif - -#endif +#include "thread_pool.h" #ifdef max #undef max @@ -436,23 +427,18 @@ void BlocksTreeWidget::clearSilent(bool _global) m_items.clear(); m_roots.clear(); - ::std::vector topLevelItems; - topLevelItems.reserve(static_cast(topLevelItemCount())); - for (int i = topLevelItemCount() - 1; i >= 0; --i) - topLevelItems.push_back(takeTopLevelItem(i)); - - auto deleter_thread = ::std::thread([](decltype(topLevelItems) _items) + if (topLevelItemCount() != 0) { -#ifdef _WIN32 - SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST); -#endif + std::vector topLevelItems; + topLevelItems.reserve(static_cast(topLevelItemCount())); + for (int i = topLevelItemCount() - 1; i >= 0; --i) + topLevelItems.push_back(takeTopLevelItem(i)); - for (auto item : _items) - delete item; - - }, ::std::move(topLevelItems)); - - deleter_thread.detach(); + ThreadPool::instance().backgroundJob([=] { + for (auto item : topLevelItems) + delete item; + }); + } //clear(); diff --git a/profiler_gui/descriptors_tree_widget.cpp b/profiler_gui/descriptors_tree_widget.cpp index 926a52f..3e5ea27 100644 --- a/profiler_gui/descriptors_tree_widget.cpp +++ b/profiler_gui/descriptors_tree_widget.cpp @@ -70,19 +70,10 @@ #include #include #include -#include #include "descriptors_tree_widget.h" #include "arbitrary_value_inspector.h" #include "globals.h" - -#ifdef _WIN32 -#include - -#ifdef __MINGW32__ -#include -#endif - -#endif +#include "thread_pool.h" #ifdef max #undef max @@ -362,30 +353,26 @@ void DescriptorsTreeWidget::clearSilent(bool _global) m_highlightItems.clear(); m_items.clear(); - ::std::vector topLevelItems; - topLevelItems.reserve(topLevelItemCount()); - for (int i = topLevelItemCount() - 1; i >= 0; --i) + if (topLevelItemCount() != 0) { - const bool expanded = !_global && topLevelItem(i)->isExpanded(); - auto item = takeTopLevelItem(i); - if (expanded) - m_expandedFilesTemp.insert(item->text(DESC_COL_FILE_LINE).toStdString()); - topLevelItems.push_back(item); + ::std::vector topLevelItems; + topLevelItems.reserve(static_cast(topLevelItemCount())); + + for (int i = topLevelItemCount() - 1; i >= 0; --i) + { + const bool expanded = !_global && topLevelItem(i)->isExpanded(); + auto item = takeTopLevelItem(i); + if (expanded) + m_expandedFilesTemp.insert(item->text(DESC_COL_FILE_LINE).toStdString()); + topLevelItems.push_back(item); + } + + ThreadPool::instance().backgroundJob([=] { + for (auto item : topLevelItems) + delete item; + }); } - auto deleter_thread = ::std::thread([](decltype(topLevelItems) _items) - { -#ifdef _WIN32 - SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST); -#endif - - for (auto item : _items) - delete item; - - }, ::std::move(topLevelItems)); - - deleter_thread.detach(); - //clear(); } diff --git a/profiler_gui/thread_pool.cpp b/profiler_gui/thread_pool.cpp index 50fb8f9..7b605f4 100644 --- a/profiler_gui/thread_pool.cpp +++ b/profiler_gui/thread_pool.cpp @@ -56,6 +56,33 @@ #include #endif +#ifdef _WIN32 +// For including SetThreadPriority() +# include +# ifdef __MINGW32__ +# include +# endif +#else +// For including pthread_setschedprio() +# include +#endif + +void setLowestThreadPriority() +{ +#ifdef _WIN32 + SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST); +#else + pthread_attr_t attr; + if (pthread_attr_init(&attr) == 0) + { + int policy = 0; + if (pthread_attr_getschedpolicy(&attr, &policy) == 0) + pthread_setschedprio(pthread_self(), sched_get_priority_min(policy)); + pthread_attr_destroy(&attr); + } +#endif +} + ThreadPool& ThreadPool::instance() { static ThreadPool pool; @@ -64,67 +91,79 @@ ThreadPool& ThreadPool::instance() ThreadPool::ThreadPool() { - const auto threadsCount = std::max(std::thread::hardware_concurrency(), 2U); + m_threads.reserve(std::thread::hardware_concurrency() + 1); - m_threads.reserve(threadsCount); - std::generate_n(std::back_inserter(m_threads), threadsCount, [this] { - return std::thread(&ThreadPool::work, this); + // N threads for main tasks + std::generate_n(std::back_inserter(m_threads), std::thread::hardware_concurrency(), [this] { + return std::thread(&ThreadPool::tasksWorker, this); }); + + // One thread for background jobs + m_threads.emplace_back(&ThreadPool::jobsWorker, this); } ThreadPool::~ThreadPool() { m_interrupt.store(true, std::memory_order_release); - m_cv.notify_all(); + m_tasks.cv.notify_all(); + m_backgroundJobs.cv.notify_all(); for (auto& thread : m_threads) thread.join(); } +void ThreadPool::backgroundJob(std::function&& func) +{ + m_backgroundJobs.mutex.lock(); + m_backgroundJobs.queue.push_back(std::move(func)); + m_backgroundJobs.mutex.unlock(); + m_backgroundJobs.cv.notify_one(); +} + void ThreadPool::enqueue(ThreadPoolTask& task) { - m_mutex.lock(); - m_tasks.emplace_back(task); - m_mutex.unlock(); - m_cv.notify_one(); + m_tasks.mutex.lock(); + m_tasks.queue.emplace_back(task); + m_tasks.mutex.unlock(); + m_tasks.cv.notify_one(); } void ThreadPool::dequeue(ThreadPoolTask& task) { - const std::lock_guard lock(m_mutex); + const std::lock_guard lock(m_tasks.mutex); if (task.status() != TaskStatus::Enqueued) return; - for (auto it = m_tasks.begin(); it != m_tasks.end(); ++it) + for (auto it = m_tasks.queue.begin(); it != m_tasks.queue.end(); ++it) { if (&it->get() == &task) { - m_tasks.erase(it); + m_tasks.queue.erase(it); break; } } } -void ThreadPool::work() +void ThreadPool::tasksWorker() { while (true) { - std::unique_lock lock(m_mutex); - m_cv.wait(lock, [this] { return !m_tasks.empty() || m_interrupt.load(std::memory_order_acquire); }); + std::unique_lock lock(m_tasks.mutex); + m_tasks.cv.wait(lock, [this] { return !m_tasks.queue.empty() || m_interrupt.load(std::memory_order_acquire); }); if (m_interrupt.load(std::memory_order_acquire)) break; while (true) // execute all available tasks { - if (m_tasks.empty()) + if (m_tasks.queue.empty()) break; // the lock will be released on the outer loop new iteration - auto& task = m_tasks.front().get(); + auto& task = m_tasks.queue.front().get(); task.setStatus(TaskStatus::Processing); - m_tasks.pop_front(); + m_tasks.queue.pop_front(); - // unlock to permit tasks execution for other worker threads + // unlock to permit tasks execution for other worker threads and for adding new tasks lock.unlock(); // execute task @@ -135,3 +174,37 @@ void ThreadPool::work() } } } + +void ThreadPool::jobsWorker() +{ + setLowestThreadPriority(); // Background thread has lowest priority + + while (true) + { + std::unique_lock lock(m_backgroundJobs.mutex); + m_backgroundJobs.cv.wait(lock, [this] { + return !m_backgroundJobs.queue.empty() || m_interrupt.load(std::memory_order_acquire); + }); + + if (m_interrupt.load(std::memory_order_acquire)) + break; + + while (true) // execute all available tasks + { + if (m_backgroundJobs.queue.empty()) + break; // the lock will be released on the outer loop new iteration + + auto job = std::move(m_backgroundJobs.queue.front()); + m_backgroundJobs.queue.pop_front(); + + // unlock to permit adding new jobs while executing current job + lock.unlock(); + + // execute job + job(); + + // lock again to check if there are new jobs in the queue + lock.lock(); + } + } +} diff --git a/profiler_gui/thread_pool.h b/profiler_gui/thread_pool.h index 00c38e1..1f4968d 100644 --- a/profiler_gui/thread_pool.h +++ b/profiler_gui/thread_pool.h @@ -56,15 +56,23 @@ #include #include #include +#include class ThreadPool EASY_FINAL { friend ThreadPoolTask; + template + struct Jobs + { + std::deque queue; + std::mutex mutex; + std::condition_variable cv; + }; + + Jobs > m_tasks; + Jobs > m_backgroundJobs; std::vector m_threads; - std::deque > m_tasks; - std::mutex m_mutex; - std::condition_variable m_cv; std::atomic_bool m_interrupt; ThreadPool(); @@ -75,11 +83,14 @@ public: static ThreadPool& instance(); + void backgroundJob(std::function&& func); + private: void enqueue(ThreadPoolTask& task); void dequeue(ThreadPoolTask& task); - void work(); + void tasksWorker(); + void jobsWorker(); }; // end of class ThreadPool. diff --git a/profiler_gui/thread_pool_task.cpp b/profiler_gui/thread_pool_task.cpp index f1ce45e..1efcde7 100644 --- a/profiler_gui/thread_pool_task.cpp +++ b/profiler_gui/thread_pool_task.cpp @@ -64,18 +64,18 @@ ThreadPoolTask::~ThreadPoolTask() void ThreadPoolTask::enqueue(std::function&& func, std::atomic_bool& interruptFlag) { dequeue(); + setStatus(TaskStatus::Enqueued); m_interrupt = & interruptFlag; m_interrupt->store(false, std::memory_order_release); m_func = std::move(func); - setStatus(TaskStatus::Enqueued); ThreadPool::instance().enqueue(*this); } void ThreadPoolTask::dequeue() { - if (m_interrupt == nullptr) + if (m_interrupt == nullptr || status() == TaskStatus::Finished) return; m_interrupt->store(true, std::memory_order_release); diff --git a/profiler_gui/tree_widget_loader.cpp b/profiler_gui/tree_widget_loader.cpp index 76be3c7..3247803 100644 --- a/profiler_gui/tree_widget_loader.cpp +++ b/profiler_gui/tree_widget_loader.cpp @@ -57,16 +57,7 @@ #include "tree_widget_loader.h" #include "tree_widget_item.h" #include "globals.h" -#include - -#ifdef _WIN32 -#include - -#ifdef __MINGW32__ -#include -#endif - -#endif +#include "thread_pool.h" #ifdef max #undef max @@ -141,25 +132,21 @@ void TreeWidgetLoader::interrupt(bool _wait) m_bDone.store(false, std::memory_order_release); m_progress.store(0, std::memory_order_release); - if (!_wait) + if (!m_topLevelItems.empty()) { - auto deleter_thread = ::std::thread([](decltype(m_topLevelItems) _items) + if (!_wait) { -#ifdef _WIN32 - SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST); -#endif - - for (auto item : _items) + auto items = std::move(m_topLevelItems); + ThreadPool::instance().backgroundJob([=] { + for (auto item : items) + delete item.second; + }); + } + else + { + for (auto item : m_topLevelItems) delete item.second; - - }, ::std::move(m_topLevelItems)); - - deleter_thread.detach(); - } - else - { - for (auto item : m_topLevelItems) - delete item.second; + } } m_items.clear();