From 73d5834eceee8efa9a8ccfec77dc096a9e8ba18a Mon Sep 17 00:00:00 2001 From: costan Date: Tue, 11 Sep 2018 10:39:08 -0700 Subject: [PATCH] Rework threading in env_posix.cc. This commit replaces the use of pthreads in the POSIX port with std::thread and port::Mutex + port::CondVar. This is intended to simplify porting the env to a different platform. The indirect use of pthreads in PosixLogger is replaced with std::thread::id(), based on an approach prototyped by @cmumfordx@. The pthreads dependency in CMakeFiles is not removed, because some C++ standard library implementations must be linked against pthreads for std::thread use. Figuring out this dependency is left for future work. Switching away from pthreads also fixes https://github.com/google/leveldb/issues/381 ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=212478311 --- util/env_posix.cc | 142 +++++++++++++++++++------------------------- util/posix_logger.h | 31 ++++++---- 2 files changed, 81 insertions(+), 92 deletions(-) diff --git a/util/env_posix.cc b/util/env_posix.cc index b201c5b..d6b0d61 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -6,7 +6,6 @@ #include #include #include -#include #include #include #include @@ -19,9 +18,10 @@ #include #include -#include #include +#include #include +#include #include "leveldb/env.h" #include "leveldb/slice.h" @@ -600,20 +600,13 @@ class PosixEnv : public Env { return Status::OK(); } - static uint64_t gettid() { - pthread_t tid = pthread_self(); - uint64_t thread_id = 0; - memcpy(&thread_id, &tid, std::min(sizeof(thread_id), sizeof(tid))); - return thread_id; - } - virtual Status NewLogger(const std::string& fname, Logger** result) { FILE* f = fopen(fname.c_str(), "w"); if (f == nullptr) { *result = nullptr; return PosixError(fname, errno); } else { - *result = new PosixLogger(f, &PosixEnv::gettid); + *result = new PosixLogger(f); return Status::OK(); } } @@ -629,29 +622,33 @@ class PosixEnv : public Env { } private: - void PthreadCall(const char* label, int result) { - if (result != 0) { - fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); - abort(); - } + void BackgroundThreadMain(); + + static void BackgroundThreadEntryPoint(PosixEnv* env) { + env->BackgroundThreadMain(); } - // BGThread() is the body of the background thread - void BGThread(); - static void* BGThreadWrapper(void* arg) { - reinterpret_cast(arg)->BGThread(); - return nullptr; - } + // Stores the work item data in a Schedule() call. + // + // Instances are constructed on the thread calling Schedule() and used on the + // background thread. + // + // This structure is thread-safe beacuse it is immutable. + struct BackgroundWorkItem { + explicit BackgroundWorkItem(void (*function)(void* arg), void* arg) + : function(function), arg(arg) {} - pthread_mutex_t mu_; - pthread_cond_t bgsignal_; - pthread_t bgthread_; - bool started_bgthread_; + void (* const function)(void*); + void* const arg; + }; - // Entry per Schedule() call - struct BGItem { void* arg; void (*function)(void*); }; - typedef std::deque BGQueue; - BGQueue queue_; + + port::Mutex background_work_mutex_; + port::CondVar background_work_cv_ GUARDED_BY(background_work_mutex_); + bool started_background_thread_ GUARDED_BY(background_work_mutex_); + + std::queue background_work_queue_ + GUARDED_BY(background_work_mutex_); PosixLockTable locks_; Limiter mmap_limit_; @@ -687,79 +684,60 @@ static intptr_t MaxOpenFiles() { } PosixEnv::PosixEnv() - : started_bgthread_(false), + : background_work_cv_(&background_work_mutex_), + started_background_thread_(false), mmap_limit_(MaxMmaps()), fd_limit_(MaxOpenFiles()) { - PthreadCall("mutex_init", pthread_mutex_init(&mu_, nullptr)); - PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, nullptr)); } -void PosixEnv::Schedule(void (*function)(void*), void* arg) { - PthreadCall("lock", pthread_mutex_lock(&mu_)); +void PosixEnv::Schedule( + void (*background_work_function)(void* background_work_arg), + void* background_work_arg) { + MutexLock lock(&background_work_mutex_); - // Start background thread if necessary - if (!started_bgthread_) { - started_bgthread_ = true; - PthreadCall( - "create thread", - pthread_create(&bgthread_, nullptr, &PosixEnv::BGThreadWrapper, this)); + // Start the background thread, if we haven't done so already. + if (!started_background_thread_) { + started_background_thread_ = true; + std::thread background_thread(PosixEnv::BackgroundThreadEntryPoint, this); + background_thread.detach(); } - // If the queue is currently empty, the background thread may currently be - // waiting. - if (queue_.empty()) { - PthreadCall("signal", pthread_cond_signal(&bgsignal_)); + // If the queue is empty, the background thread may be waiting for work. + if (background_work_queue_.empty()) { + background_work_cv_.Signal(); } - // Add to priority queue - queue_.push_back(BGItem()); - queue_.back().function = function; - queue_.back().arg = arg; - - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); + background_work_queue_.emplace(background_work_function, background_work_arg); } -void PosixEnv::BGThread() { +void PosixEnv::BackgroundThreadMain() { while (true) { - // Wait until there is an item that is ready to run - PthreadCall("lock", pthread_mutex_lock(&mu_)); - while (queue_.empty()) { - PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_)); + background_work_mutex_.Lock(); + + // Wait until there is work to be done. + while (background_work_queue_.empty()) { + background_work_cv_.Wait(); } - void (*function)(void*) = queue_.front().function; - void* arg = queue_.front().arg; - queue_.pop_front(); + assert(!background_work_queue_.empty()); + auto background_work_function = + background_work_queue_.front().function; + void* background_work_arg = background_work_queue_.front().arg; + background_work_queue_.pop(); - PthreadCall("unlock", pthread_mutex_unlock(&mu_)); - (*function)(arg); + background_work_mutex_.Unlock(); + background_work_function(background_work_arg); } } -namespace { -struct StartThreadState { - void (*user_function)(void*); - void* arg; -}; -} -static void* StartThreadWrapper(void* arg) { - StartThreadState* state = reinterpret_cast(arg); - state->user_function(state->arg); - delete state; - return nullptr; -} - -void PosixEnv::StartThread(void (*function)(void* arg), void* arg) { - pthread_t t; - StartThreadState* state = new StartThreadState; - state->user_function = function; - state->arg = arg; - PthreadCall("start thread", - pthread_create(&t, nullptr, &StartThreadWrapper, state)); -} - } // namespace +void PosixEnv::StartThread(void (*thread_main)(void* thread_main_arg), + void* thread_main_arg) { + std::thread new_thread(thread_main, thread_main_arg); + new_thread.detach(); +} + static pthread_once_t once = PTHREAD_ONCE_INIT; static Env* default_env; static void InitDefaultEnv() { default_env = new PosixEnv; } diff --git a/util/posix_logger.h b/util/posix_logger.h index a01a4fe..28b290e 100644 --- a/util/posix_logger.h +++ b/util/posix_logger.h @@ -11,10 +11,11 @@ #include #include -#include #include #include #include +#include +#include #include "leveldb/env.h" @@ -22,7 +23,10 @@ namespace leveldb { class PosixLogger final : public Logger { public: - PosixLogger(FILE* fp, uint64_t (*gettid)()) : fp_(fp), gettid_(gettid) { + // Creates a logger that writes to the given file. + // + // The PosixLogger instance takes ownership of the file handle. + explicit PosixLogger(std::FILE* fp) : fp_(fp) { assert(fp != nullptr); } @@ -38,7 +42,14 @@ class PosixLogger final : public Logger { struct std::tm now_components; ::localtime_r(&now_seconds, &now_components); - const uint64_t thread_id = (*gettid_)(); + // Record the thread ID. + constexpr const int kMaxThreadIdSize = 32; + std::ostringstream thread_stream; + thread_stream << std::this_thread::get_id(); + std::string thread_id = thread_stream.str(); + if (thread_id.size() > kMaxThreadIdSize) { + thread_id.resize(kMaxThreadIdSize); + } // We first attempt to print into a stack-allocated buffer. If this attempt // fails, we make a second attempt with a dynamically allocated buffer. @@ -57,7 +68,7 @@ class PosixLogger final : public Logger { // Print the header into the buffer. int buffer_offset = snprintf( buffer, buffer_size, - "%04d/%02d/%02d-%02d:%02d:%02d.%06d %" PRIx64 " ", + "%04d/%02d/%02d-%02d:%02d:%02d.%06d %s", now_components.tm_year + 1900, now_components.tm_mon + 1, now_components.tm_mday, @@ -65,12 +76,13 @@ class PosixLogger final : public Logger { now_components.tm_min, now_components.tm_sec, static_cast(now_timeval.tv_usec), - thread_id); + thread_id.c_str()); - // The header can be at most 48 characters (10 date + 15 time + 3 spacing - // + 20 thread ID), which should fit comfortably into the static buffer. - assert(buffer_offset <= 48); - static_assert(48 < kStackBufferSize, + // The header can be at most 28 characters (10 date + 15 time + + // 3 spacing) plus the thread ID, which should fit comfortably into the + // static buffer. + assert(buffer_offset <= 28 + kMaxThreadIdSize); + static_assert(28 + kMaxThreadIdSize < kStackBufferSize, "stack-allocated buffer may not fit the message header"); assert(buffer_offset < buffer_size); @@ -120,7 +132,6 @@ class PosixLogger final : public Logger { private: std::FILE* const fp_; - uint64_t (* const gettid_)(); // Return the thread id for the current thread. }; } // namespace leveldb