Stop future writes if a log file Close() fails.

See https://github.com/google/leveldb/issues/1081

PiperOrigin-RevId: 499519182
This commit is contained in:
Sanjay Ghemawat 2023-01-04 18:22:18 +00:00 committed by Victor Costan
parent aa5479bbf4
commit fb644cb445
2 changed files with 77 additions and 6 deletions

View File

@ -1368,8 +1368,22 @@ Status DBImpl::MakeRoomForWrite(bool force) {
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
s = logfile_->Close();
if (!s.ok()) {
// We may have lost some data written to the previous log file.
// Switch to the new log file anyway, but record as a background
// error so we do not attempt any more writes.
//
// We could perhaps attempt to save the memtable corresponding
// to log file and suppress the error if that works, but that
// would add more complexity in a critical code path.
RecordBackgroundError(s);
}
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);

View File

@ -65,6 +65,19 @@ class AtomicCounter {
void DelayMilliseconds(int millis) {
Env::Default()->SleepForMicroseconds(millis * 1000);
}
bool IsLdbFile(const std::string& f) {
return strstr(f.c_str(), ".ldb") != nullptr;
}
bool IsLogFile(const std::string& f) {
return strstr(f.c_str(), ".log") != nullptr;
}
bool IsManifestFile(const std::string& f) {
return strstr(f.c_str(), "MANIFEST") != nullptr;
}
} // namespace
// Test Env to override default Env behavior for testing.
@ -100,6 +113,10 @@ class TestEnv : public EnvWrapper {
// Special Env used to delay background operations.
class SpecialEnv : public EnvWrapper {
public:
// For historical reasons, the std::atomic<> fields below are currently
// accessed via acquired loads and release stores. We should switch
// to plain load(), store() calls that provide sequential consistency.
// sstable/log Sync() calls are blocked while this pointer is non-null.
std::atomic<bool> delay_data_sync_;
@ -118,6 +135,9 @@ class SpecialEnv : public EnvWrapper {
// Force write to manifest files to fail while this pointer is non-null.
std::atomic<bool> manifest_write_error_;
// Force log file close to fail while this bool is true.
std::atomic<bool> log_file_close_;
bool count_random_reads_;
AtomicCounter random_read_counter_;
@ -129,6 +149,7 @@ class SpecialEnv : public EnvWrapper {
non_writable_(false),
manifest_sync_error_(false),
manifest_write_error_(false),
log_file_close_(false),
count_random_reads_(false) {}
Status NewWritableFile(const std::string& f, WritableFile** r) {
@ -136,9 +157,12 @@ class SpecialEnv : public EnvWrapper {
private:
SpecialEnv* const env_;
WritableFile* const base_;
const std::string fname_;
public:
DataFile(SpecialEnv* env, WritableFile* base) : env_(env), base_(base) {}
DataFile(SpecialEnv* env, WritableFile* base, const std::string& fname)
: env_(env), base_(base), fname_(fname) {}
~DataFile() { delete base_; }
Status Append(const Slice& data) {
if (env_->no_space_.load(std::memory_order_acquire)) {
@ -148,7 +172,14 @@ class SpecialEnv : public EnvWrapper {
return base_->Append(data);
}
}
Status Close() { return base_->Close(); }
Status Close() {
Status s = base_->Close();
if (s.ok() && IsLogFile(fname_) &&
env_->log_file_close_.load(std::memory_order_acquire)) {
s = Status::IOError("simulated log file Close error");
}
return s;
}
Status Flush() { return base_->Flush(); }
Status Sync() {
if (env_->data_sync_error_.load(std::memory_order_acquire)) {
@ -192,10 +223,9 @@ class SpecialEnv : public EnvWrapper {
Status s = target()->NewWritableFile(f, r);
if (s.ok()) {
if (strstr(f.c_str(), ".ldb") != nullptr ||
strstr(f.c_str(), ".log") != nullptr) {
*r = new DataFile(this, *r);
} else if (strstr(f.c_str(), "MANIFEST") != nullptr) {
if (IsLdbFile(f) || IsLogFile(f)) {
*r = new DataFile(this, *r, f);
} else if (IsManifestFile(f)) {
*r = new ManifestFile(this, *r);
}
}
@ -1941,6 +1971,33 @@ TEST_F(DBTest, BloomFilter) {
delete options.filter_policy;
}
TEST_F(DBTest, LogCloseError) {
// Regression test for bug where we could ignore log file
// Close() error when switching to a new log file.
const int kValueSize = 20000;
const int kWriteCount = 10;
const int kWriteBufferSize = (kValueSize * kWriteCount) / 2;
Options options = CurrentOptions();
options.env = env_;
options.write_buffer_size = kWriteBufferSize; // Small write buffer
Reopen(&options);
env_->log_file_close_.store(true, std::memory_order_release);
std::string value(kValueSize, 'x');
Status s;
for (int i = 0; i < kWriteCount && s.ok(); i++) {
s = Put(Key(i), value);
}
ASSERT_TRUE(!s.ok()) << "succeeded even after log file Close failure";
// Future writes should also fail after an earlier error.
s = Put("hello", "world");
ASSERT_TRUE(!s.ok()) << "write succeeded after log file Close failure";
env_->log_file_close_.store(false, std::memory_order_release);
}
// Multi-threaded test:
namespace {