LevelDB 1.13
Fix issues 77, 87, 182, 190. Additionally, fix the bug described in https://groups.google.com/d/msg/leveldb/yL6h1mAOc20/vLU64RylIdMJ where a large contiguous keyspace of deleted data was not getting compacted. Also fix a bug where options.max_open_files was not getting clamped properly.
This commit is contained in:
parent
5bd76dc10d
commit
748539c183
6
Makefile
6
Makefile
@ -31,6 +31,7 @@ TESTHARNESS = ./util/testharness.o $(TESTUTIL)
|
|||||||
|
|
||||||
TESTS = \
|
TESTS = \
|
||||||
arena_test \
|
arena_test \
|
||||||
|
autocompact_test \
|
||||||
bloom_test \
|
bloom_test \
|
||||||
c_test \
|
c_test \
|
||||||
cache_test \
|
cache_test \
|
||||||
@ -70,7 +71,7 @@ SHARED = $(SHARED1)
|
|||||||
else
|
else
|
||||||
# Update db.h if you change these.
|
# Update db.h if you change these.
|
||||||
SHARED_MAJOR = 1
|
SHARED_MAJOR = 1
|
||||||
SHARED_MINOR = 12
|
SHARED_MINOR = 13
|
||||||
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
|
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
|
||||||
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
|
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
|
||||||
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
|
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
|
||||||
@ -114,6 +115,9 @@ leveldbutil: db/leveldb_main.o $(LIBOBJECTS)
|
|||||||
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
$(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
$(CXX) $(LDFLAGS) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
||||||
|
|
||||||
|
autocompact_test: db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
|
$(CXX) $(LDFLAGS) db/autocompact_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
||||||
|
|
||||||
bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
$(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
$(CXX) $(LDFLAGS) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
||||||
|
|
||||||
|
118
db/autocompact_test.cc
Normal file
118
db/autocompact_test.cc
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
// Copyright (c) 2013 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
|
#include "leveldb/db.h"
|
||||||
|
#include "db/db_impl.h"
|
||||||
|
#include "leveldb/cache.h"
|
||||||
|
#include "util/testharness.h"
|
||||||
|
#include "util/testutil.h"
|
||||||
|
|
||||||
|
namespace leveldb {
|
||||||
|
|
||||||
|
class AutoCompactTest {
|
||||||
|
public:
|
||||||
|
std::string dbname_;
|
||||||
|
Cache* tiny_cache_;
|
||||||
|
Options options_;
|
||||||
|
DB* db_;
|
||||||
|
|
||||||
|
AutoCompactTest() {
|
||||||
|
dbname_ = test::TmpDir() + "/autocompact_test";
|
||||||
|
tiny_cache_ = NewLRUCache(100);
|
||||||
|
options_.block_cache = tiny_cache_;
|
||||||
|
DestroyDB(dbname_, options_);
|
||||||
|
options_.create_if_missing = true;
|
||||||
|
options_.compression = kNoCompression;
|
||||||
|
ASSERT_OK(DB::Open(options_, dbname_, &db_));
|
||||||
|
}
|
||||||
|
|
||||||
|
~AutoCompactTest() {
|
||||||
|
delete db_;
|
||||||
|
DestroyDB(dbname_, Options());
|
||||||
|
delete tiny_cache_;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string Key(int i) {
|
||||||
|
char buf[100];
|
||||||
|
snprintf(buf, sizeof(buf), "key%06d", i);
|
||||||
|
return std::string(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t Size(const Slice& start, const Slice& limit) {
|
||||||
|
Range r(start, limit);
|
||||||
|
uint64_t size;
|
||||||
|
db_->GetApproximateSizes(&r, 1, &size);
|
||||||
|
return size;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DoReads(int n);
|
||||||
|
};
|
||||||
|
|
||||||
|
static const int kValueSize = 200 * 1024;
|
||||||
|
static const int kTotalSize = 100 * 1024 * 1024;
|
||||||
|
static const int kCount = kTotalSize / kValueSize;
|
||||||
|
|
||||||
|
// Read through the first n keys repeatedly and check that they get
|
||||||
|
// compacted (verified by checking the size of the key space).
|
||||||
|
void AutoCompactTest::DoReads(int n) {
|
||||||
|
std::string value(kValueSize, 'x');
|
||||||
|
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
|
||||||
|
|
||||||
|
// Fill database
|
||||||
|
for (int i = 0; i < kCount; i++) {
|
||||||
|
ASSERT_OK(db_->Put(WriteOptions(), Key(i), value));
|
||||||
|
}
|
||||||
|
ASSERT_OK(dbi->TEST_CompactMemTable());
|
||||||
|
|
||||||
|
// Delete everything
|
||||||
|
for (int i = 0; i < kCount; i++) {
|
||||||
|
ASSERT_OK(db_->Delete(WriteOptions(), Key(i)));
|
||||||
|
}
|
||||||
|
ASSERT_OK(dbi->TEST_CompactMemTable());
|
||||||
|
|
||||||
|
// Get initial measurement of the space we will be reading.
|
||||||
|
const int64_t initial_size = Size(Key(0), Key(n));
|
||||||
|
const int64_t initial_other_size = Size(Key(n), Key(kCount));
|
||||||
|
|
||||||
|
// Read until size drops significantly.
|
||||||
|
std::string limit_key = Key(n);
|
||||||
|
for (int read = 0; true; read++) {
|
||||||
|
ASSERT_LT(read, 100) << "Taking too long to compact";
|
||||||
|
Iterator* iter = db_->NewIterator(ReadOptions());
|
||||||
|
for (iter->SeekToFirst();
|
||||||
|
iter->Valid() && iter->key().ToString() < limit_key;
|
||||||
|
iter->Next()) {
|
||||||
|
// Drop data
|
||||||
|
}
|
||||||
|
delete iter;
|
||||||
|
// Wait a little bit to allow any triggered compactions to complete.
|
||||||
|
Env::Default()->SleepForMicroseconds(1000000);
|
||||||
|
uint64_t size = Size(Key(0), Key(n));
|
||||||
|
fprintf(stderr, "iter %3d => %7.3f MB [other %7.3f MB]\n",
|
||||||
|
read+1, size/1048576.0, Size(Key(n), Key(kCount))/1048576.0);
|
||||||
|
if (size <= initial_size/10) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that the size of the key space not touched by the reads
|
||||||
|
// is pretty much unchanged.
|
||||||
|
const int64_t final_other_size = Size(Key(n), Key(kCount));
|
||||||
|
ASSERT_LE(final_other_size, initial_other_size + 1048576);
|
||||||
|
ASSERT_GE(final_other_size, initial_other_size/5 - 1048576);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(AutoCompactTest, ReadAll) {
|
||||||
|
DoReads(kCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(AutoCompactTest, ReadHalf) {
|
||||||
|
DoReads(kCount/2);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace leveldb
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
return leveldb::test::RunAllTests();
|
||||||
|
}
|
@ -35,6 +35,7 @@ class CorruptionTest {
|
|||||||
CorruptionTest() {
|
CorruptionTest() {
|
||||||
tiny_cache_ = NewLRUCache(100);
|
tiny_cache_ = NewLRUCache(100);
|
||||||
options_.env = &env_;
|
options_.env = &env_;
|
||||||
|
options_.block_cache = tiny_cache_;
|
||||||
dbname_ = test::TmpDir() + "/db_test";
|
dbname_ = test::TmpDir() + "/db_test";
|
||||||
DestroyDB(dbname_, options_);
|
DestroyDB(dbname_, options_);
|
||||||
|
|
||||||
@ -50,17 +51,14 @@ class CorruptionTest {
|
|||||||
delete tiny_cache_;
|
delete tiny_cache_;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status TryReopen(Options* options = NULL) {
|
Status TryReopen() {
|
||||||
delete db_;
|
delete db_;
|
||||||
db_ = NULL;
|
db_ = NULL;
|
||||||
Options opt = (options ? *options : options_);
|
return DB::Open(options_, dbname_, &db_);
|
||||||
opt.env = &env_;
|
|
||||||
opt.block_cache = tiny_cache_;
|
|
||||||
return DB::Open(opt, dbname_, &db_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Reopen(Options* options = NULL) {
|
void Reopen() {
|
||||||
ASSERT_OK(TryReopen(options));
|
ASSERT_OK(TryReopen());
|
||||||
}
|
}
|
||||||
|
|
||||||
void RepairDB() {
|
void RepairDB() {
|
||||||
@ -92,6 +90,10 @@ class CorruptionTest {
|
|||||||
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
|
||||||
uint64_t key;
|
uint64_t key;
|
||||||
Slice in(iter->key());
|
Slice in(iter->key());
|
||||||
|
if (in == "" || in == "~") {
|
||||||
|
// Ignore boundary keys.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (!ConsumeDecimalNumber(&in, &key) ||
|
if (!ConsumeDecimalNumber(&in, &key) ||
|
||||||
!in.empty() ||
|
!in.empty() ||
|
||||||
key < next_expected) {
|
key < next_expected) {
|
||||||
@ -233,7 +235,7 @@ TEST(CorruptionTest, TableFile) {
|
|||||||
dbi->TEST_CompactRange(1, NULL, NULL);
|
dbi->TEST_CompactRange(1, NULL, NULL);
|
||||||
|
|
||||||
Corrupt(kTableFile, 100, 1);
|
Corrupt(kTableFile, 100, 1);
|
||||||
Check(99, 99);
|
Check(90, 99);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(CorruptionTest, TableFileIndexData) {
|
TEST(CorruptionTest, TableFileIndexData) {
|
||||||
@ -299,7 +301,7 @@ TEST(CorruptionTest, CompactionInputError) {
|
|||||||
ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
|
ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
|
||||||
|
|
||||||
Corrupt(kTableFile, 100, 1);
|
Corrupt(kTableFile, 100, 1);
|
||||||
Check(9, 9);
|
Check(5, 9);
|
||||||
|
|
||||||
// Force compactions by writing lots of values
|
// Force compactions by writing lots of values
|
||||||
Build(10000);
|
Build(10000);
|
||||||
@ -307,32 +309,23 @@ TEST(CorruptionTest, CompactionInputError) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
TEST(CorruptionTest, CompactionInputErrorParanoid) {
|
TEST(CorruptionTest, CompactionInputErrorParanoid) {
|
||||||
Options options;
|
options_.paranoid_checks = true;
|
||||||
options.paranoid_checks = true;
|
options_.write_buffer_size = 512 << 10;
|
||||||
options.write_buffer_size = 1048576;
|
Reopen();
|
||||||
Reopen(&options);
|
|
||||||
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
|
DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
|
||||||
|
|
||||||
// Fill levels >= 1 so memtable compaction outputs to level 1
|
// Make multiple inputs so we need to compact.
|
||||||
for (int level = 1; level < config::kNumLevels; level++) {
|
for (int i = 0; i < 2; i++) {
|
||||||
dbi->Put(WriteOptions(), "", "begin");
|
Build(10);
|
||||||
dbi->Put(WriteOptions(), "~", "end");
|
|
||||||
dbi->TEST_CompactMemTable();
|
dbi->TEST_CompactMemTable();
|
||||||
|
Corrupt(kTableFile, 100, 1);
|
||||||
|
env_.SleepForMicroseconds(100000);
|
||||||
}
|
}
|
||||||
|
dbi->CompactRange(NULL, NULL);
|
||||||
|
|
||||||
Build(10);
|
// Write must fail because of corrupted table
|
||||||
dbi->TEST_CompactMemTable();
|
|
||||||
ASSERT_EQ(1, Property("leveldb.num-files-at-level0"));
|
|
||||||
|
|
||||||
Corrupt(kTableFile, 100, 1);
|
|
||||||
Check(9, 9);
|
|
||||||
|
|
||||||
// Write must eventually fail because of corrupted table
|
|
||||||
Status s;
|
|
||||||
std::string tmp1, tmp2;
|
std::string tmp1, tmp2;
|
||||||
for (int i = 0; i < 10000 && s.ok(); i++) {
|
Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2));
|
||||||
s = db_->Put(WriteOptions(), Key(i, &tmp1), Value(i, &tmp2));
|
|
||||||
}
|
|
||||||
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
|
ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,14 +113,14 @@ Options SanitizeOptions(const std::string& dbname,
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
|
||||||
: env_(options.env),
|
: env_(raw_options.env),
|
||||||
internal_comparator_(options.comparator),
|
internal_comparator_(raw_options.comparator),
|
||||||
internal_filter_policy_(options.filter_policy),
|
internal_filter_policy_(raw_options.filter_policy),
|
||||||
options_(SanitizeOptions(
|
options_(SanitizeOptions(dbname, &internal_comparator_,
|
||||||
dbname, &internal_comparator_, &internal_filter_policy_, options)),
|
&internal_filter_policy_, raw_options)),
|
||||||
owns_info_log_(options_.info_log != options.info_log),
|
owns_info_log_(options_.info_log != raw_options.info_log),
|
||||||
owns_cache_(options_.block_cache != options.block_cache),
|
owns_cache_(options_.block_cache != raw_options.block_cache),
|
||||||
dbname_(dbname),
|
dbname_(dbname),
|
||||||
db_lock_(NULL),
|
db_lock_(NULL),
|
||||||
shutting_down_(NULL),
|
shutting_down_(NULL),
|
||||||
@ -130,6 +130,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
|||||||
logfile_(NULL),
|
logfile_(NULL),
|
||||||
logfile_number_(0),
|
logfile_number_(0),
|
||||||
log_(NULL),
|
log_(NULL),
|
||||||
|
seed_(0),
|
||||||
tmp_batch_(new WriteBatch),
|
tmp_batch_(new WriteBatch),
|
||||||
bg_compaction_scheduled_(false),
|
bg_compaction_scheduled_(false),
|
||||||
manual_compaction_(NULL),
|
manual_compaction_(NULL),
|
||||||
@ -138,7 +139,7 @@ DBImpl::DBImpl(const Options& options, const std::string& dbname)
|
|||||||
has_imm_.Release_Store(NULL);
|
has_imm_.Release_Store(NULL);
|
||||||
|
|
||||||
// Reserve ten files or so for other uses and give the rest to TableCache.
|
// Reserve ten files or so for other uses and give the rest to TableCache.
|
||||||
const int table_cache_size = options.max_open_files - kNumNonTableCacheFiles;
|
const int table_cache_size = options_.max_open_files - kNumNonTableCacheFiles;
|
||||||
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
|
table_cache_ = new TableCache(dbname_, &options_, table_cache_size);
|
||||||
|
|
||||||
versions_ = new VersionSet(dbname_, &options_, table_cache_,
|
versions_ = new VersionSet(dbname_, &options_, table_cache_,
|
||||||
@ -1027,7 +1028,8 @@ static void CleanupIteratorState(void* arg1, void* arg2) {
|
|||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
|
Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
|
||||||
SequenceNumber* latest_snapshot) {
|
SequenceNumber* latest_snapshot,
|
||||||
|
uint32_t* seed) {
|
||||||
IterState* cleanup = new IterState;
|
IterState* cleanup = new IterState;
|
||||||
mutex_.Lock();
|
mutex_.Lock();
|
||||||
*latest_snapshot = versions_->LastSequence();
|
*latest_snapshot = versions_->LastSequence();
|
||||||
@ -1051,13 +1053,15 @@ Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,
|
|||||||
cleanup->version = versions_->current();
|
cleanup->version = versions_->current();
|
||||||
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
|
internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL);
|
||||||
|
|
||||||
|
*seed = ++seed_;
|
||||||
mutex_.Unlock();
|
mutex_.Unlock();
|
||||||
return internal_iter;
|
return internal_iter;
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterator* DBImpl::TEST_NewInternalIterator() {
|
Iterator* DBImpl::TEST_NewInternalIterator() {
|
||||||
SequenceNumber ignored;
|
SequenceNumber ignored;
|
||||||
return NewInternalIterator(ReadOptions(), &ignored);
|
uint32_t ignored_seed;
|
||||||
|
return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
|
int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() {
|
||||||
@ -1114,12 +1118,21 @@ Status DBImpl::Get(const ReadOptions& options,
|
|||||||
|
|
||||||
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
|
Iterator* DBImpl::NewIterator(const ReadOptions& options) {
|
||||||
SequenceNumber latest_snapshot;
|
SequenceNumber latest_snapshot;
|
||||||
Iterator* internal_iter = NewInternalIterator(options, &latest_snapshot);
|
uint32_t seed;
|
||||||
|
Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);
|
||||||
return NewDBIterator(
|
return NewDBIterator(
|
||||||
&dbname_, env_, user_comparator(), internal_iter,
|
this, user_comparator(), iter,
|
||||||
(options.snapshot != NULL
|
(options.snapshot != NULL
|
||||||
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
|
? reinterpret_cast<const SnapshotImpl*>(options.snapshot)->number_
|
||||||
: latest_snapshot));
|
: latest_snapshot),
|
||||||
|
seed);
|
||||||
|
}
|
||||||
|
|
||||||
|
void DBImpl::RecordReadSample(Slice key) {
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
if (versions_->current()->RecordReadSample(key)) {
|
||||||
|
MaybeScheduleCompaction();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const Snapshot* DBImpl::GetSnapshot() {
|
const Snapshot* DBImpl::GetSnapshot() {
|
||||||
|
@ -59,13 +59,19 @@ class DBImpl : public DB {
|
|||||||
// file at a level >= 1.
|
// file at a level >= 1.
|
||||||
int64_t TEST_MaxNextLevelOverlappingBytes();
|
int64_t TEST_MaxNextLevelOverlappingBytes();
|
||||||
|
|
||||||
|
// Record a sample of bytes read at the specified internal key.
|
||||||
|
// Samples are taken approximately once every config::kReadBytesPeriod
|
||||||
|
// bytes.
|
||||||
|
void RecordReadSample(Slice key);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend class DB;
|
friend class DB;
|
||||||
struct CompactionState;
|
struct CompactionState;
|
||||||
struct Writer;
|
struct Writer;
|
||||||
|
|
||||||
Iterator* NewInternalIterator(const ReadOptions&,
|
Iterator* NewInternalIterator(const ReadOptions&,
|
||||||
SequenceNumber* latest_snapshot);
|
SequenceNumber* latest_snapshot,
|
||||||
|
uint32_t* seed);
|
||||||
|
|
||||||
Status NewDB();
|
Status NewDB();
|
||||||
|
|
||||||
@ -135,6 +141,7 @@ class DBImpl : public DB {
|
|||||||
WritableFile* logfile_;
|
WritableFile* logfile_;
|
||||||
uint64_t logfile_number_;
|
uint64_t logfile_number_;
|
||||||
log::Writer* log_;
|
log::Writer* log_;
|
||||||
|
uint32_t seed_; // For sampling.
|
||||||
|
|
||||||
// Queue of writers.
|
// Queue of writers.
|
||||||
std::deque<Writer*> writers_;
|
std::deque<Writer*> writers_;
|
||||||
|
@ -5,12 +5,14 @@
|
|||||||
#include "db/db_iter.h"
|
#include "db/db_iter.h"
|
||||||
|
|
||||||
#include "db/filename.h"
|
#include "db/filename.h"
|
||||||
|
#include "db/db_impl.h"
|
||||||
#include "db/dbformat.h"
|
#include "db/dbformat.h"
|
||||||
#include "leveldb/env.h"
|
#include "leveldb/env.h"
|
||||||
#include "leveldb/iterator.h"
|
#include "leveldb/iterator.h"
|
||||||
#include "port/port.h"
|
#include "port/port.h"
|
||||||
#include "util/logging.h"
|
#include "util/logging.h"
|
||||||
#include "util/mutexlock.h"
|
#include "util/mutexlock.h"
|
||||||
|
#include "util/random.h"
|
||||||
|
|
||||||
namespace leveldb {
|
namespace leveldb {
|
||||||
|
|
||||||
@ -46,15 +48,16 @@ class DBIter: public Iterator {
|
|||||||
kReverse
|
kReverse
|
||||||
};
|
};
|
||||||
|
|
||||||
DBIter(const std::string* dbname, Env* env,
|
DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
|
||||||
const Comparator* cmp, Iterator* iter, SequenceNumber s)
|
uint32_t seed)
|
||||||
: dbname_(dbname),
|
: db_(db),
|
||||||
env_(env),
|
|
||||||
user_comparator_(cmp),
|
user_comparator_(cmp),
|
||||||
iter_(iter),
|
iter_(iter),
|
||||||
sequence_(s),
|
sequence_(s),
|
||||||
direction_(kForward),
|
direction_(kForward),
|
||||||
valid_(false) {
|
valid_(false),
|
||||||
|
rnd_(seed),
|
||||||
|
bytes_counter_(RandomPeriod()) {
|
||||||
}
|
}
|
||||||
virtual ~DBIter() {
|
virtual ~DBIter() {
|
||||||
delete iter_;
|
delete iter_;
|
||||||
@ -100,8 +103,12 @@ class DBIter: public Iterator {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const std::string* const dbname_;
|
// Pick next gap with average value of config::kReadBytesPeriod.
|
||||||
Env* const env_;
|
ssize_t RandomPeriod() {
|
||||||
|
return rnd_.Uniform(2*config::kReadBytesPeriod);
|
||||||
|
}
|
||||||
|
|
||||||
|
DBImpl* db_;
|
||||||
const Comparator* const user_comparator_;
|
const Comparator* const user_comparator_;
|
||||||
Iterator* const iter_;
|
Iterator* const iter_;
|
||||||
SequenceNumber const sequence_;
|
SequenceNumber const sequence_;
|
||||||
@ -112,13 +119,23 @@ class DBIter: public Iterator {
|
|||||||
Direction direction_;
|
Direction direction_;
|
||||||
bool valid_;
|
bool valid_;
|
||||||
|
|
||||||
|
Random rnd_;
|
||||||
|
ssize_t bytes_counter_;
|
||||||
|
|
||||||
// No copying allowed
|
// No copying allowed
|
||||||
DBIter(const DBIter&);
|
DBIter(const DBIter&);
|
||||||
void operator=(const DBIter&);
|
void operator=(const DBIter&);
|
||||||
};
|
};
|
||||||
|
|
||||||
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
|
inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
|
||||||
if (!ParseInternalKey(iter_->key(), ikey)) {
|
Slice k = iter_->key();
|
||||||
|
ssize_t n = k.size() + iter_->value().size();
|
||||||
|
bytes_counter_ -= n;
|
||||||
|
while (bytes_counter_ < 0) {
|
||||||
|
bytes_counter_ += RandomPeriod();
|
||||||
|
db_->RecordReadSample(k);
|
||||||
|
}
|
||||||
|
if (!ParseInternalKey(k, ikey)) {
|
||||||
status_ = Status::Corruption("corrupted internal key in DBIter");
|
status_ = Status::Corruption("corrupted internal key in DBIter");
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
@ -288,12 +305,12 @@ void DBIter::SeekToLast() {
|
|||||||
} // anonymous namespace
|
} // anonymous namespace
|
||||||
|
|
||||||
Iterator* NewDBIterator(
|
Iterator* NewDBIterator(
|
||||||
const std::string* dbname,
|
DBImpl* db,
|
||||||
Env* env,
|
|
||||||
const Comparator* user_key_comparator,
|
const Comparator* user_key_comparator,
|
||||||
Iterator* internal_iter,
|
Iterator* internal_iter,
|
||||||
const SequenceNumber& sequence) {
|
SequenceNumber sequence,
|
||||||
return new DBIter(dbname, env, user_key_comparator, internal_iter, sequence);
|
uint32_t seed) {
|
||||||
|
return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace leveldb
|
} // namespace leveldb
|
||||||
|
@ -11,15 +11,17 @@
|
|||||||
|
|
||||||
namespace leveldb {
|
namespace leveldb {
|
||||||
|
|
||||||
|
class DBImpl;
|
||||||
|
|
||||||
// Return a new iterator that converts internal keys (yielded by
|
// Return a new iterator that converts internal keys (yielded by
|
||||||
// "*internal_iter") that were live at the specified "sequence" number
|
// "*internal_iter") that were live at the specified "sequence" number
|
||||||
// into appropriate user keys.
|
// into appropriate user keys.
|
||||||
extern Iterator* NewDBIterator(
|
extern Iterator* NewDBIterator(
|
||||||
const std::string* dbname,
|
DBImpl* db,
|
||||||
Env* env,
|
|
||||||
const Comparator* user_key_comparator,
|
const Comparator* user_key_comparator,
|
||||||
Iterator* internal_iter,
|
Iterator* internal_iter,
|
||||||
const SequenceNumber& sequence);
|
SequenceNumber sequence,
|
||||||
|
uint32_t seed);
|
||||||
|
|
||||||
} // namespace leveldb
|
} // namespace leveldb
|
||||||
|
|
||||||
|
@ -38,6 +38,9 @@ static const int kL0_StopWritesTrigger = 12;
|
|||||||
// space if the same key space is being repeatedly overwritten.
|
// space if the same key space is being repeatedly overwritten.
|
||||||
static const int kMaxMemCompactLevel = 2;
|
static const int kMaxMemCompactLevel = 2;
|
||||||
|
|
||||||
|
// Approximate gap in bytes between samples of data read during iteration.
|
||||||
|
static const int kReadBytesPeriod = 1048576;
|
||||||
|
|
||||||
} // namespace config
|
} // namespace config
|
||||||
|
|
||||||
class InternalKey;
|
class InternalKey;
|
||||||
|
@ -289,6 +289,51 @@ static bool NewestFirst(FileMetaData* a, FileMetaData* b) {
|
|||||||
return a->number > b->number;
|
return a->number > b->number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Version::ForEachOverlapping(Slice user_key, Slice internal_key,
|
||||||
|
void* arg,
|
||||||
|
bool (*func)(void*, int, FileMetaData*)) {
|
||||||
|
// TODO(sanjay): Change Version::Get() to use this function.
|
||||||
|
const Comparator* ucmp = vset_->icmp_.user_comparator();
|
||||||
|
|
||||||
|
// Search level-0 in order from newest to oldest.
|
||||||
|
std::vector<FileMetaData*> tmp;
|
||||||
|
tmp.reserve(files_[0].size());
|
||||||
|
for (uint32_t i = 0; i < files_[0].size(); i++) {
|
||||||
|
FileMetaData* f = files_[0][i];
|
||||||
|
if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&
|
||||||
|
ucmp->Compare(user_key, f->largest.user_key()) <= 0) {
|
||||||
|
tmp.push_back(f);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!tmp.empty()) {
|
||||||
|
std::sort(tmp.begin(), tmp.end(), NewestFirst);
|
||||||
|
for (uint32_t i = 0; i < tmp.size(); i++) {
|
||||||
|
if (!(*func)(arg, 0, tmp[i])) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Search other levels.
|
||||||
|
for (int level = 1; level < config::kNumLevels; level++) {
|
||||||
|
size_t num_files = files_[level].size();
|
||||||
|
if (num_files == 0) continue;
|
||||||
|
|
||||||
|
// Binary search to find earliest index whose largest key >= internal_key.
|
||||||
|
uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);
|
||||||
|
if (index < num_files) {
|
||||||
|
FileMetaData* f = files_[level][index];
|
||||||
|
if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {
|
||||||
|
// All of "f" is past any data for user_key
|
||||||
|
} else {
|
||||||
|
if (!(*func)(arg, level, f)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Status Version::Get(const ReadOptions& options,
|
Status Version::Get(const ReadOptions& options,
|
||||||
const LookupKey& k,
|
const LookupKey& k,
|
||||||
std::string* value,
|
std::string* value,
|
||||||
@ -401,6 +446,44 @@ bool Version::UpdateStats(const GetStats& stats) {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Version::RecordReadSample(Slice internal_key) {
|
||||||
|
ParsedInternalKey ikey;
|
||||||
|
if (!ParseInternalKey(internal_key, &ikey)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct State {
|
||||||
|
GetStats stats; // Holds first matching file
|
||||||
|
int matches;
|
||||||
|
|
||||||
|
static bool Match(void* arg, int level, FileMetaData* f) {
|
||||||
|
State* state = reinterpret_cast<State*>(arg);
|
||||||
|
state->matches++;
|
||||||
|
if (state->matches == 1) {
|
||||||
|
// Remember first match.
|
||||||
|
state->stats.seek_file = f;
|
||||||
|
state->stats.seek_file_level = level;
|
||||||
|
}
|
||||||
|
// We can stop iterating once we have a second match.
|
||||||
|
return state->matches < 2;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
State state;
|
||||||
|
state.matches = 0;
|
||||||
|
ForEachOverlapping(ikey.user_key, internal_key, &state, &State::Match);
|
||||||
|
|
||||||
|
// Must have at least two matches since we want to merge across
|
||||||
|
// files. But what if we have a single file that contains many
|
||||||
|
// overwrites and deletions? Should we have another mechanism for
|
||||||
|
// finding such files?
|
||||||
|
if (state.matches >= 2) {
|
||||||
|
// 1MB cost is about 1 seek (see comment in Builder::Apply).
|
||||||
|
return UpdateStats(state.stats);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
void Version::Ref() {
|
void Version::Ref() {
|
||||||
++refs_;
|
++refs_;
|
||||||
}
|
}
|
||||||
@ -435,10 +518,13 @@ int Version::PickLevelForMemTableOutput(
|
|||||||
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
|
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
|
if (level + 2 < config::kNumLevels) {
|
||||||
const int64_t sum = TotalFileSize(overlaps);
|
// Check that file does not overlap too many grandparent bytes.
|
||||||
if (sum > kMaxGrandParentOverlapBytes) {
|
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
|
||||||
break;
|
const int64_t sum = TotalFileSize(overlaps);
|
||||||
|
if (sum > kMaxGrandParentOverlapBytes) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
level++;
|
level++;
|
||||||
}
|
}
|
||||||
@ -452,6 +538,8 @@ void Version::GetOverlappingInputs(
|
|||||||
const InternalKey* begin,
|
const InternalKey* begin,
|
||||||
const InternalKey* end,
|
const InternalKey* end,
|
||||||
std::vector<FileMetaData*>* inputs) {
|
std::vector<FileMetaData*>* inputs) {
|
||||||
|
assert(level >= 0);
|
||||||
|
assert(level < config::kNumLevels);
|
||||||
inputs->clear();
|
inputs->clear();
|
||||||
Slice user_begin, user_end;
|
Slice user_begin, user_end;
|
||||||
if (begin != NULL) {
|
if (begin != NULL) {
|
||||||
|
@ -78,6 +78,12 @@ class Version {
|
|||||||
// REQUIRES: lock is held
|
// REQUIRES: lock is held
|
||||||
bool UpdateStats(const GetStats& stats);
|
bool UpdateStats(const GetStats& stats);
|
||||||
|
|
||||||
|
// Record a sample of bytes read at the specified internal key.
|
||||||
|
// Samples are taken approximately once every config::kReadBytesPeriod
|
||||||
|
// bytes. Returns true if a new compaction may need to be triggered.
|
||||||
|
// REQUIRES: lock is held
|
||||||
|
bool RecordReadSample(Slice key);
|
||||||
|
|
||||||
// Reference count management (so Versions do not disappear out from
|
// Reference count management (so Versions do not disappear out from
|
||||||
// under live iterators)
|
// under live iterators)
|
||||||
void Ref();
|
void Ref();
|
||||||
@ -114,6 +120,15 @@ class Version {
|
|||||||
class LevelFileNumIterator;
|
class LevelFileNumIterator;
|
||||||
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
|
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;
|
||||||
|
|
||||||
|
// Call func(arg, level, f) for every file that overlaps user_key in
|
||||||
|
// order from newest to oldest. If an invocation of func returns
|
||||||
|
// false, makes no more calls.
|
||||||
|
//
|
||||||
|
// REQUIRES: user portion of internal_key == user_key.
|
||||||
|
void ForEachOverlapping(Slice user_key, Slice internal_key,
|
||||||
|
void* arg,
|
||||||
|
bool (*func)(void*, int, FileMetaData*));
|
||||||
|
|
||||||
VersionSet* vset_; // VersionSet to which this Version belongs
|
VersionSet* vset_; // VersionSet to which this Version belongs
|
||||||
Version* next_; // Next version in linked list
|
Version* next_; // Next version in linked list
|
||||||
Version* prev_; // Previous version in linked list
|
Version* prev_; // Previous version in linked list
|
||||||
|
@ -14,7 +14,7 @@ namespace leveldb {
|
|||||||
|
|
||||||
// Update Makefile if you change these
|
// Update Makefile if you change these
|
||||||
static const int kMajorVersion = 1;
|
static const int kMajorVersion = 1;
|
||||||
static const int kMinorVersion = 12;
|
static const int kMinorVersion = 13;
|
||||||
|
|
||||||
struct Options;
|
struct Options;
|
||||||
struct ReadOptions;
|
struct ReadOptions;
|
||||||
|
@ -319,8 +319,39 @@ class PosixMmapFile : public WritableFile {
|
|||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status Sync() {
|
Status SyncDirIfManifest() {
|
||||||
|
const char* f = filename_.c_str();
|
||||||
|
const char* sep = strrchr(f, '/');
|
||||||
|
Slice basename;
|
||||||
|
std::string dir;
|
||||||
|
if (sep == NULL) {
|
||||||
|
dir = ".";
|
||||||
|
basename = f;
|
||||||
|
} else {
|
||||||
|
dir = std::string(f, sep - f);
|
||||||
|
basename = sep + 1;
|
||||||
|
}
|
||||||
Status s;
|
Status s;
|
||||||
|
if (basename.starts_with("MANIFEST")) {
|
||||||
|
int fd = open(dir.c_str(), O_RDONLY);
|
||||||
|
if (fd < 0) {
|
||||||
|
s = IOError(dir, errno);
|
||||||
|
} else {
|
||||||
|
if (fsync(fd) < 0) {
|
||||||
|
s = IOError(dir, errno);
|
||||||
|
}
|
||||||
|
close(fd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
virtual Status Sync() {
|
||||||
|
// Ensure new files referred to by the manifest are in the filesystem.
|
||||||
|
Status s = SyncDirIfManifest();
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
if (pending_sync_) {
|
if (pending_sync_) {
|
||||||
// Some unmapped data was not synced
|
// Some unmapped data was not synced
|
||||||
|
@ -16,7 +16,12 @@ class Random {
|
|||||||
private:
|
private:
|
||||||
uint32_t seed_;
|
uint32_t seed_;
|
||||||
public:
|
public:
|
||||||
explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) { }
|
explicit Random(uint32_t s) : seed_(s & 0x7fffffffu) {
|
||||||
|
// Avoid bad seeds.
|
||||||
|
if (seed_ == 0 || seed_ == 2147483647L) {
|
||||||
|
seed_ = 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
uint32_t Next() {
|
uint32_t Next() {
|
||||||
static const uint32_t M = 2147483647L; // 2^31-1
|
static const uint32_t M = 2147483647L; // 2^31-1
|
||||||
static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0
|
static const uint64_t A = 16807; // bits 14, 8, 7, 5, 2, 1, 0
|
||||||
|
Loading…
Reference in New Issue
Block a user