LevelDB now attempts to reuse the preceding MANIFEST and log file when re-opened.
(Based on a suggestion by cmumford.) "open" benchmark on my workstation speeds up significantly since we can now avoid three fdatasync calls and a compaction per open: Before: ~80000 microseconds After: ~130 microseconds Details: (1) Added Options::reuse_logs (currently defaults to false) to control new behavior. The intention is to change the default to true after some baking. (2) Added Env::NewAppendableFile() whose default implementation returns a not-supported error. (3) VersionSet::Recovery attempts to reuse the MANIFEST from which it is recovering. (4) DBImpl recovery attempts to reuse the last log file and memtable. (5) db_test.cc now tests a new configuration that sets reuse_logs to true. (6) fault_injection_test also tests a reuse_logs==true config. (7) Added a new recovery_test.
This commit is contained in:
parent
77948e7eec
commit
251ebf5dc7
4
Makefile
4
Makefile
@ -57,6 +57,7 @@ TESTS = \
|
||||
issue200_test \
|
||||
log_test \
|
||||
memenv_test \
|
||||
recovery_test \
|
||||
skiplist_test \
|
||||
table_test \
|
||||
version_edit_test \
|
||||
@ -177,6 +178,9 @@ issue200_test: issues/issue200_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(CXX) $(LDFLAGS) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
||||
|
||||
recovery_test: db/recovery_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(CXX) $(LDFLAGS) db/recovery_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
||||
|
||||
table_test: table/table_test.o $(LIBOBJECTS) $(TESTHARNESS)
|
||||
$(CXX) $(LDFLAGS) table/table_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LIBS)
|
||||
|
||||
|
@ -36,7 +36,7 @@ class CorruptionTest {
|
||||
tiny_cache_ = NewLRUCache(100);
|
||||
options_.env = &env_;
|
||||
options_.block_cache = tiny_cache_;
|
||||
dbname_ = test::TmpDir() + "/db_test";
|
||||
dbname_ = test::TmpDir() + "/corruption_test";
|
||||
DestroyDB(dbname_, options_);
|
||||
|
||||
db_ = NULL;
|
||||
|
@ -100,6 +100,9 @@ static int FLAGS_bloom_bits = -1;
|
||||
// benchmark will fail.
|
||||
static bool FLAGS_use_existing_db = false;
|
||||
|
||||
// If true, reuse existing log/MANIFEST files when re-opening a database.
|
||||
static bool FLAGS_reuse_logs = false;
|
||||
|
||||
// Use the db with the following name.
|
||||
static const char* FLAGS_db = NULL;
|
||||
|
||||
@ -700,6 +703,7 @@ class Benchmark {
|
||||
options.write_buffer_size = FLAGS_write_buffer_size;
|
||||
options.max_open_files = FLAGS_open_files;
|
||||
options.filter_policy = filter_policy_;
|
||||
options.reuse_logs = FLAGS_reuse_logs;
|
||||
Status s = DB::Open(options, FLAGS_db, &db_);
|
||||
if (!s.ok()) {
|
||||
fprintf(stderr, "open error: %s\n", s.ToString().c_str());
|
||||
@ -954,6 +958,9 @@ int main(int argc, char** argv) {
|
||||
} else if (sscanf(argv[i], "--use_existing_db=%d%c", &n, &junk) == 1 &&
|
||||
(n == 0 || n == 1)) {
|
||||
FLAGS_use_existing_db = n;
|
||||
} else if (sscanf(argv[i], "--reuse_logs=%d%c", &n, &junk) == 1 &&
|
||||
(n == 0 || n == 1)) {
|
||||
FLAGS_reuse_logs = n;
|
||||
} else if (sscanf(argv[i], "--num=%d%c", &n, &junk) == 1) {
|
||||
FLAGS_num = n;
|
||||
} else if (sscanf(argv[i], "--reads=%d%c", &n, &junk) == 1) {
|
||||
|
178
db/db_impl.cc
178
db/db_impl.cc
@ -125,7 +125,7 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
|
||||
db_lock_(NULL),
|
||||
shutting_down_(NULL),
|
||||
bg_cv_(&mutex_),
|
||||
mem_(new MemTable(internal_comparator_)),
|
||||
mem_(NULL),
|
||||
imm_(NULL),
|
||||
logfile_(NULL),
|
||||
logfile_number_(0),
|
||||
@ -134,7 +134,6 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname)
|
||||
tmp_batch_(new WriteBatch),
|
||||
bg_compaction_scheduled_(false),
|
||||
manual_compaction_(NULL) {
|
||||
mem_->Ref();
|
||||
has_imm_.Release_Store(NULL);
|
||||
|
||||
// Reserve ten files or so for other uses and give the rest to TableCache.
|
||||
@ -271,7 +270,7 @@ void DBImpl::DeleteObsoleteFiles() {
|
||||
}
|
||||
}
|
||||
|
||||
Status DBImpl::Recover(VersionEdit* edit) {
|
||||
Status DBImpl::Recover(VersionEdit* edit, bool *save_manifest) {
|
||||
mutex_.AssertHeld();
|
||||
|
||||
// Ignore error from CreateDir since the creation of the DB is
|
||||
@ -301,66 +300,69 @@ Status DBImpl::Recover(VersionEdit* edit) {
|
||||
}
|
||||
}
|
||||
|
||||
s = versions_->Recover();
|
||||
if (s.ok()) {
|
||||
SequenceNumber max_sequence(0);
|
||||
s = versions_->Recover(save_manifest);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
SequenceNumber max_sequence(0);
|
||||
|
||||
// Recover from all newer log files than the ones named in the
|
||||
// descriptor (new log files may have been added by the previous
|
||||
// incarnation without registering them in the descriptor).
|
||||
//
|
||||
// Note that PrevLogNumber() is no longer used, but we pay
|
||||
// attention to it in case we are recovering a database
|
||||
// produced by an older version of leveldb.
|
||||
const uint64_t min_log = versions_->LogNumber();
|
||||
const uint64_t prev_log = versions_->PrevLogNumber();
|
||||
std::vector<std::string> filenames;
|
||||
s = env_->GetChildren(dbname_, &filenames);
|
||||
// Recover from all newer log files than the ones named in the
|
||||
// descriptor (new log files may have been added by the previous
|
||||
// incarnation without registering them in the descriptor).
|
||||
//
|
||||
// Note that PrevLogNumber() is no longer used, but we pay
|
||||
// attention to it in case we are recovering a database
|
||||
// produced by an older version of leveldb.
|
||||
const uint64_t min_log = versions_->LogNumber();
|
||||
const uint64_t prev_log = versions_->PrevLogNumber();
|
||||
std::vector<std::string> filenames;
|
||||
s = env_->GetChildren(dbname_, &filenames);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
std::set<uint64_t> expected;
|
||||
versions_->AddLiveFiles(&expected);
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
std::vector<uint64_t> logs;
|
||||
for (size_t i = 0; i < filenames.size(); i++) {
|
||||
if (ParseFileName(filenames[i], &number, &type)) {
|
||||
expected.erase(number);
|
||||
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
|
||||
logs.push_back(number);
|
||||
}
|
||||
}
|
||||
if (!expected.empty()) {
|
||||
char buf[50];
|
||||
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
|
||||
static_cast<int>(expected.size()));
|
||||
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
|
||||
}
|
||||
|
||||
// Recover in the order in which the logs were generated
|
||||
std::sort(logs.begin(), logs.end());
|
||||
for (size_t i = 0; i < logs.size(); i++) {
|
||||
s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,
|
||||
&max_sequence);
|
||||
if (!s.ok()) {
|
||||
return s;
|
||||
}
|
||||
std::set<uint64_t> expected;
|
||||
versions_->AddLiveFiles(&expected);
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
std::vector<uint64_t> logs;
|
||||
for (size_t i = 0; i < filenames.size(); i++) {
|
||||
if (ParseFileName(filenames[i], &number, &type)) {
|
||||
expected.erase(number);
|
||||
if (type == kLogFile && ((number >= min_log) || (number == prev_log)))
|
||||
logs.push_back(number);
|
||||
}
|
||||
}
|
||||
if (!expected.empty()) {
|
||||
char buf[50];
|
||||
snprintf(buf, sizeof(buf), "%d missing files; e.g.",
|
||||
static_cast<int>(expected.size()));
|
||||
return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin())));
|
||||
}
|
||||
|
||||
// Recover in the order in which the logs were generated
|
||||
std::sort(logs.begin(), logs.end());
|
||||
for (size_t i = 0; i < logs.size(); i++) {
|
||||
s = RecoverLogFile(logs[i], edit, &max_sequence);
|
||||
|
||||
// The previous incarnation may not have written any MANIFEST
|
||||
// records after allocating this log number. So we manually
|
||||
// update the file number allocation counter in VersionSet.
|
||||
versions_->MarkFileNumberUsed(logs[i]);
|
||||
}
|
||||
|
||||
if (s.ok()) {
|
||||
if (versions_->LastSequence() < max_sequence) {
|
||||
versions_->SetLastSequence(max_sequence);
|
||||
}
|
||||
}
|
||||
// The previous incarnation may not have written any MANIFEST
|
||||
// records after allocating this log number. So we manually
|
||||
// update the file number allocation counter in VersionSet.
|
||||
versions_->MarkFileNumberUsed(logs[i]);
|
||||
}
|
||||
|
||||
return s;
|
||||
if (versions_->LastSequence() < max_sequence) {
|
||||
versions_->SetLastSequence(max_sequence);
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
VersionEdit* edit,
|
||||
Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log,
|
||||
bool* save_manifest, VersionEdit* edit,
|
||||
SequenceNumber* max_sequence) {
|
||||
struct LogReporter : public log::Reader::Reporter {
|
||||
Env* env;
|
||||
@ -405,6 +407,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
std::string scratch;
|
||||
Slice record;
|
||||
WriteBatch batch;
|
||||
int compactions = 0;
|
||||
MemTable* mem = NULL;
|
||||
while (reader.ReadRecord(&record, &scratch) &&
|
||||
status.ok()) {
|
||||
@ -432,25 +435,52 @@ Status DBImpl::RecoverLogFile(uint64_t log_number,
|
||||
}
|
||||
|
||||
if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) {
|
||||
compactions++;
|
||||
*save_manifest = true;
|
||||
status = WriteLevel0Table(mem, edit, NULL);
|
||||
mem->Unref();
|
||||
mem = NULL;
|
||||
if (!status.ok()) {
|
||||
// Reflect errors immediately so that conditions like full
|
||||
// file-systems cause the DB::Open() to fail.
|
||||
break;
|
||||
}
|
||||
mem->Unref();
|
||||
mem = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (status.ok() && mem != NULL) {
|
||||
status = WriteLevel0Table(mem, edit, NULL);
|
||||
// Reflect errors immediately so that conditions like full
|
||||
// file-systems cause the DB::Open() to fail.
|
||||
delete file;
|
||||
|
||||
// See if we should keep reusing the last log file.
|
||||
if (status.ok() && options_.reuse_logs && last_log && compactions == 0) {
|
||||
assert(logfile_ == NULL);
|
||||
assert(log_ == NULL);
|
||||
assert(mem_ == NULL);
|
||||
uint64_t lfile_size;
|
||||
if (env_->GetFileSize(fname, &lfile_size).ok() &&
|
||||
env_->NewAppendableFile(fname, &logfile_).ok()) {
|
||||
Log(options_.info_log, "Reusing old log %s \n", fname.c_str());
|
||||
log_ = new log::Writer(logfile_, lfile_size);
|
||||
logfile_number_ = log_number;
|
||||
if (mem != NULL) {
|
||||
mem_ = mem;
|
||||
mem = NULL;
|
||||
} else {
|
||||
// mem can be NULL if lognum exists but was empty.
|
||||
mem_ = new MemTable(internal_comparator_);
|
||||
mem_->Ref();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (mem != NULL) {
|
||||
// mem did not get reused; compact it.
|
||||
if (status.ok()) {
|
||||
*save_manifest = true;
|
||||
status = WriteLevel0Table(mem, edit, NULL);
|
||||
}
|
||||
mem->Unref();
|
||||
}
|
||||
|
||||
if (mem != NULL) mem->Unref();
|
||||
delete file;
|
||||
return status;
|
||||
}
|
||||
|
||||
@ -1449,8 +1479,11 @@ Status DB::Open(const Options& options, const std::string& dbname,
|
||||
DBImpl* impl = new DBImpl(options, dbname);
|
||||
impl->mutex_.Lock();
|
||||
VersionEdit edit;
|
||||
Status s = impl->Recover(&edit); // Handles create_if_missing, error_if_exists
|
||||
if (s.ok()) {
|
||||
// Recover handles create_if_missing, error_if_exists
|
||||
bool save_manifest = false;
|
||||
Status s = impl->Recover(&edit, &save_manifest);
|
||||
if (s.ok() && impl->mem_ == NULL) {
|
||||
// Create new log and a corresponding memtable.
|
||||
uint64_t new_log_number = impl->versions_->NewFileNumber();
|
||||
WritableFile* lfile;
|
||||
s = options.env->NewWritableFile(LogFileName(dbname, new_log_number),
|
||||
@ -1460,15 +1493,22 @@ Status DB::Open(const Options& options, const std::string& dbname,
|
||||
impl->logfile_ = lfile;
|
||||
impl->logfile_number_ = new_log_number;
|
||||
impl->log_ = new log::Writer(lfile);
|
||||
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
|
||||
}
|
||||
if (s.ok()) {
|
||||
impl->DeleteObsoleteFiles();
|
||||
impl->MaybeScheduleCompaction();
|
||||
impl->mem_ = new MemTable(impl->internal_comparator_);
|
||||
impl->mem_->Ref();
|
||||
}
|
||||
}
|
||||
if (s.ok() && save_manifest) {
|
||||
edit.SetPrevLogNumber(0); // No older logs needed after recovery.
|
||||
edit.SetLogNumber(impl->logfile_number_);
|
||||
s = impl->versions_->LogAndApply(&edit, &impl->mutex_);
|
||||
}
|
||||
if (s.ok()) {
|
||||
impl->DeleteObsoleteFiles();
|
||||
impl->MaybeScheduleCompaction();
|
||||
}
|
||||
impl->mutex_.Unlock();
|
||||
if (s.ok()) {
|
||||
assert(impl->mem_ != NULL);
|
||||
*dbptr = impl;
|
||||
} else {
|
||||
delete impl;
|
||||
|
@ -78,7 +78,8 @@ class DBImpl : public DB {
|
||||
// Recover the descriptor from persistent storage. May do a significant
|
||||
// amount of work to recover recently logged updates. Any changes to
|
||||
// be made to the descriptor are added to *edit.
|
||||
Status Recover(VersionEdit* edit) EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
||||
Status Recover(VersionEdit* edit, bool* save_manifest)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
||||
|
||||
void MaybeIgnoreError(Status* s) const;
|
||||
|
||||
@ -90,9 +91,8 @@ class DBImpl : public DB {
|
||||
// Errors are recorded in bg_error_.
|
||||
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
||||
|
||||
Status RecoverLogFile(uint64_t log_number,
|
||||
VersionEdit* edit,
|
||||
SequenceNumber* max_sequence)
|
||||
Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest,
|
||||
VersionEdit* edit, SequenceNumber* max_sequence)
|
||||
EXCLUSIVE_LOCKS_REQUIRED(mutex_);
|
||||
|
||||
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base)
|
||||
|
@ -193,6 +193,7 @@ class DBTest {
|
||||
// Sequence of option configurations to try
|
||||
enum OptionConfig {
|
||||
kDefault,
|
||||
kReuse,
|
||||
kFilter,
|
||||
kUncompressed,
|
||||
kEnd
|
||||
@ -237,7 +238,11 @@ class DBTest {
|
||||
// Return the current option configuration.
|
||||
Options CurrentOptions() {
|
||||
Options options;
|
||||
options.reuse_logs = false;
|
||||
switch (option_config_) {
|
||||
case kReuse:
|
||||
options.reuse_logs = true;
|
||||
break;
|
||||
case kFilter:
|
||||
options.filter_policy = filter_policy_;
|
||||
break;
|
||||
@ -1080,6 +1085,14 @@ TEST(DBTest, ApproximateSizes) {
|
||||
// 0 because GetApproximateSizes() does not account for memtable space
|
||||
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
|
||||
|
||||
if (options.reuse_logs) {
|
||||
// Recovery will reuse memtable, and GetApproximateSizes() does not
|
||||
// account for memtable usage;
|
||||
Reopen(&options);
|
||||
ASSERT_TRUE(Between(Size("", Key(50)), 0, 0));
|
||||
continue;
|
||||
}
|
||||
|
||||
// Check sizes across recovery by reopening a few times
|
||||
for (int run = 0; run < 3; run++) {
|
||||
Reopen(&options);
|
||||
@ -1123,6 +1136,11 @@ TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) {
|
||||
ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000)));
|
||||
ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000)));
|
||||
|
||||
if (options.reuse_logs) {
|
||||
// Need to force a memtable compaction since recovery does not do so.
|
||||
ASSERT_OK(dbfull()->TEST_CompactMemTable());
|
||||
}
|
||||
|
||||
// Check sizes across recovery by reopening a few times
|
||||
for (int run = 0; run < 3; run++) {
|
||||
Reopen(&options);
|
||||
@ -2084,7 +2102,8 @@ void BM_LogAndApply(int iters, int num_base_files) {
|
||||
InternalKeyComparator cmp(BytewiseComparator());
|
||||
Options options;
|
||||
VersionSet vset(dbname, &options, NULL, &cmp);
|
||||
ASSERT_OK(vset.Recover());
|
||||
bool save_manifest;
|
||||
ASSERT_OK(vset.Recover(&save_manifest));
|
||||
VersionEdit vbase;
|
||||
uint64_t fnum = 1;
|
||||
for (int i = 0; i < num_base_files; i++) {
|
||||
|
@ -106,7 +106,7 @@ struct FileState {
|
||||
// is written to or sync'ed.
|
||||
class TestWritableFile : public WritableFile {
|
||||
public:
|
||||
TestWritableFile(const std::string& fname,
|
||||
TestWritableFile(const FileState& state,
|
||||
WritableFile* f,
|
||||
FaultInjectionTestEnv* env);
|
||||
virtual ~TestWritableFile();
|
||||
@ -130,6 +130,8 @@ class FaultInjectionTestEnv : public EnvWrapper {
|
||||
virtual ~FaultInjectionTestEnv() { }
|
||||
virtual Status NewWritableFile(const std::string& fname,
|
||||
WritableFile** result);
|
||||
virtual Status NewAppendableFile(const std::string& fname,
|
||||
WritableFile** result);
|
||||
virtual Status DeleteFile(const std::string& f);
|
||||
virtual Status RenameFile(const std::string& s, const std::string& t);
|
||||
|
||||
@ -154,15 +156,14 @@ class FaultInjectionTestEnv : public EnvWrapper {
|
||||
bool filesystem_active_; // Record flushes, syncs, writes
|
||||
};
|
||||
|
||||
TestWritableFile::TestWritableFile(const std::string& fname,
|
||||
TestWritableFile::TestWritableFile(const FileState& state,
|
||||
WritableFile* f,
|
||||
FaultInjectionTestEnv* env)
|
||||
: state_(fname),
|
||||
: state_(state),
|
||||
target_(f),
|
||||
writable_file_opened_(true),
|
||||
env_(env) {
|
||||
assert(f != NULL);
|
||||
state_.pos_ = 0;
|
||||
}
|
||||
|
||||
TestWritableFile::~TestWritableFile() {
|
||||
@ -228,9 +229,12 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
|
||||
WritableFile* actual_writable_file;
|
||||
Status s = target()->NewWritableFile(fname, &actual_writable_file);
|
||||
if (s.ok()) {
|
||||
*result = new TestWritableFile(fname, actual_writable_file, this);
|
||||
// WritableFile doesn't append to files, so if the same file is opened again
|
||||
// then it will be truncated - so forget our saved state.
|
||||
FileState state(fname);
|
||||
state.pos_ = 0;
|
||||
*result = new TestWritableFile(state, actual_writable_file, this);
|
||||
// NewWritableFile doesn't append to files, so if the same file is
|
||||
// opened again then it will be truncated - so forget our saved
|
||||
// state.
|
||||
UntrackFile(fname);
|
||||
MutexLock l(&mutex_);
|
||||
new_files_since_last_dir_sync_.insert(fname);
|
||||
@ -238,6 +242,26 @@ Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname,
|
||||
return s;
|
||||
}
|
||||
|
||||
Status FaultInjectionTestEnv::NewAppendableFile(const std::string& fname,
|
||||
WritableFile** result) {
|
||||
WritableFile* actual_writable_file;
|
||||
Status s = target()->NewAppendableFile(fname, &actual_writable_file);
|
||||
if (s.ok()) {
|
||||
FileState state(fname);
|
||||
state.pos_ = 0;
|
||||
{
|
||||
MutexLock l(&mutex_);
|
||||
if (db_file_state_.count(fname) == 0) {
|
||||
new_files_since_last_dir_sync_.insert(fname);
|
||||
} else {
|
||||
state = db_file_state_[fname];
|
||||
}
|
||||
}
|
||||
*result = new TestWritableFile(state, actual_writable_file, this);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status FaultInjectionTestEnv::DropUnsyncedFileData() {
|
||||
Status s;
|
||||
MutexLock l(&mutex_);
|
||||
@ -301,9 +325,10 @@ Status FaultInjectionTestEnv::RenameFile(const std::string& s,
|
||||
}
|
||||
|
||||
void FaultInjectionTestEnv::ResetState() {
|
||||
// Since we are not destroying the database, the existing files
|
||||
// should keep their recorded synced/flushed state. Therefore
|
||||
// we do not reset db_file_state_ and new_files_since_last_dir_sync_.
|
||||
MutexLock l(&mutex_);
|
||||
db_file_state_.clear();
|
||||
new_files_since_last_dir_sync_.clear();
|
||||
SetFilesystemActive(true);
|
||||
}
|
||||
|
||||
@ -342,51 +367,28 @@ class FaultInjectionTest {
|
||||
Options options_;
|
||||
DB* db_;
|
||||
|
||||
FaultInjectionTest() : env_(NULL), tiny_cache_(NULL), db_(NULL) { NewDB(); }
|
||||
|
||||
~FaultInjectionTest() { ASSERT_OK(TearDown()); }
|
||||
|
||||
Status NewDB() {
|
||||
assert(db_ == NULL);
|
||||
assert(tiny_cache_ == NULL);
|
||||
assert(env_ == NULL);
|
||||
|
||||
env_ = new FaultInjectionTestEnv();
|
||||
|
||||
options_ = Options();
|
||||
FaultInjectionTest()
|
||||
: env_(new FaultInjectionTestEnv),
|
||||
tiny_cache_(NewLRUCache(100)),
|
||||
db_(NULL) {
|
||||
dbname_ = test::TmpDir() + "/fault_test";
|
||||
DestroyDB(dbname_, Options()); // Destroy any db from earlier run
|
||||
options_.reuse_logs = true;
|
||||
options_.env = env_;
|
||||
options_.paranoid_checks = true;
|
||||
|
||||
tiny_cache_ = NewLRUCache(100);
|
||||
options_.block_cache = tiny_cache_;
|
||||
dbname_ = test::TmpDir() + "/fault_test";
|
||||
|
||||
options_.create_if_missing = true;
|
||||
Status s = OpenDB();
|
||||
options_.create_if_missing = false;
|
||||
return s;
|
||||
}
|
||||
|
||||
Status SetUp() {
|
||||
Status s = TearDown();
|
||||
if (s.ok()) {
|
||||
s = NewDB();
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
Status TearDown() {
|
||||
~FaultInjectionTest() {
|
||||
CloseDB();
|
||||
|
||||
Status s = DestroyDB(dbname_, Options());
|
||||
|
||||
DestroyDB(dbname_, Options());
|
||||
delete tiny_cache_;
|
||||
tiny_cache_ = NULL;
|
||||
|
||||
delete env_;
|
||||
env_ = NULL;
|
||||
}
|
||||
|
||||
return s;
|
||||
void ReuseLogs(bool reuse) {
|
||||
options_.reuse_logs = reuse;
|
||||
}
|
||||
|
||||
void Build(int start_idx, int num_vals) {
|
||||
@ -506,33 +508,43 @@ class FaultInjectionTest {
|
||||
ResetDBState(reset_method);
|
||||
ASSERT_OK(OpenDB());
|
||||
}
|
||||
|
||||
void DoTest() {
|
||||
Random rnd(0);
|
||||
ASSERT_OK(OpenDB());
|
||||
for (size_t idx = 0; idx < kNumIterations; idx++) {
|
||||
int num_pre_sync = rnd.Uniform(kMaxNumValues);
|
||||
int num_post_sync = rnd.Uniform(kMaxNumValues);
|
||||
|
||||
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
|
||||
PartialCompactTestReopenWithFault(RESET_DROP_UNSYNCED_DATA,
|
||||
num_pre_sync,
|
||||
num_post_sync);
|
||||
|
||||
NoWriteTestPreFault();
|
||||
NoWriteTestReopenWithFault(RESET_DROP_UNSYNCED_DATA);
|
||||
|
||||
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
|
||||
// No new files created so we expect all values since no files will be
|
||||
// dropped.
|
||||
PartialCompactTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES,
|
||||
num_pre_sync + num_post_sync,
|
||||
0);
|
||||
|
||||
NoWriteTestPreFault();
|
||||
NoWriteTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
TEST(FaultInjectionTest, FaultTest) {
|
||||
Random rnd(0);
|
||||
ASSERT_OK(SetUp());
|
||||
for (size_t idx = 0; idx < kNumIterations; idx++) {
|
||||
int num_pre_sync = rnd.Uniform(kMaxNumValues);
|
||||
int num_post_sync = rnd.Uniform(kMaxNumValues);
|
||||
TEST(FaultInjectionTest, FaultTestNoLogReuse) {
|
||||
ReuseLogs(false);
|
||||
DoTest();
|
||||
}
|
||||
|
||||
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
|
||||
PartialCompactTestReopenWithFault(RESET_DROP_UNSYNCED_DATA,
|
||||
num_pre_sync,
|
||||
num_post_sync);
|
||||
|
||||
NoWriteTestPreFault();
|
||||
NoWriteTestReopenWithFault(RESET_DROP_UNSYNCED_DATA);
|
||||
|
||||
PartialCompactTestPreFault(num_pre_sync, num_post_sync);
|
||||
// No new files created so we expect all values since no files will be
|
||||
// dropped.
|
||||
PartialCompactTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES,
|
||||
num_pre_sync + num_post_sync,
|
||||
0);
|
||||
|
||||
NoWriteTestPreFault();
|
||||
NoWriteTestReopenWithFault(RESET_DELETE_UNSYNCED_FILES);
|
||||
}
|
||||
TEST(FaultInjectionTest, FaultTestWithLogReuse) {
|
||||
ReuseLogs(true);
|
||||
DoTest();
|
||||
}
|
||||
|
||||
} // namespace leveldb
|
||||
|
@ -104,7 +104,7 @@ class LogTest {
|
||||
StringSource source_;
|
||||
ReportCollector report_;
|
||||
bool reading_;
|
||||
Writer writer_;
|
||||
Writer* writer_;
|
||||
Reader reader_;
|
||||
|
||||
// Record metadata for testing initial offset functionality
|
||||
@ -113,14 +113,23 @@ class LogTest {
|
||||
|
||||
public:
|
||||
LogTest() : reading_(false),
|
||||
writer_(&dest_),
|
||||
writer_(new Writer(&dest_)),
|
||||
reader_(&source_, &report_, true/*checksum*/,
|
||||
0/*initial_offset*/) {
|
||||
}
|
||||
|
||||
~LogTest() {
|
||||
delete writer_;
|
||||
}
|
||||
|
||||
void ReopenForAppend() {
|
||||
delete writer_;
|
||||
writer_ = new Writer(&dest_, dest_.contents_.size());
|
||||
}
|
||||
|
||||
void Write(const std::string& msg) {
|
||||
ASSERT_TRUE(!reading_) << "Write() after starting to read";
|
||||
writer_.AddRecord(Slice(msg));
|
||||
writer_->AddRecord(Slice(msg));
|
||||
}
|
||||
|
||||
size_t WrittenBytes() const {
|
||||
@ -318,6 +327,15 @@ TEST(LogTest, AlignedEof) {
|
||||
ASSERT_EQ("EOF", Read());
|
||||
}
|
||||
|
||||
TEST(LogTest, OpenForAppend) {
|
||||
Write("hello");
|
||||
ReopenForAppend();
|
||||
Write("world");
|
||||
ASSERT_EQ("hello", Read());
|
||||
ASSERT_EQ("world", Read());
|
||||
ASSERT_EQ("EOF", Read());
|
||||
}
|
||||
|
||||
TEST(LogTest, RandomRead) {
|
||||
const int N = 500;
|
||||
Random write_rnd(301);
|
||||
|
@ -12,13 +12,22 @@
|
||||
namespace leveldb {
|
||||
namespace log {
|
||||
|
||||
static void InitTypeCrc(uint32_t* type_crc) {
|
||||
for (int i = 0; i <= kMaxRecordType; i++) {
|
||||
char t = static_cast<char>(i);
|
||||
type_crc[i] = crc32c::Value(&t, 1);
|
||||
}
|
||||
}
|
||||
|
||||
Writer::Writer(WritableFile* dest)
|
||||
: dest_(dest),
|
||||
block_offset_(0) {
|
||||
for (int i = 0; i <= kMaxRecordType; i++) {
|
||||
char t = static_cast<char>(i);
|
||||
type_crc_[i] = crc32c::Value(&t, 1);
|
||||
}
|
||||
InitTypeCrc(type_crc_);
|
||||
}
|
||||
|
||||
Writer::Writer(WritableFile* dest, uint64_t dest_length)
|
||||
: dest_(dest), block_offset_(dest_length % kBlockSize) {
|
||||
InitTypeCrc(type_crc_);
|
||||
}
|
||||
|
||||
Writer::~Writer() {
|
||||
|
@ -22,6 +22,12 @@ class Writer {
|
||||
// "*dest" must be initially empty.
|
||||
// "*dest" must remain live while this Writer is in use.
|
||||
explicit Writer(WritableFile* dest);
|
||||
|
||||
// Create a writer that will append data to "*dest".
|
||||
// "*dest" must have initial length "dest_length".
|
||||
// "*dest" must remain live while this Writer is in use.
|
||||
Writer(WritableFile* dest, uint64_t dest_length);
|
||||
|
||||
~Writer();
|
||||
|
||||
Status AddRecord(const Slice& slice);
|
||||
|
324
db/recovery_test.cc
Normal file
324
db/recovery_test.cc
Normal file
@ -0,0 +1,324 @@
|
||||
// Copyright (c) 2014 The LevelDB Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style license that can be
|
||||
// found in the LICENSE file. See the AUTHORS file for names of contributors.
|
||||
|
||||
#include "db/db_impl.h"
|
||||
#include "db/filename.h"
|
||||
#include "db/version_set.h"
|
||||
#include "db/write_batch_internal.h"
|
||||
#include "leveldb/db.h"
|
||||
#include "leveldb/env.h"
|
||||
#include "leveldb/write_batch.h"
|
||||
#include "util/logging.h"
|
||||
#include "util/testharness.h"
|
||||
#include "util/testutil.h"
|
||||
|
||||
namespace leveldb {
|
||||
|
||||
class RecoveryTest {
|
||||
public:
|
||||
RecoveryTest() : env_(Env::Default()), db_(NULL) {
|
||||
dbname_ = test::TmpDir() + "/recovery_test";
|
||||
DestroyDB(dbname_, Options());
|
||||
Open();
|
||||
}
|
||||
|
||||
~RecoveryTest() {
|
||||
Close();
|
||||
DestroyDB(dbname_, Options());
|
||||
}
|
||||
|
||||
DBImpl* dbfull() const { return reinterpret_cast<DBImpl*>(db_); }
|
||||
Env* env() const { return env_; }
|
||||
|
||||
bool CanAppend() {
|
||||
WritableFile* tmp;
|
||||
Status s = env_->NewAppendableFile(CurrentFileName(dbname_), &tmp);
|
||||
delete tmp;
|
||||
if (s.IsNotSupportedError()) {
|
||||
return false;
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
void Close() {
|
||||
delete db_;
|
||||
db_ = NULL;
|
||||
}
|
||||
|
||||
void Open(Options* options = NULL) {
|
||||
Close();
|
||||
Options opts;
|
||||
if (options != NULL) {
|
||||
opts = *options;
|
||||
} else {
|
||||
opts.reuse_logs = true; // TODO(sanjay): test both ways
|
||||
opts.create_if_missing = true;
|
||||
}
|
||||
if (opts.env == NULL) {
|
||||
opts.env = env_;
|
||||
}
|
||||
ASSERT_OK(DB::Open(opts, dbname_, &db_));
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
}
|
||||
|
||||
Status Put(const std::string& k, const std::string& v) {
|
||||
return db_->Put(WriteOptions(), k, v);
|
||||
}
|
||||
|
||||
std::string Get(const std::string& k, const Snapshot* snapshot = NULL) {
|
||||
std::string result;
|
||||
Status s = db_->Get(ReadOptions(), k, &result);
|
||||
if (s.IsNotFound()) {
|
||||
result = "NOT_FOUND";
|
||||
} else if (!s.ok()) {
|
||||
result = s.ToString();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::string ManifestFileName() {
|
||||
std::string current;
|
||||
ASSERT_OK(ReadFileToString(env_, CurrentFileName(dbname_), ¤t));
|
||||
size_t len = current.size();
|
||||
if (len > 0 && current[len-1] == '\n') {
|
||||
current.resize(len - 1);
|
||||
}
|
||||
return dbname_ + "/" + current;
|
||||
}
|
||||
|
||||
std::string LogName(uint64_t number) {
|
||||
return LogFileName(dbname_, number);
|
||||
}
|
||||
|
||||
size_t DeleteLogFiles() {
|
||||
std::vector<uint64_t> logs = GetFiles(kLogFile);
|
||||
for (size_t i = 0; i < logs.size(); i++) {
|
||||
ASSERT_OK(env_->DeleteFile(LogName(logs[i]))) << LogName(logs[i]);
|
||||
}
|
||||
return logs.size();
|
||||
}
|
||||
|
||||
uint64_t FirstLogFile() {
|
||||
return GetFiles(kLogFile)[0];
|
||||
}
|
||||
|
||||
std::vector<std::uint64_t> GetFiles(FileType t) {
|
||||
std::vector<std::string> filenames;
|
||||
ASSERT_OK(env_->GetChildren(dbname_, &filenames));
|
||||
std::vector<std::uint64_t> result;
|
||||
for (size_t i = 0; i < filenames.size(); i++) {
|
||||
uint64_t number;
|
||||
FileType type;
|
||||
if (ParseFileName(filenames[i], &number, &type) && type == t) {
|
||||
result.push_back(number);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
int NumLogs() {
|
||||
return GetFiles(kLogFile).size();
|
||||
}
|
||||
|
||||
int NumTables() {
|
||||
return GetFiles(kTableFile).size();
|
||||
}
|
||||
|
||||
uint64_t FileSize(const std::string& fname) {
|
||||
uint64_t result;
|
||||
ASSERT_OK(env_->GetFileSize(fname, &result)) << fname;
|
||||
return result;
|
||||
}
|
||||
|
||||
void CompactMemTable() {
|
||||
dbfull()->TEST_CompactMemTable();
|
||||
}
|
||||
|
||||
// Directly construct a log file that sets key to val.
|
||||
void MakeLogFile(uint64_t lognum, SequenceNumber seq, Slice key, Slice val) {
|
||||
std::string fname = LogFileName(dbname_, lognum);
|
||||
WritableFile* file;
|
||||
ASSERT_OK(env_->NewWritableFile(fname, &file));
|
||||
log::Writer writer(file);
|
||||
WriteBatch batch;
|
||||
batch.Put(key, val);
|
||||
WriteBatchInternal::SetSequence(&batch, seq);
|
||||
ASSERT_OK(writer.AddRecord(WriteBatchInternal::Contents(&batch)));
|
||||
ASSERT_OK(file->Flush());
|
||||
delete file;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string dbname_;
|
||||
Env* env_;
|
||||
DB* db_;
|
||||
};
|
||||
|
||||
TEST(RecoveryTest, ManifestReused) {
|
||||
if (!CanAppend()) {
|
||||
fprintf(stderr, "skipping test because env does not support appending\n");
|
||||
return;
|
||||
}
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
Close();
|
||||
std::string old_manifest = ManifestFileName();
|
||||
Open();
|
||||
ASSERT_EQ(old_manifest, ManifestFileName());
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
Open();
|
||||
ASSERT_EQ(old_manifest, ManifestFileName());
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
}
|
||||
|
||||
TEST(RecoveryTest, LargeManifestCompacted) {
|
||||
if (!CanAppend()) {
|
||||
fprintf(stderr, "skipping test because env does not support appending\n");
|
||||
return;
|
||||
}
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
Close();
|
||||
std::string old_manifest = ManifestFileName();
|
||||
|
||||
// Pad with zeroes to make manifest file very big.
|
||||
{
|
||||
uint64_t len = FileSize(old_manifest);
|
||||
WritableFile* file;
|
||||
ASSERT_OK(env()->NewAppendableFile(old_manifest, &file));
|
||||
std::string zeroes(3*1048576 - static_cast<size_t>(len), 0);
|
||||
ASSERT_OK(file->Append(zeroes));
|
||||
ASSERT_OK(file->Flush());
|
||||
delete file;
|
||||
}
|
||||
|
||||
Open();
|
||||
std::string new_manifest = ManifestFileName();
|
||||
ASSERT_NE(old_manifest, new_manifest);
|
||||
ASSERT_GT(10000, FileSize(new_manifest));
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
|
||||
Open();
|
||||
ASSERT_EQ(new_manifest, ManifestFileName());
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
}
|
||||
|
||||
TEST(RecoveryTest, NoLogFiles) {
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_EQ(1, DeleteLogFiles());
|
||||
Open();
|
||||
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||
Open();
|
||||
ASSERT_EQ("NOT_FOUND", Get("foo"));
|
||||
}
|
||||
|
||||
TEST(RecoveryTest, LogFileReuse) {
|
||||
if (!CanAppend()) {
|
||||
fprintf(stderr, "skipping test because env does not support appending\n");
|
||||
return;
|
||||
}
|
||||
for (int i = 0; i < 2; i++) {
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
if (i == 0) {
|
||||
// Compact to ensure current log is empty
|
||||
CompactMemTable();
|
||||
}
|
||||
Close();
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
uint64_t number = FirstLogFile();
|
||||
if (i == 0) {
|
||||
ASSERT_EQ(0, FileSize(LogName(number)));
|
||||
} else {
|
||||
ASSERT_LT(0, FileSize(LogName(number)));
|
||||
}
|
||||
Open();
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
ASSERT_EQ(number, FirstLogFile()) << "did not reuse log file";
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
Open();
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
ASSERT_EQ(number, FirstLogFile()) << "did not reuse log file";
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
}
|
||||
}
|
||||
|
||||
TEST(RecoveryTest, MultipleMemTables) {
|
||||
// Make a large log.
|
||||
const int kNum = 1000;
|
||||
for (int i = 0; i < kNum; i++) {
|
||||
char buf[100];
|
||||
snprintf(buf, sizeof(buf), "%050d", i);
|
||||
ASSERT_OK(Put(buf, buf));
|
||||
}
|
||||
ASSERT_EQ(0, NumTables());
|
||||
Close();
|
||||
ASSERT_EQ(0, NumTables());
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
uint64_t old_log_file = FirstLogFile();
|
||||
|
||||
// Force creation of multiple memtables by reducing the write buffer size.
|
||||
Options opt;
|
||||
opt.reuse_logs = true;
|
||||
opt.write_buffer_size = (kNum*100) / 2;
|
||||
Open(&opt);
|
||||
ASSERT_LE(2, NumTables());
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
ASSERT_NE(old_log_file, FirstLogFile()) << "must not reuse log";
|
||||
for (int i = 0; i < kNum; i++) {
|
||||
char buf[100];
|
||||
snprintf(buf, sizeof(buf), "%050d", i);
|
||||
ASSERT_EQ(buf, Get(buf));
|
||||
}
|
||||
}
|
||||
|
||||
TEST(RecoveryTest, MultipleLogFiles) {
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
Close();
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
|
||||
// Make a bunch of uncompacted log files.
|
||||
uint64_t old_log = FirstLogFile();
|
||||
MakeLogFile(old_log+1, 1000, "hello", "world");
|
||||
MakeLogFile(old_log+2, 1001, "hi", "there");
|
||||
MakeLogFile(old_log+3, 1002, "foo", "bar2");
|
||||
|
||||
// Recover and check that all log files were processed.
|
||||
Open();
|
||||
ASSERT_LE(1, NumTables());
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
uint64_t new_log = FirstLogFile();
|
||||
ASSERT_LE(old_log+3, new_log);
|
||||
ASSERT_EQ("bar2", Get("foo"));
|
||||
ASSERT_EQ("world", Get("hello"));
|
||||
ASSERT_EQ("there", Get("hi"));
|
||||
|
||||
// Test that previous recovery produced recoverable state.
|
||||
Open();
|
||||
ASSERT_LE(1, NumTables());
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
if (CanAppend()) {
|
||||
ASSERT_EQ(new_log, FirstLogFile());
|
||||
}
|
||||
ASSERT_EQ("bar2", Get("foo"));
|
||||
ASSERT_EQ("world", Get("hello"));
|
||||
ASSERT_EQ("there", Get("hi"));
|
||||
|
||||
// Check that introducing an older log file does not cause it to be re-read.
|
||||
Close();
|
||||
MakeLogFile(old_log+1, 2000, "hello", "stale write");
|
||||
Open();
|
||||
ASSERT_LE(1, NumTables());
|
||||
ASSERT_EQ(1, NumLogs());
|
||||
if (CanAppend()) {
|
||||
ASSERT_EQ(new_log, FirstLogFile());
|
||||
}
|
||||
ASSERT_EQ("bar2", Get("foo"));
|
||||
ASSERT_EQ("world", Get("hello"));
|
||||
ASSERT_EQ("there", Get("hi"));
|
||||
}
|
||||
|
||||
} // namespace leveldb
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
return leveldb::test::RunAllTests();
|
||||
}
|
@ -893,7 +893,7 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
|
||||
return s;
|
||||
}
|
||||
|
||||
Status VersionSet::Recover() {
|
||||
Status VersionSet::Recover(bool *save_manifest) {
|
||||
struct LogReporter : public log::Reader::Reporter {
|
||||
Status* status;
|
||||
virtual void Corruption(size_t bytes, const Status& s) {
|
||||
@ -1003,11 +1003,46 @@ Status VersionSet::Recover() {
|
||||
last_sequence_ = last_sequence;
|
||||
log_number_ = log_number;
|
||||
prev_log_number_ = prev_log_number;
|
||||
|
||||
// See if we can reuse the existing MANIFEST file.
|
||||
if (ReuseManifest(dscname, current)) {
|
||||
// No need to save new manifest
|
||||
} else {
|
||||
*save_manifest = true;
|
||||
}
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
bool VersionSet::ReuseManifest(const std::string& dscname,
|
||||
const std::string& dscbase) {
|
||||
FileType manifest_type;
|
||||
uint64_t manifest_number;
|
||||
uint64_t manifest_size;
|
||||
if (!ParseFileName(dscbase, &manifest_number, &manifest_type) ||
|
||||
manifest_type != kDescriptorFile ||
|
||||
!env_->GetFileSize(dscname, &manifest_size).ok() ||
|
||||
// Make new compacted MANIFEST if old one is too big
|
||||
manifest_size >= kTargetFileSize) {
|
||||
return false;
|
||||
}
|
||||
|
||||
assert(descriptor_file_ == NULL);
|
||||
assert(descriptor_log_ == NULL);
|
||||
Status r = env_->NewAppendableFile(dscname, &descriptor_file_);
|
||||
if (!r.ok()) {
|
||||
Log(options_->info_log, "Reuse MANIFEST: %s\n", r.ToString().c_str());
|
||||
assert(descriptor_file_ == NULL);
|
||||
return false;
|
||||
}
|
||||
|
||||
Log(options_->info_log, "Reusing MANIFEST %s\n", dscname.c_str());
|
||||
descriptor_log_ = new log::Writer(descriptor_file_, manifest_size);
|
||||
manifest_file_number_ = manifest_number;
|
||||
return true;
|
||||
}
|
||||
|
||||
void VersionSet::MarkFileNumberUsed(uint64_t number) {
|
||||
if (next_file_number_ <= number) {
|
||||
next_file_number_ = number + 1;
|
||||
|
@ -179,7 +179,7 @@ class VersionSet {
|
||||
EXCLUSIVE_LOCKS_REQUIRED(mu);
|
||||
|
||||
// Recover the last saved descriptor from persistent storage.
|
||||
Status Recover();
|
||||
Status Recover(bool *save_manifest);
|
||||
|
||||
// Return the current version.
|
||||
Version* current() const { return current_; }
|
||||
@ -274,6 +274,8 @@ class VersionSet {
|
||||
friend class Compaction;
|
||||
friend class Version;
|
||||
|
||||
bool ReuseManifest(const std::string& dscname, const std::string& dscbase);
|
||||
|
||||
void Finalize(Version* v);
|
||||
|
||||
void GetRange(const std::vector<FileMetaData*>& inputs,
|
||||
|
@ -277,6 +277,19 @@ class InMemoryEnv : public EnvWrapper {
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
virtual Status NewAppendableFile(const std::string& fname,
|
||||
WritableFile** result) {
|
||||
MutexLock lock(&mutex_);
|
||||
FileState** sptr = &file_map_[fname];
|
||||
FileState* file = *sptr;
|
||||
if (file == NULL) {
|
||||
file = new FileState();
|
||||
file->Ref();
|
||||
}
|
||||
*result = new WritableFileImpl(file);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
virtual bool FileExists(const std::string& fname) {
|
||||
MutexLock lock(&mutex_);
|
||||
return file_map_.find(fname) != file_map_.end();
|
||||
|
@ -40,6 +40,8 @@ TEST(MemEnvTest, Basics) {
|
||||
|
||||
// Create a file.
|
||||
ASSERT_OK(env_->NewWritableFile("/dir/f", &writable_file));
|
||||
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
|
||||
ASSERT_EQ(0, file_size);
|
||||
delete writable_file;
|
||||
|
||||
// Check that the file exists.
|
||||
@ -55,9 +57,16 @@ TEST(MemEnvTest, Basics) {
|
||||
ASSERT_OK(writable_file->Append("abc"));
|
||||
delete writable_file;
|
||||
|
||||
// Check for expected size.
|
||||
// Check that append works.
|
||||
ASSERT_OK(env_->NewAppendableFile("/dir/f", &writable_file));
|
||||
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
|
||||
ASSERT_EQ(3, file_size);
|
||||
ASSERT_OK(writable_file->Append("hello"));
|
||||
delete writable_file;
|
||||
|
||||
// Check for expected size.
|
||||
ASSERT_OK(env_->GetFileSize("/dir/f", &file_size));
|
||||
ASSERT_EQ(8, file_size);
|
||||
|
||||
// Check that renaming works.
|
||||
ASSERT_TRUE(!env_->RenameFile("/dir/non_existent", "/dir/g").ok());
|
||||
@ -65,7 +74,7 @@ TEST(MemEnvTest, Basics) {
|
||||
ASSERT_TRUE(!env_->FileExists("/dir/f"));
|
||||
ASSERT_TRUE(env_->FileExists("/dir/g"));
|
||||
ASSERT_OK(env_->GetFileSize("/dir/g", &file_size));
|
||||
ASSERT_EQ(3, file_size);
|
||||
ASSERT_EQ(8, file_size);
|
||||
|
||||
// Check that opening non-existent file fails.
|
||||
SequentialFile* seq_file;
|
||||
|
@ -69,6 +69,21 @@ class Env {
|
||||
virtual Status NewWritableFile(const std::string& fname,
|
||||
WritableFile** result) = 0;
|
||||
|
||||
// Create an object that either appends to an existing file, or
|
||||
// writes to a new file (if the file does not exist to begin with).
|
||||
// On success, stores a pointer to the new file in *result and
|
||||
// returns OK. On failure stores NULL in *result and returns
|
||||
// non-OK.
|
||||
//
|
||||
// The returned file will only be accessed by one thread at a time.
|
||||
//
|
||||
// May return an IsNotSupportedError error if this Env does
|
||||
// not allow appending to an existing file. Users of Env (including
|
||||
// the leveldb implementation) must be prepared to deal with
|
||||
// an Env that does not support appending.
|
||||
virtual Status NewAppendableFile(const std::string& fname,
|
||||
WritableFile** result);
|
||||
|
||||
// Returns true iff the named file exists.
|
||||
virtual bool FileExists(const std::string& fname) = 0;
|
||||
|
||||
@ -289,6 +304,9 @@ class EnvWrapper : public Env {
|
||||
Status NewWritableFile(const std::string& f, WritableFile** r) {
|
||||
return target_->NewWritableFile(f, r);
|
||||
}
|
||||
Status NewAppendableFile(const std::string& f, WritableFile** r) {
|
||||
return target_->NewAppendableFile(f, r);
|
||||
}
|
||||
bool FileExists(const std::string& f) { return target_->FileExists(f); }
|
||||
Status GetChildren(const std::string& dir, std::vector<std::string>* r) {
|
||||
return target_->GetChildren(dir, r);
|
||||
|
@ -128,6 +128,12 @@ struct Options {
|
||||
// efficiently detect that and will switch to uncompressed mode.
|
||||
CompressionType compression;
|
||||
|
||||
// EXPERIMENTAL: If true, append to existing MANIFEST and log files
|
||||
// when a database is opened. This can significantly speed up open.
|
||||
//
|
||||
// Default: currently false, but may become true later.
|
||||
bool reuse_logs;
|
||||
|
||||
// If non-NULL, use the specified filter policy to reduce disk reads.
|
||||
// Many applications will benefit from passing the result of
|
||||
// NewBloomFilterPolicy() here.
|
||||
|
@ -60,6 +60,9 @@ class Status {
|
||||
// Returns true iff the status indicates an IOError.
|
||||
bool IsIOError() const { return code() == kIOError; }
|
||||
|
||||
// Returns true iff the status indicates a NotSupportedError.
|
||||
bool IsNotSupportedError() const { return code() == kNotSupported; }
|
||||
|
||||
// Return a string representation of this status suitable for printing.
|
||||
// Returns the string "OK" for success.
|
||||
std::string ToString() const;
|
||||
|
@ -9,6 +9,10 @@ namespace leveldb {
|
||||
Env::~Env() {
|
||||
}
|
||||
|
||||
Status Env::NewAppendableFile(const std::string& fname, WritableFile** result) {
|
||||
return Status::NotSupported("NewAppendableFile", fname);
|
||||
}
|
||||
|
||||
SequentialFile::~SequentialFile() {
|
||||
}
|
||||
|
||||
|
@ -350,6 +350,19 @@ class PosixEnv : public Env {
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual Status NewAppendableFile(const std::string& fname,
|
||||
WritableFile** result) {
|
||||
Status s;
|
||||
FILE* f = fopen(fname.c_str(), "a");
|
||||
if (f == NULL) {
|
||||
*result = NULL;
|
||||
s = IOError(fname, errno);
|
||||
} else {
|
||||
*result = new PosixWritableFile(fname, f);
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
virtual bool FileExists(const std::string& fname) {
|
||||
return access(fname.c_str(), F_OK) == 0;
|
||||
}
|
||||
|
@ -22,8 +22,8 @@ Options::Options()
|
||||
block_size(4096),
|
||||
block_restart_interval(16),
|
||||
compression(kSnappyCompression),
|
||||
reuse_logs(false),
|
||||
filter_policy(NULL) {
|
||||
}
|
||||
|
||||
|
||||
} // namespace leveldb
|
||||
|
@ -45,6 +45,16 @@ class ErrorEnv : public EnvWrapper {
|
||||
}
|
||||
return target()->NewWritableFile(fname, result);
|
||||
}
|
||||
|
||||
virtual Status NewAppendableFile(const std::string& fname,
|
||||
WritableFile** result) {
|
||||
if (writable_file_error_) {
|
||||
++num_writable_file_errors_;
|
||||
*result = NULL;
|
||||
return Status::IOError(fname, "fake error");
|
||||
}
|
||||
return target()->NewAppendableFile(fname, result);
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace test
|
||||
|
Loading…
Reference in New Issue
Block a user