Compare commits
6 Commits
main
...
reuse-mani
Author | SHA1 | Date | |
---|---|---|---|
|
1a9648e1f5 | ||
|
f79f4180cc | ||
|
36fc955971 | ||
|
bb61e00815 | ||
|
40c17c0b84 | ||
|
251ebf5dc7 |
18
Makefile
18
Makefile
@ -57,6 +57,7 @@ TESTS = \
|
|||||||
issue200_test \
|
issue200_test \
|
||||||
log_test \
|
log_test \
|
||||||
memenv_test \
|
memenv_test \
|
||||||
|
recovery_test \
|
||||||
skiplist_test \
|
skiplist_test \
|
||||||
table_test \
|
table_test \
|
||||||
version_edit_test \
|
version_edit_test \
|
||||||
@ -177,6 +178,9 @@ issue200_test: issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
|||||||
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
||||||
|
|
||||||
|
recovery_test: db/recovery_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
|
$(CXX) $(LDFLAGS) db/recovery_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
||||||
|
|
||||||
table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||||
$(CXX) $(LDFLAGS) table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
$(CXX) $(LDFLAGS) table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
||||||
|
|
||||||
@ -202,24 +206,22 @@ memenv_test : helpers/memenv/memenv_test.o $(MEMENVLIBRARY) $(LIBRARY) $(TESTHAR
|
|||||||
ifeq ($(PLATFORM), IOS)
|
ifeq ($(PLATFORM), IOS)
|
||||||
# For iOS, create universal object files to be used on both the simulator and
|
# For iOS, create universal object files to be used on both the simulator and
|
||||||
# a device.
|
# a device.
|
||||||
PLATFORMSROOT=/Applications/Xcode.app/Contents/Developer/Platforms
|
SIMULATORSDK=$(shell xcrun -sdk iphonesimulator --show-sdk-path)
|
||||||
SIMULATORROOT=$(PLATFORMSROOT)/iPhoneSimulator.platform/Developer
|
DEVICESDK=$(shell xcrun -sdk iphoneos --show-sdk-path)
|
||||||
DEVICEROOT=$(PLATFORMSROOT)/iPhoneOS.platform/Developer
|
|
||||||
IOSVERSION=$(shell defaults read $(PLATFORMSROOT)/iPhoneOS.platform/version CFBundleShortVersionString)
|
|
||||||
IOSARCH=-arch armv6 -arch armv7 -arch armv7s -arch arm64
|
IOSARCH=-arch armv6 -arch armv7 -arch armv7s -arch arm64
|
||||||
|
|
||||||
.cc.o:
|
.cc.o:
|
||||||
mkdir -p ios-x86/$(dir $@)
|
mkdir -p ios-x86/$(dir $@)
|
||||||
xcrun -sdk iphonesimulator $(CXX) $(CXXFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@
|
xcrun -sdk iphonesimulator $(CXX) $(CXXFLAGS) -isysroot "$(SIMULATORSDK)" -arch i686 -arch x86_64 -c $< -o ios-x86/$@
|
||||||
mkdir -p ios-arm/$(dir $@)
|
mkdir -p ios-arm/$(dir $@)
|
||||||
xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk $(IOSARCH) -c $< -o ios-arm/$@
|
xcrun -sdk iphoneos $(CXX) $(CXXFLAGS) -isysroot "$(DEVICESDK)" $(IOSARCH) -c $< -o ios-arm/$@
|
||||||
xcrun lipo ios-x86/$@ ios-arm/$@ -create -output $@
|
xcrun lipo ios-x86/$@ ios-arm/$@ -create -output $@
|
||||||
|
|
||||||
.c.o:
|
.c.o:
|
||||||
mkdir -p ios-x86/$(dir $@)
|
mkdir -p ios-x86/$(dir $@)
|
||||||
xcrun -sdk iphonesimulator $(CC) $(CFLAGS) -isysroot $(SIMULATORROOT)/SDKs/iPhoneSimulator$(IOSVERSION).sdk -arch i686 -arch x86_64 -c $< -o ios-x86/$@
|
xcrun -sdk iphonesimulator $(CC) $(CFLAGS) -isysroot "$(SIMULATORSDK)" -arch i686 -arch x86_64 -c $< -o ios-x86/$@
|
||||||
mkdir -p ios-arm/$(dir $@)
|
mkdir -p ios-arm/$(dir $@)
|
||||||
xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot $(DEVICEROOT)/SDKs/iPhoneOS$(IOSVERSION).sdk $(IOSARCH) -c $< -o ios-arm/$@
|
xcrun -sdk iphoneos $(CC) $(CFLAGS) -isysroot "$(DEVICESDK)" $(IOSARCH) -c $< -o ios-arm/$@
|
||||||
xcrun lipo ios-x86/$@ ios-arm/$@ -create -output $@
|
xcrun lipo ios-x86/$@ ios-arm/$@ -create -output $@
|
||||||
|
|
||||||
else
|
else
|
||||||
|
@ -36,7 +36,7 @@ class CorruptionTest {
|
|||||||
tiny_cache_ = NewLRUCache(100);
|
tiny_cache_ = NewLRUCache(100);
|
||||||
options_.env = &env_;
|
options_.env = &env_;
|
||||||
options_.block_cache = tiny_cache_;
|
options_.block_cache = tiny_cache_;
|
||||||
dbname_ = test::TmpDir() + "/db_test";
|
dbname_ = test::TmpDir() + "/corruption_test";
|
||||||
DestroyDB(dbname_, options_);
|
DestroyDB(dbname_, options_);
|
||||||
|
|
||||||
db_ = NULL;
|
db_ = NULL;
|
||||||
|
@ -100,6 +100,9 @@ static int FLAGS_bloom_bits = -1;
|
|||||||
// benchmark will fail.
|
// benchmark will fail.
|
||||||
static bool FLAGS_use_existing_db = false;
|
static bool FLAGS_use_existing_db = false;
|
||||||
|
|
||||||
|
// If true, reuse existing log/MANIFEST files when re-opening a database.
|
||||||
|
static bool FLAGS_reuse_logs = false;
|
||||||
|
|
||||||
// Use the db with the following name.
|
// Use the db with the following name.
|
||||||
static const char* FLAGS_db = NULL;
|
static const char* FLAGS_db = NULL;
|
||||||
|
|
||||||
@ -139,6 +142,7 @@ class RandomGenerator {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
#if defined(__linux)
|
||||||
static Slice TrimSpace(Slice s) {
|
static Slice TrimSpace(Slice s) {
|
||||||
size_t start = 0;
|
size_t start = 0;
|
||||||
while (start < s.size() && isspace(s[start])) {
|
while (start < s.size() && isspace(s[start])) {
|
||||||
@ -150,6 +154,7 @@ static Slice TrimSpace(Slice s) {
|
|||||||
}
|
}
|
||||||
return Slice(s.data() + start, limit - start);
|
return Slice(s.data() + start, limit - start);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
static void AppendWithSpace(std::string* str, Slice msg) {
|
static void AppendWithSpace(std::string* str, Slice msg) {
|
||||||
if (msg.empty()) return;
|
if (msg.empty()) return;
|
||||||
@ -700,6 +705,7 @@ class Benchmark {
|
|||||||
options.write_buffer_size = FLAGS_write_buffer_size;
|
options.write_buffer_size = FLAGS_write_buffer_size;
|
||||||
options.max_open_files = FLAGS_open_files;
|
options.max_open_files = FLAGS_open_files;
|
||||||
options.filter_policy = filter_policy_;
|
options.filter_policy = filter_policy_;
|
||||||
|
options.reuse_logs = FLAGS_reuse_logs;
|
||||||
Status s = DB::Open(options, FLAGS_db, &db_);
|
Status s = DB::Open(options, FLAGS_db, &db_);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
|
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
|
||||||
@ -954,6 +960,9 @@ int main(int argc, char** argv) {
|
|||||||
} else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
|
} else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
|
||||||
(n == 0 || n == 1)) {
|
(n == 0 || n == 1)) {
|
||||||
FLAGS_use_existing_db = n;
|
FLAGS_use_existing_db = n;
|
||||||
|
} else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 &&
|
||||||
|
(n == 0 || n == 1)) {
|
||||||
|
FLAGS_reuse_logs = n;
|
||||||
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
|
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
|
||||||
FLAGS_num = n;
|
FLAGS_num = n;
|
||||||
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
|
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
|
||||||
|
191
db/db_impl.cc
191
db/db_impl.cc
@ -125,7 +125,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
|
|||||||
db_lock_(NULL),
|
db_lock_(NULL),
|
||||||
shutting_down_(NULL),
|
shutting_down_(NULL),
|
||||||
bg_cv_(&mutex_),
|
bg_cv_(&mutex_),
|
||||||
mem_(new MemTable(internal_comparator_)),
|
mem_(NULL),
|
||||||
imm_(NULL),
|
imm_(NULL),
|
||||||
logfile_(NULL),
|
logfile_(NULL),
|
||||||
logfile_number_(0),
|
logfile_number_(0),
|
||||||
@ -134,7 +134,6 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
|
|||||||
tmp_batch_(new WriteBatch),
|
tmp_batch_(new WriteBatch),
|
||||||
bg_compaction_scheduled_(false),
|
bg_compaction_scheduled_(false),
|
||||||
manual_compaction_(NULL) {
|
manual_compaction_(NULL) {
|
||||||
mem_->Ref();
|
|
||||||
has_imm_.Release_Store(NULL);
|
has_imm_.Release_Store(NULL);
|
||||||
|
|
||||||
// Reserve ten files or so for other uses and give the rest to TableCache.
|
// Reserve ten files or so for other uses and give the rest to TableCache.
|
||||||
@ -271,7 +270,7 @@ void DBImpl::DeleteObsoleteFiles() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Status DBImpl::Recover(VersionEdit* edit) {
|
Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
|
||||||
mutex_.AssertHeld();
|
mutex_.AssertHeld();
|
||||||
|
|
||||||
// Ignore error from CreateDir since the creation of the DB is
|
// Ignore error from CreateDir since the creation of the DB is
|
||||||
@ -301,66 +300,69 @@ Status DBImpl::Recover(VersionEdit* edit) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
s = versions_->Recover();
|
s = versions_->Recover(save_manifest);
|
||||||
if (s.ok()) {
|
if (!s.ok()) {
|
||||||
SequenceNumber max_sequence(0);
|
return s;
|
||||||
|
}
|
||||||
|
SequenceNumber max_sequence(0);
|
||||||
|
|
||||||
// Recover from all newer log files than the ones named in the
|
// Recover from all newer log files than the ones named in the
|
||||||
// descriptor (new log files may have been added by the previous
|
// descriptor (new log files may have been added by the previous
|
||||||
// incarnation without registering them in the descriptor).
|
// incarnation without registering them in the descriptor).
|
||||||
//
|
//
|
||||||
// Note that PrevLogNumber() is no longer used, but we pay
|
// Note that PrevLogNumber() is no longer used, but we pay
|
||||||
// attention to it in case we are recovering a database
|
// attention to it in case we are recovering a database
|
||||||
// produced by an older version of leveldb.
|
// produced by an older version of leveldb.
|
||||||
const uint64_t min_log = versions_->LogNumber();
|
const uint64_t min_log = versions_->LogNumber();
|
||||||
const uint64_t prev_log = versions_->PrevLogNumber();
|
const uint64_t prev_log = versions_->PrevLogNumber();
|
||||||
std::vector<std::string> filenames;
|
std::vector<std::string> filenames;
|
||||||
s = env_->GetChildren(dbname_, &filenames);
|
s = env_->GetChildren(dbname_, &filenames);
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
std::set<uint64_t> expected;
|
||||||
|
versions_->AddLiveFiles(&expected);
|
||||||
|
uint64_t number;
|
||||||
|
FileType type;
|
||||||
|
std::vector<uint64_t> logs;
|
||||||
|
for (size_t i = 0; i < filenames.size(); i++) {
|
||||||
|
if (ParseFileName(filenames[i], &number, &type)) {
|
||||||
|
expected.erase(number);
|
||||||
|
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
|
||||||
|
logs.push_back(number);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!expected.empty()) {
|
||||||
|
char buf[50];
|
||||||
|
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
|
||||||
|
static_cast<int>(expected.size()));
|
||||||
|
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Recover in the order in which the logs were generated
|
||||||
|
std::sort(logs.begin(), logs.end());
|
||||||
|
for (size_t i = 0; i < logs.size(); i++) {
|
||||||
|
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
|
||||||
|
&max_sequence);
|
||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
std::set<uint64_t> expected;
|
|
||||||
versions_->AddLiveFiles(&expected);
|
|
||||||
uint64_t number;
|
|
||||||
FileType type;
|
|
||||||
std::vector<uint64_t> logs;
|
|
||||||
for (size_t i = 0; i < filenames.size(); i++) {
|
|
||||||
if (ParseFileName(filenames[i], &number, &type)) {
|
|
||||||
expected.erase(number);
|
|
||||||
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
|
|
||||||
logs.push_back(number);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (!expected.empty()) {
|
|
||||||
char buf[50];
|
|
||||||
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
|
|
||||||
static_cast<int>(expected.size()));
|
|
||||||
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Recover in the order in which the logs were generated
|
// The previous incarnation may not have written any MANIFEST
|
||||||
std::sort(logs.begin(), logs.end());
|
// records after allocating this log number. So we manually
|
||||||
for (size_t i = 0; i < logs.size(); i++) {
|
// update the file number allocation counter in VersionSet.
|
||||||
s = RecoverLogFile(logs[i], edit, &max_sequence);
|
versions_->MarkFileNumberUsed(logs[i]);
|
||||||
|
|
||||||
// The previous incarnation may not have written any MANIFEST
|
|
||||||
// records after allocating this log number. So we manually
|
|
||||||
// update the file number allocation counter in VersionSet.
|
|
||||||
versions_->MarkFileNumberUsed(logs[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (s.ok()) {
|
|
||||||
if (versions_->LastSequence() < max_sequence) {
|
|
||||||
versions_->SetLastSequence(max_sequence);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return s;
|
if (versions_->LastSequence() < max_sequence) {
|
||||||
|
versions_->SetLastSequence(max_sequence);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
Status DBImpl::RecoverLogFile(uint64_t log_number,
|
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
|
||||||
VersionEdit* edit,
|
bool* save_manifest, VersionEdit* edit,
|
||||||
SequenceNumber* max_sequence) {
|
SequenceNumber* max_sequence) {
|
||||||
struct LogReporter : public log::Reader::Reporter {
|
struct LogReporter : public log::Reader::Reporter {
|
||||||
Env* env;
|
Env* env;
|
||||||
@ -405,6 +407,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
|||||||
std::string scratch;
|
std::string scratch;
|
||||||
Slice record;
|
Slice record;
|
||||||
WriteBatch batch;
|
WriteBatch batch;
|
||||||
|
int compactions = 0;
|
||||||
MemTable* mem = NULL;
|
MemTable* mem = NULL;
|
||||||
while (reader.ReadRecord(&record, &scratch) &&
|
while (reader.ReadRecord(&record, &scratch) &&
|
||||||
status.ok()) {
|
status.ok()) {
|
||||||
@ -432,25 +435,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
|
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
|
||||||
|
compactions++;
|
||||||
|
*save_manifest = true;
|
||||||
status = WriteLevel0Table(mem, edit, NULL);
|
status = WriteLevel0Table(mem, edit, NULL);
|
||||||
|
mem->Unref();
|
||||||
|
mem = NULL;
|
||||||
if (!status.ok()) {
|
if (!status.ok()) {
|
||||||
// Reflect errors immediately so that conditions like full
|
// Reflect errors immediately so that conditions like full
|
||||||
// file-systems cause the DB::Open() to fail.
|
// file-systems cause the DB::Open() to fail.
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
mem->Unref();
|
|
||||||
mem = NULL;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status.ok() && mem != NULL) {
|
delete file;
|
||||||
status = WriteLevel0Table(mem, edit, NULL);
|
|
||||||
// Reflect errors immediately so that conditions like full
|
// See if we should keep reusing the last log file.
|
||||||
// file-systems cause the DB::Open() to fail.
|
if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
|
||||||
|
assert(logfile_ == NULL);
|
||||||
|
assert(log_ == NULL);
|
||||||
|
assert(mem_ == NULL);
|
||||||
|
uint64_t lfile_size;
|
||||||
|
if (env_->GetFileSize(fname, &lfile_size).ok() &&
|
||||||
|
env_->NewAppendableFile(fname, &logfile_).ok()) {
|
||||||
|
Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
|
||||||
|
log_ = new log::Writer(logfile_, lfile_size);
|
||||||
|
logfile_number_ = log_number;
|
||||||
|
if (mem != NULL) {
|
||||||
|
mem_ = mem;
|
||||||
|
mem = NULL;
|
||||||
|
} else {
|
||||||
|
// mem can be NULL if lognum exists but was empty.
|
||||||
|
mem_ = new MemTable(internal_comparator_);
|
||||||
|
mem_->Ref();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mem != NULL) {
|
||||||
|
// mem did not get reused; compact it.
|
||||||
|
if (status.ok()) {
|
||||||
|
*save_manifest = true;
|
||||||
|
status = WriteLevel0Table(mem, edit, NULL);
|
||||||
|
}
|
||||||
|
mem->Unref();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mem != NULL) mem->Unref();
|
|
||||||
delete file;
|
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1395,6 +1425,19 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
|
|||||||
} else if (in == "sstables") {
|
} else if (in == "sstables") {
|
||||||
*value = versions_->current()->DebugString();
|
*value = versions_->current()->DebugString();
|
||||||
return true;
|
return true;
|
||||||
|
} else if (in == "approximate-memory-usage") {
|
||||||
|
size_t total_usage = options_.block_cache->TotalCharge();
|
||||||
|
if (mem_) {
|
||||||
|
total_usage += mem_->ApproximateMemoryUsage();
|
||||||
|
}
|
||||||
|
if (imm_) {
|
||||||
|
total_usage += imm_->ApproximateMemoryUsage();
|
||||||
|
}
|
||||||
|
char buf[50];
|
||||||
|
snprintf(buf, sizeof(buf), "%llu",
|
||||||
|
static_cast<unsigned long long>(total_usage));
|
||||||
|
value->append(buf);
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
@ -1449,8 +1492,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
|
|||||||
DBImpl* impl = new DBImpl(options, dbname);
|
DBImpl* impl = new DBImpl(options, dbname);
|
||||||
impl->mutex_.Lock();
|
impl->mutex_.Lock();
|
||||||
VersionEdit edit;
|
VersionEdit edit;
|
||||||
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
|
// Recover handles create_if_missing, error_if_exists
|
||||||
if (s.ok()) {
|
bool save_manifest = false;
|
||||||
|
Status s = impl->Recover(&edit, &save_manifest);
|
||||||
|
if (s.ok() && impl->mem_ == NULL) {
|
||||||
|
// Create new log and a corresponding memtable.
|
||||||
uint64_t new_log_number = impl->versions_->NewFileNumber();
|
uint64_t new_log_number = impl->versions_->NewFileNumber();
|
||||||
WritableFile* lfile;
|
WritableFile* lfile;
|
||||||
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
|
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
|
||||||
@ -1460,15 +1506,22 @@ Status DB::Open(const Options& options, const std::string& dbname,
|
|||||||
impl->logfile_ = lfile;
|
impl->logfile_ = lfile;
|
||||||
impl->logfile_number_ = new_log_number;
|
impl->logfile_number_ = new_log_number;
|
||||||
impl->log_ = new log::Writer(lfile);
|
impl->log_ = new log::Writer(lfile);
|
||||||
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
|
impl->mem_ = new MemTable(impl->internal_comparator_);
|
||||||
}
|
impl->mem_->Ref();
|
||||||
if (s.ok()) {
|
|
||||||
impl->DeleteObsoleteFiles();
|
|
||||||
impl->MaybeScheduleCompaction();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (s.ok() && save_manifest) {
|
||||||
|
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
|
||||||
|
edit.SetLogNumber(impl->logfile_number_);
|
||||||
|
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
|
||||||
|
}
|
||||||
|
if (s.ok()) {
|
||||||
|
impl->DeleteObsoleteFiles();
|
||||||
|
impl->MaybeScheduleCompaction();
|
||||||
|
}
|
||||||
impl->mutex_.Unlock();
|
impl->mutex_.Unlock();
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
|
assert(impl->mem_ != NULL);
|
||||||
*dbptr = impl;
|
*dbptr = impl;
|
||||||
} else {
|
} else {
|
||||||
delete impl;
|
delete impl;
|
||||||
|
@ -78,7 +78,8 @@ class DBImpl : public DB {
|
|||||||
// Recover the descriptor from persistent storage. May do a significant
|
// Recover the descriptor from persistent storage. May do a significant
|
||||||
// amount of work to recover recently logged updates. Any changes to
|
// amount of work to recover recently logged updates. Any changes to
|
||||||
// be made to the descriptor are added to *edit.
|
// be made to the descriptor are added to *edit.
|
||||||
Status Recover(VersionEdit* edit) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
Status Recover(VersionEdit* edit, bool* save_manifest)
|
||||||
|
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
||||||
|
|
||||||
void MaybeIgnoreError(Status* s) const;
|
void MaybeIgnoreError(Status* s) const;
|
||||||
|
|
||||||
@ -90,9 +91,8 @@ class DBImpl : public DB {
|
|||||||
// Errors are recorded in bg_error_.
|
// Errors are recorded in bg_error_.
|
||||||
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
||||||
|
|
||||||
Status RecoverLogFile(uint64_t log_number,
|
Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest,
|
||||||
VersionEdit* edit,
|
VersionEdit* edit, SequenceNumber* max_sequence)
|
||||||
SequenceNumber* max_sequence)
|
|
||||||
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
||||||
|
|
||||||
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
|
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
|
||||||
|
@ -193,6 +193,7 @@ class DBTest {
|
|||||||
// Sequence of option configurations to try
|
// Sequence of option configurations to try
|
||||||
enum OptionConfig {
|
enum OptionConfig {
|
||||||
kDefault,
|
kDefault,
|
||||||
|
kReuse,
|
||||||
kFilter,
|
kFilter,
|
||||||
kUncompressed,
|
kUncompressed,
|
||||||
kEnd
|
kEnd
|
||||||
@ -237,7 +238,11 @@ class DBTest {
|
|||||||
// Return the current option configuration.
|
// Return the current option configuration.
|
||||||
Options CurrentOptions() {
|
Options CurrentOptions() {
|
||||||
Options options;
|
Options options;
|
||||||
|
options.reuse_logs = false;
|
||||||
switch (option_config_) {
|
switch (option_config_) {
|
||||||
|
case kReuse:
|
||||||
|
options.reuse_logs = true;
|
||||||
|
break;
|
||||||
case kFilter:
|
case kFilter:
|
||||||
options.filter_policy = filter_policy_;
|
options.filter_policy = filter_policy_;
|
||||||
break;
|
break;
|
||||||
@ -558,6 +563,17 @@ TEST(DBTest, GetFromVersions) {
|
|||||||
} while (ChangeOptions());
|
} while (ChangeOptions());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(DBTest, GetMemUsage) {
|
||||||
|
do {
|
||||||
|
ASSERT_OK(Put("foo", "v1"));
|
||||||
|
std::string val;
|
||||||
|
ASSERT_TRUE(db_->GetProperty("leveldb.approximate-memory-usage", &val));
|
||||||
|
int mem_usage = atoi(val.c_str());
|
||||||
|
ASSERT_GT(mem_usage, 0);
|
||||||
|
ASSERT_LT(mem_usage, 5*1024*1024);
|
||||||
|
} while (ChangeOptions());
|
||||||
|
}
|
||||||
|
|
||||||
TEST(DBTest, GetSnapshot) {
|
TEST(DBTest, GetSnapshot) {
|
||||||
do {
|
do {
|
||||||
// Try with both a short key and a long key
|
// Try with both a short key and a long key
|
||||||
@ -1080,6 +1096,14 @@ TEST(DBTest, ApproximateSizes) {
|
|||||||
// 0 because GetApproximateSizes() does not account for memtable space
|
// 0 because GetApproximateSizes() does not account for memtable space
|
||||||
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
|
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
|
||||||
|
|
||||||
|
if (options.reuse_logs) {
|
||||||
|
// Recovery will reuse memtable, and GetApproximateSizes() does not
|
||||||
|
// account for memtable usage;
|
||||||
|
Reopen(&options);
|
||||||
|
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// Check sizes across recovery by reopening a few times
|
// Check sizes across recovery by reopening a few times
|
||||||
for (int run = 0; run < 3; run++) {
|
for (int run = 0; run < 3; run++) {
|
||||||
Reopen(&options);
|
Reopen(&options);
|
||||||
@ -1123,6 +1147,11 @@ TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
|
|||||||
ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
|
ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
|
||||||
ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));
|
ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));
|
||||||
|
|
||||||
|
if (options.reuse_logs) {
|
||||||
|
// Need to force a memtable compaction since recovery does not do so.
|
||||||
|
ASSERT_OK(dbfull()->TEST_CompactMemTable());
|
||||||
|
}
|
||||||
|
|
||||||
// Check sizes across recovery by reopening a few times
|
// Check sizes across recovery by reopening a few times
|
||||||
for (int run = 0; run < 3; run++) {
|
for (int run = 0; run < 3; run++) {
|
||||||
Reopen(&options);
|
Reopen(&options);
|
||||||
@ -2084,7 +2113,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
|
|||||||
InternalKeyComparator cmp(BytewiseComparator());
|
InternalKeyComparator cmp(BytewiseComparator());
|
||||||
Options options;
|
Options options;
|
||||||
VersionSet vset(dbname, &options, NULL, &cmp);
|
VersionSet vset(dbname, &options, NULL, &cmp);
|
||||||
ASSERT_OK(vset.Recover());
|
bool save_manifest;
|
||||||
|
ASSERT_OK(vset.Recover(&save_manifest));
|
||||||
VersionEdit vbase;
|
VersionEdit vbase;
|
||||||
uint64_t fnum = 1;
|
uint64_t fnum = 1;
|
||||||
for (int i = 0; i < num_base_files; i++) {
|
for (int i = 0; i < num_base_files; i++) {
|
||||||
|
@ -106,7 +106,7 @@ struct FileState {
|
|||||||
// is written to or sync'ed.
|
// is written to or sync'ed.
|
||||||
class TestWritableFile : public WritableFile {
|
class TestWritableFile : public WritableFile {
|
||||||
public:
|
public:
|
||||||
TestWritableFile(const std::string& fname,
|
TestWritableFile(const FileState& state,
|
||||||
WritableFile* f,
|
WritableFile* f,
|
||||||
FaultInjectionTestEnv* env);
|
FaultInjectionTestEnv* env);
|
||||||
virtual ~TestWritableFile();
|
virtual ~TestWritableFile();
|
||||||
@ -130,6 +130,8 @@ class FaultInjectionTestEnv : public EnvWrapper {
|
|||||||
virtual ~FaultInjectionTestEnv() { }
|
virtual ~FaultInjectionTestEnv() { }
|
||||||
virtual Status NewWritableFile(const std::string& fname,
|
virtual Status NewWritableFile(const std::string& fname,
|
||||||
WritableFile** result);
|
WritableFile** result);
|
||||||
|
virtual Status NewAppendableFile(const std::string& fname,
|
||||||
|
WritableFile** result);
|
||||||
virtual Status DeleteFile(const std::string& f);
|
virtual Status DeleteFile(const std::string& f);
|
||||||
virtual Status RenameFile(const std::string& s, const std::string& t);
|
virtual Status RenameFile(const std::string& s, const std::string& t);
|
||||||
|
|
||||||
@ -154,15 +156,14 @@ class FaultInjectionTestEnv : public EnvWrapper {
|
|||||||
bool filesystem_active_; // Record flushes, syncs, writes
|
bool filesystem_active_; // Record flushes, syncs, writes
|
||||||
};
|
};
|
||||||
|
|
||||||
TestWritableFile::TestWritableFile(const std::string& fname,
|
TestWritableFile::TestWritableFile(const FileState& state,
|
||||||
WritableFile* f,
|
WritableFile* f,
|
||||||
FaultInjectionTestEnv* env)
|
FaultInjectionTestEnv* env)
|
||||||
: state_(fname),
|
: state_(state),
|
||||||
target_(f),
|
target_(f),
|
||||||
writable_file_opened_(true),
|
writable_file_opened_(true),
|
||||||
env_(env) {
|
env_(env) {
|
||||||
assert(f != NULL);
|
assert(f != NULL);
|
||||||
state_.pos_ = 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TestWritableFile::~TestWritableFile() {
|
TestWritableFile::~TestWritableFile() {
|
||||||
@ -228,9 +229,12 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
|
|||||||
WritableFile* actual_writable_file;
|
WritableFile* actual_writable_file;
|
||||||
Status s = target()->NewWritableFile(fname, &actual_writable_file);
|
Status s = target()->NewWritableFile(fname, &actual_writable_file);
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
*result = new TestWritableFile(fname, actual_writable_file, this);
|
FileState state(fname);
|
||||||
// WritableFile doesn't append to files, so if the same file is opened again
|
state.pos_ = 0;
|
||||||
// then it will be truncated - so forget our saved state.
|
*result = new TestWritableFile(state, actual_writable_file, this);
|
||||||
|
// NewWritableFile doesn't append to files, so if the same file is
|
||||||
|
// opened again then it will be truncated - so forget our saved
|
||||||
|
// state.
|
||||||
UntrackFile(fname);
|
UntrackFile(fname);
|
||||||
MutexLock l(&mutex_);
|
MutexLock l(&mutex_);
|
||||||
new_files_since_last_dir_sync_.insert(fname);
|
new_files_since_last_dir_sync_.insert(fname);
|
||||||
@ -238,6 +242,26 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status FaultInjectionTestEnv::NewAppendableFile(const std::string& fname,
|
||||||
|
WritableFile** result) {
|
||||||
|
WritableFile* actual_writable_file;
|
||||||
|
Status s = target()->NewAppendableFile(fname, &actual_writable_file);
|
||||||
|
if (s.ok()) {
|
||||||
|
FileState state(fname);
|
||||||
|
state.pos_ = 0;
|
||||||
|
{
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
if (db_file_state_.count(fname) == 0) {
|
||||||
|
new_files_since_last_dir_sync_.insert(fname);
|
||||||
|
} else {
|
||||||
|
state = db_file_state_[fname];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
*result = new TestWritableFile(state, actual_writable_file, this);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
Status FaultInjectionTestEnv::DropUnsyncedFileData() {
|
Status FaultInjectionTestEnv::DropUnsyncedFileData() {
|
||||||
Status s;
|
Status s;
|
||||||
MutexLock l(&mutex_);
|
MutexLock l(&mutex_);
|
||||||
@ -301,9 +325,10 @@ Status FaultInjectionTestEnv::RenameFile(const std::string& s,
|
|||||||
}
|
}
|
||||||
|
|
||||||
void FaultInjectionTestEnv::ResetState() {
|
void FaultInjectionTestEnv::ResetState() {
|
||||||
|
// Since we are not destroying the database, the existing files
|
||||||
|
// should keep their recorded synced/flushed state. Therefore
|
||||||
|
// we do not reset db_file_state_ and new_files_since_last_dir_sync_.
|
||||||
MutexLock l(&mutex_);
|
MutexLock l(&mutex_);
|
||||||
db_file_state_.clear();
|
|
||||||
new_files_since_last_dir_sync_.clear();
|
|
||||||
SetFilesystemActive(true);
|
SetFilesystemActive(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -342,51 +367,28 @@ class FaultInjectionTest {
|
|||||||
Options options_;
|
Options options_;
|
||||||
DB* db_;
|
DB* db_;
|
||||||
|
|
||||||
FaultInjectionTest() : env_(NULL), tiny_cache_(NULL), db_(NULL) { NewDB(); }
|
FaultInjectionTest()
|
||||||
|
: env_(new FaultInjectionTestEnv),
|
||||||
~FaultInjectionTest() { ASSERT_OK(TearDown()); }
|
tiny_cache_(NewLRUCache(100)),
|
||||||
|
db_(NULL) {
|
||||||
Status NewDB() {
|
dbname_ = test::TmpDir() + "/fault_test";
|
||||||
assert(db_ == NULL);
|
DestroyDB(dbname_, Options()); // Destroy any db from earlier run
|
||||||
assert(tiny_cache_ == NULL);
|
options_.reuse_logs = true;
|
||||||
assert(env_ == NULL);
|
|
||||||
|
|
||||||
env_ = new FaultInjectionTestEnv();
|
|
||||||
|
|
||||||
options_ = Options();
|
|
||||||
options_.env = env_;
|
options_.env = env_;
|
||||||
options_.paranoid_checks = true;
|
options_.paranoid_checks = true;
|
||||||
|
|
||||||
tiny_cache_ = NewLRUCache(100);
|
|
||||||
options_.block_cache = tiny_cache_;
|
options_.block_cache = tiny_cache_;
|
||||||
dbname_ = test::TmpDir() + "/fault_test";
|
|
||||||
|
|
||||||
options_.create_if_missing = true;
|
options_.create_if_missing = true;
|
||||||
Status s = OpenDB();
|
|
||||||
options_.create_if_missing = false;
|
|
||||||
return s;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Status SetUp() {
|
~FaultInjectionTest() {
|
||||||
Status s = TearDown();
|
|
||||||
if (s.ok()) {
|
|
||||||
s = NewDB();
|
|
||||||
}
|
|
||||||
return s;
|
|
||||||
}
|
|
||||||
|
|
||||||
Status TearDown() {
|
|
||||||
CloseDB();
|
CloseDB();
|
||||||
|
DestroyDB(dbname_, Options());
|
||||||
Status s = DestroyDB(dbname_, Options());
|
|
||||||
|
|
||||||
delete tiny_cache_;
|
delete tiny_cache_;
|
||||||
tiny_cache_ = NULL;
|
|
||||||
|
|
||||||
delete env_;
|
delete env_;
|
||||||
env_ = NULL;
|
}
|
||||||
|
|
||||||
return s;
|
void ReuseLogs(bool reuse) {
|
||||||
|
options_.reuse_logs = reuse;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Build(int start_idx, int num_vals) {
|
void Build(int start_idx, int num_vals) {
|
||||||
@ -506,33 +508,43 @@ class FaultInjectionTest {
|
|||||||
ResetDBState(reset_method);
|
ResetDBState(reset_method);
|
||||||
ASSERT_OK(OpenDB());
|
ASSERT_OK(OpenDB());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DoTest() {
|
||||||
|
Random rnd(0);
|
||||||
|
ASSERT_OK(OpenDB());
|
||||||
|
for (size_t idx = 0; idx < kNumIterations; idx++) {
|
||||||
|
int num_pre_sync = rnd.Uniform(kMaxNumValues);
|
||||||
|
int num_post_sync = rnd.Uniform(kMaxNumValues);
|
||||||
|
|
||||||
|
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
|
||||||
|
PartialCompactTestReopenWithFault(RESET_DROP_UNSYNCED_DATA,
|
||||||
|
num_pre_sync,
|
||||||
|
num_post_sync);
|
||||||
|
|
||||||
|
NoWriteTestPreFault();
|
||||||
|
NoWriteTestReopenWithFault(RESET_DROP_UNSYNCED_DATA);
|
||||||
|
|
||||||
|
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
|
||||||
|
// No new files created so we expect all values since no files will be
|
||||||
|
// dropped.
|
||||||
|
PartialCompactTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES,
|
||||||
|
num_pre_sync + num_post_sync,
|
||||||
|
0);
|
||||||
|
|
||||||
|
NoWriteTestPreFault();
|
||||||
|
NoWriteTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES);
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST(FaultInjectionTest, FaultTest) {
|
TEST(FaultInjectionTest, FaultTestNoLogReuse) {
|
||||||
Random rnd(0);
|
ReuseLogs(false);
|
||||||
ASSERT_OK(SetUp());
|
DoTest();
|
||||||
for (size_t idx = 0; idx < kNumIterations; idx++) {
|
}
|
||||||
int num_pre_sync = rnd.Uniform(kMaxNumValues);
|
|
||||||
int num_post_sync = rnd.Uniform(kMaxNumValues);
|
|
||||||
|
|
||||||
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
|
TEST(FaultInjectionTest, FaultTestWithLogReuse) {
|
||||||
PartialCompactTestReopenWithFault(RESET_DROP_UNSYNCED_DATA,
|
ReuseLogs(true);
|
||||||
num_pre_sync,
|
DoTest();
|
||||||
num_post_sync);
|
|
||||||
|
|
||||||
NoWriteTestPreFault();
|
|
||||||
NoWriteTestReopenWithFault(RESET_DROP_UNSYNCED_DATA);
|
|
||||||
|
|
||||||
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
|
|
||||||
// No new files created so we expect all values since no files will be
|
|
||||||
// dropped.
|
|
||||||
PartialCompactTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES,
|
|
||||||
num_pre_sync + num_post_sync,
|
|
||||||
0);
|
|
||||||
|
|
||||||
NoWriteTestPreFault();
|
|
||||||
NoWriteTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace leveldb
|
} // namespace leveldb
|
||||||
|
@ -104,7 +104,7 @@ class LogTest {
|
|||||||
StringSource source_;
|
StringSource source_;
|
||||||
ReportCollector report_;
|
ReportCollector report_;
|
||||||
bool reading_;
|
bool reading_;
|
||||||
Writer writer_;
|
Writer* writer_;
|
||||||
Reader reader_;
|
Reader reader_;
|
||||||
|
|
||||||
// Record metadata for testing initial offset functionality
|
// Record metadata for testing initial offset functionality
|
||||||
@ -113,14 +113,23 @@ class LogTest {
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
LogTest() : reading_(false),
|
LogTest() : reading_(false),
|
||||||
writer_(&dest_),
|
writer_(new Writer(&dest_)),
|
||||||
reader_(&source_, &report_, true/*checksum*/,
|
reader_(&source_, &report_, true/*checksum*/,
|
||||||
0/*initial_offset*/) {
|
0/*initial_offset*/) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
~LogTest() {
|
||||||
|
delete writer_;
|
||||||
|
}
|
||||||
|
|
||||||
|
void ReopenForAppend() {
|
||||||
|
delete writer_;
|
||||||
|
writer_ = new Writer(&dest_, dest_.contents_.size());
|
||||||
|
}
|
||||||
|
|
||||||
void Write(const std::string& msg) {
|
void Write(const std::string& msg) {
|
||||||
ASSERT_TRUE(!reading_) << "Write() after starting to read";
|
ASSERT_TRUE(!reading_) << "Write() after starting to read";
|
||||||
writer_.AddRecord(Slice(msg));
|
writer_->AddRecord(Slice(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t WrittenBytes() const {
|
size_t WrittenBytes() const {
|
||||||
@ -318,6 +327,15 @@ TEST(LogTest, AlignedEof) {
|
|||||||
ASSERT_EQ("EOF", Read());
|
ASSERT_EQ("EOF", Read());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(LogTest, OpenForAppend) {
|
||||||
|
Write("hello");
|
||||||
|
ReopenForAppend();
|
||||||
|
Write("world");
|
||||||
|
ASSERT_EQ("hello", Read());
|
||||||
|
ASSERT_EQ("world", Read());
|
||||||
|
ASSERT_EQ("EOF", Read());
|
||||||
|
}
|
||||||
|
|
||||||
TEST(LogTest, RandomRead) {
|
TEST(LogTest, RandomRead) {
|
||||||
const int N = 500;
|
const int N = 500;
|
||||||
Random write_rnd(301);
|
Random write_rnd(301);
|
||||||
|
@ -12,13 +12,22 @@
|
|||||||
namespace leveldb {
|
namespace leveldb {
|
||||||
namespace log {
|
namespace log {
|
||||||
|
|
||||||
|
static void InitTypeCrc(uint32_t* type_crc) {
|
||||||
|
for (int i = 0; i <= kMaxRecordType; i++) {
|
||||||
|
char t = static_cast<char>(i);
|
||||||
|
type_crc[i] = crc32c::Value(&t, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Writer::Writer(WritableFile* dest)
|
Writer::Writer(WritableFile* dest)
|
||||||
: dest_(dest),
|
: dest_(dest),
|
||||||
block_offset_(0) {
|
block_offset_(0) {
|
||||||
for (int i = 0; i <= kMaxRecordType; i++) {
|
InitTypeCrc(type_crc_);
|
||||||
char t = static_cast<char>(i);
|
}
|
||||||
type_crc_[i] = crc32c::Value(&t, 1);
|
|
||||||
}
|
Writer::Writer(WritableFile* dest, uint64_t dest_length)
|
||||||
|
: dest_(dest), block_offset_(dest_length % kBlockSize) {
|
||||||
|
InitTypeCrc(type_crc_);
|
||||||
}
|
}
|
||||||
|
|
||||||
Writer::~Writer() {
|
Writer::~Writer() {
|
||||||
|
@ -22,6 +22,12 @@ class Writer {
|
|||||||
// "*dest" must be initially empty.
|
// "*dest" must be initially empty.
|
||||||
// "*dest" must remain live while this Writer is in use.
|
// "*dest" must remain live while this Writer is in use.
|
||||||
explicit Writer(WritableFile* dest);
|
explicit Writer(WritableFile* dest);
|
||||||
|
|
||||||
|
// Create a writer that will append data to "*dest".
|
||||||
|
// "*dest" must have initial length "dest_length".
|
||||||
|
// "*dest" must remain live while this Writer is in use.
|
||||||
|
Writer(WritableFile* dest, uint64_t dest_length);
|
||||||
|
|
||||||
~Writer();
|
~Writer();
|
||||||
|
|
||||||
Status AddRecord(const Slice& slice);
|
Status AddRecord(const Slice& slice);
|
||||||
|
324
db/recovery_test.cc
Normal file
324
db/recovery_test.cc
Normal file
@ -0,0 +1,324 @@
|
|||||||
|
// Copyright (c) 2014 The LevelDB Authors. All rights reserved.
|
||||||
|
// Use of this source code is governed by a BSD-style license that can be
|
||||||
|
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||||
|
|
||||||
|
#include "db/db_impl.h"
|
||||||
|
#include "db/filename.h"
|
||||||
|
#include "db/version_set.h"
|
||||||
|
#include "db/write_batch_internal.h"
|
||||||
|
#include "leveldb/db.h"
|
||||||
|
#include "leveldb/env.h"
|
||||||
|
#include "leveldb/write_batch.h"
|
||||||
|
#include "util/logging.h"
|
||||||
|
#include "util/testharness.h"
|
||||||
|
#include "util/testutil.h"
|
||||||
|
|
||||||
|
namespace leveldb {
|
||||||
|
|
||||||
|
class RecoveryTest {
|
||||||
|
public:
|
||||||
|
RecoveryTest() : env_(Env::Default()), db_(NULL) {
|
||||||
|
dbname_ = test::TmpDir() + "/recovery_test";
|
||||||
|
DestroyDB(dbname_, Options());
|
||||||
|
Open();
|
||||||
|
}
|
||||||
|
|
||||||
|
~RecoveryTest() {
|
||||||
|
Close();
|
||||||
|
DestroyDB(dbname_, Options());
|
||||||
|
}
|
||||||
|
|
||||||
|
DBImpl* dbfull() const { return reinterpret_cast<DBImpl*>(db_); }
|
||||||
|
Env* env() const { return env_; }
|
||||||
|
|
||||||
|
bool CanAppend() {
|
||||||
|
WritableFile* tmp;
|
||||||
|
Status s = env_->NewAppendableFile(CurrentFileName(dbname_), &tmp);
|
||||||
|
delete tmp;
|
||||||
|
if (s.IsNotSupportedError()) {
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void Close() {
|
||||||
|
delete db_;
|
||||||
|
db_ = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Open(Options* options = NULL) {
|
||||||
|
Close();
|
||||||
|
Options opts;
|
||||||
|
if (options != NULL) {
|
||||||
|
opts = *options;
|
||||||
|
} else {
|
||||||
|
opts.reuse_logs = true; // TODO(sanjay): test both ways
|
||||||
|
opts.create_if_missing = true;
|
||||||
|
}
|
||||||
|
if (opts.env == NULL) {
|
||||||
|
opts.env = env_;
|
||||||
|
}
|
||||||
|
ASSERT_OK(DB::Open(opts, dbname_, &db_));
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
}
|
||||||
|
|
||||||
|
Status Put(const std::string& k, const std::string& v) {
|
||||||
|
return db_->Put(WriteOptions(), k, v);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
|
||||||
|
std::string result;
|
||||||
|
Status s = db_->Get(ReadOptions(), k, &result);
|
||||||
|
if (s.IsNotFound()) {
|
||||||
|
result = "NOT_FOUND";
|
||||||
|
} else if (!s.ok()) {
|
||||||
|
result = s.ToString();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string ManifestFileName() {
|
||||||
|
std::string current;
|
||||||
|
ASSERT_OK(ReadFileToString(env_, CurrentFileName(dbname_), ¤t));
|
||||||
|
size_t len = current.size();
|
||||||
|
if (len > 0 && current[len-1] == '\n') {
|
||||||
|
current.resize(len - 1);
|
||||||
|
}
|
||||||
|
return dbname_ + "/" + current;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string LogName(uint64_t number) {
|
||||||
|
return LogFileName(dbname_, number);
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t DeleteLogFiles() {
|
||||||
|
std::vector<uint64_t> logs = GetFiles(kLogFile);
|
||||||
|
for (size_t i = 0; i < logs.size(); i++) {
|
||||||
|
ASSERT_OK(env_->DeleteFile(LogName(logs[i]))) << LogName(logs[i]);
|
||||||
|
}
|
||||||
|
return logs.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t FirstLogFile() {
|
||||||
|
return GetFiles(kLogFile)[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<std::uint64_t> GetFiles(FileType t) {
|
||||||
|
std::vector<std::string> filenames;
|
||||||
|
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
|
||||||
|
std::vector<std::uint64_t> result;
|
||||||
|
for (size_t i = 0; i < filenames.size(); i++) {
|
||||||
|
uint64_t number;
|
||||||
|
FileType type;
|
||||||
|
if (ParseFileName(filenames[i], &number, &type) && type == t) {
|
||||||
|
result.push_back(number);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
int NumLogs() {
|
||||||
|
return GetFiles(kLogFile).size();
|
||||||
|
}
|
||||||
|
|
||||||
|
int NumTables() {
|
||||||
|
return GetFiles(kTableFile).size();
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t FileSize(const std::string& fname) {
|
||||||
|
uint64_t result;
|
||||||
|
ASSERT_OK(env_->GetFileSize(fname, &result)) << fname;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
void CompactMemTable() {
|
||||||
|
dbfull()->TEST_CompactMemTable();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Directly construct a log file that sets key to val.
|
||||||
|
void MakeLogFile(uint64_t lognum, SequenceNumber seq, Slice key, Slice val) {
|
||||||
|
std::string fname = LogFileName(dbname_, lognum);
|
||||||
|
WritableFile* file;
|
||||||
|
ASSERT_OK(env_->NewWritableFile(fname, &file));
|
||||||
|
log::Writer writer(file);
|
||||||
|
WriteBatch batch;
|
||||||
|
batch.Put(key, val);
|
||||||
|
WriteBatchInternal::SetSequence(&batch, seq);
|
||||||
|
ASSERT_OK(writer.AddRecord(WriteBatchInternal::Contents(&batch)));
|
||||||
|
ASSERT_OK(file->Flush());
|
||||||
|
delete file;
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::string dbname_;
|
||||||
|
Env* env_;
|
||||||
|
DB* db_;
|
||||||
|
};
|
||||||
|
|
||||||
|
TEST(RecoveryTest, ManifestReused) {
|
||||||
|
if (!CanAppend()) {
|
||||||
|
fprintf(stderr, "skipping test because env does not support appending\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ASSERT_OK(Put("foo", "bar"));
|
||||||
|
Close();
|
||||||
|
std::string old_manifest = ManifestFileName();
|
||||||
|
Open();
|
||||||
|
ASSERT_EQ(old_manifest, ManifestFileName());
|
||||||
|
ASSERT_EQ("bar", Get("foo"));
|
||||||
|
Open();
|
||||||
|
ASSERT_EQ(old_manifest, ManifestFileName());
|
||||||
|
ASSERT_EQ("bar", Get("foo"));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RecoveryTest, LargeManifestCompacted) {
|
||||||
|
if (!CanAppend()) {
|
||||||
|
fprintf(stderr, "skipping test because env does not support appending\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
ASSERT_OK(Put("foo", "bar"));
|
||||||
|
Close();
|
||||||
|
std::string old_manifest = ManifestFileName();
|
||||||
|
|
||||||
|
// Pad with zeroes to make manifest file very big.
|
||||||
|
{
|
||||||
|
uint64_t len = FileSize(old_manifest);
|
||||||
|
WritableFile* file;
|
||||||
|
ASSERT_OK(env()->NewAppendableFile(old_manifest, &file));
|
||||||
|
std::string zeroes(3*1048576 - static_cast<size_t>(len), 0);
|
||||||
|
ASSERT_OK(file->Append(zeroes));
|
||||||
|
ASSERT_OK(file->Flush());
|
||||||
|
delete file;
|
||||||
|
}
|
||||||
|
|
||||||
|
Open();
|
||||||
|
std::string new_manifest = ManifestFileName();
|
||||||
|
ASSERT_NE(old_manifest, new_manifest);
|
||||||
|
ASSERT_GT(10000, FileSize(new_manifest));
|
||||||
|
ASSERT_EQ("bar", Get("foo"));
|
||||||
|
|
||||||
|
Open();
|
||||||
|
ASSERT_EQ(new_manifest, ManifestFileName());
|
||||||
|
ASSERT_EQ("bar", Get("foo"));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RecoveryTest, NoLogFiles) {
|
||||||
|
ASSERT_OK(Put("foo", "bar"));
|
||||||
|
ASSERT_EQ(1, DeleteLogFiles());
|
||||||
|
Open();
|
||||||
|
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||||
|
Open();
|
||||||
|
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RecoveryTest, LogFileReuse) {
|
||||||
|
if (!CanAppend()) {
|
||||||
|
fprintf(stderr, "skipping test because env does not support appending\n");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < 2; i++) {
|
||||||
|
ASSERT_OK(Put("foo", "bar"));
|
||||||
|
if (i == 0) {
|
||||||
|
// Compact to ensure current log is empty
|
||||||
|
CompactMemTable();
|
||||||
|
}
|
||||||
|
Close();
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
uint64_t number = FirstLogFile();
|
||||||
|
if (i == 0) {
|
||||||
|
ASSERT_EQ(0, FileSize(LogName(number)));
|
||||||
|
} else {
|
||||||
|
ASSERT_LT(0, FileSize(LogName(number)));
|
||||||
|
}
|
||||||
|
Open();
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
ASSERT_EQ(number, FirstLogFile()) << "did not reuse log file";
|
||||||
|
ASSERT_EQ("bar", Get("foo"));
|
||||||
|
Open();
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
ASSERT_EQ(number, FirstLogFile()) << "did not reuse log file";
|
||||||
|
ASSERT_EQ("bar", Get("foo"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RecoveryTest, MultipleMemTables) {
|
||||||
|
// Make a large log.
|
||||||
|
const int kNum = 1000;
|
||||||
|
for (int i = 0; i < kNum; i++) {
|
||||||
|
char buf[100];
|
||||||
|
snprintf(buf, sizeof(buf), "%050d", i);
|
||||||
|
ASSERT_OK(Put(buf, buf));
|
||||||
|
}
|
||||||
|
ASSERT_EQ(0, NumTables());
|
||||||
|
Close();
|
||||||
|
ASSERT_EQ(0, NumTables());
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
uint64_t old_log_file = FirstLogFile();
|
||||||
|
|
||||||
|
// Force creation of multiple memtables by reducing the write buffer size.
|
||||||
|
Options opt;
|
||||||
|
opt.reuse_logs = true;
|
||||||
|
opt.write_buffer_size = (kNum*100) / 2;
|
||||||
|
Open(&opt);
|
||||||
|
ASSERT_LE(2, NumTables());
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
ASSERT_NE(old_log_file, FirstLogFile()) << "must not reuse log";
|
||||||
|
for (int i = 0; i < kNum; i++) {
|
||||||
|
char buf[100];
|
||||||
|
snprintf(buf, sizeof(buf), "%050d", i);
|
||||||
|
ASSERT_EQ(buf, Get(buf));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(RecoveryTest, MultipleLogFiles) {
|
||||||
|
ASSERT_OK(Put("foo", "bar"));
|
||||||
|
Close();
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
|
||||||
|
// Make a bunch of uncompacted log files.
|
||||||
|
uint64_t old_log = FirstLogFile();
|
||||||
|
MakeLogFile(old_log+1, 1000, "hello", "world");
|
||||||
|
MakeLogFile(old_log+2, 1001, "hi", "there");
|
||||||
|
MakeLogFile(old_log+3, 1002, "foo", "bar2");
|
||||||
|
|
||||||
|
// Recover and check that all log files were processed.
|
||||||
|
Open();
|
||||||
|
ASSERT_LE(1, NumTables());
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
uint64_t new_log = FirstLogFile();
|
||||||
|
ASSERT_LE(old_log+3, new_log);
|
||||||
|
ASSERT_EQ("bar2", Get("foo"));
|
||||||
|
ASSERT_EQ("world", Get("hello"));
|
||||||
|
ASSERT_EQ("there", Get("hi"));
|
||||||
|
|
||||||
|
// Test that previous recovery produced recoverable state.
|
||||||
|
Open();
|
||||||
|
ASSERT_LE(1, NumTables());
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
if (CanAppend()) {
|
||||||
|
ASSERT_EQ(new_log, FirstLogFile());
|
||||||
|
}
|
||||||
|
ASSERT_EQ("bar2", Get("foo"));
|
||||||
|
ASSERT_EQ("world", Get("hello"));
|
||||||
|
ASSERT_EQ("there", Get("hi"));
|
||||||
|
|
||||||
|
// Check that introducing an older log file does not cause it to be re-read.
|
||||||
|
Close();
|
||||||
|
MakeLogFile(old_log+1, 2000, "hello", "stale write");
|
||||||
|
Open();
|
||||||
|
ASSERT_LE(1, NumTables());
|
||||||
|
ASSERT_EQ(1, NumLogs());
|
||||||
|
if (CanAppend()) {
|
||||||
|
ASSERT_EQ(new_log, FirstLogFile());
|
||||||
|
}
|
||||||
|
ASSERT_EQ("bar2", Get("foo"));
|
||||||
|
ASSERT_EQ("world", Get("hello"));
|
||||||
|
ASSERT_EQ("there", Get("hi"));
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace leveldb
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
return leveldb::test::RunAllTests();
|
||||||
|
}
|
@ -893,7 +893,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status VersionSet::Recover() {
|
Status VersionSet::Recover(bool *save_manifest) {
|
||||||
struct LogReporter : public log::Reader::Reporter {
|
struct LogReporter : public log::Reader::Reporter {
|
||||||
Status* status;
|
Status* status;
|
||||||
virtual void Corruption(size_t bytes, const Status& s) {
|
virtual void Corruption(size_t bytes, const Status& s) {
|
||||||
@ -1003,11 +1003,49 @@ Status VersionSet::Recover() {
|
|||||||
last_sequence_ = last_sequence;
|
last_sequence_ = last_sequence;
|
||||||
log_number_ = log_number;
|
log_number_ = log_number;
|
||||||
prev_log_number_ = prev_log_number;
|
prev_log_number_ = prev_log_number;
|
||||||
|
|
||||||
|
// See if we can reuse the existing MANIFEST file.
|
||||||
|
if (ReuseManifest(dscname, current)) {
|
||||||
|
// No need to save new manifest
|
||||||
|
} else {
|
||||||
|
*save_manifest = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool VersionSet::ReuseManifest(const std::string& dscname,
|
||||||
|
const std::string& dscbase) {
|
||||||
|
if (!options_->reuse_logs) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
FileType manifest_type;
|
||||||
|
uint64_t manifest_number;
|
||||||
|
uint64_t manifest_size;
|
||||||
|
if (!ParseFileName(dscbase, &manifest_number, &manifest_type) ||
|
||||||
|
manifest_type != kDescriptorFile ||
|
||||||
|
!env_->GetFileSize(dscname, &manifest_size).ok() ||
|
||||||
|
// Make new compacted MANIFEST if old one is too big
|
||||||
|
manifest_size >= kTargetFileSize) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(descriptor_file_ == NULL);
|
||||||
|
assert(descriptor_log_ == NULL);
|
||||||
|
Status r = env_->NewAppendableFile(dscname, &descriptor_file_);
|
||||||
|
if (!r.ok()) {
|
||||||
|
Log(options_->info_log, "Reuse MANIFEST: %s\n", r.ToString().c_str());
|
||||||
|
assert(descriptor_file_ == NULL);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Log(options_->info_log, "Reusing MANIFEST %s\n", dscname.c_str());
|
||||||
|
descriptor_log_ = new log::Writer(descriptor_file_, manifest_size);
|
||||||
|
manifest_file_number_ = manifest_number;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
void VersionSet::MarkFileNumberUsed(uint64_t number) {
|
void VersionSet::MarkFileNumberUsed(uint64_t number) {
|
||||||
if (next_file_number_ <= number) {
|
if (next_file_number_ <= number) {
|
||||||
next_file_number_ = number + 1;
|
next_file_number_ = number + 1;
|
||||||
|
@ -179,7 +179,7 @@ class VersionSet {
|
|||||||
EXCLUSIVE_LOCKS_REQUIRED(mu);
|
EXCLUSIVE_LOCKS_REQUIRED(mu);
|
||||||
|
|
||||||
// Recover the last saved descriptor from persistent storage.
|
// Recover the last saved descriptor from persistent storage.
|
||||||
Status Recover();
|
Status Recover(bool *save_manifest);
|
||||||
|
|
||||||
// Return the current version.
|
// Return the current version.
|
||||||
Version* current() const { return current_; }
|
Version* current() const { return current_; }
|
||||||
@ -274,6 +274,8 @@ class VersionSet {
|
|||||||
friend class Compaction;
|
friend class Compaction;
|
||||||
friend class Version;
|
friend class Version;
|
||||||
|
|
||||||
|
bool ReuseManifest(const std::string& dscname, const std::string& dscbase);
|
||||||
|
|
||||||
void Finalize(Version* v);
|
void Finalize(Version* v);
|
||||||
|
|
||||||
void GetRange(const std::vector<FileMetaData*>& inputs,
|
void GetRange(const std::vector<FileMetaData*>& inputs,
|
||||||
|
@ -277,6 +277,19 @@ class InMemoryEnv : public EnvWrapper {
|
|||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual Status NewAppendableFile(const std::string& fname,
|
||||||
|
WritableFile** result) {
|
||||||
|
MutexLock lock(&mutex_);
|
||||||
|
FileState** sptr = &file_map_[fname];
|
||||||
|
FileState* file = *sptr;
|
||||||
|
if (file == NULL) {
|
||||||
|
file = new FileState();
|
||||||
|
file->Ref();
|
||||||
|
}
|
||||||
|
*result = new WritableFileImpl(file);
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
|
||||||
virtual bool FileExists(const std::string& fname) {
|
virtual bool FileExists(const std::string& fname) {
|
||||||
MutexLock lock(&mutex_);
|
MutexLock lock(&mutex_);
|
||||||
return file_map_.find(fname) != file_map_.end();
|
return file_map_.find(fname) != file_map_.end();
|
||||||
|
@ -40,6 +40,8 @@ TEST(MemEnvTest, Basics) {
|
|||||||
|
|
||||||
// Create a file.
|
// Create a file.
|
||||||
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file));
|
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file));
|
||||||
|
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
|
||||||
|
ASSERT_EQ(0, file_size);
|
||||||
delete writable_file;
|
delete writable_file;
|
||||||
|
|
||||||
// Check that the file exists.
|
// Check that the file exists.
|
||||||
@ -55,9 +57,16 @@ TEST(MemEnvTest, Basics) {
|
|||||||
ASSERT_OK(writable_file->Append("abc"));
|
ASSERT_OK(writable_file->Append("abc"));
|
||||||
delete writable_file;
|
delete writable_file;
|
||||||
|
|
||||||
// Check for expected size.
|
// Check that append works.
|
||||||
|
ASSERT_OK(env_->NewAppendableFile("/dir/f", &writable_file));
|
||||||
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
|
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
|
||||||
ASSERT_EQ(3, file_size);
|
ASSERT_EQ(3, file_size);
|
||||||
|
ASSERT_OK(writable_file->Append("hello"));
|
||||||
|
delete writable_file;
|
||||||
|
|
||||||
|
// Check for expected size.
|
||||||
|
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
|
||||||
|
ASSERT_EQ(8, file_size);
|
||||||
|
|
||||||
// Check that renaming works.
|
// Check that renaming works.
|
||||||
ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok());
|
ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok());
|
||||||
@ -65,7 +74,7 @@ TEST(MemEnvTest, Basics) {
|
|||||||
ASSERT_TRUE(!env_->FileExists("/dir/f"));
|
ASSERT_TRUE(!env_->FileExists("/dir/f"));
|
||||||
ASSERT_TRUE(env_->FileExists("/dir/g"));
|
ASSERT_TRUE(env_->FileExists("/dir/g"));
|
||||||
ASSERT_OK(env_->GetFileSize("/dir/g", &file_size));
|
ASSERT_OK(env_->GetFileSize("/dir/g", &file_size));
|
||||||
ASSERT_EQ(3, file_size);
|
ASSERT_EQ(8, file_size);
|
||||||
|
|
||||||
// Check that opening non-existent file fails.
|
// Check that opening non-existent file fails.
|
||||||
SequentialFile* seq_file;
|
SequentialFile* seq_file;
|
||||||
|
@ -81,6 +81,17 @@ class Cache {
|
|||||||
// its cache keys.
|
// its cache keys.
|
||||||
virtual uint64_t NewId() = 0;
|
virtual uint64_t NewId() = 0;
|
||||||
|
|
||||||
|
// Remove all cache entries that are not actively in use. Memory-constrained
|
||||||
|
// applications may wish to call this method to reduce memory usage.
|
||||||
|
// Default implementation of Prune() does nothing. Subclasses are strongly
|
||||||
|
// encouraged to override the default implementation. A future release of
|
||||||
|
// leveldb may change Prune() to a pure abstract method.
|
||||||
|
virtual void Prune() {}
|
||||||
|
|
||||||
|
// Return an estimate of the combined charges of all elements stored in the
|
||||||
|
// cache.
|
||||||
|
virtual size_t TotalCharge() const = 0;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void LRU_Remove(Handle* e);
|
void LRU_Remove(Handle* e);
|
||||||
void LRU_Append(Handle* e);
|
void LRU_Append(Handle* e);
|
||||||
|
@ -115,6 +115,8 @@ class DB {
|
|||||||
// about the internal operation of the DB.
|
// about the internal operation of the DB.
|
||||||
// "leveldb.sstables" - returns a multi-line string that describes all
|
// "leveldb.sstables" - returns a multi-line string that describes all
|
||||||
// of the sstables that make up the db contents.
|
// of the sstables that make up the db contents.
|
||||||
|
// "leveldb.approximate-memory-usage" - returns the approximate number of
|
||||||
|
// bytes of memory in use by the DB.
|
||||||
virtual bool GetProperty(const Slice& property, std::string* value) = 0;
|
virtual bool GetProperty(const Slice& property, std::string* value) = 0;
|
||||||
|
|
||||||
// For each i in [0,n-1], store in "sizes[i]", the approximate
|
// For each i in [0,n-1], store in "sizes[i]", the approximate
|
||||||
|
@ -69,6 +69,21 @@ class Env {
|
|||||||
virtual Status NewWritableFile(const std::string& fname,
|
virtual Status NewWritableFile(const std::string& fname,
|
||||||
WritableFile** result) = 0;
|
WritableFile** result) = 0;
|
||||||
|
|
||||||
|
// Create an object that either appends to an existing file, or
|
||||||
|
// writes to a new file (if the file does not exist to begin with).
|
||||||
|
// On success, stores a pointer to the new file in *result and
|
||||||
|
// returns OK. On failure stores NULL in *result and returns
|
||||||
|
// non-OK.
|
||||||
|
//
|
||||||
|
// The returned file will only be accessed by one thread at a time.
|
||||||
|
//
|
||||||
|
// May return an IsNotSupportedError error if this Env does
|
||||||
|
// not allow appending to an existing file. Users of Env (including
|
||||||
|
// the leveldb implementation) must be prepared to deal with
|
||||||
|
// an Env that does not support appending.
|
||||||
|
virtual Status NewAppendableFile(const std::string& fname,
|
||||||
|
WritableFile** result);
|
||||||
|
|
||||||
// Returns true iff the named file exists.
|
// Returns true iff the named file exists.
|
||||||
virtual bool FileExists(const std::string& fname) = 0;
|
virtual bool FileExists(const std::string& fname) = 0;
|
||||||
|
|
||||||
@ -289,6 +304,9 @@ class EnvWrapper : public Env {
|
|||||||
Status NewWritableFile(const std::string& f, WritableFile** r) {
|
Status NewWritableFile(const std::string& f, WritableFile** r) {
|
||||||
return target_->NewWritableFile(f, r);
|
return target_->NewWritableFile(f, r);
|
||||||
}
|
}
|
||||||
|
Status NewAppendableFile(const std::string& f, WritableFile** r) {
|
||||||
|
return target_->NewAppendableFile(f, r);
|
||||||
|
}
|
||||||
bool FileExists(const std::string& f) { return target_->FileExists(f); }
|
bool FileExists(const std::string& f) { return target_->FileExists(f); }
|
||||||
Status GetChildren(const std::string& dir, std::vector<std::string>* r) {
|
Status GetChildren(const std::string& dir, std::vector<std::string>* r) {
|
||||||
return target_->GetChildren(dir, r);
|
return target_->GetChildren(dir, r);
|
||||||
|
@ -128,6 +128,12 @@ struct Options {
|
|||||||
// efficiently detect that and will switch to uncompressed mode.
|
// efficiently detect that and will switch to uncompressed mode.
|
||||||
CompressionType compression;
|
CompressionType compression;
|
||||||
|
|
||||||
|
// EXPERIMENTAL: If true, append to existing MANIFEST and log files
|
||||||
|
// when a database is opened. This can significantly speed up open.
|
||||||
|
//
|
||||||
|
// Default: currently false, but may become true later.
|
||||||
|
bool reuse_logs;
|
||||||
|
|
||||||
// If non-NULL, use the specified filter policy to reduce disk reads.
|
// If non-NULL, use the specified filter policy to reduce disk reads.
|
||||||
// Many applications will benefit from passing the result of
|
// Many applications will benefit from passing the result of
|
||||||
// NewBloomFilterPolicy() here.
|
// NewBloomFilterPolicy() here.
|
||||||
|
@ -60,6 +60,9 @@ class Status {
|
|||||||
// Returns true iff the status indicates an IOError.
|
// Returns true iff the status indicates an IOError.
|
||||||
bool IsIOError() const { return code() == kIOError; }
|
bool IsIOError() const { return code() == kIOError; }
|
||||||
|
|
||||||
|
// Returns true iff the status indicates a NotSupportedError.
|
||||||
|
bool IsNotSupportedError() const { return code() == kNotSupported; }
|
||||||
|
|
||||||
// Return a string representation of this status suitable for printing.
|
// Return a string representation of this status suitable for printing.
|
||||||
// Returns the string "OK" for success.
|
// Returns the string "OK" for success.
|
||||||
std::string ToString() const;
|
std::string ToString() const;
|
||||||
|
@ -147,6 +147,11 @@ class LRUCache {
|
|||||||
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
|
Cache::Handle* Lookup(const Slice& key, uint32_t hash);
|
||||||
void Release(Cache::Handle* handle);
|
void Release(Cache::Handle* handle);
|
||||||
void Erase(const Slice& key, uint32_t hash);
|
void Erase(const Slice& key, uint32_t hash);
|
||||||
|
void Prune();
|
||||||
|
size_t TotalCharge() const {
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
return usage_;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void LRU_Remove(LRUHandle* e);
|
void LRU_Remove(LRUHandle* e);
|
||||||
@ -157,7 +162,7 @@ class LRUCache {
|
|||||||
size_t capacity_;
|
size_t capacity_;
|
||||||
|
|
||||||
// mutex_ protects the following state.
|
// mutex_ protects the following state.
|
||||||
port::Mutex mutex_;
|
mutable port::Mutex mutex_;
|
||||||
size_t usage_;
|
size_t usage_;
|
||||||
|
|
||||||
// Dummy head of LRU list.
|
// Dummy head of LRU list.
|
||||||
@ -264,6 +269,19 @@ void LRUCache::Erase(const Slice& key, uint32_t hash) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void LRUCache::Prune() {
|
||||||
|
MutexLock l(&mutex_);
|
||||||
|
for (LRUHandle* e = lru_.next; e != &lru_; ) {
|
||||||
|
LRUHandle* next = e->next;
|
||||||
|
if (e->refs == 1) {
|
||||||
|
table_.Remove(e->key(), e->hash);
|
||||||
|
LRU_Remove(e);
|
||||||
|
Unref(e);
|
||||||
|
}
|
||||||
|
e = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static const int kNumShardBits = 4;
|
static const int kNumShardBits = 4;
|
||||||
static const int kNumShards = 1 << kNumShardBits;
|
static const int kNumShards = 1 << kNumShardBits;
|
||||||
|
|
||||||
@ -314,6 +332,18 @@ class ShardedLRUCache : public Cache {
|
|||||||
MutexLock l(&id_mutex_);
|
MutexLock l(&id_mutex_);
|
||||||
return ++(last_id_);
|
return ++(last_id_);
|
||||||
}
|
}
|
||||||
|
virtual void Prune() {
|
||||||
|
for (int s = 0; s < kNumShards; s++) {
|
||||||
|
shard_[s].Prune();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
virtual size_t TotalCharge() const {
|
||||||
|
size_t total = 0;
|
||||||
|
for (int s = 0; s < kNumShards; s++) {
|
||||||
|
total += shard_[s].TotalCharge();
|
||||||
|
}
|
||||||
|
return total;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
} // end anonymous namespace
|
} // end anonymous namespace
|
||||||
|
@ -179,6 +179,19 @@ TEST(CacheTest, NewId) {
|
|||||||
ASSERT_NE(a, b);
|
ASSERT_NE(a, b);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(CacheTest, Prune) {
|
||||||
|
Insert(1, 100);
|
||||||
|
Insert(2, 200);
|
||||||
|
|
||||||
|
Cache::Handle* handle = cache_->Lookup(EncodeKey(1));
|
||||||
|
ASSERT_TRUE(handle);
|
||||||
|
cache_->Prune();
|
||||||
|
cache_->Release(handle);
|
||||||
|
|
||||||
|
ASSERT_EQ(100, Lookup(1));
|
||||||
|
ASSERT_EQ(-1, Lookup(2));
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace leveldb
|
} // namespace leveldb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -9,6 +9,10 @@ namespace leveldb {
|
|||||||
Env::~Env() {
|
Env::~Env() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Status Env::NewAppendableFile(const std::string& fname, WritableFile** result) {
|
||||||
|
return Status::NotSupported("NewAppendableFile", fname);
|
||||||
|
}
|
||||||
|
|
||||||
SequentialFile::~SequentialFile() {
|
SequentialFile::~SequentialFile() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -350,6 +350,19 @@ class PosixEnv : public Env {
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual Status NewAppendableFile(const std::string& fname,
|
||||||
|
WritableFile** result) {
|
||||||
|
Status s;
|
||||||
|
FILE* f = fopen(fname.c_str(), "a");
|
||||||
|
if (f == NULL) {
|
||||||
|
*result = NULL;
|
||||||
|
s = IOError(fname, errno);
|
||||||
|
} else {
|
||||||
|
*result = new PosixWritableFile(fname, f);
|
||||||
|
}
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
virtual bool FileExists(const std::string& fname) {
|
virtual bool FileExists(const std::string& fname) {
|
||||||
return access(fname.c_str(), F_OK) == 0;
|
return access(fname.c_str(), F_OK) == 0;
|
||||||
}
|
}
|
||||||
|
@ -22,8 +22,8 @@ Options::Options()
|
|||||||
block_size(4096),
|
block_size(4096),
|
||||||
block_restart_interval(16),
|
block_restart_interval(16),
|
||||||
compression(kSnappyCompression),
|
compression(kSnappyCompression),
|
||||||
|
reuse_logs(false),
|
||||||
filter_policy(NULL) {
|
filter_policy(NULL) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
} // namespace leveldb
|
} // namespace leveldb
|
||||||
|
@ -45,6 +45,16 @@ class ErrorEnv : public EnvWrapper {
|
|||||||
}
|
}
|
||||||
return target()->NewWritableFile(fname, result);
|
return target()->NewWritableFile(fname, result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
virtual Status NewAppendableFile(const std::string& fname,
|
||||||
|
WritableFile** result) {
|
||||||
|
if (writable_file_error_) {
|
||||||
|
++num_writable_file_errors_;
|
||||||
|
*result = NULL;
|
||||||
|
return Status::IOError(fname, "fake error");
|
||||||
|
}
|
||||||
|
return target()->NewAppendableFile(fname, result);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace test
|
} // namespace test
|
||||||
|
Loading…
Reference in New Issue
Block a user