Compare commits

...

6 Commits

Author SHA1 Message Date
cmumford
1a9648e1f5 Only compiling TrimSpace on linux.
Incorporated change by zmodem at https://github.com/google/leveldb/pull/310
to fix issue #310.

This change will only build TrimSace on linux to avoid unused function
warning/error.
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=105323419
2015-10-13 14:06:08 -07:00
ndmatthews
f79f4180cc Let LevelDB use xcrun to determine Xcode.app path instead of using a hardcoded path.
This allows build agents to select from multiple Xcode installations.
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=104859097
2015-10-13 14:05:43 -07:00
ssid
36fc955971 Add "approximate-memory-usage" property to leveldb::DB::GetProperty
The approximate RAM usage of the database is calculated from the memory
allocated for write buffers and the block cache. This is to give an
estimate of memory usage to leveldb clients.
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=104222307
2015-10-13 14:05:30 -07:00
tzik
bb61e00815 Add leveldb::Cache::Prune
Prune() drops on-memory read cache of the database, so that the client can
relief its memory shortage.
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=101335710
2015-10-13 14:05:21 -07:00
Chris Mumford
40c17c0b84 Will not reuse manifest if reuse_logs options is false.
Prior implementation would always try to reuse the manifest, even if reuse_logs
was false (the default). This was missed because the stock
Env::NewAppendableFile implementation returns false forcing the creation of a
new log.
2015-06-17 11:20:27 -07:00
Sanjay Ghemawat
251ebf5dc7 LevelDB now attempts to reuse the preceding MANIFEST and log file when re-opened.
(Based on a suggestion by cmumford.)

"open" benchmark on my workstation speeds up significantly since we
can now avoid three fdatasync calls and a compaction per open:

  Before: ~80000 microseconds
  After:    ~130 microseconds

Details:

(1) Added Options::reuse_logs (currently defaults to false) to control
new behavior.  The intention is to change the default to true after some
baking.

(2) Added Env::NewAppendableFile() whose default implementation returns
a not-supported error.

(3) VersionSet::Recovery attempts to reuse the MANIFEST from which
it is recovering.

(4) DBImpl recovery attempts to reuse the last log file and memtable.

(5) db_test.cc now tests a new configuration that sets reuse_logs to true.

(6) fault_injection_test also tests a reuse_logs==true config.

(7) Added a new recovery_test.
2014-12-11 08:13:18 -08:00
26 changed files with 799 additions and 164 deletions

View File

