diff --git a/db/db_test.cc b/db/db_test.cc index a0b08bc..edc3916 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -25,6 +25,13 @@ static std::string RandomString(Random* rnd, int len) { return r; } +static std::string RandomKey(Random* rnd) { + int len = (rnd->OneIn(3) + ? 1 // Short sometimes to encourage collisions + : (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10))); + return test::RandomKey(rnd, len); +} + namespace { class AtomicCounter { private: @@ -1394,6 +1401,15 @@ TEST(DBTest, L0_CompactionBug_Issue44_b) { ASSERT_EQ("(->)(c->cv)", Contents()); } +TEST(DBTest, Fflush_Issue474) { + static const int kNum = 100000; + Random rnd(test::RandomSeed()); + for (int i = 0; i < kNum; i++) { + fflush(NULL); + ASSERT_OK(Put(RandomKey(&rnd), RandomString(&rnd, 100))); + } +} + TEST(DBTest, ComparatorCheck) { class NewComparator : public Comparator { public: @@ -1959,13 +1975,6 @@ class ModelDB: public DB { KVMap map_; }; -static std::string RandomKey(Random* rnd) { - int len = (rnd->OneIn(3) - ? 1 // Short sometimes to encourage collisions - : (rnd->OneIn(100) ? rnd->Skewed(10) : rnd->Uniform(10))); - return test::RandomKey(rnd, len); -} - static bool CompareIterators(int step, DB* model, DB* db, diff --git a/util/env_posix.cc b/util/env_posix.cc index b21f515..1a88802 100644 --- a/util/env_posix.cc +++ b/util/env_posix.cc @@ -34,6 +34,8 @@ namespace { static int open_read_only_file_limit = -1; static int mmap_limit = -1; +static const size_t kBufSize = 65536; + static Status PosixError(const std::string& context, int err_number) { if (err_number == ENOENT) { return Status::NotFound(context, strerror(err_number)); @@ -96,30 +98,32 @@ class Limiter { class PosixSequentialFile: public SequentialFile { private: std::string filename_; - FILE* file_; + int fd_; public: - PosixSequentialFile(const std::string& fname, FILE* f) - : filename_(fname), file_(f) { } - virtual ~PosixSequentialFile() { fclose(file_); } + PosixSequentialFile(const std::string& fname, int fd) + : filename_(fname), fd_(fd) {} + virtual ~PosixSequentialFile() { close(fd_); } virtual Status Read(size_t n, Slice* result, char* scratch) { Status s; - size_t r = fread_unlocked(scratch, 1, n, file_); - *result = Slice(scratch, r); - if (r < n) { - if (feof(file_)) { - // We leave status as ok if we hit the end of the file - } else { - // A partial read with an error: return a non-ok status + while (true) { + ssize_t r = read(fd_, scratch, n); + if (r < 0) { + if (errno == EINTR) { + continue; // Retry + } s = PosixError(filename_, errno); + break; } + *result = Slice(scratch, r); + break; } return s; } virtual Status Skip(uint64_t n) { - if (fseek(file_, n, SEEK_CUR)) { + if (lseek(fd_, n, SEEK_CUR) == static_cast(-1)) { return PosixError(filename_, errno); } return Status::OK(); @@ -213,42 +217,64 @@ class PosixMmapReadableFile: public RandomAccessFile { class PosixWritableFile : public WritableFile { private: + // buf_[0, pos_-1] contains data to be written to fd_. std::string filename_; - FILE* file_; + int fd_; + char buf_[kBufSize]; + size_t pos_; public: - PosixWritableFile(const std::string& fname, FILE* f) - : filename_(fname), file_(f) { } + PosixWritableFile(const std::string& fname, int fd) + : filename_(fname), fd_(fd), pos_(0) { } ~PosixWritableFile() { - if (file_ != NULL) { + if (fd_ >= 0) { // Ignoring any potential errors - fclose(file_); + FlushBuffered(); } } virtual Status Append(const Slice& data) { - size_t r = fwrite_unlocked(data.data(), 1, data.size(), file_); - if (r != data.size()) { - return PosixError(filename_, errno); + size_t n = data.size(); + const char* p = data.data(); + + // Fit as much as possible into buffer. + size_t copy = std::min(n, kBufSize - pos_); + memcpy(buf_ + pos_, p, copy); + p += copy; + n -= copy; + pos_ += copy; + if (n == 0) { + return Status::OK(); } - return Status::OK(); + + // Can't fit in buffer, so need to do at least one write. + Status s = FlushBuffered(); + if (!s.ok()) { + return s; + } + + // Small writes go to buffer, large writes are written directly. + if (n < kBufSize) { + memcpy(buf_, p, n); + pos_ = n; + return Status::OK(); + } + return WriteRaw(p, n); } virtual Status Close() { - Status result; - if (fclose(file_) != 0) { + Status result = FlushBuffered(); + const int r = close(fd_); + if (r < 0 && result.ok()) { result = PosixError(filename_, errno); } - file_ = NULL; + fd_ = -1; return result; } virtual Status Flush() { - if (fflush_unlocked(file_) != 0) { - return PosixError(filename_, errno); - } - return Status::OK(); + return FlushBuffered(); } Status SyncDirIfManifest() { @@ -284,12 +310,36 @@ class PosixWritableFile : public WritableFile { if (!s.ok()) { return s; } - if (fflush_unlocked(file_) != 0 || - fdatasync(fileno(file_)) != 0) { - s = Status::IOError(filename_, strerror(errno)); + s = FlushBuffered(); + if (s.ok()) { + if (fdatasync(fd_) != 0) { + s = PosixError(filename_, errno); + } } return s; } + + private: + Status FlushBuffered() { + Status s = WriteRaw(buf_, pos_); + pos_ = 0; + return s; + } + + Status WriteRaw(const char* p, size_t n) { + while (n > 0) { + ssize_t r = write(fd_, p, n); + if (r < 0) { + if (errno == EINTR) { + continue; // Retry + } + return PosixError(filename_, errno); + } + p += r; + n -= r; + } + return Status::OK(); + } }; static int LockOrUnlock(int fd, bool lock) { @@ -338,12 +388,12 @@ class PosixEnv : public Env { virtual Status NewSequentialFile(const std::string& fname, SequentialFile** result) { - FILE* f = fopen(fname.c_str(), "r"); - if (f == NULL) { + int fd = open(fname.c_str(), O_RDONLY); + if (fd < 0) { *result = NULL; return PosixError(fname, errno); } else { - *result = new PosixSequentialFile(fname, f); + *result = new PosixSequentialFile(fname, fd); return Status::OK(); } } @@ -379,12 +429,12 @@ class PosixEnv : public Env { virtual Status NewWritableFile(const std::string& fname, WritableFile** result) { Status s; - FILE* f = fopen(fname.c_str(), "w"); - if (f == NULL) { + int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644); + if (fd < 0) { *result = NULL; s = PosixError(fname, errno); } else { - *result = new PosixWritableFile(fname, f); + *result = new PosixWritableFile(fname, fd); } return s; } @@ -392,12 +442,12 @@ class PosixEnv : public Env { virtual Status NewAppendableFile(const std::string& fname, WritableFile** result) { Status s; - FILE* f = fopen(fname.c_str(), "a"); - if (f == NULL) { + int fd = open(fname.c_str(), O_APPEND | O_RDWR | O_CREAT, 0644); + if (fd < 0) { *result = NULL; s = PosixError(fname, errno); } else { - *result = new PosixWritableFile(fname, f); + *result = new PosixWritableFile(fname, fd); } return s; } diff --git a/util/env_test.cc b/util/env_test.cc index 4d8ecc4..53ec923 100644 --- a/util/env_test.cc +++ b/util/env_test.cc @@ -6,6 +6,7 @@ #include "port/port.h" #include "util/testharness.h" +#include "util/testutil.h" namespace leveldb { @@ -27,6 +28,55 @@ static void SetBool(void* ptr) { reinterpret_cast(ptr)->NoBarrier_Store(ptr); } + +TEST(EnvTest, ReadWrite) { + Random rnd(test::RandomSeed()); + + // Get file to use for testing. + std::string test_dir; + ASSERT_OK(env_->GetTestDirectory(&test_dir)); + std::string test_file_name = test_dir + "/open_on_read.txt"; + WritableFile* wfile_tmp; + ASSERT_OK(env_->NewWritableFile(test_file_name, &wfile_tmp)); + std::unique_ptr wfile(wfile_tmp); + + // Fill a file with data generated via a sequence of randomly sized writes. + static const size_t kDataSize = 10 * 1048576; + std::string data; + while (data.size() < kDataSize) { + int len = rnd.Skewed(18); // Up to 2^18 - 1, but typically much smaller + std::string r; + test::RandomString(&rnd, len, &r); + ASSERT_OK(wfile->Append(r)); + data += r; + if (rnd.OneIn(10)) { + ASSERT_OK(wfile->Flush()); + } + } + ASSERT_OK(wfile->Sync()); + ASSERT_OK(wfile->Close()); + wfile.reset(); + + // Read all data using a sequence of randomly sized reads. + SequentialFile* rfile_tmp; + ASSERT_OK(env_->NewSequentialFile(test_file_name, &rfile_tmp)); + std::unique_ptr rfile(rfile_tmp); + std::string read_result; + std::string scratch; + while (read_result.size() < data.size()) { + int len = std::min(rnd.Skewed(18), data.size() - read_result.size()); + scratch.resize(std::max(len, 1)); // at least 1 so &scratch[0] is legal + Slice read; + ASSERT_OK(rfile->Read(len, &read, &scratch[0])); + if (len > 0) { + ASSERT_GT(read.size(), 0); + } + ASSERT_LE(read.size(), len); + read_result.append(read.data(), read.size()); + } + ASSERT_EQ(read_result, data); +} + TEST(EnvTest, RunImmediately) { port::AtomicPointer called (NULL); env_->Schedule(&SetBool, &called);