Fix issue 474: a race between the f*_unlocked() STDIO calls in
env_posix.cc and concurrent application calls to fflush(NULL). The fix is to avoid using stdio in env_posix.cc but add our own buffering where we need it. Added a test to reproduce the bug. Added a test for Env reads/writes. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=170738066
This commit is contained in:
parent
bcd9a8ea4a
commit
7e12c00ecf
@ -25,6 +25,13 @@ static std::string RandomString(Random* rnd, int len) {
|
|||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static std::string RandomKey(Random* rnd) {
|
||||||
|
int len = (rnd->OneIn(3)
|
||||||
|
? 1 // Short sometimes to encourage collisions
|
||||||
|
: (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
|
||||||
|
return test::RandomKey(rnd, len);
|
||||||
|
}
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
class AtomicCounter {
|
class AtomicCounter {
|
||||||
private:
|
private:
|
||||||
@ -1394,6 +1401,15 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) {
|
|||||||
ASSERT_EQ("(->)(c->cv)", Contents());
|
ASSERT_EQ("(->)(c->cv)", Contents());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(DBTest, Fflush_Issue474) {
|
||||||
|
static const int kNum = 100000;
|
||||||
|
Random rnd(test::RandomSeed());
|
||||||
|
for (int i = 0; i < kNum; i++) {
|
||||||
|
fflush(NULL);
|
||||||
|
ASSERT_OK(Put(RandomKey(&rnd), RandomString(&rnd, 100)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST(DBTest, ComparatorCheck) {
|
TEST(DBTest, ComparatorCheck) {
|
||||||
class NewComparator : public Comparator {
|
class NewComparator : public Comparator {
|
||||||
public:
|
public:
|
||||||
@ -1959,13 +1975,6 @@ class ModelDB: public DB {
|
|||||||
KVMap map_;
|
KVMap map_;
|
||||||
};
|
};
|
||||||
|
|
||||||
static std::string RandomKey(Random* rnd) {
|
|
||||||
int len = (rnd->OneIn(3)
|
|
||||||
? 1 // Short sometimes to encourage collisions
|
|
||||||
: (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10)));
|
|
||||||
return test::RandomKey(rnd, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool CompareIterators(int step,
|
static bool CompareIterators(int step,
|
||||||
DB* model,
|
DB* model,
|
||||||
DB* db,
|
DB* db,
|
||||||
|
@ -34,6 +34,8 @@ namespace {
|
|||||||
static int open_read_only_file_limit = -1;
|
static int open_read_only_file_limit = -1;
|
||||||
static int mmap_limit = -1;
|
static int mmap_limit = -1;
|
||||||
|
|
||||||
|
static const size_t kBufSize = 65536;
|
||||||
|
|
||||||
static Status PosixError(const std::string& context, int err_number) {
|
static Status PosixError(const std::string& context, int err_number) {
|
||||||
if (err_number == ENOENT) {
|
if (err_number == ENOENT) {
|
||||||
return Status::NotFound(context, strerror(err_number));
|
return Status::NotFound(context, strerror(err_number));
|
||||||
@ -96,30 +98,32 @@ class Limiter {
|
|||||||
class PosixSequentialFile: public SequentialFile {
|
class PosixSequentialFile: public SequentialFile {
|
||||||
private:
|
private:
|
||||||
std::string filename_;
|
std::string filename_;
|
||||||
FILE* file_;
|
int fd_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
PosixSequentialFile(const std::string& fname, FILE* f)
|
PosixSequentialFile(const std::string& fname, int fd)
|
||||||
: filename_(fname), file_(f) { }
|
: filename_(fname), fd_(fd) {}
|
||||||
virtual ~PosixSequentialFile() { fclose(file_); }
|
virtual ~PosixSequentialFile() { close(fd_); }
|
||||||
|
|
||||||
virtual Status Read(size_t n, Slice* result, char* scratch) {
|
virtual Status Read(size_t n, Slice* result, char* scratch) {
|
||||||
Status s;
|
Status s;
|
||||||
size_t r = fread_unlocked(scratch, 1, n, file_);
|
while (true) {
|
||||||
*result = Slice(scratch, r);
|
ssize_t r = read(fd_, scratch, n);
|
||||||
if (r < n) {
|
if (r < 0) {
|
||||||
if (feof(file_)) {
|
if (errno == EINTR) {
|
||||||
// We leave status as ok if we hit the end of the file
|
continue; // Retry
|
||||||
} else {
|
}
|
||||||
// A partial read with an error: return a non-ok status
|
|
||||||
s = PosixError(filename_, errno);
|
s = PosixError(filename_, errno);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
*result = Slice(scratch, r);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status Skip(uint64_t n) {
|
virtual Status Skip(uint64_t n) {
|
||||||
if (fseek(file_, n, SEEK_CUR)) {
|
if (lseek(fd_, n, SEEK_CUR) == static_cast<off_t>(-1)) {
|
||||||
return PosixError(filename_, errno);
|
return PosixError(filename_, errno);
|
||||||
}
|
}
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
@ -213,42 +217,64 @@ class PosixMmapReadableFile: public RandomAccessFile {
|
|||||||
|
|
||||||
class PosixWritableFile : public WritableFile {
|
class PosixWritableFile : public WritableFile {
|
||||||
private:
|
private:
|
||||||
|
// buf_[0, pos_-1] contains data to be written to fd_.
|
||||||
std::string filename_;
|
std::string filename_;
|
||||||
FILE* file_;
|
int fd_;
|
||||||
|
char buf_[kBufSize];
|
||||||
|
size_t pos_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
PosixWritableFile(const std::string& fname, FILE* f)
|
PosixWritableFile(const std::string& fname, int fd)
|
||||||
: filename_(fname), file_(f) { }
|
: filename_(fname), fd_(fd), pos_(0) { }
|
||||||
|
|
||||||
~PosixWritableFile() {
|
~PosixWritableFile() {
|
||||||
if (file_ != NULL) {
|
if (fd_ >= 0) {
|
||||||
// Ignoring any potential errors
|
// Ignoring any potential errors
|
||||||
fclose(file_);
|
FlushBuffered();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status Append(const Slice& data) {
|
virtual Status Append(const Slice& data) {
|
||||||
size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_);
|
size_t n = data.size();
|
||||||
if (r != data.size()) {
|
const char* p = data.data();
|
||||||
return PosixError(filename_, errno);
|
|
||||||
|
// Fit as much as possible into buffer.
|
||||||
|
size_t copy = std::min(n, kBufSize - pos_);
|
||||||
|
memcpy(buf_ + pos_, p, copy);
|
||||||
|
p += copy;
|
||||||
|
n -= copy;
|
||||||
|
pos_ += copy;
|
||||||
|
if (n == 0) {
|
||||||
|
return Status::OK();
|
||||||
}
|
}
|
||||||
return Status::OK();
|
|
||||||
|
// Can't fit in buffer, so need to do at least one write.
|
||||||
|
Status s = FlushBuffered();
|
||||||
|
if (!s.ok()) {
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Small writes go to buffer, large writes are written directly.
|
||||||
|
if (n < kBufSize) {
|
||||||
|
memcpy(buf_, p, n);
|
||||||
|
pos_ = n;
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
|
return WriteRaw(p, n);
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status Close() {
|
virtual Status Close() {
|
||||||
Status result;
|
Status result = FlushBuffered();
|
||||||
if (fclose(file_) != 0) {
|
const int r = close(fd_);
|
||||||
|
if (r < 0 && result.ok()) {
|
||||||
result = PosixError(filename_, errno);
|
result = PosixError(filename_, errno);
|
||||||
}
|
}
|
||||||
file_ = NULL;
|
fd_ = -1;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
virtual Status Flush() {
|
virtual Status Flush() {
|
||||||
if (fflush_unlocked(file_) != 0) {
|
return FlushBuffered();
|
||||||
return PosixError(filename_, errno);
|
|
||||||
}
|
|
||||||
return Status::OK();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Status SyncDirIfManifest() {
|
Status SyncDirIfManifest() {
|
||||||
@ -284,12 +310,36 @@ class PosixWritableFile : public WritableFile {
|
|||||||
if (!s.ok()) {
|
if (!s.ok()) {
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
if (fflush_unlocked(file_) != 0 ||
|
s = FlushBuffered();
|
||||||
fdatasync(fileno(file_)) != 0) {
|
if (s.ok()) {
|
||||||
s = Status::IOError(filename_, strerror(errno));
|
if (fdatasync(fd_) != 0) {
|
||||||
|
s = PosixError(filename_, errno);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
Status FlushBuffered() {
|
||||||
|
Status s = WriteRaw(buf_, pos_);
|
||||||
|
pos_ = 0;
|
||||||
|
return s;
|
||||||
|
}
|
||||||
|
|
||||||
|
Status WriteRaw(const char* p, size_t n) {
|
||||||
|
while (n > 0) {
|
||||||
|
ssize_t r = write(fd_, p, n);
|
||||||
|
if (r < 0) {
|
||||||
|
if (errno == EINTR) {
|
||||||
|
continue; // Retry
|
||||||
|
}
|
||||||
|
return PosixError(filename_, errno);
|
||||||
|
}
|
||||||
|
p += r;
|
||||||
|
n -= r;
|
||||||
|
}
|
||||||
|
return Status::OK();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
static int LockOrUnlock(int fd, bool lock) {
|
static int LockOrUnlock(int fd, bool lock) {
|
||||||
@ -338,12 +388,12 @@ class PosixEnv : public Env {
|
|||||||
|
|
||||||
virtual Status NewSequentialFile(const std::string& fname,
|
virtual Status NewSequentialFile(const std::string& fname,
|
||||||
SequentialFile** result) {
|
SequentialFile** result) {
|
||||||
FILE* f = fopen(fname.c_str(), "r");
|
int fd = open(fname.c_str(), O_RDONLY);
|
||||||
if (f == NULL) {
|
if (fd < 0) {
|
||||||
*result = NULL;
|
*result = NULL;
|
||||||
return PosixError(fname, errno);
|
return PosixError(fname, errno);
|
||||||
} else {
|
} else {
|
||||||
*result = new PosixSequentialFile(fname, f);
|
*result = new PosixSequentialFile(fname, fd);
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -379,12 +429,12 @@ class PosixEnv : public Env {
|
|||||||
virtual Status NewWritableFile(const std::string& fname,
|
virtual Status NewWritableFile(const std::string& fname,
|
||||||
WritableFile** result) {
|
WritableFile** result) {
|
||||||
Status s;
|
Status s;
|
||||||
FILE* f = fopen(fname.c_str(), "w");
|
int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
|
||||||
if (f == NULL) {
|
if (fd < 0) {
|
||||||
*result = NULL;
|
*result = NULL;
|
||||||
s = PosixError(fname, errno);
|
s = PosixError(fname, errno);
|
||||||
} else {
|
} else {
|
||||||
*result = new PosixWritableFile(fname, f);
|
*result = new PosixWritableFile(fname, fd);
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
@ -392,12 +442,12 @@ class PosixEnv : public Env {
|
|||||||
virtual Status NewAppendableFile(const std::string& fname,
|
virtual Status NewAppendableFile(const std::string& fname,
|
||||||
WritableFile** result) {
|
WritableFile** result) {
|
||||||
Status s;
|
Status s;
|
||||||
FILE* f = fopen(fname.c_str(), "a");
|
int fd = open(fname.c_str(), O_APPEND | O_RDWR | O_CREAT, 0644);
|
||||||
if (f == NULL) {
|
if (fd < 0) {
|
||||||
*result = NULL;
|
*result = NULL;
|
||||||
s = PosixError(fname, errno);
|
s = PosixError(fname, errno);
|
||||||
} else {
|
} else {
|
||||||
*result = new PosixWritableFile(fname, f);
|
*result = new PosixWritableFile(fname, fd);
|
||||||
}
|
}
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
|
|
||||||
#include "port/port.h"
|
#include "port/port.h"
|
||||||
#include "util/testharness.h"
|
#include "util/testharness.h"
|
||||||
|
#include "util/testutil.h"
|
||||||
|
|
||||||
namespace leveldb {
|
namespace leveldb {
|
||||||
|
|
||||||
@ -27,6 +28,55 @@ static void SetBool(void* ptr) {
|
|||||||
reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
|
reinterpret_cast<port::AtomicPointer*>(ptr)->NoBarrier_Store(ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TEST(EnvTest, ReadWrite) {
|
||||||
|
Random rnd(test::RandomSeed());
|
||||||
|
|
||||||
|
// Get file to use for testing.
|
||||||
|
std::string test_dir;
|
||||||
|
ASSERT_OK(env_->GetTestDirectory(&test_dir));
|
||||||
|
std::string test_file_name = test_dir + "/open_on_read.txt";
|
||||||
|
WritableFile* wfile_tmp;
|
||||||
|
ASSERT_OK(env_->NewWritableFile(test_file_name, &wfile_tmp));
|
||||||
|
std::unique_ptr<WritableFile> wfile(wfile_tmp);
|
||||||
|
|
||||||
|
// Fill a file with data generated via a sequence of randomly sized writes.
|
||||||
|
static const size_t kDataSize = 10 * 1048576;
|
||||||
|
std::string data;
|
||||||
|
while (data.size() < kDataSize) {
|
||||||
|
int len = rnd.Skewed(18); // Up to 2^18 - 1, but typically much smaller
|
||||||
|
std::string r;
|
||||||
|
test::RandomString(&rnd, len, &r);
|
||||||
|
ASSERT_OK(wfile->Append(r));
|
||||||
|
data += r;
|
||||||
|
if (rnd.OneIn(10)) {
|
||||||
|
ASSERT_OK(wfile->Flush());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT_OK(wfile->Sync());
|
||||||
|
ASSERT_OK(wfile->Close());
|
||||||
|
wfile.reset();
|
||||||
|
|
||||||
|
// Read all data using a sequence of randomly sized reads.
|
||||||
|
SequentialFile* rfile_tmp;
|
||||||
|
ASSERT_OK(env_->NewSequentialFile(test_file_name, &rfile_tmp));
|
||||||
|
std::unique_ptr<SequentialFile> rfile(rfile_tmp);
|
||||||
|
std::string read_result;
|
||||||
|
std::string scratch;
|
||||||
|
while (read_result.size() < data.size()) {
|
||||||
|
int len = std::min<int>(rnd.Skewed(18), data.size() - read_result.size());
|
||||||
|
scratch.resize(std::max(len, 1)); // at least 1 so &scratch[0] is legal
|
||||||
|
Slice read;
|
||||||
|
ASSERT_OK(rfile->Read(len, &read, &scratch[0]));
|
||||||
|
if (len > 0) {
|
||||||
|
ASSERT_GT(read.size(), 0);
|
||||||
|
}
|
||||||
|
ASSERT_LE(read.size(), len);
|
||||||
|
read_result.append(read.data(), read.size());
|
||||||
|
}
|
||||||
|
ASSERT_EQ(read_result, data);
|
||||||
|
}
|
||||||
|
|
||||||
TEST(EnvTest, RunImmediately) {
|
TEST(EnvTest, RunImmediately) {
|
||||||
port::AtomicPointer called (NULL);
|
port::AtomicPointer called (NULL);
|
||||||
env_->Schedule(&SetBool, &called);
|
env_->Schedule(&SetBool, &called);
|
||||||
|
Loading…
Reference in New Issue
Block a user