0
0
mirror of https://github.com/yse/easy_profiler.git synced 2024-12-26 08:01:51 +08:00

#0 [Gui] Added background jobs to thread pool and removed detached threads from code

This commit is contained in:
Victor Zarubkin 2018-02-27 21:41:32 +03:00
parent b07de42a48
commit bd1d8b71cf
6 changed files with 151 additions and 107 deletions

View File

@ -79,18 +79,9 @@
#include <QByteArray>
#include <QDebug>
#include <QApplication>
#include <thread>
#include "blocks_tree_widget.h"
#include "globals.h"
#ifdef _WIN32
#include <Windows.h>
#ifdef __MINGW32__
#include <processthreadsapi.h>
#endif
#endif
#include "thread_pool.h"
#ifdef max
#undef max
@ -436,23 +427,18 @@ void BlocksTreeWidget::clearSilent(bool _global)
m_items.clear();
m_roots.clear();
::std::vector<QTreeWidgetItem*> topLevelItems;
topLevelItems.reserve(static_cast<size_t>(topLevelItemCount()));
for (int i = topLevelItemCount() - 1; i >= 0; --i)
topLevelItems.push_back(takeTopLevelItem(i));
auto deleter_thread = ::std::thread([](decltype(topLevelItems) _items)
if (topLevelItemCount() != 0)
{
#ifdef _WIN32
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST);
#endif
std::vector<QTreeWidgetItem*> topLevelItems;
topLevelItems.reserve(static_cast<size_t>(topLevelItemCount()));
for (int i = topLevelItemCount() - 1; i >= 0; --i)
topLevelItems.push_back(takeTopLevelItem(i));
for (auto item : _items)
delete item;
}, ::std::move(topLevelItems));
deleter_thread.detach();
ThreadPool::instance().backgroundJob([=] {
for (auto item : topLevelItems)
delete item;
});
}
//clear();

View File

@ -70,19 +70,10 @@
#include <QSplitter>
#include <QVariant>
#include <QTimer>
#include <thread>
#include "descriptors_tree_widget.h"
#include "arbitrary_value_inspector.h"
#include "globals.h"
#ifdef _WIN32
#include <Windows.h>
#ifdef __MINGW32__
#include <processthreadsapi.h>
#endif
#endif
#include "thread_pool.h"
#ifdef max
#undef max
@ -362,30 +353,26 @@ void DescriptorsTreeWidget::clearSilent(bool _global)
m_highlightItems.clear();
m_items.clear();
::std::vector<QTreeWidgetItem*> topLevelItems;
topLevelItems.reserve(topLevelItemCount());
for (int i = topLevelItemCount() - 1; i >= 0; --i)
if (topLevelItemCount() != 0)
{
const bool expanded = !_global && topLevelItem(i)->isExpanded();
auto item = takeTopLevelItem(i);
if (expanded)
m_expandedFilesTemp.insert(item->text(DESC_COL_FILE_LINE).toStdString());
topLevelItems.push_back(item);
::std::vector<QTreeWidgetItem*> topLevelItems;
topLevelItems.reserve(static_cast<size_t>(topLevelItemCount()));
for (int i = topLevelItemCount() - 1; i >= 0; --i)
{
const bool expanded = !_global && topLevelItem(i)->isExpanded();
auto item = takeTopLevelItem(i);
if (expanded)
m_expandedFilesTemp.insert(item->text(DESC_COL_FILE_LINE).toStdString());
topLevelItems.push_back(item);
}
ThreadPool::instance().backgroundJob([=] {
for (auto item : topLevelItems)
delete item;
});
}
auto deleter_thread = ::std::thread([](decltype(topLevelItems) _items)
{
#ifdef _WIN32
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST);
#endif
for (auto item : _items)
delete item;
}, ::std::move(topLevelItems));
deleter_thread.detach();
//clear();
}

View File

