added group commit; drastically speeds up mult-threaded synchronous write workloads

This commit is contained in:
Sanjay Ghemawat 2012-03-08 16:23:21 -08:00
parent 015d26f8be
commit d79762e273
5 changed files with 162 additions and 49 deletions

View File

@ -35,6 +35,17 @@
namespace leveldb { namespace leveldb {
// Information kept for every waiting writer
struct DBImpl::Writer {
Status status;
WriteBatch* batch;
bool sync;
bool done;
port::CondVar cv;
explicit Writer(port::Mutex* mu) : cv(mu) { }
};
struct DBImpl::CompactionState { struct DBImpl::CompactionState {
Compaction* const compaction; Compaction* const compaction;
@ -113,8 +124,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
logfile_(NULL), logfile_(NULL),
logfile_number_(0), logfile_number_(0),
log_(NULL), log_(NULL),
logger_(NULL), tmp_batch_(new WriteBatch),
logger_cv_(&mutex_),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(false),
manual_compaction_(NULL) { manual_compaction_(NULL) {
mem_->Ref(); mem_->Ref();
@ -144,6 +154,7 @@ DBImpl::~DBImpl() {
delete versions_; delete versions_;
if (mem_ != NULL) mem_->Unref(); if (mem_ != NULL) mem_->Unref();
if (imm_ != NULL) imm_->Unref(); if (imm_ != NULL) imm_->Unref();
delete tmp_batch_;
delete log_; delete log_;
delete logfile_; delete logfile_;
delete table_cache_; delete table_cache_;
@ -554,13 +565,11 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
} }
Status DBImpl::TEST_CompactMemTable() { Status DBImpl::TEST_CompactMemTable() {
MutexLock l(&mutex_); // NULL batch means just wait for earlier writes to be done
LoggerId self; Status s = Write(WriteOptions(), NULL);
AcquireLoggingResponsibility(&self);
Status s = MakeRoomForWrite(true /* force compaction */);
ReleaseLoggingResponsibility(&self);
if (s.ok()) { if (s.ok()) {
// Wait until the compaction completes // Wait until the compaction completes
MutexLock l(&mutex_);
while (imm_ != NULL && bg_error_.ok()) { while (imm_ != NULL && bg_error_.ok()) {
bg_cv_.Wait(); bg_cv_.Wait();
} }
@ -1094,38 +1103,35 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) {
return DB::Delete(options, key); return DB::Delete(options, key);
} }
// There is at most one thread that is the current logger. This call Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) {
// waits until preceding logger(s) have finished and becomes the Writer w(&mutex_);
// current logger. w.batch = my_batch;
void DBImpl::AcquireLoggingResponsibility(LoggerId* self) { w.sync = options.sync;
while (logger_ != NULL) { w.done = false;
logger_cv_.Wait();
}
logger_ = self;
}
void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) {
assert(logger_ == self);
logger_ = NULL;
logger_cv_.SignalAll();
}
Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
Status status;
MutexLock l(&mutex_); MutexLock l(&mutex_);
LoggerId self; writers_.push_back(&w);
AcquireLoggingResponsibility(&self); while (!w.done && &w != writers_.front()) {
status = MakeRoomForWrite(false); // May temporarily release lock and wait w.cv.Wait();
}
if (w.done) {
return w.status;
}
// May temporarily unlock and wait.
Status status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence(); uint64_t last_sequence = versions_->LastSequence();
if (status.ok()) { Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
WriteBatch* updates = BuildBatchGroup(&last_writer);
WriteBatchInternal::SetSequence(updates, last_sequence + 1); WriteBatchInternal::SetSequence(updates, last_sequence + 1);
last_sequence += WriteBatchInternal::Count(updates); last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock during // Add to log and apply to memtable. We can release the lock
// this phase since the "logger_" flag protects against concurrent // during this phase since &w is currently responsible for logging
// loggers and concurrent writes into mem_. // and protects against concurrent loggers and concurrent writes
// into mem_.
{ {
assert(logger_ == &self);
mutex_.Unlock(); mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates)); status = log_->AddRecord(WriteBatchInternal::Contents(updates));
if (status.ok() && options.sync) { if (status.ok() && options.sync) {
@ -1135,20 +1141,85 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
status = WriteBatchInternal::InsertInto(updates, mem_); status = WriteBatchInternal::InsertInto(updates, mem_);
} }
mutex_.Lock(); mutex_.Lock();
assert(logger_ == &self);
} }
if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence); versions_->SetLastSequence(last_sequence);
} }
ReleaseLoggingResponsibility(&self);
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
if (ready == last_writer) break;
}
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status; return status;
} }
// REQUIRES: Writer list must be non-empty
// REQUIRES: First writer must have a non-NULL batch
WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) {
assert(!writers_.empty());
Writer* first = writers_.front();
WriteBatch* result = first->batch;
assert(result != NULL);
size_t size = WriteBatchInternal::ByteSize(first->batch);
// Allow the group to grow up to a maximum size, but if the
// original write is small, limit the growth so we do not slow
// down the small write too much.
size_t max_size = 1 << 20;
if (size <= (128<<10)) {
max_size = size + (128<<10);
}
*last_writer = first;
std::deque<Writer*>::iterator iter = writers_.begin();
++iter; // Advance past "first"
for (; iter != writers_.end(); ++iter) {
Writer* w = *iter;
if (w->sync && !first->sync) {
// Do not include a sync write into a batch handled by a non-sync write.
break;
}
if (w->batch != NULL) {
size += WriteBatchInternal::ByteSize(w->batch);
if (size > max_size) {
// Do not make batch too big
break;
}
// Append to *reuslt
if (result == first->batch) {
// Switch to temporary batch instead of disturbing caller's batch
result = tmp_batch_;
assert(WriteBatchInternal::Count(result) == 0);
WriteBatchInternal::Append(result, first->batch);
}
WriteBatchInternal::Append(result, w->batch);
}
*last_writer = w;
}
return result;
}
// REQUIRES: mutex_ is held // REQUIRES: mutex_ is held
// REQUIRES: this thread is the current logger // REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) { Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld(); mutex_.AssertHeld();
assert(logger_ != NULL); assert(!writers_.empty());
bool allow_delay = !force; bool allow_delay = !force;
Status s; Status s;
while (true) { while (true) {

View File

@ -5,6 +5,7 @@
#ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_ #ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_
#define STORAGE_LEVELDB_DB_DB_IMPL_H_ #define STORAGE_LEVELDB_DB_DB_IMPL_H_
#include <deque>
#include <set> #include <set>
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/log_writer.h" #include "db/log_writer.h"
@ -59,6 +60,8 @@ class DBImpl : public DB {
private: private:
friend class DB; friend class DB;
struct CompactionState;
struct Writer;
Iterator* NewInternalIterator(const ReadOptions&, Iterator* NewInternalIterator(const ReadOptions&,
SequenceNumber* latest_snapshot); SequenceNumber* latest_snapshot);
@ -85,14 +88,8 @@ class DBImpl : public DB {
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base); Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base);
// Only thread is allowed to log at a time.
struct LoggerId { }; // Opaque identifier for logging thread
void AcquireLoggingResponsibility(LoggerId* self);
void ReleaseLoggingResponsibility(LoggerId* self);
Status MakeRoomForWrite(bool force /* compact even if there is room? */); Status MakeRoomForWrite(bool force /* compact even if there is room? */);
WriteBatch* BuildBatchGroup(Writer** last_writer);
struct CompactionState;
void MaybeScheduleCompaction(); void MaybeScheduleCompaction();
static void BGWork(void* db); static void BGWork(void* db);
@ -129,8 +126,11 @@ class DBImpl : public DB {
WritableFile* logfile_; WritableFile* logfile_;
uint64_t logfile_number_; uint64_t logfile_number_;
log::Writer* log_; log::Writer* log_;
LoggerId* logger_; // NULL, or the id of the current logging thread
port::CondVar logger_cv_; // For threads waiting to log // Queue of writers.
std::deque<Writer*> writers_;
WriteBatch* tmp_batch_;
SnapshotList snapshots_; SnapshotList snapshots_;
// Set of table files to protect from deletion because they are // Set of table files to protect from deletion because they are

View File

@ -23,6 +23,9 @@
namespace leveldb { namespace leveldb {
// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
static const size_t kHeader = 12;
WriteBatch::WriteBatch() { WriteBatch::WriteBatch() {
Clear(); Clear();
} }
@ -33,16 +36,16 @@ WriteBatch::Handler::~Handler() { }
void WriteBatch::Clear() { void WriteBatch::Clear() {
rep_.clear(); rep_.clear();
rep_.resize(12); rep_.resize(kHeader);
} }
Status WriteBatch::Iterate(Handler* handler) const { Status WriteBatch::Iterate(Handler* handler) const {
Slice input(rep_); Slice input(rep_);
if (input.size() < 12) { if (input.size() < kHeader) {
return Status::Corruption("malformed WriteBatch (too small)"); return Status::Corruption("malformed WriteBatch (too small)");
} }
input.remove_prefix(12); input.remove_prefix(kHeader);
Slice key, value; Slice key, value;
int found = 0; int found = 0;
while (!input.empty()) { while (!input.empty()) {
@ -131,8 +134,14 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b,
} }
void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
assert(contents.size() >= 12); assert(contents.size() >= kHeader);
b->rep_.assign(contents.data(), contents.size()); b->rep_.assign(contents.data(), contents.size());
} }
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
SetCount(dst, Count(dst) + Count(src));
assert(src->rep_.size() >= kHeader);
dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
}
} // namespace leveldb } // namespace leveldb

View File

@ -39,6 +39,8 @@ class WriteBatchInternal {
static void SetContents(WriteBatch* batch, const Slice& contents); static void SetContents(WriteBatch* batch, const Slice& contents);
static Status InsertInto(const WriteBatch* batch, MemTable* memtable); static Status InsertInto(const WriteBatch* batch, MemTable* memtable);
static void Append(WriteBatch* dst, const WriteBatch* src);
}; };
} // namespace leveldb } // namespace leveldb

View File

@ -18,6 +18,7 @@ static std::string PrintContents(WriteBatch* b) {
mem->Ref(); mem->Ref();
std::string state; std::string state;
Status s = WriteBatchInternal::InsertInto(b, mem); Status s = WriteBatchInternal::InsertInto(b, mem);
int count = 0;
Iterator* iter = mem->NewIterator(); Iterator* iter = mem->NewIterator();
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
ParsedInternalKey ikey; ParsedInternalKey ikey;
@ -29,11 +30,13 @@ static std::string PrintContents(WriteBatch* b) {
state.append(", "); state.append(", ");
state.append(iter->value().ToString()); state.append(iter->value().ToString());
state.append(")"); state.append(")");
count++;
break; break;
case kTypeDeletion: case kTypeDeletion:
state.append("Delete("); state.append("Delete(");
state.append(ikey.user_key.ToString()); state.append(ikey.user_key.ToString());
state.append(")"); state.append(")");
count++;
break; break;
} }
state.append("@"); state.append("@");
@ -42,6 +45,8 @@ static std::string PrintContents(WriteBatch* b) {
delete iter; delete iter;
if (!s.ok()) { if (!s.ok()) {
state.append("ParseError()"); state.append("ParseError()");
} else if (count != WriteBatchInternal::Count(b)) {
state.append("CountMismatch()");
} }
mem->Unref(); mem->Unref();
return state; return state;
@ -82,6 +87,32 @@ TEST(WriteBatchTest, Corruption) {
PrintContents(&batch)); PrintContents(&batch));
} }
TEST(WriteBatchTest, Append) {
WriteBatch b1, b2;
WriteBatchInternal::SetSequence(&b1, 200);
WriteBatchInternal::SetSequence(&b2, 300);
WriteBatchInternal::Append(&b1, &b2);
ASSERT_EQ("",
PrintContents(&b1));
b2.Put("a", "va");
WriteBatchInternal::Append(&b1, &b2);
ASSERT_EQ("Put(a, va)@200",
PrintContents(&b1));
b2.Clear();
b2.Put("b", "vb");
WriteBatchInternal::Append(&b1, &b2);
ASSERT_EQ("Put(a, va)@200"
"Put(b, vb)@201",
PrintContents(&b1));
b2.Delete("foo");
WriteBatchInternal::Append(&b1, &b2);
ASSERT_EQ("Put(a, va)@200"
"Put(b, vb)@202"
"Put(b, vb)@201"
"Delete(foo)@203",
PrintContents(&b1));
}
} // namespace leveldb } // namespace leveldb
int main(int argc, char** argv) { int main(int argc, char** argv) {