@ -57,6 +57,7 @@ TESTS = \
issue200_test \ issue200_test \
log_test \ log_test \
memenv_test \ memenv_test \
recovery_test \
skiplist_test \ skiplist_test \
table_test \ table_test \
version_edit_test \ version_edit_test \
@ -177,6 +178,9 @@ issue200_test: issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS)
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) $(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
recovery_test: db/recovery_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) db/recovery_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS)
$(CXX) $(LDFLAGS) table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS) $(CXX) $(LDFLAGS) table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
@ -202,24 +206,22 @@ memenv_test : helpers/memenv/memenv_test.o $(MEMENVLIBRARY) $(LIBRARY) $(TESTHAR
ifeq ($(PLATFORM), IOS) ifeq ($(PLATFORM), IOS)
# For iOS, create universal object files to be used on both the simulator and # For iOS, create universal object files to be used on both the simulator and
# a device. # a device.
PLATFORMSROOT=/Applications/Xcode.app/Contents/Developer/Platforms SIMULATORSDK=$(shell xcrun -sdk iphonesimulator --show-sdk-path)
SIMULATORROOT=$(PLATFORMSROOT)/iPhoneSimulator.platform/Developer DEVICESDK=$(shell xcrun -sdk iphoneos --show-sdk-path)
DEVICEROOT=$(PLATFORMSROOT)/iPhoneOS.platform/Developer
IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/version CFBundleShortVersionString)
IOSARCH=-arch armv6 -arch armv7 -arch armv7s -arch arm64 IOSARCH=-arch armv6 -arch armv7 -arch armv7s -arch arm64
.cc.o: .cc.o:
mkdir -p ios-x86/$(dir $@) mkdir -p ios-x86/$(dir $@)
xcrun -sdk iphonesimulator $(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@ xcrun -sdk iphonesimulator $(CXX) $(CXXFLAGS) -isysroot "$(SIMULATORSDK)" -arch i686 -arch x86_64 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@) mkdir -p ios-arm/$(dir $@)
xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk $(IOSARCH) -c $< -o ios-arm/$@ xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot "$(DEVICESDK)" $(IOSARCH) -c $< -o ios-arm/$@
xcrun lipo ios-x86/$@ ios-arm/$@ -create -output $@ xcrun lipo ios-x86/$@ ios-arm/$@ -create -output $@
.c.o: .c.o:
mkdir -p ios-x86/$(dir $@) mkdir -p ios-x86/$(dir $@)
xcrun -sdk iphonesimulator $(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@ xcrun -sdk iphonesimulator $(CC) $(CFLAGS) -isysroot "$(SIMULATORSDK)" -arch i686 -arch x86_64 -c $< -o ios-x86/$@
mkdir -p ios-arm/$(dir $@) mkdir -p ios-arm/$(dir $@)
xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk $(IOSARCH) -c $< -o ios-arm/$@ xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot "$(DEVICESDK)" $(IOSARCH) -c $< -o ios-arm/$@
xcrun lipo ios-x86/$@ ios-arm/$@ -create -output $@ xcrun lipo ios-x86/$@ ios-arm/$@ -create -output $@
else else

View File

@ -36,7 +36,7 @@ class CorruptionTest {
tiny_cache_ = NewLRUCache(100); tiny_cache_ = NewLRUCache(100);
options_.env = &env_; options_.env = &env_;
options_.block_cache = tiny_cache_; options_.block_cache = tiny_cache_;
dbname_ = test::TmpDir() + "/db_test"; dbname_ = test::TmpDir() + "/corruption_test";
DestroyDB(dbname_, options_); DestroyDB(dbname_, options_);
db_ = NULL; db_ = NULL;

View File

@ -100,6 +100,9 @@ static int FLAGS_bloom_bits = -1;
// benchmark will fail. // benchmark will fail.
static bool FLAGS_use_existing_db = false; static bool FLAGS_use_existing_db = false;
// If true, reuse existing log/MANIFEST files when re-opening a database.
static bool FLAGS_reuse_logs = false;
// Use the db with the following name. // Use the db with the following name.
static const char* FLAGS_db = NULL; static const char* FLAGS_db = NULL;
@ -139,6 +142,7 @@ class RandomGenerator {
} }
}; };
#if defined(__linux)
static Slice TrimSpace(Slice s) { static Slice TrimSpace(Slice s) {
size_t start = 0; size_t start = 0;
while (start < s.size() && isspace(s[start])) { while (start < s.size() && isspace(s[start])) {
@ -150,6 +154,7 @@ static Slice TrimSpace(Slice s) {
} }
return Slice(s.data() + start, limit - start); return Slice(s.data() + start, limit - start);
} }
#endif
static void AppendWithSpace(std::string* str, Slice msg) { static void AppendWithSpace(std::string* str, Slice msg) {
if (msg.empty()) return; if (msg.empty()) return;
@ -700,6 +705,7 @@ class Benchmark {
options.write_buffer_size = FLAGS_write_buffer_size; options.write_buffer_size = FLAGS_write_buffer_size;
options.max_open_files = FLAGS_open_files; options.max_open_files = FLAGS_open_files;
options.filter_policy = filter_policy_; options.filter_policy = filter_policy_;
options.reuse_logs = FLAGS_reuse_logs;
Status s = DB::Open(options, FLAGS_db, &db_); Status s = DB::Open(options, FLAGS_db, &db_);
if (!s.ok()) { if (!s.ok()) {
fprintf(stderr, "open error: %s\n", s.ToString().c_str()); fprintf(stderr, "open error: %s\n", s.ToString().c_str());
@ -954,6 +960,9 @@ int main(int argc, char** argv) {
} else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 && } else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) { (n == 0 || n == 1)) {
FLAGS_use_existing_db = n; FLAGS_use_existing_db = n;
} else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 &&
(n == 0 || n == 1)) {
FLAGS_reuse_logs = n;
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
FLAGS_num = n; FLAGS_num = n;
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) { } else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {

View File

@ -125,7 +125,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
db_lock_(NULL), db_lock_(NULL),
shutting_down_(NULL), shutting_down_(NULL),
bg_cv_(&mutex_), bg_cv_(&mutex_),
mem_(new MemTable(internal_comparator_)), mem_(NULL),
imm_(NULL), imm_(NULL),
logfile_(NULL), logfile_(NULL),
logfile_number_(0), logfile_number_(0),
@ -134,7 +134,6 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
tmp_batch_(new WriteBatch), tmp_batch_(new WriteBatch),
bg_compaction_scheduled_(false), bg_compaction_scheduled_(false),
manual_compaction_(NULL) { manual_compaction_(NULL) {
mem_->Ref();
has_imm_.Release_Store(NULL); has_imm_.Release_Store(NULL);
// Reserve ten files or so for other uses and give the rest to TableCache. // Reserve ten files or so for other uses and give the rest to TableCache.
@ -271,7 +270,7 @@ void DBImpl::DeleteObsoleteFiles() {
} }
} }
Status DBImpl::Recover(VersionEdit* edit) { Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
mutex_.AssertHeld(); mutex_.AssertHeld();
// Ignore error from CreateDir since the creation of the DB is // Ignore error from CreateDir since the creation of the DB is
@ -301,66 +300,69 @@ Status DBImpl::Recover(VersionEdit* edit) {
} }
} }
s = versions_->Recover(); s = versions_->Recover(save_manifest);
if (s.ok()) { if (!s.ok()) {
SequenceNumber max_sequence(0); return s;
}
SequenceNumber max_sequence(0);
// Recover from all newer log files than the ones named in the // Recover from all newer log files than the ones named in the
// descriptor (new log files may have been added by the previous // descriptor (new log files may have been added by the previous
// incarnation without registering them in the descriptor). // incarnation without registering them in the descriptor).
// //
// Note that PrevLogNumber() is no longer used, but we pay // Note that PrevLogNumber() is no longer used, but we pay
// attention to it in case we are recovering a database // attention to it in case we are recovering a database
// produced by an older version of leveldb. // produced by an older version of leveldb.
const uint64_t min_log = versions_->LogNumber(); const uint64_t min_log = versions_->LogNumber();
const uint64_t prev_log = versions_->PrevLogNumber(); const uint64_t prev_log = versions_->PrevLogNumber();
std::vector<std::string> filenames; std::vector<std::string> filenames;
s = env_->GetChildren(dbname_, &filenames); s = env_->GetChildren(dbname_, &filenames);
if (!s.ok()) {
return s;
}
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}
// Recover in the order in which the logs were generated
std::sort(logs.begin(), logs.end());
for (size_t i = 0; i < logs.size(); i++) {
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
&max_sequence);
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
std::set<uint64_t> expected;
versions_->AddLiveFiles(&expected);
uint64_t number;
FileType type;
std::vector<uint64_t> logs;
for (size_t i = 0; i < filenames.size(); i++) {
if (ParseFileName(filenames[i], &number, &type)) {
expected.erase(number);
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
logs.push_back(number);
}
}
if (!expected.empty()) {
char buf[50];
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
static_cast<int>(expected.size()));
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
}
// Recover in the order in which the logs were generated // The previous incarnation may not have written any MANIFEST
std::sort(logs.begin(), logs.end()); // records after allocating this log number. So we manually
for (size_t i = 0; i < logs.size(); i++) { // update the file number allocation counter in VersionSet.
s = RecoverLogFile(logs[i], edit, &max_sequence); versions_->MarkFileNumberUsed(logs[i]);
// The previous incarnation may not have written any MANIFEST
// records after allocating this log number. So we manually
// update the file number allocation counter in VersionSet.
versions_->MarkFileNumberUsed(logs[i]);
}
if (s.ok()) {
if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
}
} }
return s; if (versions_->LastSequence() < max_sequence) {
versions_->SetLastSequence(max_sequence);
}
return Status::OK();
} }
Status DBImpl::RecoverLogFile(uint64_t log_number, Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
VersionEdit* edit, bool* save_manifest, VersionEdit* edit,
SequenceNumber* max_sequence) { SequenceNumber* max_sequence) {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Env* env; Env* env;
@ -405,6 +407,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
std::string scratch; std::string scratch;
Slice record; Slice record;
WriteBatch batch; WriteBatch batch;
int compactions = 0;
MemTable* mem = NULL; MemTable* mem = NULL;
while (reader.ReadRecord(&record, &scratch) && while (reader.ReadRecord(&record, &scratch) &&
status.ok()) { status.ok()) {
@ -432,25 +435,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
} }
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
compactions++;
*save_manifest = true;
status = WriteLevel0Table(mem, edit, NULL); status = WriteLevel0Table(mem, edit, NULL);
mem->Unref();
mem = NULL;
if (!status.ok()) { if (!status.ok()) {
// Reflect errors immediately so that conditions like full // Reflect errors immediately so that conditions like full
// file-systems cause the DB::Open() to fail. // file-systems cause the DB::Open() to fail.
break; break;
} }
mem->Unref();
mem = NULL;
} }
} }
if (status.ok() && mem != NULL) { delete file;
status = WriteLevel0Table(mem, edit, NULL);
// Reflect errors immediately so that conditions like full // See if we should keep reusing the last log file.
// file-systems cause the DB::Open() to fail. if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
assert(logfile_ == NULL);
assert(log_ == NULL);
assert(mem_ == NULL);
uint64_t lfile_size;
if (env_->GetFileSize(fname, &lfile_size).ok() &&
env_->NewAppendableFile(fname, &logfile_).ok()) {
Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
log_ = new log::Writer(logfile_, lfile_size);
logfile_number_ = log_number;
if (mem != NULL) {
mem_ = mem;
mem = NULL;
} else {
// mem can be NULL if lognum exists but was empty.
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
}
}
}
if (mem != NULL) {
// mem did not get reused; compact it.
if (status.ok()) {
*save_manifest = true;
status = WriteLevel0Table(mem, edit, NULL);
}
mem->Unref();
} }
if (mem != NULL) mem->Unref();
delete file;
return status; return status;
} }
@ -1395,6 +1425,19 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
} else if (in == "sstables") { } else if (in == "sstables") {
*value = versions_->current()->DebugString(); *value = versions_->current()->DebugString();
return true; return true;
} else if (in == "approximate-memory-usage") {
size_t total_usage = options_.block_cache->TotalCharge();
if (mem_) {
total_usage += mem_->ApproximateMemoryUsage();
}
if (imm_) {
total_usage += imm_->ApproximateMemoryUsage();
}
char buf[50];
snprintf(buf, sizeof(buf), "%llu",
static_cast<unsigned long long>(total_usage));
value->append(buf);
return true;
} }
return false; return false;
@ -1449,8 +1492,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
DBImpl* impl = new DBImpl(options, dbname); DBImpl* impl = new DBImpl(options, dbname);
impl->mutex_.Lock(); impl->mutex_.Lock();
VersionEdit edit; VersionEdit edit;
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists // Recover handles create_if_missing, error_if_exists
if (s.ok()) { bool save_manifest = false;
Status s = impl->Recover(&edit, &save_manifest);
if (s.ok() && impl->mem_ == NULL) {
// Create new log and a corresponding memtable.
uint64_t new_log_number = impl->versions_->NewFileNumber(); uint64_t new_log_number = impl->versions_->NewFileNumber();
WritableFile* lfile; WritableFile* lfile;
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
@ -1460,15 +1506,22 @@ Status DB::Open(const Options& options, const std::string& dbname,
impl->logfile_ = lfile; impl->logfile_ = lfile;
impl->logfile_number_ = new_log_number; impl->logfile_number_ = new_log_number;
impl->log_ = new log::Writer(lfile); impl->log_ = new log::Writer(lfile);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_); impl->mem_ = new MemTable(impl->internal_comparator_);
} impl->mem_->Ref();
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
} }
} }
if (s.ok() && save_manifest) {
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
edit.SetLogNumber(impl->logfile_number_);
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
}
if (s.ok()) {
impl->DeleteObsoleteFiles();
impl->MaybeScheduleCompaction();
}
impl->mutex_.Unlock(); impl->mutex_.Unlock();
if (s.ok()) { if (s.ok()) {
assert(impl->mem_ != NULL);
*dbptr = impl; *dbptr = impl;
} else { } else {
delete impl; delete impl;

View File

@ -78,7 +78,8 @@ class DBImpl : public DB {
// Recover the descriptor from persistent storage. May do a significant // Recover the descriptor from persistent storage. May do a significant
// amount of work to recover recently logged updates. Any changes to // amount of work to recover recently logged updates. Any changes to
// be made to the descriptor are added to *edit. // be made to the descriptor are added to *edit.
Status Recover(VersionEdit* edit) EXCLUSIVE_LOCKS_REQUIRED(mutex_); Status Recover(VersionEdit* edit, bool* save_manifest)
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
void MaybeIgnoreError(Status* s) const; void MaybeIgnoreError(Status* s) const;
@ -90,9 +91,8 @@ class DBImpl : public DB {
// Errors are recorded in bg_error_. // Errors are recorded in bg_error_.
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_); void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status RecoverLogFile(uint64_t log_number, Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest,
VersionEdit* edit, VersionEdit* edit, SequenceNumber* max_sequence)
SequenceNumber* max_sequence)
EXCLUSIVE_LOCKS_REQUIRED(mutex_); EXCLUSIVE_LOCKS_REQUIRED(mutex_);
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)

View File

@ -193,6 +193,7 @@ class DBTest {
// Sequence of option configurations to try // Sequence of option configurations to try
enum OptionConfig { enum OptionConfig {
kDefault, kDefault,
kReuse,
kFilter, kFilter,
kUncompressed, kUncompressed,
kEnd kEnd
@ -237,7 +238,11 @@ class DBTest {
// Return the current option configuration. // Return the current option configuration.
Options CurrentOptions() { Options CurrentOptions() {
Options options; Options options;
options.reuse_logs = false;
switch (option_config_) { switch (option_config_) {
case kReuse:
options.reuse_logs = true;
break;
case kFilter: case kFilter:
options.filter_policy = filter_policy_; options.filter_policy = filter_policy_;
break; break;
@ -558,6 +563,17 @@ TEST(DBTest, GetFromVersions) {
} while (ChangeOptions()); } while (ChangeOptions());
} }
TEST(DBTest, GetMemUsage) {
do {
ASSERT_OK(Put("foo", "v1"));
std::string val;
ASSERT_TRUE(db_->GetProperty("leveldb.approximate-memory-usage", &val));
int mem_usage = atoi(val.c_str());
ASSERT_GT(mem_usage, 0);
ASSERT_LT(mem_usage, 5*1024*1024);
} while (ChangeOptions());
}
TEST(DBTest, GetSnapshot) { TEST(DBTest, GetSnapshot) {
do { do {
// Try with both a short key and a long key // Try with both a short key and a long key
@ -1080,6 +1096,14 @@ TEST(DBTest, ApproximateSizes) {
// 0 because GetApproximateSizes() does not account for memtable space // 0 because GetApproximateSizes() does not account for memtable space
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0)); ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
if (options.reuse_logs) {
// Recovery will reuse memtable, and GetApproximateSizes() does not
// account for memtable usage;
Reopen(&options);
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
continue;
}
// Check sizes across recovery by reopening a few times // Check sizes across recovery by reopening a few times
for (int run = 0; run < 3; run++) { for (int run = 0; run < 3; run++) {
Reopen(&options); Reopen(&options);
@ -1123,6 +1147,11 @@ TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000))); ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000))); ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));
if (options.reuse_logs) {
// Need to force a memtable compaction since recovery does not do so.
ASSERT_OK(dbfull()->TEST_CompactMemTable());
}
// Check sizes across recovery by reopening a few times // Check sizes across recovery by reopening a few times
for (int run = 0; run < 3; run++) { for (int run = 0; run < 3; run++) {
Reopen(&options); Reopen(&options);
@ -2084,7 +2113,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
InternalKeyComparator cmp(BytewiseComparator()); InternalKeyComparator cmp(BytewiseComparator());
Options options; Options options;
VersionSet vset(dbname, &options, NULL, &cmp); VersionSet vset(dbname, &options, NULL, &cmp);
ASSERT_OK(vset.Recover()); bool save_manifest;
ASSERT_OK(vset.Recover(&save_manifest));
VersionEdit vbase; VersionEdit vbase;
uint64_t fnum = 1; uint64_t fnum = 1;
for (int i = 0; i < num_base_files; i++) { for (int i = 0; i < num_base_files; i++) {

View File

@ -106,7 +106,7 @@ struct FileState {
// is written to or sync'ed. // is written to or sync'ed.
class TestWritableFile : public WritableFile { class TestWritableFile : public WritableFile {
public: public:
TestWritableFile(const std::string& fname, TestWritableFile(const FileState& state,
WritableFile* f, WritableFile* f,
FaultInjectionTestEnv* env); FaultInjectionTestEnv* env);
virtual ~TestWritableFile(); virtual ~TestWritableFile();
@ -130,6 +130,8 @@ class FaultInjectionTestEnv : public EnvWrapper {
virtual ~FaultInjectionTestEnv() { } virtual ~FaultInjectionTestEnv() { }
virtual Status NewWritableFile(const std::string& fname, virtual Status NewWritableFile(const std::string& fname,
WritableFile** result); WritableFile** result);
virtual Status NewAppendableFile(const std::string& fname,
WritableFile** result);
virtual Status DeleteFile(const std::string& f); virtual Status DeleteFile(const std::string& f);
virtual Status RenameFile(const std::string& s, const std::string& t); virtual Status RenameFile(const std::string& s, const std::string& t);
@ -154,15 +156,14 @@ class FaultInjectionTestEnv : public EnvWrapper {
bool filesystem_active_; // Record flushes, syncs, writes bool filesystem_active_; // Record flushes, syncs, writes
}; };
TestWritableFile::TestWritableFile(const std::string& fname, TestWritableFile::TestWritableFile(const FileState& state,
WritableFile* f, WritableFile* f,
FaultInjectionTestEnv* env) FaultInjectionTestEnv* env)
: state_(fname), : state_(state),
target_(f), target_(f),
writable_file_opened_(true), writable_file_opened_(true),
env_(env) { env_(env) {
assert(f != NULL); assert(f != NULL);
state_.pos_ = 0;
} }
TestWritableFile::~TestWritableFile() { TestWritableFile::~TestWritableFile() {
@ -228,9 +229,12 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
WritableFile* actual_writable_file; WritableFile* actual_writable_file;
Status s = target()->NewWritableFile(fname, &actual_writable_file); Status s = target()->NewWritableFile(fname, &actual_writable_file);
if (s.ok()) { if (s.ok()) {
*result = new TestWritableFile(fname, actual_writable_file, this); FileState state(fname);
// WritableFile doesn't append to files, so if the same file is opened again state.pos_ = 0;
// then it will be truncated - so forget our saved state. *result = new TestWritableFile(state, actual_writable_file, this);
// NewWritableFile doesn't append to files, so if the same file is
// opened again then it will be truncated - so forget our saved
// state.
UntrackFile(fname); UntrackFile(fname);
MutexLock l(&mutex_); MutexLock l(&mutex_);
new_files_since_last_dir_sync_.insert(fname); new_files_since_last_dir_sync_.insert(fname);
@ -238,6 +242,26 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
return s; return s;
} }
Status FaultInjectionTestEnv::NewAppendableFile(const std::string& fname,
WritableFile** result) {
WritableFile* actual_writable_file;
Status s = target()->NewAppendableFile(fname, &actual_writable_file);
if (s.ok()) {
FileState state(fname);
state.pos_ = 0;
{
MutexLock l(&mutex_);
if (db_file_state_.count(fname) == 0) {
new_files_since_last_dir_sync_.insert(fname);
} else {
state = db_file_state_[fname];
}
}
*result = new TestWritableFile(state, actual_writable_file, this);
}
return s;
}
Status FaultInjectionTestEnv::DropUnsyncedFileData() { Status FaultInjectionTestEnv::DropUnsyncedFileData() {
Status s; Status s;
MutexLock l(&mutex_); MutexLock l(&mutex_);
@ -301,9 +325,10 @@ Status FaultInjectionTestEnv::RenameFile(const std::string& s,
} }
void FaultInjectionTestEnv::ResetState() { void FaultInjectionTestEnv::ResetState() {
// Since we are not destroying the database, the existing files
// should keep their recorded synced/flushed state. Therefore
// we do not reset db_file_state_ and new_files_since_last_dir_sync_.
MutexLock l(&mutex_); MutexLock l(&mutex_);
db_file_state_.clear();
new_files_since_last_dir_sync_.clear();
SetFilesystemActive(true); SetFilesystemActive(true);
} }
@ -342,51 +367,28 @@ class FaultInjectionTest {
Options options_; Options options_;
DB* db_; DB* db_;
FaultInjectionTest() : env_(NULL), tiny_cache_(NULL), db_(NULL) { NewDB(); } FaultInjectionTest()
: env_(new FaultInjectionTestEnv),
~FaultInjectionTest() { ASSERT_OK(TearDown()); } tiny_cache_(NewLRUCache(100)),
db_(NULL) {
Status NewDB() { dbname_ = test::TmpDir() + "/fault_test";
assert(db_ == NULL); DestroyDB(dbname_, Options()); // Destroy any db from earlier run
assert(tiny_cache_ == NULL); options_.reuse_logs = true;
assert(env_ == NULL);
env_ = new FaultInjectionTestEnv();
options_ = Options();
options_.env = env_; options_.env = env_;
options_.paranoid_checks = true; options_.paranoid_checks = true;
tiny_cache_ = NewLRUCache(100);
options_.block_cache = tiny_cache_; options_.block_cache = tiny_cache_;
dbname_ = test::TmpDir() + "/fault_test";
options_.create_if_missing = true; options_.create_if_missing = true;
Status s = OpenDB();
options_.create_if_missing = false;
return s;
} }
Status SetUp() { ~FaultInjectionTest() {
Status s = TearDown();
if (s.ok()) {
s = NewDB();
}
return s;
}
Status TearDown() {
CloseDB(); CloseDB();
DestroyDB(dbname_, Options());
Status s = DestroyDB(dbname_, Options());
delete tiny_cache_; delete tiny_cache_;
tiny_cache_ = NULL;
delete env_; delete env_;
env_ = NULL; }
return s; void ReuseLogs(bool reuse) {
options_.reuse_logs = reuse;
} }
void Build(int start_idx, int num_vals) { void Build(int start_idx, int num_vals) {
@ -506,33 +508,43 @@ class FaultInjectionTest {
ResetDBState(reset_method); ResetDBState(reset_method);
ASSERT_OK(OpenDB()); ASSERT_OK(OpenDB());
} }
void DoTest() {
Random rnd(0);
ASSERT_OK(OpenDB());
for (size_t idx = 0; idx < kNumIterations; idx++) {
int num_pre_sync = rnd.Uniform(kMaxNumValues);
int num_post_sync = rnd.Uniform(kMaxNumValues);
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
PartialCompactTestReopenWithFault(RESET_DROP_UNSYNCED_DATA,
num_pre_sync,
num_post_sync);
NoWriteTestPreFault();
NoWriteTestReopenWithFault(RESET_DROP_UNSYNCED_DATA);
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
// No new files created so we expect all values since no files will be
// dropped.
PartialCompactTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES,
num_pre_sync + num_post_sync,
0);
NoWriteTestPreFault();
NoWriteTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES);
}
}
}; };
TEST(FaultInjectionTest, FaultTest) { TEST(FaultInjectionTest, FaultTestNoLogReuse) {
Random rnd(0); ReuseLogs(false);
ASSERT_OK(SetUp()); DoTest();
for (size_t idx = 0; idx < kNumIterations; idx++) { }
int num_pre_sync = rnd.Uniform(kMaxNumValues);
int num_post_sync = rnd.Uniform(kMaxNumValues);
PartialCompactTestPreFault(num_pre_sync, num_post_sync); TEST(FaultInjectionTest, FaultTestWithLogReuse) {
PartialCompactTestReopenWithFault(RESET_DROP_UNSYNCED_DATA, ReuseLogs(true);
num_pre_sync, DoTest();
num_post_sync);
NoWriteTestPreFault();
NoWriteTestReopenWithFault(RESET_DROP_UNSYNCED_DATA);
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
// No new files created so we expect all values since no files will be
// dropped.
PartialCompactTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES,
num_pre_sync + num_post_sync,
0);
NoWriteTestPreFault();
NoWriteTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES);
}
} }
} // namespace leveldb } // namespace leveldb

View File

@ -104,7 +104,7 @@ class LogTest {
StringSource source_; StringSource source_;
ReportCollector report_; ReportCollector report_;
bool reading_; bool reading_;
Writer writer_; Writer* writer_;
Reader reader_; Reader reader_;
// Record metadata for testing initial offset functionality // Record metadata for testing initial offset functionality
@ -113,14 +113,23 @@ class LogTest {
public: public:
LogTest() : reading_(false), LogTest() : reading_(false),
writer_(&dest_), writer_(new Writer(&dest_)),
reader_(&source_, &report_, true/*checksum*/, reader_(&source_, &report_, true/*checksum*/,
0/*initial_offset*/) { 0/*initial_offset*/) {
} }
~LogTest() {
delete writer_;
}
void ReopenForAppend() {
delete writer_;
writer_ = new Writer(&dest_, dest_.contents_.size());
}
void Write(const std::string& msg) { void Write(const std::string& msg) {
ASSERT_TRUE(!reading_) << "Write() after starting to read"; ASSERT_TRUE(!reading_) << "Write() after starting to read";
writer_.AddRecord(Slice(msg)); writer_->AddRecord(Slice(msg));
} }
size_t WrittenBytes() const { size_t WrittenBytes() const {
@ -318,6 +327,15 @@ TEST(LogTest, AlignedEof) {
ASSERT_EQ("EOF", Read()); ASSERT_EQ("EOF", Read());
} }
TEST(LogTest, OpenForAppend) {
Write("hello");
ReopenForAppend();
Write("world");
ASSERT_EQ("hello", Read());
ASSERT_EQ("world", Read());
ASSERT_EQ("EOF", Read());
}
TEST(LogTest, RandomRead) { TEST(LogTest, RandomRead) {
const int N = 500; const int N = 500;
Random write_rnd(301); Random write_rnd(301);

View File

@ -12,13 +12,22 @@
namespace leveldb { namespace leveldb {
namespace log { namespace log {
static void InitTypeCrc(uint32_t* type_crc) {
for (int i = 0; i <= kMaxRecordType; i++) {
char t = static_cast<char>(i);
type_crc[i] = crc32c::Value(&t, 1);
}
}
Writer::Writer(WritableFile* dest) Writer::Writer(WritableFile* dest)
: dest_(dest), : dest_(dest),
block_offset_(0) { block_offset_(0) {
for (int i = 0; i <= kMaxRecordType; i++) { InitTypeCrc(type_crc_);
char t = static_cast<char>(i); }
type_crc_[i] = crc32c::Value(&t, 1);
} Writer::Writer(WritableFile* dest, uint64_t dest_length)
: dest_(dest), block_offset_(dest_length % kBlockSize) {
InitTypeCrc(type_crc_);
} }
Writer::~Writer() { Writer::~Writer() {

View File

@ -22,6 +22,12 @@ class Writer {
// "*dest" must be initially empty. // "*dest" must be initially empty.
// "*dest" must remain live while this Writer is in use. // "*dest" must remain live while this Writer is in use.
explicit Writer(WritableFile* dest); explicit Writer(WritableFile* dest);
// Create a writer that will append data to "*dest".
// "*dest" must have initial length "dest_length".
// "*dest" must remain live while this Writer is in use.
Writer(WritableFile* dest, uint64_t dest_length);
~Writer(); ~Writer();
Status AddRecord(const Slice& slice); Status AddRecord(const Slice& slice);

324
db/recovery_test.cc Normal file
View File

@ -0,0 +1,324 @@
// Copyright (c) 2014 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "db/db_impl.h"
#include "db/filename.h"
#include "db/version_set.h"
#include "db/write_batch_internal.h"
#include "leveldb/db.h"
#include "leveldb/env.h"
#include "leveldb/write_batch.h"
#include "util/logging.h"
#include "util/testharness.h"
#include "util/testutil.h"
namespace leveldb {
class RecoveryTest {
public:
RecoveryTest() : env_(Env::Default()), db_(NULL) {
dbname_ = test::TmpDir() + "/recovery_test";
DestroyDB(dbname_, Options());
Open();
}
~RecoveryTest() {
Close();
DestroyDB(dbname_, Options());
}
DBImpl* dbfull() const { return reinterpret_cast<DBImpl*>(db_); }
Env* env() const { return env_; }
bool CanAppend() {
WritableFile* tmp;
Status s = env_->NewAppendableFile(CurrentFileName(dbname_), &tmp);
delete tmp;
if (s.IsNotSupportedError()) {
return false;
} else {
return true;
}
}
void Close() {
delete db_;
db_ = NULL;
}
void Open(Options* options = NULL) {
Close();
Options opts;
if (options != NULL) {
opts = *options;
} else {
opts.reuse_logs = true; // TODO(sanjay): test both ways
opts.create_if_missing = true;
}
if (opts.env == NULL) {
opts.env = env_;
}
ASSERT_OK(DB::Open(opts, dbname_, &db_));
ASSERT_EQ(1, NumLogs());
}
Status Put(const std::string& k, const std::string& v) {
return db_->Put(WriteOptions(), k, v);
}
std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
std::string result;
Status s = db_->Get(ReadOptions(), k, &result);
if (s.IsNotFound()) {
result = "NOT_FOUND";
} else if (!s.ok()) {
result = s.ToString();
}
return result;
}
std::string ManifestFileName() {
std::string current;
ASSERT_OK(ReadFileToString(env_, CurrentFileName(dbname_), &current));
size_t len = current.size();
if (len > 0 && current[len-1] == '\n') {
current.resize(len - 1);
}
return dbname_ + "/" + current;
}
std::string LogName(uint64_t number) {
return LogFileName(dbname_, number);
}
size_t DeleteLogFiles() {
std::vector<uint64_t> logs = GetFiles(kLogFile);
for (size_t i = 0; i < logs.size(); i++) {
ASSERT_OK(env_->DeleteFile(LogName(logs[i]))) << LogName(logs[i]);
}
return logs.size();
}
uint64_t FirstLogFile() {
return GetFiles(kLogFile)[0];
}
std::vector<std::uint64_t> GetFiles(FileType t) {
std::vector<std::string> filenames;
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
std::vector<std::uint64_t> result;
for (size_t i = 0; i < filenames.size(); i++) {
uint64_t number;
FileType type;
if (ParseFileName(filenames[i], &number, &type) && type == t) {
result.push_back(number);
}
}
return result;
}
int NumLogs() {
return GetFiles(kLogFile).size();
}
int NumTables() {
return GetFiles(kTableFile).size();
}
uint64_t FileSize(const std::string& fname) {
uint64_t result;
ASSERT_OK(env_->GetFileSize(fname, &result)) << fname;
return result;
}
void CompactMemTable() {
dbfull()->TEST_CompactMemTable();
}
// Directly construct a log file that sets key to val.
void MakeLogFile(uint64_t lognum, SequenceNumber seq, Slice key, Slice val) {
std::string fname = LogFileName(dbname_, lognum);
WritableFile* file;
ASSERT_OK(env_->NewWritableFile(fname, &file));
log::Writer writer(file);
WriteBatch batch;
batch.Put(key, val);
WriteBatchInternal::SetSequence(&batch, seq);
ASSERT_OK(writer.AddRecord(WriteBatchInternal::Contents(&batch)));
ASSERT_OK(file->Flush());
delete file;
}
private:
std::string dbname_;
Env* env_;
DB* db_;
};
TEST(RecoveryTest, ManifestReused) {
if (!CanAppend()) {
fprintf(stderr, "skipping test because env does not support appending\n");
return;
}
ASSERT_OK(Put("foo", "bar"));
Close();
std::string old_manifest = ManifestFileName();
Open();
ASSERT_EQ(old_manifest, ManifestFileName());
ASSERT_EQ("bar", Get("foo"));
Open();
ASSERT_EQ(old_manifest, ManifestFileName());
ASSERT_EQ("bar", Get("foo"));
}
TEST(RecoveryTest, LargeManifestCompacted) {
if (!CanAppend()) {
fprintf(stderr, "skipping test because env does not support appending\n");
return;
}
ASSERT_OK(Put("foo", "bar"));
Close();
std::string old_manifest = ManifestFileName();
// Pad with zeroes to make manifest file very big.
{
uint64_t len = FileSize(old_manifest);
WritableFile* file;
ASSERT_OK(env()->NewAppendableFile(old_manifest, &file));
std::string zeroes(3*1048576 - static_cast<size_t>(len), 0);
ASSERT_OK(file->Append(zeroes));
ASSERT_OK(file->Flush());
delete file;
}
Open();
std::string new_manifest = ManifestFileName();
ASSERT_NE(old_manifest, new_manifest);
ASSERT_GT(10000, FileSize(new_manifest));
ASSERT_EQ("bar", Get("foo"));
Open();
ASSERT_EQ(new_manifest, ManifestFileName());
ASSERT_EQ("bar", Get("foo"));
}
TEST(RecoveryTest, NoLogFiles) {
ASSERT_OK(Put("foo", "bar"));
ASSERT_EQ(1, DeleteLogFiles());
Open();
ASSERT_EQ("NOT_FOUND", Get("foo"));
Open();
ASSERT_EQ("NOT_FOUND", Get("foo"));
}
TEST(RecoveryTest, LogFileReuse) {
if (!CanAppend()) {
fprintf(stderr, "skipping test because env does not support appending\n");
return;
}
for (int i = 0; i < 2; i++) {
ASSERT_OK(Put("foo", "bar"));
if (i == 0) {
// Compact to ensure current log is empty
CompactMemTable();
}
Close();
ASSERT_EQ(1, NumLogs());
uint64_t number = FirstLogFile();
if (i == 0) {
ASSERT_EQ(0, FileSize(LogName(number)));
} else {
ASSERT_LT(0, FileSize(LogName(number)));
}
Open();
ASSERT_EQ(1, NumLogs());
ASSERT_EQ(number, FirstLogFile()) << "did not reuse log file";
ASSERT_EQ("bar", Get("foo"));
Open();
ASSERT_EQ(1, NumLogs());
ASSERT_EQ(number, FirstLogFile()) << "did not reuse log file";
ASSERT_EQ("bar", Get("foo"));
}
}
TEST(RecoveryTest, MultipleMemTables) {
// Make a large log.
const int kNum = 1000;
for (int i = 0; i < kNum; i++) {
char buf[100];
snprintf(buf, sizeof(buf), "%050d", i);
ASSERT_OK(Put(buf, buf));
}
ASSERT_EQ(0, NumTables());
Close();
ASSERT_EQ(0, NumTables());
ASSERT_EQ(1, NumLogs());
uint64_t old_log_file = FirstLogFile();
// Force creation of multiple memtables by reducing the write buffer size.
Options opt;
opt.reuse_logs = true;
opt.write_buffer_size = (kNum*100) / 2;
Open(&opt);
ASSERT_LE(2, NumTables());
ASSERT_EQ(1, NumLogs());
ASSERT_NE(old_log_file, FirstLogFile()) << "must not reuse log";
for (int i = 0; i < kNum; i++) {
char buf[100];
snprintf(buf, sizeof(buf), "%050d", i);
ASSERT_EQ(buf, Get(buf));
}
}
TEST(RecoveryTest, MultipleLogFiles) {
ASSERT_OK(Put("foo", "bar"));
Close();
ASSERT_EQ(1, NumLogs());
// Make a bunch of uncompacted log files.
uint64_t old_log = FirstLogFile();
MakeLogFile(old_log+1, 1000, "hello", "world");
MakeLogFile(old_log+2, 1001, "hi", "there");
MakeLogFile(old_log+3, 1002, "foo", "bar2");
// Recover and check that all log files were processed.
Open();
ASSERT_LE(1, NumTables());
ASSERT_EQ(1, NumLogs());
uint64_t new_log = FirstLogFile();
ASSERT_LE(old_log+3, new_log);
ASSERT_EQ("bar2", Get("foo"));
ASSERT_EQ("world", Get("hello"));
ASSERT_EQ("there", Get("hi"));
// Test that previous recovery produced recoverable state.
Open();
ASSERT_LE(1, NumTables());
ASSERT_EQ(1, NumLogs());
if (CanAppend()) {
ASSERT_EQ(new_log, FirstLogFile());
}
ASSERT_EQ("bar2", Get("foo"));
ASSERT_EQ("world", Get("hello"));
ASSERT_EQ("there", Get("hi"));
// Check that introducing an older log file does not cause it to be re-read.
Close();
MakeLogFile(old_log+1, 2000, "hello", "stale write");
Open();
ASSERT_LE(1, NumTables());
ASSERT_EQ(1, NumLogs());
if (CanAppend()) {
ASSERT_EQ(new_log, FirstLogFile());
}
ASSERT_EQ("bar2", Get("foo"));
ASSERT_EQ("world", Get("hello"));
ASSERT_EQ("there", Get("hi"));
}
} // namespace leveldb
int main(int argc, char** argv) {
return leveldb::test::RunAllTests();
}

View File

@ -893,7 +893,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
return s; return s;
} }
Status VersionSet::Recover() { Status VersionSet::Recover(bool *save_manifest) {
struct LogReporter : public log::Reader::Reporter { struct LogReporter : public log::Reader::Reporter {
Status* status; Status* status;
virtual void Corruption(size_t bytes, const Status& s) { virtual void Corruption(size_t bytes, const Status& s) {
@ -1003,11 +1003,49 @@ Status VersionSet::Recover() {
last_sequence_ = last_sequence; last_sequence_ = last_sequence;
log_number_ = log_number; log_number_ = log_number;
prev_log_number_ = prev_log_number; prev_log_number_ = prev_log_number;
// See if we can reuse the existing MANIFEST file.
if (ReuseManifest(dscname, current)) {
// No need to save new manifest
} else {
*save_manifest = true;
}
} }
return s; return s;
} }
bool VersionSet::ReuseManifest(const std::string& dscname,
const std::string& dscbase) {
if (!options_->reuse_logs) {
return false;
}
FileType manifest_type;
uint64_t manifest_number;
uint64_t manifest_size;
if (!ParseFileName(dscbase, &manifest_number, &manifest_type) ||
manifest_type != kDescriptorFile ||
!env_->GetFileSize(dscname, &manifest_size).ok() ||
// Make new compacted MANIFEST if old one is too big
manifest_size >= kTargetFileSize) {
return false;
}
assert(descriptor_file_ == NULL);
assert(descriptor_log_ == NULL);
Status r = env_->NewAppendableFile(dscname, &descriptor_file_);
if (!r.ok()) {
Log(options_->info_log, "Reuse MANIFEST: %s\n", r.ToString().c_str());
assert(descriptor_file_ == NULL);
return false;
}
Log(options_->info_log, "Reusing MANIFEST %s\n", dscname.c_str());
descriptor_log_ = new log::Writer(descriptor_file_, manifest_size);
manifest_file_number_ = manifest_number;
return true;
}
void VersionSet::MarkFileNumberUsed(uint64_t number) { void VersionSet::MarkFileNumberUsed(uint64_t number) {
if (next_file_number_ <= number) { if (next_file_number_ <= number) {
next_file_number_ = number + 1; next_file_number_ = number + 1;

View File

@ -179,7 +179,7 @@ class VersionSet {
EXCLUSIVE_LOCKS_REQUIRED(mu); EXCLUSIVE_LOCKS_REQUIRED(mu);
// Recover the last saved descriptor from persistent storage. // Recover the last saved descriptor from persistent storage.
Status Recover(); Status Recover(bool *save_manifest);
// Return the current version. // Return the current version.
Version* current() const { return current_; } Version* current() const { return current_; }
@ -274,6 +274,8 @@ class VersionSet {
friend class Compaction; friend class Compaction;
friend class Version; friend class Version;
bool ReuseManifest(const std::string& dscname, const std::string& dscbase);
void Finalize(Version* v); void Finalize(Version* v);
void GetRange(const std::vector<FileMetaData*>& inputs, void GetRange(const std::vector<FileMetaData*>& inputs,

View File

@ -277,6 +277,19 @@ class InMemoryEnv : public EnvWrapper {
return Status::OK(); return Status::OK();
} }
virtual Status NewAppendableFile(const std::string& fname,
WritableFile** result) {
MutexLock lock(&mutex_);
FileState** sptr = &file_map_[fname];
FileState* file = *sptr;
if (file == NULL) {
file = new FileState();
file->Ref();
}
*result = new WritableFileImpl(file);
return Status::OK();
}
virtual bool FileExists(const std::string& fname) { virtual bool FileExists(const std::string& fname) {
MutexLock lock(&mutex_); MutexLock lock(&mutex_);
return file_map_.find(fname) != file_map_.end(); return file_map_.find(fname) != file_map_.end();

View File

@ -40,6 +40,8 @@ TEST(MemEnvTest, Basics) {
// Create a file. // Create a file.
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file)); ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file));
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
ASSERT_EQ(0, file_size);
delete writable_file; delete writable_file;
// Check that the file exists. // Check that the file exists.
@ -55,9 +57,16 @@ TEST(MemEnvTest, Basics) {
ASSERT_OK(writable_file->Append("abc")); ASSERT_OK(writable_file->Append("abc"));
delete writable_file; delete writable_file;
// Check for expected size. // Check that append works.
ASSERT_OK(env_->NewAppendableFile("/dir/f", &writable_file));
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size)); ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
ASSERT_EQ(3, file_size); ASSERT_EQ(3, file_size);
ASSERT_OK(writable_file->Append("hello"));
delete writable_file;
// Check for expected size.
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
ASSERT_EQ(8, file_size);
// Check that renaming works. // Check that renaming works.
ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok()); ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok());
@ -65,7 +74,7 @@ TEST(MemEnvTest, Basics) {
ASSERT_TRUE(!env_->FileExists("/dir/f")); ASSERT_TRUE(!env_->FileExists("/dir/f"));
ASSERT_TRUE(env_->FileExists("/dir/g")); ASSERT_TRUE(env_->FileExists("/dir/g"));
ASSERT_OK(env_->GetFileSize("/dir/g", &file_size)); ASSERT_OK(env_->GetFileSize("/dir/g", &file_size));
ASSERT_EQ(3, file_size); ASSERT_EQ(8, file_size);
// Check that opening non-existent file fails. // Check that opening non-existent file fails.
SequentialFile* seq_file; SequentialFile* seq_file;

View File

@ -81,6 +81,17 @@ class Cache {
// its cache keys. // its cache keys.
virtual uint64_t NewId() = 0; virtual uint64_t NewId() = 0;
// Remove all cache entries that are not actively in use. Memory-constrained
// applications may wish to call this method to reduce memory usage.
// Default implementation of Prune() does nothing. Subclasses are strongly
// encouraged to override the default implementation. A future release of
// leveldb may change Prune() to a pure abstract method.
virtual void Prune() {}
// Return an estimate of the combined charges of all elements stored in the
// cache.
virtual size_t TotalCharge() const = 0;
private: private:
void LRU_Remove(Handle* e); void LRU_Remove(Handle* e);
void LRU_Append(Handle* e); void LRU_Append(Handle* e);

View File

@ -115,6 +115,8 @@ class DB {
// about the internal operation of the DB. // about the internal operation of the DB.
// "leveldb.sstables" - returns a multi-line string that describes all // "leveldb.sstables" - returns a multi-line string that describes all
// of the sstables that make up the db contents. // of the sstables that make up the db contents.
// "leveldb.approximate-memory-usage" - returns the approximate number of
// bytes of memory in use by the DB.
virtual bool GetProperty(const Slice& property, std::string* value) = 0; virtual bool GetProperty(const Slice& property, std::string* value) = 0;
// For each i in [0,n-1], store in "sizes[i]", the approximate // For each i in [0,n-1], store in "sizes[i]", the approximate

View File

@ -69,6 +69,21 @@ class Env {
virtual Status NewWritableFile(const std::string& fname, virtual Status NewWritableFile(const std::string& fname,
WritableFile** result) = 0; WritableFile** result) = 0;
// Create an object that either appends to an existing file, or
// writes to a new file (if the file does not exist to begin with).
// On success, stores a pointer to the new file in *result and
// returns OK. On failure stores NULL in *result and returns
// non-OK.
//
// The returned file will only be accessed by one thread at a time.
//
// May return an IsNotSupportedError error if this Env does
// not allow appending to an existing file. Users of Env (including
// the leveldb implementation) must be prepared to deal with
// an Env that does not support appending.
virtual Status NewAppendableFile(const std::string& fname,
WritableFile** result);
// Returns true iff the named file exists. // Returns true iff the named file exists.
virtual bool FileExists(const std::string& fname) = 0; virtual bool FileExists(const std::string& fname) = 0;
@ -289,6 +304,9 @@ class EnvWrapper : public Env {
Status NewWritableFile(const std::string& f, WritableFile** r) { Status NewWritableFile(const std::string& f, WritableFile** r) {
return target_->NewWritableFile(f, r); return target_->NewWritableFile(f, r);
} }
Status NewAppendableFile(const std::string& f, WritableFile** r) {
return target_->NewAppendableFile(f, r);
}
bool FileExists(const std::string& f) { return target_->FileExists(f); } bool FileExists(const std::string& f) { return target_->FileExists(f); }
Status GetChildren(const std::string& dir, std::vector<std::string>* r) { Status GetChildren(const std::string& dir, std::vector<std::string>* r) {
return target_->GetChildren(dir, r); return target_->GetChildren(dir, r);

View File

@ -128,6 +128,12 @@ struct Options {
// efficiently detect that and will switch to uncompressed mode. // efficiently detect that and will switch to uncompressed mode.
CompressionType compression; CompressionType compression;
// EXPERIMENTAL: If true, append to existing MANIFEST and log files
// when a database is opened. This can significantly speed up open.
//
// Default: currently false, but may become true later.
bool reuse_logs;
// If non-NULL, use the specified filter policy to reduce disk reads. // If non-NULL, use the specified filter policy to reduce disk reads.
// Many applications will benefit from passing the result of // Many applications will benefit from passing the result of
// NewBloomFilterPolicy() here. // NewBloomFilterPolicy() here.

View File

@ -60,6 +60,9 @@ class Status {
// Returns true iff the status indicates an IOError. // Returns true iff the status indicates an IOError.
bool IsIOError() const { return code() == kIOError; } bool IsIOError() const { return code() == kIOError; }
// Returns true iff the status indicates a NotSupportedError.
bool IsNotSupportedError() const { return code() == kNotSupported; }
// Return a string representation of this status suitable for printing. // Return a string representation of this status suitable for printing.
// Returns the string "OK" for success. // Returns the string "OK" for success.
std::string ToString() const; std::string ToString() const;

View File

@ -147,6 +147,11 @@ class LRUCache {
Cache::Handle* Lookup(const Slice& key, uint32_t hash); Cache::Handle* Lookup(const Slice& key, uint32_t hash);
void Release(Cache::Handle* handle); void Release(Cache::Handle* handle);
void Erase(const Slice& key, uint32_t hash); void Erase(const Slice& key, uint32_t hash);
void Prune();
size_t TotalCharge() const {
MutexLock l(&mutex_);
return usage_;
}
private: private:
void LRU_Remove(LRUHandle* e); void LRU_Remove(LRUHandle* e);
@ -157,7 +162,7 @@ class LRUCache {
size_t capacity_; size_t capacity_;
// mutex_ protects the following state. // mutex_ protects the following state.
port::Mutex mutex_; mutable port::Mutex mutex_;
size_t usage_; size_t usage_;
// Dummy head of LRU list. // Dummy head of LRU list.
@ -264,6 +269,19 @@ void LRUCache::Erase(const Slice& key, uint32_t hash) {
} }
} }
void LRUCache::Prune() {
MutexLock l(&mutex_);
for (LRUHandle* e = lru_.next; e != &lru_; ) {
LRUHandle* next = e->next;
if (e->refs == 1) {
table_.Remove(e->key(), e->hash);
LRU_Remove(e);
Unref(e);
}
e = next;
}
}
static const int kNumShardBits = 4; static const int kNumShardBits = 4;
static const int kNumShards = 1 << kNumShardBits; static const int kNumShards = 1 << kNumShardBits;
@ -314,6 +332,18 @@ class ShardedLRUCache : public Cache {
MutexLock l(&id_mutex_); MutexLock l(&id_mutex_);
return ++(last_id_); return ++(last_id_);
} }
virtual void Prune() {
for (int s = 0; s < kNumShards; s++) {
shard_[s].Prune();
}
}
virtual size_t TotalCharge() const {
size_t total = 0;
for (int s = 0; s < kNumShards; s++) {
total += shard_[s].TotalCharge();
}
return total;
}
}; };
} // end anonymous namespace } // end anonymous namespace

View File

@ -179,6 +179,19 @@ TEST(CacheTest, NewId) {
ASSERT_NE(a, b); ASSERT_NE(a, b);
} }
TEST(CacheTest, Prune) {
Insert(1, 100);
Insert(2, 200);
Cache::Handle* handle = cache_->Lookup(EncodeKey(1));
ASSERT_TRUE(handle);
cache_->Prune();
cache_->Release(handle);
ASSERT_EQ(100, Lookup(1));
ASSERT_EQ(-1, Lookup(2));
}
} // namespace leveldb } // namespace leveldb
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -9,6 +9,10 @@ namespace leveldb {
Env::~Env() { Env::~Env() {
} }
Status Env::NewAppendableFile(const std::string& fname, WritableFile** result) {
return Status::NotSupported("NewAppendableFile", fname);
}
SequentialFile::~SequentialFile() { SequentialFile::~SequentialFile() {
} }

View File

@ -350,6 +350,19 @@ class PosixEnv : public Env {
return s; return s;
} }
virtual Status NewAppendableFile(const std::string& fname,
WritableFile** result) {
Status s;
FILE* f = fopen(fname.c_str(), "a");
if (f == NULL) {
*result = NULL;
s = IOError(fname, errno);
} else {
*result = new PosixWritableFile(fname, f);
}
return s;
}
virtual bool FileExists(const std::string& fname) { virtual bool FileExists(const std::string& fname) {
return access(fname.c_str(), F_OK) == 0; return access(fname.c_str(), F_OK) == 0;
} }

View File

@ -22,8 +22,8 @@ Options::Options()
block_size(4096), block_size(4096),
block_restart_interval(16), block_restart_interval(16),
compression(kSnappyCompression), compression(kSnappyCompression),
reuse_logs(false),
filter_policy(NULL) { filter_policy(NULL) {
} }
} // namespace leveldb } // namespace leveldb

View File

@ -45,6 +45,16 @@ class ErrorEnv : public EnvWrapper {
} }
return target()->NewWritableFile(fname, result); return target()->NewWritableFile(fname, result);
} }
virtual Status NewAppendableFile(const std::string& fname,
WritableFile** result) {
if (writable_file_error_) {
++num_writable_file_errors_;
*result = NULL;
return Status::IOError(fname, "fake error");
}
return target()->NewAppendableFile(fname, result);
}
}; };
} // namespace test } // namespace test