@ -56,6 +56,33 @@
#include <iterator>
#endif
#ifdef _WIN32
// For including SetThreadPriority()
# include <Windows.h>
# ifdef __MINGW32__
# include <processthreadsapi.h>
# endif
#else
// For including pthread_setschedprio()
# include <pthread.h>
#endif
void setLowestThreadPriority()
{
#ifdef _WIN32
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST);
#else
pthread_attr_t attr;
if (pthread_attr_init(&attr) == 0)
{
int policy = 0;
if (pthread_attr_getschedpolicy(&attr, &policy) == 0)
pthread_setschedprio(pthread_self(), sched_get_priority_min(policy));
pthread_attr_destroy(&attr);
}
#endif
}
ThreadPool& ThreadPool::instance()
{
static ThreadPool pool;
@ -64,67 +91,79 @@ ThreadPool& ThreadPool::instance()
ThreadPool::ThreadPool()
{
const auto threadsCount = std::max(std::thread::hardware_concurrency(), 2U);
m_threads.reserve(std::thread::hardware_concurrency() + 1);
m_threads.reserve(threadsCount);
std::generate_n(std::back_inserter(m_threads), threadsCount, [this] {
return std::thread(&ThreadPool::work, this);
// N threads for main tasks
std::generate_n(std::back_inserter(m_threads), std::thread::hardware_concurrency(), [this] {
return std::thread(&ThreadPool::tasksWorker, this);
});
// One thread for background jobs
m_threads.emplace_back(&ThreadPool::jobsWorker, this);
}
ThreadPool::~ThreadPool()
{
m_interrupt.store(true, std::memory_order_release);
m_cv.notify_all();
m_tasks.cv.notify_all();
m_backgroundJobs.cv.notify_all();
for (auto& thread : m_threads)
thread.join();
}
void ThreadPool::backgroundJob(std::function<void()>&& func)
{
m_backgroundJobs.mutex.lock();
m_backgroundJobs.queue.push_back(std::move(func));
m_backgroundJobs.mutex.unlock();
m_backgroundJobs.cv.notify_one();
}
void ThreadPool::enqueue(ThreadPoolTask& task)
{
m_mutex.lock();
m_tasks.emplace_back(task);
m_mutex.unlock();
m_cv.notify_one();
m_tasks.mutex.lock();
m_tasks.queue.emplace_back(task);
m_tasks.mutex.unlock();
m_tasks.cv.notify_one();
}
void ThreadPool::dequeue(ThreadPoolTask& task)
{
const std::lock_guard<std::mutex> lock(m_mutex);
const std::lock_guard<std::mutex> lock(m_tasks.mutex);
if (task.status() != TaskStatus::Enqueued)
return;
for (auto it = m_tasks.begin(); it != m_tasks.end(); ++it)
for (auto it = m_tasks.queue.begin(); it != m_tasks.queue.end(); ++it)
{
if (&it->get() == &task)
{
m_tasks.erase(it);
m_tasks.queue.erase(it);
break;
}
}
}
void ThreadPool::work()
void ThreadPool::tasksWorker()
{
while (true)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cv.wait(lock, [this] { return !m_tasks.empty() || m_interrupt.load(std::memory_order_acquire); });
std::unique_lock<std::mutex> lock(m_tasks.mutex);
m_tasks.cv.wait(lock, [this] { return !m_tasks.queue.empty() || m_interrupt.load(std::memory_order_acquire); });
if (m_interrupt.load(std::memory_order_acquire))
break;
while (true) // execute all available tasks
{
if (m_tasks.empty())
if (m_tasks.queue.empty())
break; // the lock will be released on the outer loop new iteration
auto& task = m_tasks.front().get();
auto& task = m_tasks.queue.front().get();
task.setStatus(TaskStatus::Processing);
m_tasks.pop_front();
m_tasks.queue.pop_front();
// unlock to permit tasks execution for other worker threads
// unlock to permit tasks execution for other worker threads and for adding new tasks
lock.unlock();
// execute task
@ -135,3 +174,37 @@ void ThreadPool::work()
}
}
}
void ThreadPool::jobsWorker()
{
setLowestThreadPriority(); // Background thread has lowest priority
while (true)
{
std::unique_lock<std::mutex> lock(m_backgroundJobs.mutex);
m_backgroundJobs.cv.wait(lock, [this] {
return !m_backgroundJobs.queue.empty() || m_interrupt.load(std::memory_order_acquire);
});
if (m_interrupt.load(std::memory_order_acquire))
break;
while (true) // execute all available tasks
{
if (m_backgroundJobs.queue.empty())
break; // the lock will be released on the outer loop new iteration
auto job = std::move(m_backgroundJobs.queue.front());
m_backgroundJobs.queue.pop_front();
// unlock to permit adding new jobs while executing current job
lock.unlock();
// execute job
job();
// lock again to check if there are new jobs in the queue
lock.lock();
}
}
}

View File

@ -56,15 +56,23 @@
#include <deque>
#include <condition_variable>
#include <thread>
#include <functional>
class ThreadPool EASY_FINAL
{
friend ThreadPoolTask;
template <class T>
struct Jobs
{
std::deque<T> queue;
std::mutex mutex;
std::condition_variable cv;
};
Jobs<std::reference_wrapper<ThreadPoolTask> > m_tasks;
Jobs<std::function<void()> > m_backgroundJobs;
std::vector<std::thread> m_threads;
std::deque<std::reference_wrapper<ThreadPoolTask> > m_tasks;
std::mutex m_mutex;
std::condition_variable m_cv;
std::atomic_bool m_interrupt;
ThreadPool();
@ -75,11 +83,14 @@ public:
static ThreadPool& instance();
void backgroundJob(std::function<void()>&& func);
private:
void enqueue(ThreadPoolTask& task);
void dequeue(ThreadPoolTask& task);
void work();
void tasksWorker();
void jobsWorker();
}; // end of class ThreadPool.

View File

@ -64,18 +64,18 @@ ThreadPoolTask::~ThreadPoolTask()
void ThreadPoolTask::enqueue(std::function<void()>&& func, std::atomic_bool& interruptFlag)
{
dequeue();
setStatus(TaskStatus::Enqueued);
m_interrupt = & interruptFlag;
m_interrupt->store(false, std::memory_order_release);
m_func = std::move(func);
setStatus(TaskStatus::Enqueued);
ThreadPool::instance().enqueue(*this);
}
void ThreadPoolTask::dequeue()
{
if (m_interrupt == nullptr)
if (m_interrupt == nullptr || status() == TaskStatus::Finished)
return;
m_interrupt->store(true, std::memory_order_release);

View File

@ -57,16 +57,7 @@
#include "tree_widget_loader.h"
#include "tree_widget_item.h"
#include "globals.h"
#include <thread>
#ifdef _WIN32
#include <Windows.h>
#ifdef __MINGW32__
#include <processthreadsapi.h>
#endif
#endif
#include "thread_pool.h"
#ifdef max
#undef max
@ -141,25 +132,21 @@ void TreeWidgetLoader::interrupt(bool _wait)
m_bDone.store(false, std::memory_order_release);
m_progress.store(0, std::memory_order_release);
if (!_wait)
if (!m_topLevelItems.empty())
{
auto deleter_thread = ::std::thread([](decltype(m_topLevelItems) _items)
if (!_wait)
{
#ifdef _WIN32
SetThreadPriority(GetCurrentThread(), THREAD_PRIORITY_LOWEST);
#endif
for (auto item : _items)
auto items = std::move(m_topLevelItems);
ThreadPool::instance().backgroundJob([=] {
for (auto item : items)
delete item.second;
});
}
else
{
for (auto item : m_topLevelItems)
delete item.second;
}, ::std::move(m_topLevelItems));
deleter_thread.detach();
}
else
{
for (auto item : m_topLevelItems)
delete item.second;
}
}
m_items.clear();