leveldb/table/table_builder.cc

281 lines
8.4 KiB
C++
Raw Normal View History

// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#include "leveldb/table_builder.h"
#include <cassert>
#include "leveldb/comparator.h"
#include "leveldb/env.h"
#include "leveldb/filter_policy.h"
#include "leveldb/options.h"
#include "table/block_builder.h"
#include "table/filter_block.h"
#include "table/format.h"
#include "util/coding.h"
#include "util/crc32c.h"
namespace leveldb {
struct TableBuilder::Rep {
Rep(const Options& opt, WritableFile* f)
: options(opt),
index_block_options(opt),
file(f),
offset(0),
data_block(&options),
index_block(&index_block_options),
num_entries(0),
closed(false),
filter_block(opt.filter_policy == nullptr
? nullptr
: new FilterBlockBuilder(opt.filter_policy)),
pending_index_entry(false) {
index_block_options.block_restart_interval = 1;
}
Options options;
Options index_block_options;
WritableFile* file;
uint64_t offset;
Status status;
BlockBuilder data_block;
BlockBuilder index_block;
std::string last_key;
int64_t num_entries;
bool closed; // Either Finish() or Abandon() has been called.
FilterBlockBuilder* filter_block;
// We do not emit the index entry for a block until we have seen the
// first key for the next data block. This allows us to use shorter
// keys in the index block. For example, consider a block boundary
// between the keys "the quick brown fox" and "the who". We can use
// "the r" as the key for the index block entry since it is >= all
// entries in the first block and < all entries in subsequent
// blocks.
//
// Invariant: r->pending_index_entry is true only if data_block is empty.
bool pending_index_entry;
BlockHandle pending_handle; // Handle to add to index block
std::string compressed_output;
};
TableBuilder::TableBuilder(const Options& options, WritableFile* file)
: rep_(new Rep(options, file)) {
if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0);
}
}
TableBuilder::~TableBuilder() {
assert(rep_->closed); // Catch errors where caller forgot to call Finish()
delete rep_->filter_block;
delete rep_;
}
Status TableBuilder::ChangeOptions(const Options& options) {
// Note: if more fields are added to Options, update
// this function to catch changes that should not be allowed to
// change in the middle of building a Table.
if (options.comparator != rep_->options.comparator) {
return Status::InvalidArgument("changing comparator while building table");
}
// Note that any live BlockBuilders point to rep_->options and therefore
// will automatically pick up the updated options.
rep_->options = options;
rep_->index_block_options = options;
rep_->index_block_options.block_restart_interval = 1;
return Status::OK();
}
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
if (r->pending_index_entry) {
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
}
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);
const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush();
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
}
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// File format contains a sequence of blocks where each block has:
// block_data: uint8[n]
// type: uint8
// crc: uint32
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();
Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
break;
case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
Add support for Zstd-based compression in LevelDB. This change implements support for Zstd-based compression in LevelDB. Building up from the Snappy compression (which has been supported since inception), this change adds Zstd as an alternate compression algorithm. We are implementing this to provide alternative options for users who might have different performance and efficiency requirements. For instance, the Zstandard website (https://facebook.github.io/zstd/) claims that the Zstd algorithm can achieve around 30% higher compression ratios than Snappy, with relatively smaller (~10%) slowdowns in de/compression speeds. Benchmarking results: $ blaze-bin/third_party/leveldb/db_bench LevelDB: version 1.23 Date: Thu Feb 2 18:50:06 2023 CPU: 56 * Intel(R) Xeon(R) CPU E5-2690 v4 @ 2.60GHz CPUCache: 35840 KB Keys: 16 bytes each Values: 100 bytes each (50 bytes after compression) Entries: 1000000 RawSize: 110.6 MB (estimated) FileSize: 62.9 MB (estimated) ------------------------------------------------ fillseq : 2.613 micros/op; 42.3 MB/s fillsync : 3924.432 micros/op; 0.0 MB/s (1000 ops) fillrandom : 3.609 micros/op; 30.7 MB/s overwrite : 4.508 micros/op; 24.5 MB/s readrandom : 6.136 micros/op; (864322 of 1000000 found) readrandom : 5.446 micros/op; (864083 of 1000000 found) readseq : 0.180 micros/op; 613.3 MB/s readreverse : 0.321 micros/op; 344.7 MB/s compact : 827043.000 micros/op; readrandom : 4.603 micros/op; (864105 of 1000000 found) readseq : 0.169 micros/op; 656.3 MB/s readreverse : 0.315 micros/op; 350.8 MB/s fill100K : 854.009 micros/op; 111.7 MB/s (1000 ops) crc32c : 1.227 micros/op; 3184.0 MB/s (4K per op) snappycomp : 3.610 micros/op; 1081.9 MB/s (output: 55.2%) snappyuncomp : 0.691 micros/op; 5656.3 MB/s zstdcomp : 15.731 micros/op; 248.3 MB/s (output: 44.1%) zstduncomp : 4.218 micros/op; 926.2 MB/s PiperOrigin-RevId: 509957778
2023-02-16 08:04:43 +08:00
case kZstdCompression: {
std::string* compressed = &r->compressed_output;
if (port::Zstd_Compress(r->options.zstd_compression_level, raw.data(),
raw.size(), compressed) &&
Add support for Zstd-based compression in LevelDB. This change implements support for Zstd-based compression in LevelDB. Building up from the Snappy compression (which has been supported since inception), this change adds Zstd as an alternate compression algorithm. We are implementing this to provide alternative options for users who might have different performance and efficiency requirements. For instance, the Zstandard website (https://facebook.github.io/zstd/) claims that the Zstd algorithm can achieve around 30% higher compression ratios than Snappy, with relatively smaller (~10%) slowdowns in de/compression speeds. Benchmarking results: $ blaze-bin/third_party/leveldb/db_bench LevelDB: version 1.23 Date: Thu Feb 2 18:50:06 2023 CPU: 56 * Intel(R) Xeon(R) CPU E5-2690 v4 @ 2.60GHz CPUCache: 35840 KB Keys: 16 bytes each Values: 100 bytes each (50 bytes after compression) Entries: 1000000 RawSize: 110.6 MB (estimated) FileSize: 62.9 MB (estimated) ------------------------------------------------ fillseq : 2.613 micros/op; 42.3 MB/s fillsync : 3924.432 micros/op; 0.0 MB/s (1000 ops) fillrandom : 3.609 micros/op; 30.7 MB/s overwrite : 4.508 micros/op; 24.5 MB/s readrandom : 6.136 micros/op; (864322 of 1000000 found) readrandom : 5.446 micros/op; (864083 of 1000000 found) readseq : 0.180 micros/op; 613.3 MB/s readreverse : 0.321 micros/op; 344.7 MB/s compact : 827043.000 micros/op; readrandom : 4.603 micros/op; (864105 of 1000000 found) readseq : 0.169 micros/op; 656.3 MB/s readreverse : 0.315 micros/op; 350.8 MB/s fill100K : 854.009 micros/op; 111.7 MB/s (1000 ops) crc32c : 1.227 micros/op; 3184.0 MB/s (4K per op) snappycomp : 3.610 micros/op; 1081.9 MB/s (output: 55.2%) snappyuncomp : 0.691 micros/op; 5656.3 MB/s zstdcomp : 15.731 micros/op; 248.3 MB/s (output: 44.1%) zstduncomp : 4.218 micros/op; 926.2 MB/s PiperOrigin-RevId: 509957778
2023-02-16 08:04:43 +08:00
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Zstd not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
block->Reset();
}
void TableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, BlockHandle* handle) {
Rep* r = rep_;
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
}
Status TableBuilder::status() const { return rep_->status; }
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
&filter_block_handle);
}
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}
// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
// Write index block
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);
}
// Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}
void TableBuilder::Abandon() {
Rep* r = rep_;
assert(!r->closed);
r->closed = true;
}
uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; }
uint64_t TableBuilder::FileSize() const { return rep_->offset; }
} // namespace leveldb