diff --git a/db/db_impl.cc b/db/db_impl.cc index 7b268ea..dde3711 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -35,6 +35,17 @@ namespace leveldb { +// Information kept for every waiting writer +struct DBImpl::Writer { + Status status; + WriteBatch* batch; + bool sync; + bool done; + port::CondVar cv; + + explicit Writer(port::Mutex* mu) : cv(mu) { } +}; + struct DBImpl::CompactionState { Compaction* const compaction; @@ -113,8 +124,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname) logfile_(NULL), logfile_number_(0), log_(NULL), - logger_(NULL), - logger_cv_(&mutex_), + tmp_batch_(new WriteBatch), bg_compaction_scheduled_(false), manual_compaction_(NULL) { mem_->Ref(); @@ -144,6 +154,7 @@ DBImpl::~DBImpl() { delete versions_; if (mem_ != NULL) mem_->Unref(); if (imm_ != NULL) imm_->Unref(); + delete tmp_batch_; delete log_; delete logfile_; delete table_cache_; @@ -554,13 +565,11 @@ void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) { } Status DBImpl::TEST_CompactMemTable() { - MutexLock l(&mutex_); - LoggerId self; - AcquireLoggingResponsibility(&self); - Status s = MakeRoomForWrite(true /* force compaction */); - ReleaseLoggingResponsibility(&self); + // NULL batch means just wait for earlier writes to be done + Status s = Write(WriteOptions(), NULL); if (s.ok()) { // Wait until the compaction completes + MutexLock l(&mutex_); while (imm_ != NULL && bg_error_.ok()) { bg_cv_.Wait(); } @@ -1094,38 +1103,35 @@ Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { return DB::Delete(options, key); } -// There is at most one thread that is the current logger. This call -// waits until preceding logger(s) have finished and becomes the -// current logger. -void DBImpl::AcquireLoggingResponsibility(LoggerId* self) { - while (logger_ != NULL) { - logger_cv_.Wait(); - } - logger_ = self; -} +Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { + Writer w(&mutex_); + w.batch = my_batch; + w.sync = options.sync; + w.done = false; -void DBImpl::ReleaseLoggingResponsibility(LoggerId* self) { - assert(logger_ == self); - logger_ = NULL; - logger_cv_.SignalAll(); -} - -Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { - Status status; MutexLock l(&mutex_); - LoggerId self; - AcquireLoggingResponsibility(&self); - status = MakeRoomForWrite(false); // May temporarily release lock and wait + writers_.push_back(&w); + while (!w.done && &w != writers_.front()) { + w.cv.Wait(); + } + if (w.done) { + return w.status; + } + + // May temporarily unlock and wait. + Status status = MakeRoomForWrite(my_batch == NULL); uint64_t last_sequence = versions_->LastSequence(); - if (status.ok()) { + Writer* last_writer = &w; + if (status.ok() && my_batch != NULL) { // NULL batch is for compactions + WriteBatch* updates = BuildBatchGroup(&last_writer); WriteBatchInternal::SetSequence(updates, last_sequence + 1); last_sequence += WriteBatchInternal::Count(updates); - // Add to log and apply to memtable. We can release the lock during - // this phase since the "logger_" flag protects against concurrent - // loggers and concurrent writes into mem_. + // Add to log and apply to memtable. We can release the lock + // during this phase since &w is currently responsible for logging + // and protects against concurrent loggers and concurrent writes + // into mem_. { - assert(logger_ == &self); mutex_.Unlock(); status = log_->AddRecord(WriteBatchInternal::Contents(updates)); if (status.ok() && options.sync) { @@ -1135,20 +1141,85 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { status = WriteBatchInternal::InsertInto(updates, mem_); } mutex_.Lock(); - assert(logger_ == &self); } + if (updates == tmp_batch_) tmp_batch_->Clear(); versions_->SetLastSequence(last_sequence); } - ReleaseLoggingResponsibility(&self); + + while (true) { + Writer* ready = writers_.front(); + writers_.pop_front(); + if (ready != &w) { + ready->status = status; + ready->done = true; + ready->cv.Signal(); + } + if (ready == last_writer) break; + } + + // Notify new head of write queue + if (!writers_.empty()) { + writers_.front()->cv.Signal(); + } + return status; } +// REQUIRES: Writer list must be non-empty +// REQUIRES: First writer must have a non-NULL batch +WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { + assert(!writers_.empty()); + Writer* first = writers_.front(); + WriteBatch* result = first->batch; + assert(result != NULL); + + size_t size = WriteBatchInternal::ByteSize(first->batch); + + // Allow the group to grow up to a maximum size, but if the + // original write is small, limit the growth so we do not slow + // down the small write too much. + size_t max_size = 1 << 20; + if (size <= (128<<10)) { + max_size = size + (128<<10); + } + + *last_writer = first; + std::deque::iterator iter = writers_.begin(); + ++iter; // Advance past "first" + for (; iter != writers_.end(); ++iter) { + Writer* w = *iter; + if (w->sync && !first->sync) { + // Do not include a sync write into a batch handled by a non-sync write. + break; + } + + if (w->batch != NULL) { + size += WriteBatchInternal::ByteSize(w->batch); + if (size > max_size) { + // Do not make batch too big + break; + } + + // Append to *reuslt + if (result == first->batch) { + // Switch to temporary batch instead of disturbing caller's batch + result = tmp_batch_; + assert(WriteBatchInternal::Count(result) == 0); + WriteBatchInternal::Append(result, first->batch); + } + WriteBatchInternal::Append(result, w->batch); + } + *last_writer = w; + } + return result; +} + // REQUIRES: mutex_ is held -// REQUIRES: this thread is the current logger +// REQUIRES: this thread is currently at the front of the writer queue Status DBImpl::MakeRoomForWrite(bool force) { mutex_.AssertHeld(); - assert(logger_ != NULL); + assert(!writers_.empty()); bool allow_delay = !force; Status s; while (true) { diff --git a/db/db_impl.h b/db/db_impl.h index fc40d1e..e665c0e 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -5,6 +5,7 @@ #ifndef STORAGE_LEVELDB_DB_DB_IMPL_H_ #define STORAGE_LEVELDB_DB_DB_IMPL_H_ +#include #include #include "db/dbformat.h" #include "db/log_writer.h" @@ -59,6 +60,8 @@ class DBImpl : public DB { private: friend class DB; + struct CompactionState; + struct Writer; Iterator* NewInternalIterator(const ReadOptions&, SequenceNumber* latest_snapshot); @@ -85,14 +88,8 @@ class DBImpl : public DB { Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base); - // Only thread is allowed to log at a time. - struct LoggerId { }; // Opaque identifier for logging thread - void AcquireLoggingResponsibility(LoggerId* self); - void ReleaseLoggingResponsibility(LoggerId* self); - Status MakeRoomForWrite(bool force /* compact even if there is room? */); - - struct CompactionState; + WriteBatch* BuildBatchGroup(Writer** last_writer); void MaybeScheduleCompaction(); static void BGWork(void* db); @@ -129,8 +126,11 @@ class DBImpl : public DB { WritableFile* logfile_; uint64_t logfile_number_; log::Writer* log_; - LoggerId* logger_; // NULL, or the id of the current logging thread - port::CondVar logger_cv_; // For threads waiting to log + + // Queue of writers. + std::deque writers_; + WriteBatch* tmp_batch_; + SnapshotList snapshots_; // Set of table files to protect from deletion because they are diff --git a/db/write_batch.cc b/db/write_batch.cc index a0e812f..33f4a42 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -23,6 +23,9 @@ namespace leveldb { +// WriteBatch header has an 8-byte sequence number followed by a 4-byte count. +static const size_t kHeader = 12; + WriteBatch::WriteBatch() { Clear(); } @@ -33,16 +36,16 @@ WriteBatch::Handler::~Handler() { } void WriteBatch::Clear() { rep_.clear(); - rep_.resize(12); + rep_.resize(kHeader); } Status WriteBatch::Iterate(Handler* handler) const { Slice input(rep_); - if (input.size() < 12) { + if (input.size() < kHeader) { return Status::Corruption("malformed WriteBatch (too small)"); } - input.remove_prefix(12); + input.remove_prefix(kHeader); Slice key, value; int found = 0; while (!input.empty()) { @@ -131,8 +134,14 @@ Status WriteBatchInternal::InsertInto(const WriteBatch* b, } void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { - assert(contents.size() >= 12); + assert(contents.size() >= kHeader); b->rep_.assign(contents.data(), contents.size()); } +void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) { + SetCount(dst, Count(dst) + Count(src)); + assert(src->rep_.size() >= kHeader); + dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader); +} + } // namespace leveldb diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 49aeb84..4423a7f 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -39,6 +39,8 @@ class WriteBatchInternal { static void SetContents(WriteBatch* batch, const Slice& contents); static Status InsertInto(const WriteBatch* batch, MemTable* memtable); + + static void Append(WriteBatch* dst, const WriteBatch* src); }; } // namespace leveldb diff --git a/db/write_batch_test.cc b/db/write_batch_test.cc index 1ee6d7b..9064e3d 100644 --- a/db/write_batch_test.cc +++ b/db/write_batch_test.cc @@ -18,6 +18,7 @@ static std::string PrintContents(WriteBatch* b) { mem->Ref(); std::string state; Status s = WriteBatchInternal::InsertInto(b, mem); + int count = 0; Iterator* iter = mem->NewIterator(); for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { ParsedInternalKey ikey; @@ -29,11 +30,13 @@ static std::string PrintContents(WriteBatch* b) { state.append(", "); state.append(iter->value().ToString()); state.append(")"); + count++; break; case kTypeDeletion: state.append("Delete("); state.append(ikey.user_key.ToString()); state.append(")"); + count++; break; } state.append("@"); @@ -42,6 +45,8 @@ static std::string PrintContents(WriteBatch* b) { delete iter; if (!s.ok()) { state.append("ParseError()"); + } else if (count != WriteBatchInternal::Count(b)) { + state.append("CountMismatch()"); } mem->Unref(); return state; @@ -82,6 +87,32 @@ TEST(WriteBatchTest, Corruption) { PrintContents(&batch)); } +TEST(WriteBatchTest, Append) { + WriteBatch b1, b2; + WriteBatchInternal::SetSequence(&b1, 200); + WriteBatchInternal::SetSequence(&b2, 300); + WriteBatchInternal::Append(&b1, &b2); + ASSERT_EQ("", + PrintContents(&b1)); + b2.Put("a", "va"); + WriteBatchInternal::Append(&b1, &b2); + ASSERT_EQ("Put(a, va)@200", + PrintContents(&b1)); + b2.Clear(); + b2.Put("b", "vb"); + WriteBatchInternal::Append(&b1, &b2); + ASSERT_EQ("Put(a, va)@200" + "Put(b, vb)@201", + PrintContents(&b1)); + b2.Delete("foo"); + WriteBatchInternal::Append(&b1, &b2); + ASSERT_EQ("Put(a, va)@200" + "Put(b, vb)@202" + "Put(b, vb)@201" + "Delete(foo)@203", + PrintContents(&b1)); +} + } // namespace leveldb int main(int argc, char** argv) {