Create WorkerThread, an abstraction to perform some work on an interval.

This was extracted from CrashReportUploadThread and will be re-used for the
database pruning thread.

BUG=crashpad:22
R=mark@chromium.org

Review URL: https://codereview.chromium.org/1526563003 .
This commit is contained in:
Robert Sesek 2016-01-04 17:10:58 -05:00
parent b0394744cc
commit 6d829e9af7
7 changed files with 349 additions and 73 deletions

View File

@ -33,7 +33,6 @@
#include "util/net/http_multipart_builder.h"
#include "util/net/http_transport.h"
#include "util/stdlib/map_insert.h"
#include "util/thread/thread.h"
namespace crashpad {
@ -137,78 +136,30 @@ class CallRecordUploadAttempt {
} // namespace
namespace internal {
class CrashReportUploadHelperThread final : public Thread {
public:
explicit CrashReportUploadHelperThread(CrashReportUploadThread* self)
: self_(self) {}
~CrashReportUploadHelperThread() override {}
void ThreadMain() override {
self_->ThreadMain();
}
private:
CrashReportUploadThread* self_;
DISALLOW_COPY_AND_ASSIGN(CrashReportUploadHelperThread);
};
} // namespace internal
CrashReportUploadThread::CrashReportUploadThread(CrashReportDatabase* database,
const std::string& url)
: url_(url),
database_(database),
semaphore_(0),
thread_(),
running_(false) {
// Check for pending reports every 15 minutes, even in the absence of a
// signal from the handler thread. This allows for failed uploads to be
// retried periodically, and for pending reports written by other
// processes to be recognized.
thread_(15 * 60, this) {
}
CrashReportUploadThread::~CrashReportUploadThread() {
DCHECK(!running_);
DCHECK(!thread_);
}
void CrashReportUploadThread::Start() {
DCHECK(!running_);
DCHECK(!thread_);
running_ = true;
thread_.reset(new internal::CrashReportUploadHelperThread(this));
thread_->Start();
thread_.Start(0);
}
void CrashReportUploadThread::Stop() {
DCHECK(running_);
DCHECK(thread_);
if (!running_) {
return;
}
running_ = false;
semaphore_.Signal();
thread_->Join();
thread_.reset();
thread_.Stop();
}
void CrashReportUploadThread::ReportPending() {
semaphore_.Signal();
}
void CrashReportUploadThread::ThreadMain() {
while (running_) {
ProcessPendingReports();
// Check for pending reports every 15 minutes, even in the absence of a
// signal from the handler thread. This allows for failed uploads to be
// retried periodically, and for pending reports written by other processes
// to be recognized.
semaphore_.TimedWait(15 * 60);
}
thread_.DoWorkNow();
}
void CrashReportUploadThread::ProcessPendingReports() {
@ -226,7 +177,7 @@ void CrashReportUploadThread::ProcessPendingReports() {
// Respect Stop() being called after at least one attempt to process a
// report.
if (!running_) {
if (!thread_.is_running()) {
return;
}
}
@ -377,4 +328,8 @@ CrashReportUploadThread::UploadResult CrashReportUploadThread::UploadReport(
return UploadResult::kSuccess;
}
void CrashReportUploadThread::DoWork(const WorkerThread* thread) {
ProcessPendingReports();
}
} // namespace crashpad

View File

@ -21,14 +21,10 @@
#include "base/memory/scoped_ptr.h"
#include "client/crash_report_database.h"
#include "util/synchronization/semaphore.h"
#include "util/thread/worker_thread.h"
namespace crashpad {
namespace internal {
class CrashReportUploadHelperThread;
} // namespace internal
//! \brief A thread that processes pending crash reports in a
//! CrashReportDatabase by uploading them or marking them as completed
//! without upload, as desired.
@ -42,7 +38,7 @@ class CrashReportUploadHelperThread;
//! catches reports that are added without a ReportPending() signal being
//! caught. This may happen if crash reports are added to the database by other
//! processes.
class CrashReportUploadThread {
class CrashReportUploadThread : public WorkerThread::Delegate {
public:
//! \brief Constructs a new object.
//!
@ -79,8 +75,6 @@ class CrashReportUploadThread {
void ReportPending();
private:
friend internal::CrashReportUploadHelperThread;
//! \brief The result code from UploadReport().
enum class UploadResult {
//! \brief The crash report was uploaded successfully.
@ -101,10 +95,6 @@ class CrashReportUploadThread {
kRetry,
};
//! \brief Calls ProcessPendingReports() in response to ReportPending() having
//! been called on any thread, as well as periodically on a timer.
void ThreadMain();
//! \brief Obtains all pending reports from the database, and calls
//! ProcessPendingReport() to process each one.
void ProcessPendingReports();
@ -138,11 +128,14 @@ class CrashReportUploadThread {
UploadResult UploadReport(const CrashReportDatabase::Report* report,
std::string* response_body);
// WorkerThread::Delegate:
//! \brief Calls ProcessPendingReports() in response to ReportPending() having
//! been called on any thread, as well as periodically on a timer.
void DoWork(const WorkerThread* thread) override;
std::string url_;
WorkerThread thread_;
CrashReportDatabase* database_; // weak
Semaphore semaphore_; // TODO(mark): Use a condition variable instead?
scoped_ptr<internal::CrashReportUploadHelperThread> thread_;
bool running_;
};
} // namespace crashpad

View File

@ -0,0 +1,95 @@
// Copyright 2015 The Crashpad Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "util/thread/worker_thread.h"
#include "base/logging.h"
#include "util/synchronization/semaphore.h"
#include "util/thread/thread.h"
namespace crashpad {
namespace internal {
class WorkerThreadImpl final : public Thread {
public:
WorkerThreadImpl(WorkerThread* self, double initial_work_delay)
: semaphore_(0),
initial_work_delay_(initial_work_delay),
self_(self) {}
~WorkerThreadImpl() {}
void ThreadMain() override {
if (initial_work_delay_ > 0)
semaphore_.TimedWait(initial_work_delay_);
while (self_->running_) {
self_->delegate_->DoWork(self_);
semaphore_.TimedWait(self_->work_interval_);
}
}
void SignalSemaphore() {
semaphore_.Signal();
}
private:
// TODO(mark): Use a condition variable instead?
Semaphore semaphore_;
double initial_work_delay_;
WorkerThread* self_; // Weak, owns this.
};
} // namespace internal
WorkerThread::WorkerThread(double work_interval,
WorkerThread::Delegate* delegate)
: work_interval_(work_interval),
delegate_(delegate),
impl_(),
running_(false) {}
WorkerThread::~WorkerThread() {
DCHECK(!running_);
}
void WorkerThread::Start(double initial_work_delay) {
DCHECK(!impl_);
DCHECK(!running_);
running_ = true;
impl_.reset(new internal::WorkerThreadImpl(this, initial_work_delay));
impl_->Start();
}
void WorkerThread::Stop() {
DCHECK(running_);
DCHECK(impl_);
if (!running_)
return;
running_ = false;
impl_->SignalSemaphore();
impl_->Join();
impl_.reset();
}
void WorkerThread::DoWorkNow() {
DCHECK(running_);
impl_->SignalSemaphore();
}
} // namespace crashpad

View File

@ -0,0 +1,93 @@
// Copyright 2015 The Crashpad Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef CRASHPAD_UTIL_THREAD_WORKER_THREAD_H_
#define CRASHPAD_UTIL_THREAD_WORKER_THREAD_H_
#include "base/basictypes.h"
#include "base/memory/scoped_ptr.h"
namespace crashpad {
namespace internal {
class WorkerThreadImpl;
} // namespace internal
//! \brief A WorkerThread executes its Delegate's DoWork method repeatedly on a
//! dedicated thread at a set time interval.
class WorkerThread {
public:
//! \brief An interface for doing work on a WorkerThread.
class Delegate {
public:
//! \brief The work function executed by the WorkerThread every work
//! interval.
virtual void DoWork(const WorkerThread* thread) = 0;
protected:
virtual ~Delegate() {}
};
//! \brief Creates a new WorkerThread that is not yet running.
//!
//! \param[in] work_interval The time interval in seconds at which the \a
//! delegate runs. The interval counts from the completion of
//! Delegate::DoWork() to the next invocation.
//! \param[in] delegate The work delegate to invoke every interval.
WorkerThread(double work_interval, Delegate* delegate);
~WorkerThread();
//! \brief Starts the worker thread.
//!
//! This may not be called if the thread is_running().
//!
//! \param[in] initial_work_delay The amount of time in seconds to wait
//! before invoking the \a delegate for the first time. Pass `0` for
//! no delay.
void Start(double initial_work_delay);
//! \brief Stops the worker thread from running.
//!
//! This may only be called if the thread is_running().
//!
//! If the work function is currently executing, this will not interrupt it.
//! This method stops any future work from occurring. This method is safe
//! to call from any thread with the exception of the worker thread itself,
//! as this joins the thread.
void Stop();
//! \brief Interrupts a \a work_interval to execute the work function
//! immediately. This invokes Delegate::DoWork() on the thread, without
//! waiting for the current \a work_interval to expire. After the
//! delegate is invoked, the WorkerThread will start waiting for a new
//! \a work_interval.
void DoWorkNow();
//! \return `true` if the thread is running, `false` if it is not.
bool is_running() const { return running_; }
private:
friend class internal::WorkerThreadImpl;
double work_interval_;
Delegate* delegate_; // weak
scoped_ptr<internal::WorkerThreadImpl> impl_;
bool running_;
DISALLOW_COPY_AND_ASSIGN(WorkerThread);
};
} // namespace crashpad
#endif // CRASHPAD_UTIL_THREAD_WORKER_THREAD_H_

View File

@ -0,0 +1,137 @@
// Copyright 2015 The Crashpad Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "util/thread/worker_thread.h"
#include "gtest/gtest.h"
#include "util/misc/clock.h"
#include "util/synchronization/semaphore.h"
namespace crashpad {
namespace test {
namespace {
const uint64_t kNanosecondsPerSecond = 1E9;
class WorkDelegate : public WorkerThread::Delegate {
public:
WorkDelegate() {}
~WorkDelegate() {}
void DoWork(const WorkerThread* thread) override {
if (++work_count_ == waiting_for_count_)
semaphore_.Signal();
}
//! \brief Suspends the calling thread until the DoWork() has been called
//! the specified number of times.
void WaitForWorkCount(int times) {
waiting_for_count_ = times;
semaphore_.Wait();
}
int work_count() const { return work_count_; }
private:
Semaphore semaphore_{0};
int work_count_ = 0;
int waiting_for_count_ = -1;
DISALLOW_COPY_AND_ASSIGN(WorkDelegate);
};
TEST(WorkerThread, DoWork) {
WorkDelegate delegate;
WorkerThread thread(0.05, &delegate);
uint64_t start = ClockMonotonicNanoseconds();
thread.Start(0);
EXPECT_TRUE(thread.is_running());
delegate.WaitForWorkCount(2);
thread.Stop();
EXPECT_FALSE(thread.is_running());
EXPECT_GE(1 * kNanosecondsPerSecond, ClockMonotonicNanoseconds() - start);
}
TEST(WorkerThread, StopBeforeDoWork) {
WorkDelegate delegate;
WorkerThread thread(1, &delegate);
thread.Start(15);
thread.Stop();
EXPECT_EQ(0, delegate.work_count());
}
TEST(WorkerThread, Restart) {
WorkDelegate delegate;
WorkerThread thread(0.05, &delegate);
thread.Start(0);
EXPECT_TRUE(thread.is_running());
delegate.WaitForWorkCount(1);
thread.Stop();
ASSERT_FALSE(thread.is_running());
thread.Start(0);
delegate.WaitForWorkCount(2);
thread.Stop();
ASSERT_FALSE(thread.is_running());
}
TEST(WorkerThread, DoWorkNow) {
WorkDelegate delegate;
WorkerThread thread(100, &delegate);
thread.Start(0);
EXPECT_TRUE(thread.is_running());
uint64_t start = ClockMonotonicNanoseconds();
delegate.WaitForWorkCount(1);
EXPECT_EQ(1, delegate.work_count());
thread.DoWorkNow();
delegate.WaitForWorkCount(2);
thread.Stop();
EXPECT_EQ(2, delegate.work_count());
EXPECT_GE(100 * kNanosecondsPerSecond, ClockMonotonicNanoseconds() - start);
}
TEST(WorkerThread, DoWorkNowAtStart) {
WorkDelegate delegate;
WorkerThread thread(100, &delegate);
uint64_t start = ClockMonotonicNanoseconds();
thread.Start(100);
EXPECT_TRUE(thread.is_running());
thread.DoWorkNow();
delegate.WaitForWorkCount(1);
EXPECT_EQ(1, delegate.work_count());
EXPECT_GE(100 * kNanosecondsPerSecond, ClockMonotonicNanoseconds() - start);
thread.Stop();
EXPECT_FALSE(thread.is_running());
}
} // namespace
} // namespace test
} // namespace crashpad

View File

@ -152,6 +152,8 @@
'thread/thread_log_messages.h',
'thread/thread_posix.cc',
'thread/thread_win.cc',
'thread/worker_thread.cc',
'thread/worker_thread.h',
'win/address_types.h',
'win/capture_context.asm',
'win/capture_context.h',

View File

@ -81,6 +81,7 @@
'synchronization/semaphore_test.cc',
'thread/thread_log_messages_test.cc',
'thread/thread_test.cc',
'thread/worker_thread_test.cc',
'win/capture_context_test.cc',
'win/command_line_test.cc',
'win/critical_section_with_debug_info_test.cc',