This CL fixes a bug encountered when reading records from leveldb files that have been split, as in a [] input task split.
Detailed description: Suppose an input split is generated between two leveldb record blocks and the preceding block ends with null padding. A reader that previously read at least 1 record within the first block (before encountering the padding) upon trying to read the next record, will successfully and correctly read the next logical record from the subsequent block, but will return a last record offset pointing to the padding in the first block. When this happened in a [], it resulted in duplicate records being handled at what appeared to be different offsets that were separated by only a few bytes. This behavior is only observed when at least 1 record was read from the first block before encountering the padding. If the initial offset for a reader was within the padding, the correct record offset would be reported, namely the offset within the second block. The tests failed to catch this scenario/bug, because each read test only read a single record with an initial offset. This CL adds an explicit test case for this scenario, and modifies the test structure to read all remaining records in the test case after an initial offset is specified. Thus an initial offset that jumps to record #3, with 5 total records in the test file, will result in reading 2 records, and validating the offset of each of them in order to pass successfully. ------------- Created by MOE: https://github.com/google/moe MOE_MIGRATED_REVID=115338487
This commit is contained in:
parent
3211343909
commit
e84b5bdb5a
@ -73,8 +73,14 @@ bool Reader::ReadRecord(Slice* record, std::string* scratch) {
|
|||||||
|
|
||||||
Slice fragment;
|
Slice fragment;
|
||||||
while (true) {
|
while (true) {
|
||||||
uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size();
|
|
||||||
const unsigned int record_type = ReadPhysicalRecord(&fragment);
|
const unsigned int record_type = ReadPhysicalRecord(&fragment);
|
||||||
|
|
||||||
|
// ReadPhysicalRecord may have only had an empty trailer remaining in its
|
||||||
|
// internal buffer. Calculate the offset of the next physical record now
|
||||||
|
// that it has returned, properly accounting for its header size.
|
||||||
|
uint64_t physical_record_offset =
|
||||||
|
end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
|
||||||
|
|
||||||
if (resyncing_) {
|
if (resyncing_) {
|
||||||
if (record_type == kMiddleType) {
|
if (record_type == kMiddleType) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -110,6 +110,7 @@ class LogTest {
|
|||||||
// Record metadata for testing initial offset functionality
|
// Record metadata for testing initial offset functionality
|
||||||
static size_t initial_offset_record_sizes_[];
|
static size_t initial_offset_record_sizes_[];
|
||||||
static uint64_t initial_offset_last_record_offsets_[];
|
static uint64_t initial_offset_last_record_offsets_[];
|
||||||
|
static int num_initial_offset_records_;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
LogTest() : reading_(false),
|
LogTest() : reading_(false),
|
||||||
@ -192,7 +193,7 @@ class LogTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void WriteInitialOffsetLog() {
|
void WriteInitialOffsetLog() {
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < num_initial_offset_records_; i++) {
|
||||||
std::string record(initial_offset_record_sizes_[i],
|
std::string record(initial_offset_record_sizes_[i],
|
||||||
static_cast<char>('a' + i));
|
static_cast<char>('a' + i));
|
||||||
Write(record);
|
Write(record);
|
||||||
@ -223,6 +224,11 @@ class LogTest {
|
|||||||
source_.contents_ = Slice(dest_.contents_);
|
source_.contents_ = Slice(dest_.contents_);
|
||||||
Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
|
Reader* offset_reader = new Reader(&source_, &report_, true/*checksum*/,
|
||||||
initial_offset);
|
initial_offset);
|
||||||
|
|
||||||
|
// Read all records from expected_record_offset through the last one.
|
||||||
|
ASSERT_LT(expected_record_offset, num_initial_offset_records_);
|
||||||
|
for (; expected_record_offset < num_initial_offset_records_;
|
||||||
|
++expected_record_offset) {
|
||||||
Slice record;
|
Slice record;
|
||||||
std::string scratch;
|
std::string scratch;
|
||||||
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
|
ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
|
||||||
@ -231,6 +237,7 @@ class LogTest {
|
|||||||
ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
|
ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
|
||||||
offset_reader->LastRecordOffset());
|
offset_reader->LastRecordOffset());
|
||||||
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
|
ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
|
||||||
|
}
|
||||||
delete offset_reader;
|
delete offset_reader;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -239,15 +246,26 @@ size_t LogTest::initial_offset_record_sizes_[] =
|
|||||||
{10000, // Two sizable records in first block
|
{10000, // Two sizable records in first block
|
||||||
10000,
|
10000,
|
||||||
2 * log::kBlockSize - 1000, // Span three blocks
|
2 * log::kBlockSize - 1000, // Span three blocks
|
||||||
1};
|
1,
|
||||||
|
13716, // Consume all but two bytes of block 3.
|
||||||
|
log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
|
||||||
|
};
|
||||||
|
|
||||||
uint64_t LogTest::initial_offset_last_record_offsets_[] =
|
uint64_t LogTest::initial_offset_last_record_offsets_[] =
|
||||||
{0,
|
{0,
|
||||||
kHeaderSize + 10000,
|
kHeaderSize + 10000,
|
||||||
2 * (kHeaderSize + 10000),
|
2 * (kHeaderSize + 10000),
|
||||||
2 * (kHeaderSize + 10000) +
|
2 * (kHeaderSize + 10000) +
|
||||||
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize};
|
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
|
||||||
|
2 * (kHeaderSize + 10000) +
|
||||||
|
(2 * log::kBlockSize - 1000) + 3 * kHeaderSize
|
||||||
|
+ kHeaderSize + 1,
|
||||||
|
3 * log::kBlockSize,
|
||||||
|
};
|
||||||
|
|
||||||
|
// LogTest::initial_offset_last_record_offsets_ must be defined before this.
|
||||||
|
int LogTest::num_initial_offset_records_ =
|
||||||
|
sizeof(LogTest::initial_offset_last_record_offsets_)/sizeof(uint64_t);
|
||||||
|
|
||||||
TEST(LogTest, Empty) {
|
TEST(LogTest, Empty) {
|
||||||
ASSERT_EQ("EOF", Read());
|
ASSERT_EQ("EOF", Read());
|
||||||
@ -553,6 +571,10 @@ TEST(LogTest, ReadFourthStart) {
|
|||||||
3);
|
3);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(LogTest, ReadInitialOffsetIntoBlockPadding) {
|
||||||
|
CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
|
||||||
|
}
|
||||||
|
|
||||||
TEST(LogTest, ReadEnd) {
|
TEST(LogTest, ReadEnd) {
|
||||||
CheckOffsetPastEndReturnsNoRecords(0);
|
CheckOffsetPastEndReturnsNoRecords(0);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user