Fix corruption bug found and analyzed by dhruba@gmail.com
https://groups.google.com/d/msg/leveldb/Kc9JxuIUu5A/9P0N9RL4ar8J
This commit is contained in:
parent
ea2e9195fc
commit
d84c825a70
2
Makefile
2
Makefile
@ -69,7 +69,7 @@ SHARED = $(SHARED1)
|
||||
else
|
||||
# Update db.h if you change these.
|
||||
SHARED_MAJOR = 1
|
||||
SHARED_MINOR = 8
|
||||
SHARED_MINOR = 9
|
||||
SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT)
|
||||
SHARED2 = $(SHARED1).$(SHARED_MAJOR)
|
||||
SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR)
|
||||
|
@ -59,6 +59,12 @@ class SpecialEnv : public EnvWrapper {
|
||||
// Simulate non-writable file system while this pointer is non-NULL
|
||||
port::AtomicPointer non_writable_;
|
||||
|
||||
// Force sync of manifest files to fail while this pointer is non-NULL
|
||||
port::AtomicPointer manifest_sync_error_;
|
||||
|
||||
// Force write to manifest files to fail while this pointer is non-NULL
|
||||
port::AtomicPointer manifest_write_error_;
|
||||
|
||||
bool count_random_reads_;
|
||||
AtomicCounter random_read_counter_;
|
||||
|
||||
@ -69,6 +75,8 @@ class SpecialEnv : public EnvWrapper {
|
||||
no_space_.Release_Store(NULL);
|
||||
non_writable_.Release_Store(NULL);
|
||||
count_random_reads_ = false;
|
||||
manifest_sync_error_.Release_Store(NULL);
|
||||
manifest_write_error_.Release_Store(NULL);
|
||||
}
|
||||
|
||||
Status NewWritableFile(const std::string& f, WritableFile** r) {
|
||||
@ -100,6 +108,30 @@ class SpecialEnv : public EnvWrapper {
|
||||
return base_->Sync();
|
||||
}
|
||||
};
|
||||
class ManifestFile : public WritableFile {
|
||||
private:
|
||||
SpecialEnv* env_;
|
||||
WritableFile* base_;
|
||||
public:
|
||||
ManifestFile(SpecialEnv* env, WritableFile* b) : env_(env), base_(b) { }
|
||||
~ManifestFile() { delete base_; }
|
||||
Status Append(const Slice& data) {
|
||||
if (env_->manifest_write_error_.Acquire_Load() != NULL) {
|
||||
return Status::IOError("simulated writer error");
|
||||
} else {
|
||||
return base_->Append(data);
|
||||
}
|
||||
}
|
||||
Status Close() { return base_->Close(); }
|
||||
Status Flush() { return base_->Flush(); }
|
||||
Status Sync() {
|
||||
if (env_->manifest_sync_error_.Acquire_Load() != NULL) {
|
||||
return Status::IOError("simulated sync error");
|
||||
} else {
|
||||
return base_->Sync();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if (non_writable_.Acquire_Load() != NULL) {
|
||||
return Status::IOError("simulated write error");
|
||||
@ -109,6 +141,8 @@ class SpecialEnv : public EnvWrapper {
|
||||
if (s.ok()) {
|
||||
if (strstr(f.c_str(), ".sst") != NULL) {
|
||||
*r = new SSTableFile(this, *r);
|
||||
} else if (strstr(f.c_str(), "MANIFEST") != NULL) {
|
||||
*r = new ManifestFile(this, *r);
|
||||
}
|
||||
}
|
||||
return s;
|
||||
@ -1492,6 +1526,47 @@ TEST(DBTest, NonWritableFileSystem) {
|
||||
env_->non_writable_.Release_Store(NULL);
|
||||
}
|
||||
|
||||
TEST(DBTest, ManifestWriteError) {
|
||||
// Test for the following problem:
|
||||
// (a) Compaction produces file F
|
||||
// (b) Log record containing F is written to MANIFEST file, but Sync() fails
|
||||
// (c) GC deletes F
|
||||
// (d) After reopening DB, reads fail since deleted F is named in log record
|
||||
|
||||
// We iterate twice. In the second iteration, everything is the
|
||||
// same except the log record never makes it to the MANIFEST file.
|
||||
for (int iter = 0; iter < 2; iter++) {
|
||||
port::AtomicPointer* error_type = (iter == 0)
|
||||
? &env_->manifest_sync_error_
|
||||
: &env_->manifest_write_error_;
|
||||
|
||||
// Insert foo=>bar mapping
|
||||
Options options = CurrentOptions();
|
||||
options.env = env_;
|
||||
options.create_if_missing = true;
|
||||
options.error_if_exists = false;
|
||||
DestroyAndReopen(&options);
|
||||
ASSERT_OK(Put("foo", "bar"));
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
|
||||
// Memtable compaction (will succeed)
|
||||
dbfull()->TEST_CompactMemTable();
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
const int last = config::kMaxMemCompactLevel;
|
||||
ASSERT_EQ(NumTableFilesAtLevel(last), 1); // foo=>bar is now in last level
|
||||
|
||||
// Merging compaction (will fail)
|
||||
error_type->Release_Store(env_);
|
||||
dbfull()->TEST_CompactRange(last, NULL, NULL); // Should fail
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
|
||||
// Recovery: should not lose data
|
||||
error_type->Release_Store(NULL);
|
||||
Reopen(&options);
|
||||
ASSERT_EQ("bar", Get("foo"));
|
||||
}
|
||||
}
|
||||
|
||||
TEST(DBTest, FilesDeletedAfterCompaction) {
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Compact("a", "z");
|
||||
|
@ -786,12 +786,23 @@ Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
|
||||
if (s.ok()) {
|
||||
s = descriptor_file_->Sync();
|
||||
}
|
||||
if (!s.ok()) {
|
||||
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
|
||||
if (ManifestContains(record)) {
|
||||
Log(options_->info_log,
|
||||
"MANIFEST contains log record despite error; advancing to new "
|
||||
"version to prevent mismatch between in-memory and logged state");
|
||||
s = Status::OK();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If we just created a new descriptor file, install it by writing a
|
||||
// new CURRENT file that points to it.
|
||||
if (s.ok() && !new_manifest_file.empty()) {
|
||||
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
|
||||
// No need to double-check MANIFEST in case of error since it
|
||||
// will be discarded below.
|
||||
}
|
||||
|
||||
mu->Lock();
|
||||
@ -1025,6 +1036,31 @@ const char* VersionSet::LevelSummary(LevelSummaryStorage* scratch) const {
|
||||
return scratch->buffer;
|
||||
}
|
||||
|
||||
// Return true iff the manifest contains the specified record.
|
||||
bool VersionSet::ManifestContains(const std::string& record) const {
|
||||
std::string fname = DescriptorFileName(dbname_, manifest_file_number_);
|
||||
Log(options_->info_log, "ManifestContains: checking %s\n", fname.c_str());
|
||||
SequentialFile* file = NULL;
|
||||
Status s = env_->NewSequentialFile(fname, &file);
|
||||
if (!s.ok()) {
|
||||
Log(options_->info_log, "ManifestContains: %s\n", s.ToString().c_str());
|
||||
return false;
|
||||
}
|
||||
log::Reader reader(file, NULL, true/*checksum*/, 0);
|
||||
Slice r;
|
||||
std::string scratch;
|
||||
bool result = false;
|
||||
while (reader.ReadRecord(&r, &scratch)) {
|
||||
if (r == Slice(record)) {
|
||||
result = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
delete file;
|
||||
Log(options_->info_log, "ManifestContains: result = %d\n", result ? 1 : 0);
|
||||
return result;
|
||||
}
|
||||
|
||||
uint64_t VersionSet::ApproximateOffsetOf(Version* v, const InternalKey& ikey) {
|
||||
uint64_t result = 0;
|
||||
for (int level = 0; level < config::kNumLevels; level++) {
|
||||
|
@ -277,6 +277,8 @@ class VersionSet {
|
||||
|
||||
void AppendVersion(Version* v);
|
||||
|
||||
bool ManifestContains(const std::string& record) const;
|
||||
|
||||
Env* const env_;
|
||||
const std::string dbname_;
|
||||
const Options* const options_;
|
||||
|
@ -14,7 +14,7 @@ namespace leveldb {
|
||||
|
||||
// Update Makefile if you change these
|
||||
static const int kMajorVersion = 1;
|
||||
static const int kMinorVersion = 8;
|
||||
static const int kMinorVersion = 9;
|
||||
|
||||
struct Options;
|
||||
struct ReadOptions;
|
||||
|
Loading…
Reference in New Issue
Block a user