From 85584d497e7b354853b72f450683d59fcf6b9c5c Mon Sep 17 00:00:00 2001 From: Sanjay Ghemawat Date: Tue, 17 Apr 2012 08:36:46 -0700 Subject: [PATCH] Added bloom filter support. In particular, we add a new FilterPolicy class. An instance of this class can be supplied in Options when opening a database. If supplied, the instance is used to generate summaries of keys (e.g., a bloom filter) which are placed in sstables. These summaries are consulted by DB::Get() so we can avoid reading sstable blocks that are guaranteed to not contain the key we are looking for. This change provides one implementation of FilterPolicy based on bloom filters. Other changes: - Updated version number to 1.4. - Some build tweaks. - C binding for CompactRange. - A few more benchmarks: deleteseq, deleterandom, readmissing, seekrandom. - Minor .gitignore update. --- .gitignore | 3 + Makefile | 14 +- build_detect_platform | 38 +- db/c.cc | 110 ++++ db/c_test.c | 77 +++ db/db_bench.cc | 99 ++- db/db_impl.cc | 12 +- db/db_impl.h | 2 + db/db_test.cc | 1034 ++++++++++++++++++------------- db/dbformat.cc | 20 + db/dbformat.h | 12 + db/repair.cc | 4 +- db/table_cache.cc | 58 +- db/table_cache.h | 11 + db/version_set.cc | 83 +-- doc/index.html | 63 ++ doc/table_format.txt | 41 ++ include/leveldb/c.h | 29 + include/leveldb/db.h | 2 +- include/leveldb/filter_policy.h | 70 +++ include/leveldb/options.h | 8 + include/leveldb/table.h | 15 + include/leveldb/table_builder.h | 1 + port/port_android.h | 3 + table/block.cc | 9 +- table/block.h | 5 +- table/filter_block.cc | 111 ++++ table/filter_block.h | 68 ++ table/filter_block_test.cc | 128 ++++ table/format.cc | 23 +- table/format.h | 16 +- table/table.cc | 116 +++- table/table_builder.cc | 55 +- table/table_test.cc | 32 +- util/bloom.cc | 95 +++ util/bloom_test.cc | 159 +++++ util/filter_policy.cc | 11 + util/options.cc | 3 +- 38 files changed, 2070 insertions(+), 570 deletions(-) mode change 100644 => 100755 build_detect_platform create mode 100644 include/leveldb/filter_policy.h create mode 100644 table/filter_block.cc create mode 100644 table/filter_block.h create mode 100644 table/filter_block_test.cc create mode 100644 util/bloom.cc create mode 100644 util/bloom_test.cc create mode 100644 util/filter_policy.cc diff --git a/.gitignore b/.gitignore index 46769e0..f030430 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ build_config.mk *.a *.o +*.dylib* +*.so +*.so.* *_test db_bench diff --git a/Makefile b/Makefile index 354654d..b961ba1 100644 --- a/Makefile +++ b/Makefile @@ -17,8 +17,8 @@ OPT ?= -O2 -DNDEBUG # (A) Production use (optimized mode) #----------------------------------------------- # detect what platform we're building on -$(shell sh ./build_detect_platform) -# this file is generated by build_detect_platform to set build flags and sources +$(shell ./build_detect_platform build_config.mk) +# this file is generated by the previous line to set build flags and sources include build_config.mk CFLAGS += -I. -I./include $(PLATFORM_CCFLAGS) $(OPT) @@ -34,6 +34,7 @@ TESTHARNESS = ./util/testharness.o $(TESTUTIL) TESTS = \ arena_test \ + bloom_test \ c_test \ cache_test \ coding_test \ @@ -43,6 +44,7 @@ TESTS = \ dbformat_test \ env_test \ filename_test \ + filter_block_test \ log_test \ memenv_test \ skiplist_test \ @@ -63,7 +65,7 @@ default: all ifneq ($(PLATFORM_SHARED_EXT),) # Update db.h if you change these. SHARED_MAJOR = 1 -SHARED_MINOR = 3 +SHARED_MINOR = 4 SHARED1 = libleveldb.$(PLATFORM_SHARED_EXT) SHARED2 = $(SHARED1).$(SHARED_MAJOR) SHARED3 = $(SHARED1).$(SHARED_MAJOR).$(SHARED_MINOR) @@ -101,6 +103,9 @@ db_bench_tree_db: doc/bench/db_bench_tree_db.o $(LIBOBJECTS) $(TESTUTIL) arena_test: util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) util/arena_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) +bloom_test: util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) util/bloom_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) + c_test: db/c_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/c_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) @@ -128,6 +133,9 @@ env_test: util/env_test.o $(LIBOBJECTS) $(TESTHARNESS) filename_test: db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/filename_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) +filter_block_test: table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) + $(CXX) table/filter_block_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) + log_test: db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CXX) db/log_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ $(LDFLAGS) diff --git a/build_detect_platform b/build_detect_platform old mode 100644 new mode 100755 index 64fcaef..b71bf02 --- a/build_detect_platform +++ b/build_detect_platform @@ -1,9 +1,9 @@ #!/bin/sh # -# Detects OS we're compiling on and generates build_config.mk, -# which in turn gets read while processing Makefile. +# Detects OS we're compiling on and outputs a file specified by the first +# argument, which in turn gets read while processing Makefile. # -# build_config.mk will set the following variables: +# The output will set the following variables: # PLATFORM_LDFLAGS Linker flags # PLATFORM_SHARED_EXT Extension for shared libraries # PLATFORM_SHARED_LDFLAGS Flags for building shared library @@ -13,11 +13,15 @@ # -DLEVELDB_PLATFORM_POSIX if cstdatomic is present # -DLEVELDB_PLATFORM_NOATOMIC if it is not -SCRIPT_DIR=`dirname $0` +OUTPUT=$1 +if test -z "$OUTPUT"; then + echo "usage: $0 " + exit 1 +fi -# Delete existing build_config.mk -rm -f build_config.mk -touch build_config.mk +# Delete existing output, if it exists +rm -f $OUTPUT +touch $OUTPUT if test -z "$CXX"; then CXX=g++ @@ -96,7 +100,7 @@ esac # except for the test and benchmark files. By default, find will output a list # of all files matching either rule, so we need to append -print to make the # prune take effect. -DIRS="$SCRIPT_DIR/util $SCRIPT_DIR/db $SCRIPT_DIR/table" +DIRS="util db table" set -f # temporarily disable globbing so that our patterns aren't expanded PRUNE_TEST="-name *test*.cc -prune" PRUNE_BENCH="-name *_bench.cc -prune" @@ -105,8 +109,8 @@ set +f # re-enable globbing # The sources consist of the portable files, plus the platform-specific port # file. -echo "SOURCES=$PORTABLE_FILES $PORT_FILE" >> build_config.mk -echo "MEMENV_SOURCES=helpers/memenv/memenv.cc" >> build_config.mk +echo "SOURCES=$PORTABLE_FILES $PORT_FILE" >> $OUTPUT +echo "MEMENV_SOURCES=helpers/memenv/memenv.cc" >> $OUTPUT if [ "$PLATFORM" = "OS_ANDROID_CROSSCOMPILE" ]; then # Cross-compiling; do not try any compilation tests. @@ -147,10 +151,10 @@ fi PLATFORM_CCFLAGS="$PLATFORM_CCFLAGS $COMMON_FLAGS" PLATFORM_CXXFLAGS="$PLATFORM_CXXFLAGS $COMMON_FLAGS" -echo "PLATFORM=$PLATFORM" >> build_config.mk -echo "PLATFORM_LDFLAGS=$PLATFORM_LDFLAGS" >> build_config.mk -echo "PLATFORM_CCFLAGS=$PLATFORM_CCFLAGS" >> build_config.mk -echo "PLATFORM_CXXFLAGS=$PLATFORM_CXXFLAGS" >> build_config.mk -echo "PLATFORM_SHARED_CFLAGS=$PLATFORM_SHARED_CFLAGS" >> build_config.mk -echo "PLATFORM_SHARED_EXT=$PLATFORM_SHARED_EXT" >> build_config.mk -echo "PLATFORM_SHARED_LDFLAGS=$PLATFORM_SHARED_LDFLAGS" >> build_config.mk +echo "PLATFORM=$PLATFORM" >> $OUTPUT +echo "PLATFORM_LDFLAGS=$PLATFORM_LDFLAGS" >> $OUTPUT +echo "PLATFORM_CCFLAGS=$PLATFORM_CCFLAGS" >> $OUTPUT +echo "PLATFORM_CXXFLAGS=$PLATFORM_CXXFLAGS" >> $OUTPUT +echo "PLATFORM_SHARED_CFLAGS=$PLATFORM_SHARED_CFLAGS" >> $OUTPUT +echo "PLATFORM_SHARED_EXT=$PLATFORM_SHARED_EXT" >> $OUTPUT +echo "PLATFORM_SHARED_LDFLAGS=$PLATFORM_SHARED_LDFLAGS" >> $OUTPUT diff --git a/db/c.cc b/db/c.cc index 038e5c0..2dde400 100644 --- a/db/c.cc +++ b/db/c.cc @@ -10,6 +10,7 @@ #include "leveldb/comparator.h" #include "leveldb/db.h" #include "leveldb/env.h" +#include "leveldb/filter_policy.h" #include "leveldb/iterator.h" #include "leveldb/options.h" #include "leveldb/status.h" @@ -21,8 +22,10 @@ using leveldb::CompressionType; using leveldb::DB; using leveldb::Env; using leveldb::FileLock; +using leveldb::FilterPolicy; using leveldb::Iterator; using leveldb::Logger; +using leveldb::NewBloomFilterPolicy; using leveldb::NewLRUCache; using leveldb::Options; using leveldb::RandomAccessFile; @@ -78,6 +81,47 @@ struct leveldb_comparator_t : public Comparator { virtual void FindShortSuccessor(std::string* key) const { } }; +struct leveldb_filterpolicy_t : public FilterPolicy { + void* state_; + void (*destructor_)(void*); + const char* (*name_)(void*); + char* (*create_)( + void*, + const char* const* key_array, const size_t* key_length_array, + int num_keys, + size_t* filter_length); + unsigned char (*key_match_)( + void*, + const char* key, size_t length, + const char* filter, size_t filter_length); + + virtual ~leveldb_filterpolicy_t() { + (*destructor_)(state_); + } + + virtual const char* Name() const { + return (*name_)(state_); + } + + virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const { + std::vector key_pointers(n); + std::vector key_sizes(n); + for (int i = 0; i < n; i++) { + key_pointers[i] = keys[i].data(); + key_sizes[i] = keys[i].size(); + } + size_t len; + char* filter = (*create_)(state_, &key_pointers[0], &key_sizes[0], n, &len); + dst->append(filter, len); + free(filter); + } + + virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const { + return (*key_match_)(state_, key.data(), key.size(), + filter.data(), filter.size()); + } +}; + struct leveldb_env_t { Env* rep; bool is_default; @@ -218,6 +262,17 @@ void leveldb_approximate_sizes( delete[] ranges; } +void leveldb_compact_range( + leveldb_t* db, + const char* start_key, size_t start_key_len, + const char* limit_key, size_t limit_key_len) { + Slice a, b; + db->rep->CompactRange( + // Pass NULL Slice if corresponding "const char*" is NULL + (start_key ? (a = Slice(start_key, start_key_len), &a) : NULL), + (limit_key ? (b = Slice(limit_key, limit_key_len), &b) : NULL)); +} + void leveldb_destroy_db( const leveldb_options_t* options, const char* name, @@ -340,6 +395,12 @@ void leveldb_options_set_comparator( opt->rep.comparator = cmp; } +void leveldb_options_set_filter_policy( + leveldb_options_t* opt, + leveldb_filterpolicy_t* policy) { + opt->rep.filter_policy = policy; +} + void leveldb_options_set_create_if_missing( leveldb_options_t* opt, unsigned char v) { opt->rep.create_if_missing = v; @@ -407,6 +468,55 @@ void leveldb_comparator_destroy(leveldb_comparator_t* cmp) { delete cmp; } +leveldb_filterpolicy_t* leveldb_filterpolicy_create( + void* state, + void (*destructor)(void*), + char* (*create_filter)( + void*, + const char* const* key_array, const size_t* key_length_array, + int num_keys, + size_t* filter_length), + unsigned char (*key_may_match)( + void*, + const char* key, size_t length, + const char* filter, size_t filter_length), + const char* (*name)(void*)) { + leveldb_filterpolicy_t* result = new leveldb_filterpolicy_t; + result->state_ = state; + result->destructor_ = destructor; + result->create_ = create_filter; + result->key_match_ = key_may_match; + result->name_ = name; + return result; +} + +void leveldb_filterpolicy_destroy(leveldb_filterpolicy_t* filter) { + delete filter; +} + +leveldb_filterpolicy_t* leveldb_filterpolicy_create_bloom(int bits_per_key) { + // Make a leveldb_filterpolicy_t, but override all of its methods so + // they delegate to a NewBloomFilterPolicy() instead of user + // supplied C functions. + struct Wrapper : public leveldb_filterpolicy_t { + const FilterPolicy* rep_; + ~Wrapper() { delete rep_; } + const char* Name() const { return rep_->Name(); } + void CreateFilter(const Slice* keys, int n, std::string* dst) const { + return rep_->CreateFilter(keys, n, dst); + } + bool KeyMayMatch(const Slice& key, const Slice& filter) const { + return rep_->KeyMayMatch(key, filter); + } + static void DoNothing(void*) { } + }; + Wrapper* wrapper = new Wrapper; + wrapper->rep_ = NewBloomFilterPolicy(bits_per_key); + wrapper->state_ = NULL; + wrapper->destructor_ = &Wrapper::DoNothing; + return wrapper; +} + leveldb_readoptions_t* leveldb_readoptions_create() { return new leveldb_readoptions_t; } diff --git a/db/c_test.c b/db/c_test.c index 9fef325..12b4424 100644 --- a/db/c_test.c +++ b/db/c_test.c @@ -122,6 +122,31 @@ static const char* CmpName(void* arg) { return "foo"; } +// Custom filter policy +static unsigned char fake_filter_result = 1; +static void FilterDestroy(void* arg) { } +static const char* FilterName(void* arg) { + return "TestFilter"; +} +static char* FilterCreate( + void* arg, + const char* const* key_array, const size_t* key_length_array, + int num_keys, + size_t* filter_length) { + *filter_length = 4; + char* result = malloc(4); + memcpy(result, "fake", 4); + return result; +} +unsigned char FilterKeyMatch( + void* arg, + const char* key, size_t length, + const char* filter, size_t filter_length) { + CheckCondition(filter_length == 4); + CheckCondition(memcmp(filter, "fake", 4) == 0); + return fake_filter_result; +} + int main(int argc, char** argv) { leveldb_t* db; leveldb_comparator_t* cmp; @@ -131,6 +156,7 @@ int main(int argc, char** argv) { leveldb_readoptions_t* roptions; leveldb_writeoptions_t* woptions; char* err = NULL; + int run = -1; snprintf(dbname, sizeof(dbname), "/tmp/leveldb_c_test-%d", ((int) geteuid())); @@ -180,6 +206,14 @@ int main(int argc, char** argv) { CheckNoError(err); CheckGet(db, roptions, "foo", "hello"); + StartPhase("compactall"); + leveldb_compact_range(db, NULL, 0, NULL, 0); + CheckGet(db, roptions, "foo", "hello"); + + StartPhase("compactrange"); + leveldb_compact_range(db, "a", 1, "z", 1); + CheckGet(db, roptions, "foo", "hello"); + StartPhase("writebatch"); { leveldb_writebatch_t* wb = leveldb_writebatch_create(); @@ -279,6 +313,49 @@ int main(int argc, char** argv) { CheckGet(db, roptions, "foo", NULL); CheckGet(db, roptions, "bar", NULL); CheckGet(db, roptions, "box", "c"); + leveldb_options_set_create_if_missing(options, 1); + leveldb_options_set_error_if_exists(options, 1); + } + + StartPhase("filter"); + for (run = 0; run < 2; run++) { + // First run uses custom filter, second run uses bloom filter + CheckNoError(err); + leveldb_filterpolicy_t* policy; + if (run == 0) { + policy = leveldb_filterpolicy_create( + NULL, FilterDestroy, FilterCreate, FilterKeyMatch, FilterName); + } else { + policy = leveldb_filterpolicy_create_bloom(10); + } + + // Create new database + leveldb_close(db); + leveldb_destroy_db(options, dbname, &err); + leveldb_options_set_filter_policy(options, policy); + db = leveldb_open(options, dbname, &err); + CheckNoError(err); + leveldb_put(db, woptions, "foo", 3, "foovalue", 8, &err); + CheckNoError(err); + leveldb_put(db, woptions, "bar", 3, "barvalue", 8, &err); + CheckNoError(err); + leveldb_compact_range(db, NULL, 0, NULL, 0); + + fake_filter_result = 1; + CheckGet(db, roptions, "foo", "foovalue"); + CheckGet(db, roptions, "bar", "barvalue"); + if (phase == 0) { + // Must not find value when custom filter returns false + fake_filter_result = 0; + CheckGet(db, roptions, "foo", NULL); + CheckGet(db, roptions, "bar", NULL); + fake_filter_result = 1; + + CheckGet(db, roptions, "foo", "foovalue"); + CheckGet(db, roptions, "bar", "barvalue"); + } + leveldb_options_set_filter_policy(options, NULL); + leveldb_filterpolicy_destroy(policy); } StartPhase("cleanup"); diff --git a/db/db_bench.cc b/db/db_bench.cc index bbfd618..b0c3995 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -25,15 +25,20 @@ // overwrite -- overwrite N values in random key order in async mode // fillsync -- write N/100 values in random key order in sync mode // fill100K -- write N/1000 100K values in random order in async mode +// deleteseq -- delete N keys in sequential order +// deleterandom -- delete N keys in random order // readseq -- read N times sequentially // readreverse -- read N times in reverse order // readrandom -- read N times in random order +// readmissing -- read N missing keys in random order // readhot -- read N times in random order from 1% section of DB +// seekrandom -- N random seeks // crc32c -- repeated crc32c of 4K of data // acquireload -- load N*1000 times // Meta operations: // compact -- Compact the entire DB // stats -- Print DB stats +// sstables -- Print sstable info // heapprofile -- Dump a heap profile (if supported by this port) static const char* FLAGS_benchmarks = "fillseq," @@ -85,6 +90,10 @@ static int FLAGS_cache_size = -1; // Maximum number of files to keep open at the same time (use default if == 0) static int FLAGS_open_files = 0; +// Bloom filter bits per key. +// Negative means use default settings. +static int FLAGS_bloom_bits = -1; + // If true, do not destroy the existing database. If you set this // flag and also specify a benchmark that wants a fresh database, that // benchmark will fail. @@ -293,6 +302,7 @@ struct ThreadState { class Benchmark { private: Cache* cache_; + const FilterPolicy* filter_policy_; DB* db_; int num_; int value_size_; @@ -378,6 +388,9 @@ class Benchmark { public: Benchmark() : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : NULL), + filter_policy_(FLAGS_bloom_bits >= 0 + ? NewBloomFilterPolicy(FLAGS_bloom_bits) + : NULL), db_(NULL), num_(FLAGS_num), value_size_(FLAGS_value_size), @@ -399,6 +412,7 @@ class Benchmark { ~Benchmark() { delete db_; delete cache_; + delete filter_policy_; } void Run() { @@ -457,11 +471,19 @@ class Benchmark { method = &Benchmark::ReadReverse; } else if (name == Slice("readrandom")) { method = &Benchmark::ReadRandom; + } else if (name == Slice("readmissing")) { + method = &Benchmark::ReadMissing; + } else if (name == Slice("seekrandom")) { + method = &Benchmark::SeekRandom; } else if (name == Slice("readhot")) { method = &Benchmark::ReadHot; } else if (name == Slice("readrandomsmall")) { reads_ /= 1000; method = &Benchmark::ReadRandom; + } else if (name == Slice("deleteseq")) { + method = &Benchmark::DeleteSeq; + } else if (name == Slice("deleterandom")) { + method = &Benchmark::DeleteRandom; } else if (name == Slice("readwhilewriting")) { num_threads++; // Add extra thread for writing method = &Benchmark::ReadWhileWriting; @@ -478,7 +500,9 @@ class Benchmark { } else if (name == Slice("heapprofile")) { HeapProfile(); } else if (name == Slice("stats")) { - PrintStats(); + PrintStats("leveldb.stats"); + } else if (name == Slice("sstables")) { + PrintStats("leveldb.sstables"); } else { if (name != Slice()) { // No error message for empty name fprintf(stderr, "unknown benchmark '%s'\n", name.ToString().c_str()); @@ -669,6 +693,7 @@ class Benchmark { options.create_if_missing = !FLAGS_use_existing_db; options.block_cache = cache_; options.write_buffer_size = FLAGS_write_buffer_size; + options.filter_policy = filter_policy_; Status s = DB::Open(options, FLAGS_db, &db_); if (!s.ok()) { fprintf(stderr, "open error: %s\n", s.ToString().c_str()); @@ -743,10 +768,28 @@ class Benchmark { void ReadRandom(ThreadState* thread) { ReadOptions options; std::string value; + int found = 0; for (int i = 0; i < reads_; i++) { char key[100]; const int k = thread->rand.Next() % FLAGS_num; snprintf(key, sizeof(key), "%016d", k); + if (db_->Get(options, key, &value).ok()) { + found++; + } + thread->stats.FinishedSingleOp(); + } + char msg[100]; + snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void ReadMissing(ThreadState* thread) { + ReadOptions options; + std::string value; + for (int i = 0; i < reads_; i++) { + char key[100]; + const int k = thread->rand.Next() % FLAGS_num; + snprintf(key, sizeof(key), "%016d.", k); db_->Get(options, key, &value); thread->stats.FinishedSingleOp(); } @@ -765,6 +808,54 @@ class Benchmark { } } + void SeekRandom(ThreadState* thread) { + ReadOptions options; + std::string value; + int found = 0; + for (int i = 0; i < reads_; i++) { + Iterator* iter = db_->NewIterator(options); + char key[100]; + const int k = thread->rand.Next() % FLAGS_num; + snprintf(key, sizeof(key), "%016d", k); + iter->Seek(key); + if (iter->Valid() && iter->key() == key) found++; + delete iter; + thread->stats.FinishedSingleOp(); + } + char msg[100]; + snprintf(msg, sizeof(msg), "(%d of %d found)", found, num_); + thread->stats.AddMessage(msg); + } + + void DoDelete(ThreadState* thread, bool seq) { + RandomGenerator gen; + WriteBatch batch; + Status s; + for (int i = 0; i < num_; i += entries_per_batch_) { + batch.Clear(); + for (int j = 0; j < entries_per_batch_; j++) { + const int k = seq ? i+j : (thread->rand.Next() % FLAGS_num); + char key[100]; + snprintf(key, sizeof(key), "%016d", k); + batch.Delete(key); + thread->stats.FinishedSingleOp(); + } + s = db_->Write(write_options_, &batch); + if (!s.ok()) { + fprintf(stderr, "del error: %s\n", s.ToString().c_str()); + exit(1); + } + } + } + + void DeleteSeq(ThreadState* thread) { + DoDelete(thread, true); + } + + void DeleteRandom(ThreadState* thread) { + DoDelete(thread, false); + } + void ReadWhileWriting(ThreadState* thread) { if (thread->tid > 0) { ReadRandom(thread); @@ -799,9 +890,9 @@ class Benchmark { db_->CompactRange(NULL, NULL); } - void PrintStats() { + void PrintStats(const char* key) { std::string stats; - if (!db_->GetProperty("leveldb.stats", &stats)) { + if (!db_->GetProperty(key, &stats)) { stats = "(failed)"; } fprintf(stdout, "\n%s\n", stats.c_str()); @@ -861,6 +952,8 @@ int main(int argc, char** argv) { FLAGS_write_buffer_size = n; } else if (sscanf(argv[i], "--cache_size=%d%c", &n, &junk) == 1) { FLAGS_cache_size = n; + } else if (sscanf(argv[i], "--bloom_bits=%d%c", &n, &junk) == 1) { + FLAGS_bloom_bits = n; } else if (sscanf(argv[i], "--open_files=%d%c", &n, &junk) == 1) { FLAGS_open_files = n; } else if (strncmp(argv[i], "--db=", 5) == 0) { diff --git a/db/db_impl.cc b/db/db_impl.cc index 88d17e7..c9c9023 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -87,12 +87,14 @@ static void ClipToRange(T* ptr, V minvalue, V maxvalue) { } Options SanitizeOptions(const std::string& dbname, const InternalKeyComparator* icmp, + const InternalFilterPolicy* ipolicy, const Options& src) { Options result = src; result.comparator = icmp; - ClipToRange(&result.max_open_files, 20, 50000); - ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); - ClipToRange(&result.block_size, 1<<10, 4<<20); + result.filter_policy = (src.filter_policy != NULL) ? ipolicy : NULL; + ClipToRange(&result.max_open_files, 20, 50000); + ClipToRange(&result.write_buffer_size, 64<<10, 1<<30); + ClipToRange(&result.block_size, 1<<10, 4<<20); if (result.info_log == NULL) { // Open a log file in the same directory as the db src.env->CreateDir(dbname); // In case it does not exist @@ -112,7 +114,9 @@ Options SanitizeOptions(const std::string& dbname, DBImpl::DBImpl(const Options& options, const std::string& dbname) : env_(options.env), internal_comparator_(options.comparator), - options_(SanitizeOptions(dbname, &internal_comparator_, options)), + internal_filter_policy_(options.filter_policy), + options_(SanitizeOptions( + dbname, &internal_comparator_, &internal_filter_policy_, options)), owns_info_log_(options_.info_log != options.info_log), owns_cache_(options_.block_cache != options.block_cache), dbname_(dbname), diff --git a/db/db_impl.h b/db/db_impl.h index e665c0e..2f8b523 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -105,6 +105,7 @@ class DBImpl : public DB { // Constant after construction Env* const env_; const InternalKeyComparator internal_comparator_; + const InternalFilterPolicy internal_filter_policy_; const Options options_; // options_.comparator == &internal_comparator_ bool owns_info_log_; bool owns_cache_; @@ -185,6 +186,7 @@ class DBImpl : public DB { // it is not equal to src.info_log. extern Options SanitizeOptions(const std::string& db, const InternalKeyComparator* icmp, + const InternalFilterPolicy* ipolicy, const Options& src); } // namespace leveldb diff --git a/db/db_test.cc b/db/db_test.cc index 8318885..ee10807 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -3,12 +3,15 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "leveldb/db.h" +#include "leveldb/filter_policy.h" #include "db/db_impl.h" #include "db/filename.h" #include "db/version_set.h" #include "db/write_batch_internal.h" +#include "leveldb/cache.h" #include "leveldb/env.h" #include "leveldb/table.h" +#include "util/hash.h" #include "util/logging.h" #include "util/mutexlock.h" #include "util/testharness.h" @@ -22,6 +25,28 @@ static std::string RandomString(Random* rnd, int len) { return r; } +namespace { +class AtomicCounter { + private: + port::Mutex mu_; + int count_; + public: + AtomicCounter() : count_(0) { } + void Increment() { + MutexLock l(&mu_); + count_++; + } + int Read() { + MutexLock l(&mu_); + return count_; + } + void Reset() { + MutexLock l(&mu_); + count_ = 0; + } +}; +} + // Special Env used to delay background operations class SpecialEnv : public EnvWrapper { public: @@ -31,9 +56,13 @@ class SpecialEnv : public EnvWrapper { // Simulate no-space errors while this pointer is non-NULL. port::AtomicPointer no_space_; + bool count_random_reads_; + AtomicCounter random_read_counter_; + explicit SpecialEnv(Env* base) : EnvWrapper(base) { delay_sstable_sync_.Release_Store(NULL); no_space_.Release_Store(NULL); + count_random_reads_ = false; } Status NewWritableFile(const std::string& f, WritableFile** r) { @@ -74,9 +103,44 @@ class SpecialEnv : public EnvWrapper { } return s; } + + Status NewRandomAccessFile(const std::string& f, RandomAccessFile** r) { + class CountingFile : public RandomAccessFile { + private: + RandomAccessFile* target_; + AtomicCounter* counter_; + public: + CountingFile(RandomAccessFile* target, AtomicCounter* counter) + : target_(target), counter_(counter) { + } + virtual ~CountingFile() { delete target_; } + virtual Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + counter_->Increment(); + return target_->Read(offset, n, result, scratch); + } + }; + + Status s = target()->NewRandomAccessFile(f, r); + if (s.ok() && count_random_reads_) { + *r = new CountingFile(*r, &random_read_counter_); + } + return s; + } }; class DBTest { + private: + const FilterPolicy* filter_policy_; + + // Sequence of option configurations to try + enum OptionConfig { + kDefault, + kFilter, + kEnd + }; + int option_config_; + public: std::string dbname_; SpecialEnv* env_; @@ -84,7 +148,9 @@ class DBTest { Options last_options_; - DBTest() : env_(new SpecialEnv(Env::Default())) { + DBTest() : option_config_(kDefault), + env_(new SpecialEnv(Env::Default())) { + filter_policy_ = NewBloomFilterPolicy(10); dbname_ = test::TmpDir() + "/db_test"; DestroyDB(dbname_, Options()); db_ = NULL; @@ -95,6 +161,32 @@ class DBTest { delete db_; DestroyDB(dbname_, Options()); delete env_; + delete filter_policy_; + } + + // Switch to a fresh database with the next option configuration to + // test. Return false if there are no more configurations to test. + bool ChangeOptions() { + if (option_config_ == kEnd) { + return false; + } else { + option_config_++; + DestroyAndReopen(); + return true; + } + } + + // Return the current option configuration. + Options CurrentOptions() { + Options options; + switch (option_config_) { + case kFilter: + options.filter_policy = filter_policy_; + break; + default: + break; + } + return options; } DBImpl* dbfull() { @@ -105,6 +197,11 @@ class DBTest { ASSERT_OK(TryReopen(options)); } + void Close() { + delete db_; + db_ = NULL; + } + void DestroyAndReopen(Options* options = NULL) { delete db_; db_ = NULL; @@ -119,6 +216,7 @@ class DBTest { if (options != NULL) { opts = *options; } else { + opts = CurrentOptions(); opts.create_if_missing = true; } last_options_ = opts; @@ -189,8 +287,7 @@ class DBTest { if (!ParseInternalKey(iter->key(), &ikey)) { result += "CORRUPTED"; } else { - if (last_options_.comparator->Compare( - ikey.user_key, user_key) != 0) { + if (last_options_.comparator->Compare(ikey.user_key, user_key) != 0) { break; } if (!first) { @@ -314,135 +411,155 @@ class DBTest { }; TEST(DBTest, Empty) { - ASSERT_TRUE(db_ != NULL); - ASSERT_EQ("NOT_FOUND", Get("foo")); + do { + ASSERT_TRUE(db_ != NULL); + ASSERT_EQ("NOT_FOUND", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, ReadWrite) { - ASSERT_OK(Put("foo", "v1")); - ASSERT_EQ("v1", Get("foo")); - ASSERT_OK(Put("bar", "v2")); - ASSERT_OK(Put("foo", "v3")); - ASSERT_EQ("v3", Get("foo")); - ASSERT_EQ("v2", Get("bar")); + do { + ASSERT_OK(Put("foo", "v1")); + ASSERT_EQ("v1", Get("foo")); + ASSERT_OK(Put("bar", "v2")); + ASSERT_OK(Put("foo", "v3")); + ASSERT_EQ("v3", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + } while (ChangeOptions()); } TEST(DBTest, PutDeleteGet) { - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1")); - ASSERT_EQ("v1", Get("foo")); - ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2")); - ASSERT_EQ("v2", Get("foo")); - ASSERT_OK(db_->Delete(WriteOptions(), "foo")); - ASSERT_EQ("NOT_FOUND", Get("foo")); + do { + ASSERT_OK(db_->Put(WriteOptions(), "foo", "v1")); + ASSERT_EQ("v1", Get("foo")); + ASSERT_OK(db_->Put(WriteOptions(), "foo", "v2")); + ASSERT_EQ("v2", Get("foo")); + ASSERT_OK(db_->Delete(WriteOptions(), "foo")); + ASSERT_EQ("NOT_FOUND", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, GetFromImmutableLayer) { - Options options; - options.env = env_; - options.write_buffer_size = 100000; // Small write buffer - Reopen(&options); + do { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 100000; // Small write buffer + Reopen(&options); - ASSERT_OK(Put("foo", "v1")); - ASSERT_EQ("v1", Get("foo")); + ASSERT_OK(Put("foo", "v1")); + ASSERT_EQ("v1", Get("foo")); - env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls - Put("k1", std::string(100000, 'x')); // Fill memtable - Put("k2", std::string(100000, 'y')); // Trigger compaction - ASSERT_EQ("v1", Get("foo")); - env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls + env_->delay_sstable_sync_.Release_Store(env_); // Block sync calls + Put("k1", std::string(100000, 'x')); // Fill memtable + Put("k2", std::string(100000, 'y')); // Trigger compaction + ASSERT_EQ("v1", Get("foo")); + env_->delay_sstable_sync_.Release_Store(NULL); // Release sync calls + } while (ChangeOptions()); } TEST(DBTest, GetFromVersions) { - ASSERT_OK(Put("foo", "v1")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("v1", Get("foo")); + do { + ASSERT_OK(Put("foo", "v1")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v1", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, GetSnapshot) { - // Try with both a short key and a long key - for (int i = 0; i < 2; i++) { - std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x'); - ASSERT_OK(Put(key, "v1")); - const Snapshot* s1 = db_->GetSnapshot(); - ASSERT_OK(Put(key, "v2")); - ASSERT_EQ("v2", Get(key)); - ASSERT_EQ("v1", Get(key, s1)); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("v2", Get(key)); - ASSERT_EQ("v1", Get(key, s1)); - db_->ReleaseSnapshot(s1); - } + do { + // Try with both a short key and a long key + for (int i = 0; i < 2; i++) { + std::string key = (i == 0) ? std::string("foo") : std::string(200, 'x'); + ASSERT_OK(Put(key, "v1")); + const Snapshot* s1 = db_->GetSnapshot(); + ASSERT_OK(Put(key, "v2")); + ASSERT_EQ("v2", Get(key)); + ASSERT_EQ("v1", Get(key, s1)); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v2", Get(key)); + ASSERT_EQ("v1", Get(key, s1)); + db_->ReleaseSnapshot(s1); + } + } while (ChangeOptions()); } TEST(DBTest, GetLevel0Ordering) { - // Check that we process level-0 files in correct order. The code - // below generates two level-0 files where the earlier one comes - // before the later one in the level-0 file list since the earlier - // one has a smaller "smallest" key. - ASSERT_OK(Put("bar", "b")); - ASSERT_OK(Put("foo", "v1")); - dbfull()->TEST_CompactMemTable(); - ASSERT_OK(Put("foo", "v2")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("v2", Get("foo")); + do { + // Check that we process level-0 files in correct order. The code + // below generates two level-0 files where the earlier one comes + // before the later one in the level-0 file list since the earlier + // one has a smaller "smallest" key. + ASSERT_OK(Put("bar", "b")); + ASSERT_OK(Put("foo", "v1")); + dbfull()->TEST_CompactMemTable(); + ASSERT_OK(Put("foo", "v2")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v2", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, GetOrderedByLevels) { - ASSERT_OK(Put("foo", "v1")); - Compact("a", "z"); - ASSERT_EQ("v1", Get("foo")); - ASSERT_OK(Put("foo", "v2")); - ASSERT_EQ("v2", Get("foo")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("v2", Get("foo")); + do { + ASSERT_OK(Put("foo", "v1")); + Compact("a", "z"); + ASSERT_EQ("v1", Get("foo")); + ASSERT_OK(Put("foo", "v2")); + ASSERT_EQ("v2", Get("foo")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("v2", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, GetPicksCorrectFile) { - // Arrange to have multiple files in a non-level-0 level. - ASSERT_OK(Put("a", "va")); - Compact("a", "b"); - ASSERT_OK(Put("x", "vx")); - Compact("x", "y"); - ASSERT_OK(Put("f", "vf")); - Compact("f", "g"); - ASSERT_EQ("va", Get("a")); - ASSERT_EQ("vf", Get("f")); - ASSERT_EQ("vx", Get("x")); + do { + // Arrange to have multiple files in a non-level-0 level. + ASSERT_OK(Put("a", "va")); + Compact("a", "b"); + ASSERT_OK(Put("x", "vx")); + Compact("x", "y"); + ASSERT_OK(Put("f", "vf")); + Compact("f", "g"); + ASSERT_EQ("va", Get("a")); + ASSERT_EQ("vf", Get("f")); + ASSERT_EQ("vx", Get("x")); + } while (ChangeOptions()); } TEST(DBTest, GetEncountersEmptyLevel) { - // Arrange for the following to happen: - // * sstable A in level 0 - // * nothing in level 1 - // * sstable B in level 2 - // Then do enough Get() calls to arrange for an automatic compaction - // of sstable A. A bug would cause the compaction to be marked as - // occuring at level 1 (instead of the correct level 0). + do { + // Arrange for the following to happen: + // * sstable A in level 0 + // * nothing in level 1 + // * sstable B in level 2 + // Then do enough Get() calls to arrange for an automatic compaction + // of sstable A. A bug would cause the compaction to be marked as + // occuring at level 1 (instead of the correct level 0). - // Step 1: First place sstables in levels 0 and 2 - int compaction_count = 0; - while (NumTableFilesAtLevel(0) == 0 || - NumTableFilesAtLevel(2) == 0) { - ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2"; - compaction_count++; - Put("a", "begin"); - Put("z", "end"); - dbfull()->TEST_CompactMemTable(); - } + // Step 1: First place sstables in levels 0 and 2 + int compaction_count = 0; + while (NumTableFilesAtLevel(0) == 0 || + NumTableFilesAtLevel(2) == 0) { + ASSERT_LE(compaction_count, 100) << "could not fill levels 0 and 2"; + compaction_count++; + Put("a", "begin"); + Put("z", "end"); + dbfull()->TEST_CompactMemTable(); + } - // Step 2: clear level 1 if necessary. - dbfull()->TEST_CompactRange(1, NULL, NULL); - ASSERT_EQ(NumTableFilesAtLevel(0), 1); - ASSERT_EQ(NumTableFilesAtLevel(1), 0); - ASSERT_EQ(NumTableFilesAtLevel(2), 1); + // Step 2: clear level 1 if necessary. + dbfull()->TEST_CompactRange(1, NULL, NULL); + ASSERT_EQ(NumTableFilesAtLevel(0), 1); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 1); - // Step 3: read until level 0 compaction disappears. - int read_count = 0; - while (NumTableFilesAtLevel(0) > 0) { - ASSERT_LE(read_count, 10000) << "did not trigger level 0 compaction"; - read_count++; - ASSERT_EQ("NOT_FOUND", Get("missing")); - } + // Step 3: read until level 0 compaction disappears. + int read_count = 0; + while (NumTableFilesAtLevel(0) > 0) { + ASSERT_LE(read_count, 10000) << "did not trigger level 0 compaction"; + read_count++; + ASSERT_EQ("NOT_FOUND", Get("missing")); + } + } while (ChangeOptions()); } TEST(DBTest, IterEmpty) { @@ -620,69 +737,77 @@ TEST(DBTest, IterSmallAndLargeMix) { } TEST(DBTest, IterMultiWithDelete) { - ASSERT_OK(Put("a", "va")); - ASSERT_OK(Put("b", "vb")); - ASSERT_OK(Put("c", "vc")); - ASSERT_OK(Delete("b")); - ASSERT_EQ("NOT_FOUND", Get("b")); + do { + ASSERT_OK(Put("a", "va")); + ASSERT_OK(Put("b", "vb")); + ASSERT_OK(Put("c", "vc")); + ASSERT_OK(Delete("b")); + ASSERT_EQ("NOT_FOUND", Get("b")); - Iterator* iter = db_->NewIterator(ReadOptions()); - iter->Seek("c"); - ASSERT_EQ(IterStatus(iter), "c->vc"); - iter->Prev(); - ASSERT_EQ(IterStatus(iter), "a->va"); - delete iter; + Iterator* iter = db_->NewIterator(ReadOptions()); + iter->Seek("c"); + ASSERT_EQ(IterStatus(iter), "c->vc"); + iter->Prev(); + ASSERT_EQ(IterStatus(iter), "a->va"); + delete iter; + } while (ChangeOptions()); } TEST(DBTest, Recover) { - ASSERT_OK(Put("foo", "v1")); - ASSERT_OK(Put("baz", "v5")); + do { + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("baz", "v5")); - Reopen(); - ASSERT_EQ("v1", Get("foo")); + Reopen(); + ASSERT_EQ("v1", Get("foo")); - ASSERT_EQ("v1", Get("foo")); - ASSERT_EQ("v5", Get("baz")); - ASSERT_OK(Put("bar", "v2")); - ASSERT_OK(Put("foo", "v3")); + ASSERT_EQ("v1", Get("foo")); + ASSERT_EQ("v5", Get("baz")); + ASSERT_OK(Put("bar", "v2")); + ASSERT_OK(Put("foo", "v3")); - Reopen(); - ASSERT_EQ("v3", Get("foo")); - ASSERT_OK(Put("foo", "v4")); - ASSERT_EQ("v4", Get("foo")); - ASSERT_EQ("v2", Get("bar")); - ASSERT_EQ("v5", Get("baz")); + Reopen(); + ASSERT_EQ("v3", Get("foo")); + ASSERT_OK(Put("foo", "v4")); + ASSERT_EQ("v4", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + ASSERT_EQ("v5", Get("baz")); + } while (ChangeOptions()); } TEST(DBTest, RecoveryWithEmptyLog) { - ASSERT_OK(Put("foo", "v1")); - ASSERT_OK(Put("foo", "v2")); - Reopen(); - Reopen(); - ASSERT_OK(Put("foo", "v3")); - Reopen(); - ASSERT_EQ("v3", Get("foo")); + do { + ASSERT_OK(Put("foo", "v1")); + ASSERT_OK(Put("foo", "v2")); + Reopen(); + Reopen(); + ASSERT_OK(Put("foo", "v3")); + Reopen(); + ASSERT_EQ("v3", Get("foo")); + } while (ChangeOptions()); } // Check that writes done during a memtable compaction are recovered // if the database is shutdown during the memtable compaction. TEST(DBTest, RecoverDuringMemtableCompaction) { - Options options; - options.env = env_; - options.write_buffer_size = 1000000; - Reopen(&options); + do { + Options options = CurrentOptions(); + options.env = env_; + options.write_buffer_size = 1000000; + Reopen(&options); - // Trigger a long memtable compaction and reopen the database during it - ASSERT_OK(Put("foo", "v1")); // Goes to 1st log file - ASSERT_OK(Put("big1", std::string(10000000, 'x'))); // Fills memtable - ASSERT_OK(Put("big2", std::string(1000, 'y'))); // Triggers compaction - ASSERT_OK(Put("bar", "v2")); // Goes to new log file + // Trigger a long memtable compaction and reopen the database during it + ASSERT_OK(Put("foo", "v1")); // Goes to 1st log file + ASSERT_OK(Put("big1", std::string(10000000, 'x'))); // Fills memtable + ASSERT_OK(Put("big2", std::string(1000, 'y'))); // Triggers compaction + ASSERT_OK(Put("bar", "v2")); // Goes to new log file - Reopen(&options); - ASSERT_EQ("v1", Get("foo")); - ASSERT_EQ("v2", Get("bar")); - ASSERT_EQ(std::string(10000000, 'x'), Get("big1")); - ASSERT_EQ(std::string(1000, 'y'), Get("big2")); + Reopen(&options); + ASSERT_EQ("v1", Get("foo")); + ASSERT_EQ("v2", Get("bar")); + ASSERT_EQ(std::string(10000000, 'x'), Get("big1")); + ASSERT_EQ(std::string(1000, 'y'), Get("big2")); + } while (ChangeOptions()); } static std::string Key(int i) { @@ -692,7 +817,7 @@ static std::string Key(int i) { } TEST(DBTest, MinorCompactionsHappen) { - Options options; + Options options = CurrentOptions(); options.write_buffer_size = 10000; Reopen(&options); @@ -718,7 +843,7 @@ TEST(DBTest, MinorCompactionsHappen) { TEST(DBTest, RecoverWithLargeLog) { { - Options options; + Options options = CurrentOptions(); Reopen(&options); ASSERT_OK(Put("big1", std::string(200000, '1'))); ASSERT_OK(Put("big2", std::string(200000, '2'))); @@ -729,7 +854,7 @@ TEST(DBTest, RecoverWithLargeLog) { // Make sure that if we re-open with a small write buffer size that // we flush table files in the middle of a large log file. - Options options; + Options options = CurrentOptions(); options.write_buffer_size = 100000; Reopen(&options); ASSERT_EQ(NumTableFilesAtLevel(0), 3); @@ -741,7 +866,7 @@ TEST(DBTest, RecoverWithLargeLog) { } TEST(DBTest, CompactionsGenerateMultipleFiles) { - Options options; + Options options = CurrentOptions(); options.write_buffer_size = 100000000; // Large write buffer Reopen(&options); @@ -767,7 +892,7 @@ TEST(DBTest, CompactionsGenerateMultipleFiles) { } TEST(DBTest, RepeatedWritesToSameKey) { - Options options; + Options options = CurrentOptions(); options.env = env_; options.write_buffer_size = 100000; // Small write buffer Reopen(&options); @@ -786,7 +911,7 @@ TEST(DBTest, RepeatedWritesToSameKey) { } TEST(DBTest, SparseMerge) { - Options options; + Options options = CurrentOptions(); options.compression = kNoCompression; Reopen(&options); @@ -837,87 +962,91 @@ static bool Between(uint64_t val, uint64_t low, uint64_t high) { } TEST(DBTest, ApproximateSizes) { - Options options; - options.write_buffer_size = 100000000; // Large write buffer - options.compression = kNoCompression; - DestroyAndReopen(); + do { + Options options = CurrentOptions(); + options.write_buffer_size = 100000000; // Large write buffer + options.compression = kNoCompression; + DestroyAndReopen(); - ASSERT_TRUE(Between(Size("", "xyz"), 0, 0)); - Reopen(&options); - ASSERT_TRUE(Between(Size("", "xyz"), 0, 0)); - - // Write 8MB (80 values, each 100K) - ASSERT_EQ(NumTableFilesAtLevel(0), 0); - const int N = 80; - Random rnd(301); - for (int i = 0; i < N; i++) { - ASSERT_OK(Put(Key(i), RandomString(&rnd, 100000))); - } - - // 0 because GetApproximateSizes() does not account for memtable space - ASSERT_TRUE(Between(Size("", Key(50)), 0, 0)); - - // Check sizes across recovery by reopening a few times - for (int run = 0; run < 3; run++) { + ASSERT_TRUE(Between(Size("", "xyz"), 0, 0)); Reopen(&options); + ASSERT_TRUE(Between(Size("", "xyz"), 0, 0)); - for (int compact_start = 0; compact_start < N; compact_start += 10) { - for (int i = 0; i < N; i += 10) { - ASSERT_TRUE(Between(Size("", Key(i)), 100000*i, 100000*i + 10000)); - ASSERT_TRUE(Between(Size("", Key(i)+".suffix"), - 100000 * (i+1), 100000 * (i+1) + 10000)); - ASSERT_TRUE(Between(Size(Key(i), Key(i+10)), - 100000 * 10, 100000 * 10 + 10000)); - } - ASSERT_TRUE(Between(Size("", Key(50)), 5000000, 5010000)); - ASSERT_TRUE(Between(Size("", Key(50)+".suffix"), 5100000, 5110000)); - - std::string cstart_str = Key(compact_start); - std::string cend_str = Key(compact_start + 9); - Slice cstart = cstart_str; - Slice cend = cend_str; - dbfull()->TEST_CompactRange(0, &cstart, &cend); + // Write 8MB (80 values, each 100K) + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + const int N = 80; + static const int S1 = 100000; + static const int S2 = 105000; // Allow some expansion from metadata + Random rnd(301); + for (int i = 0; i < N; i++) { + ASSERT_OK(Put(Key(i), RandomString(&rnd, S1))); } - ASSERT_EQ(NumTableFilesAtLevel(0), 0); - ASSERT_GT(NumTableFilesAtLevel(1), 0); - } + // 0 because GetApproximateSizes() does not account for memtable space + ASSERT_TRUE(Between(Size("", Key(50)), 0, 0)); + + // Check sizes across recovery by reopening a few times + for (int run = 0; run < 3; run++) { + Reopen(&options); + + for (int compact_start = 0; compact_start < N; compact_start += 10) { + for (int i = 0; i < N; i += 10) { + ASSERT_TRUE(Between(Size("", Key(i)), S1*i, S2*i)); + ASSERT_TRUE(Between(Size("", Key(i)+".suffix"), S1*(i+1), S2*(i+1))); + ASSERT_TRUE(Between(Size(Key(i), Key(i+10)), S1*10, S2*10)); + } + ASSERT_TRUE(Between(Size("", Key(50)), S1*50, S2*50)); + ASSERT_TRUE(Between(Size("", Key(50)+".suffix"), S1*50, S2*50)); + + std::string cstart_str = Key(compact_start); + std::string cend_str = Key(compact_start + 9); + Slice cstart = cstart_str; + Slice cend = cend_str; + dbfull()->TEST_CompactRange(0, &cstart, &cend); + } + + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_GT(NumTableFilesAtLevel(1), 0); + } + } while (ChangeOptions()); } TEST(DBTest, ApproximateSizes_MixOfSmallAndLarge) { - Options options; - options.compression = kNoCompression; - Reopen(); + do { + Options options = CurrentOptions(); + options.compression = kNoCompression; + Reopen(); - Random rnd(301); - std::string big1 = RandomString(&rnd, 100000); - ASSERT_OK(Put(Key(0), RandomString(&rnd, 10000))); - ASSERT_OK(Put(Key(1), RandomString(&rnd, 10000))); - ASSERT_OK(Put(Key(2), big1)); - ASSERT_OK(Put(Key(3), RandomString(&rnd, 10000))); - ASSERT_OK(Put(Key(4), big1)); - ASSERT_OK(Put(Key(5), RandomString(&rnd, 10000))); - ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000))); - ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000))); + Random rnd(301); + std::string big1 = RandomString(&rnd, 100000); + ASSERT_OK(Put(Key(0), RandomString(&rnd, 10000))); + ASSERT_OK(Put(Key(1), RandomString(&rnd, 10000))); + ASSERT_OK(Put(Key(2), big1)); + ASSERT_OK(Put(Key(3), RandomString(&rnd, 10000))); + ASSERT_OK(Put(Key(4), big1)); + ASSERT_OK(Put(Key(5), RandomString(&rnd, 10000))); + ASSERT_OK(Put(Key(6), RandomString(&rnd, 300000))); + ASSERT_OK(Put(Key(7), RandomString(&rnd, 10000))); - // Check sizes across recovery by reopening a few times - for (int run = 0; run < 3; run++) { - Reopen(&options); + // Check sizes across recovery by reopening a few times + for (int run = 0; run < 3; run++) { + Reopen(&options); - ASSERT_TRUE(Between(Size("", Key(0)), 0, 0)); - ASSERT_TRUE(Between(Size("", Key(1)), 10000, 11000)); - ASSERT_TRUE(Between(Size("", Key(2)), 20000, 21000)); - ASSERT_TRUE(Between(Size("", Key(3)), 120000, 121000)); - ASSERT_TRUE(Between(Size("", Key(4)), 130000, 131000)); - ASSERT_TRUE(Between(Size("", Key(5)), 230000, 231000)); - ASSERT_TRUE(Between(Size("", Key(6)), 240000, 241000)); - ASSERT_TRUE(Between(Size("", Key(7)), 540000, 541000)); - ASSERT_TRUE(Between(Size("", Key(8)), 550000, 551000)); + ASSERT_TRUE(Between(Size("", Key(0)), 0, 0)); + ASSERT_TRUE(Between(Size("", Key(1)), 10000, 11000)); + ASSERT_TRUE(Between(Size("", Key(2)), 20000, 21000)); + ASSERT_TRUE(Between(Size("", Key(3)), 120000, 121000)); + ASSERT_TRUE(Between(Size("", Key(4)), 130000, 131000)); + ASSERT_TRUE(Between(Size("", Key(5)), 230000, 231000)); + ASSERT_TRUE(Between(Size("", Key(6)), 240000, 241000)); + ASSERT_TRUE(Between(Size("", Key(7)), 540000, 541000)); + ASSERT_TRUE(Between(Size("", Key(8)), 550000, 560000)); - ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000)); + ASSERT_TRUE(Between(Size(Key(3), Key(5)), 110000, 111000)); - dbfull()->TEST_CompactRange(0, NULL, NULL); - } + dbfull()->TEST_CompactRange(0, NULL, NULL); + } + } while (ChangeOptions()); } TEST(DBTest, IteratorPinsRef) { @@ -943,59 +1072,63 @@ TEST(DBTest, IteratorPinsRef) { } TEST(DBTest, Snapshot) { - Put("foo", "v1"); - const Snapshot* s1 = db_->GetSnapshot(); - Put("foo", "v2"); - const Snapshot* s2 = db_->GetSnapshot(); - Put("foo", "v3"); - const Snapshot* s3 = db_->GetSnapshot(); + do { + Put("foo", "v1"); + const Snapshot* s1 = db_->GetSnapshot(); + Put("foo", "v2"); + const Snapshot* s2 = db_->GetSnapshot(); + Put("foo", "v3"); + const Snapshot* s3 = db_->GetSnapshot(); - Put("foo", "v4"); - ASSERT_EQ("v1", Get("foo", s1)); - ASSERT_EQ("v2", Get("foo", s2)); - ASSERT_EQ("v3", Get("foo", s3)); - ASSERT_EQ("v4", Get("foo")); + Put("foo", "v4"); + ASSERT_EQ("v1", Get("foo", s1)); + ASSERT_EQ("v2", Get("foo", s2)); + ASSERT_EQ("v3", Get("foo", s3)); + ASSERT_EQ("v4", Get("foo")); - db_->ReleaseSnapshot(s3); - ASSERT_EQ("v1", Get("foo", s1)); - ASSERT_EQ("v2", Get("foo", s2)); - ASSERT_EQ("v4", Get("foo")); + db_->ReleaseSnapshot(s3); + ASSERT_EQ("v1", Get("foo", s1)); + ASSERT_EQ("v2", Get("foo", s2)); + ASSERT_EQ("v4", Get("foo")); - db_->ReleaseSnapshot(s1); - ASSERT_EQ("v2", Get("foo", s2)); - ASSERT_EQ("v4", Get("foo")); + db_->ReleaseSnapshot(s1); + ASSERT_EQ("v2", Get("foo", s2)); + ASSERT_EQ("v4", Get("foo")); - db_->ReleaseSnapshot(s2); - ASSERT_EQ("v4", Get("foo")); + db_->ReleaseSnapshot(s2); + ASSERT_EQ("v4", Get("foo")); + } while (ChangeOptions()); } TEST(DBTest, HiddenValuesAreRemoved) { - Random rnd(301); - FillLevels("a", "z"); + do { + Random rnd(301); + FillLevels("a", "z"); - std::string big = RandomString(&rnd, 50000); - Put("foo", big); - Put("pastfoo", "v"); - const Snapshot* snapshot = db_->GetSnapshot(); - Put("foo", "tiny"); - Put("pastfoo2", "v2"); // Advance sequence number one more + std::string big = RandomString(&rnd, 50000); + Put("foo", big); + Put("pastfoo", "v"); + const Snapshot* snapshot = db_->GetSnapshot(); + Put("foo", "tiny"); + Put("pastfoo2", "v2"); // Advance sequence number one more - ASSERT_OK(dbfull()->TEST_CompactMemTable()); - ASSERT_GT(NumTableFilesAtLevel(0), 0); + ASSERT_OK(dbfull()->TEST_CompactMemTable()); + ASSERT_GT(NumTableFilesAtLevel(0), 0); - ASSERT_EQ(big, Get("foo", snapshot)); - ASSERT_TRUE(Between(Size("", "pastfoo"), 50000, 60000)); - db_->ReleaseSnapshot(snapshot); - ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]"); - Slice x("x"); - dbfull()->TEST_CompactRange(0, NULL, &x); - ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); - ASSERT_EQ(NumTableFilesAtLevel(0), 0); - ASSERT_GE(NumTableFilesAtLevel(1), 1); - dbfull()->TEST_CompactRange(1, NULL, &x); - ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); + ASSERT_EQ(big, Get("foo", snapshot)); + ASSERT_TRUE(Between(Size("", "pastfoo"), 50000, 60000)); + db_->ReleaseSnapshot(snapshot); + ASSERT_EQ(AllEntriesFor("foo"), "[ tiny, " + big + " ]"); + Slice x("x"); + dbfull()->TEST_CompactRange(0, NULL, &x); + ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); + ASSERT_EQ(NumTableFilesAtLevel(0), 0); + ASSERT_GE(NumTableFilesAtLevel(1), 1); + dbfull()->TEST_CompactRange(1, NULL, &x); + ASSERT_EQ(AllEntriesFor("foo"), "[ tiny ]"); - ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000)); + ASSERT_TRUE(Between(Size("", "pastfoo"), 0, 1000)); + } while (ChangeOptions()); } TEST(DBTest, DeletionMarkers1) { @@ -1054,85 +1187,87 @@ TEST(DBTest, DeletionMarkers2) { } TEST(DBTest, OverlapInLevel0) { - ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config"; + do { + ASSERT_EQ(config::kMaxMemCompactLevel, 2) << "Fix test to match config"; - // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0. - ASSERT_OK(Put("100", "v100")); - ASSERT_OK(Put("999", "v999")); - dbfull()->TEST_CompactMemTable(); - ASSERT_OK(Delete("100")); - ASSERT_OK(Delete("999")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("0,1,1", FilesPerLevel()); + // Fill levels 1 and 2 to disable the pushing of new memtables to levels > 0. + ASSERT_OK(Put("100", "v100")); + ASSERT_OK(Put("999", "v999")); + dbfull()->TEST_CompactMemTable(); + ASSERT_OK(Delete("100")); + ASSERT_OK(Delete("999")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("0,1,1", FilesPerLevel()); - // Make files spanning the following ranges in level-0: - // files[0] 200 .. 900 - // files[1] 300 .. 500 - // Note that files are sorted by smallest key. - ASSERT_OK(Put("300", "v300")); - ASSERT_OK(Put("500", "v500")); - dbfull()->TEST_CompactMemTable(); - ASSERT_OK(Put("200", "v200")); - ASSERT_OK(Put("600", "v600")); - ASSERT_OK(Put("900", "v900")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("2,1,1", FilesPerLevel()); + // Make files spanning the following ranges in level-0: + // files[0] 200 .. 900 + // files[1] 300 .. 500 + // Note that files are sorted by smallest key. + ASSERT_OK(Put("300", "v300")); + ASSERT_OK(Put("500", "v500")); + dbfull()->TEST_CompactMemTable(); + ASSERT_OK(Put("200", "v200")); + ASSERT_OK(Put("600", "v600")); + ASSERT_OK(Put("900", "v900")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("2,1,1", FilesPerLevel()); - // Compact away the placeholder files we created initially - dbfull()->TEST_CompactRange(1, NULL, NULL); - dbfull()->TEST_CompactRange(2, NULL, NULL); - ASSERT_EQ("2", FilesPerLevel()); + // Compact away the placeholder files we created initially + dbfull()->TEST_CompactRange(1, NULL, NULL); + dbfull()->TEST_CompactRange(2, NULL, NULL); + ASSERT_EQ("2", FilesPerLevel()); - // Do a memtable compaction. Before bug-fix, the compaction would - // not detect the overlap with level-0 files and would incorrectly place - // the deletion in a deeper level. - ASSERT_OK(Delete("600")); - dbfull()->TEST_CompactMemTable(); - ASSERT_EQ("3", FilesPerLevel()); - ASSERT_EQ("NOT_FOUND", Get("600")); + // Do a memtable compaction. Before bug-fix, the compaction would + // not detect the overlap with level-0 files and would incorrectly place + // the deletion in a deeper level. + ASSERT_OK(Delete("600")); + dbfull()->TEST_CompactMemTable(); + ASSERT_EQ("3", FilesPerLevel()); + ASSERT_EQ("NOT_FOUND", Get("600")); + } while (ChangeOptions()); } TEST(DBTest, L0_CompactionBug_Issue44_a) { - Reopen(); - ASSERT_OK(Put("b", "v")); - Reopen(); - ASSERT_OK(Delete("b")); - ASSERT_OK(Delete("a")); - Reopen(); - ASSERT_OK(Delete("a")); - Reopen(); - ASSERT_OK(Put("a", "v")); - Reopen(); - Reopen(); - ASSERT_EQ("(a->v)", Contents()); - env_->SleepForMicroseconds(1000000); // Wait for compaction to finish - ASSERT_EQ("(a->v)", Contents()); + Reopen(); + ASSERT_OK(Put("b", "v")); + Reopen(); + ASSERT_OK(Delete("b")); + ASSERT_OK(Delete("a")); + Reopen(); + ASSERT_OK(Delete("a")); + Reopen(); + ASSERT_OK(Put("a", "v")); + Reopen(); + Reopen(); + ASSERT_EQ("(a->v)", Contents()); + env_->SleepForMicroseconds(1000000); // Wait for compaction to finish + ASSERT_EQ("(a->v)", Contents()); } TEST(DBTest, L0_CompactionBug_Issue44_b) { - Reopen(); - Put("",""); - Reopen(); - Delete("e"); - Put("",""); - Reopen(); - Put("c", "cv"); - Reopen(); - Put("",""); - Reopen(); - Put("",""); - env_->SleepForMicroseconds(1000000); // Wait for compaction to finish - Reopen(); - Put("d","dv"); - Reopen(); - Put("",""); - Reopen(); - Delete("d"); - Delete("b"); - Reopen(); - ASSERT_EQ("(->)(c->cv)", Contents()); - env_->SleepForMicroseconds(1000000); // Wait for compaction to finish - ASSERT_EQ("(->)(c->cv)", Contents()); + Reopen(); + Put("",""); + Reopen(); + Delete("e"); + Put("",""); + Reopen(); + Put("c", "cv"); + Reopen(); + Put("",""); + Reopen(); + Put("",""); + env_->SleepForMicroseconds(1000000); // Wait for compaction to finish + Reopen(); + Put("d","dv"); + Reopen(); + Put("",""); + Reopen(); + Delete("d"); + Delete("b"); + Reopen(); + ASSERT_EQ("(->)(c->cv)", Contents()); + env_->SleepForMicroseconds(1000000); // Wait for compaction to finish + ASSERT_EQ("(->)(c->cv)", Contents()); } TEST(DBTest, ComparatorCheck) { @@ -1150,7 +1285,7 @@ TEST(DBTest, ComparatorCheck) { } }; NewComparator cmp; - Options new_options; + Options new_options = CurrentOptions(); new_options.comparator = &cmp; Status s = TryReopen(&new_options); ASSERT_TRUE(!s.ok()); @@ -1185,9 +1320,10 @@ TEST(DBTest, CustomComparator) { } }; NumberComparator cmp; - Options new_options; + Options new_options = CurrentOptions(); new_options.create_if_missing = true; new_options.comparator = &cmp; + new_options.filter_policy = NULL; // Cannot use bloom filters new_options.write_buffer_size = 1000; // Compact more often DestroyAndReopen(&new_options); ASSERT_OK(Put("[10]", "ten")); @@ -1197,6 +1333,8 @@ TEST(DBTest, CustomComparator) { ASSERT_EQ("ten", Get("[0xa]")); ASSERT_EQ("twenty", Get("[20]")); ASSERT_EQ("twenty", Get("[0x14]")); + ASSERT_EQ("NOT_FOUND", Get("[15]")); + ASSERT_EQ("NOT_FOUND", Get("[0xf]")); Compact("[0]", "[9999]"); } @@ -1285,7 +1423,7 @@ TEST(DBTest, DBOpen_Options) { // Check that number of files does not grow when we are out of space TEST(DBTest, NoSpace) { - Options options; + Options options = CurrentOptions(); options.env = env_; Reopen(&options); @@ -1314,6 +1452,53 @@ TEST(DBTest, FilesDeletedAfterCompaction) { ASSERT_EQ(CountFiles(), num_files); } +TEST(DBTest, BloomFilter) { + env_->count_random_reads_ = true; + Options options = CurrentOptions(); + options.env = env_; + options.block_cache = NewLRUCache(0); // Prevent cache hits + options.filter_policy = NewBloomFilterPolicy(10); + Reopen(&options); + + // Populate multiple layers + const int N = 10000; + for (int i = 0; i < N; i++) { + ASSERT_OK(Put(Key(i), Key(i))); + } + Compact("a", "z"); + for (int i = 0; i < N; i += 100) { + ASSERT_OK(Put(Key(i), Key(i))); + } + dbfull()->TEST_CompactMemTable(); + + // Prevent auto compactions triggered by seeks + env_->delay_sstable_sync_.Release_Store(env_); + + // Lookup present keys. Should rarely read from small sstable. + env_->random_read_counter_.Reset(); + for (int i = 0; i < N; i++) { + ASSERT_EQ(Key(i), Get(Key(i))); + } + int reads = env_->random_read_counter_.Read(); + fprintf(stderr, "%d present => %d reads\n", N, reads); + ASSERT_GE(reads, N); + ASSERT_LE(reads, N + 2*N/100); + + // Lookup present keys. Should rarely read from either sstable. + env_->random_read_counter_.Reset(); + for (int i = 0; i < N; i++) { + ASSERT_EQ("NOT_FOUND", Get(Key(i) + ".missing")); + } + reads = env_->random_read_counter_.Read(); + fprintf(stderr, "%d missing => %d reads\n", N, reads); + ASSERT_LE(reads, 3*N/100); + + env_->delay_sstable_sync_.Release_Store(NULL); + Close(); + delete options.block_cache; + delete options.filter_policy; +} + // Multi-threaded test: namespace { @@ -1381,33 +1566,35 @@ static void MTThreadBody(void* arg) { } // namespace TEST(DBTest, MultiThreaded) { - // Initialize state - MTState mt; - mt.test = this; - mt.stop.Release_Store(0); - for (int id = 0; id < kNumThreads; id++) { - mt.counter[id].Release_Store(0); - mt.thread_done[id].Release_Store(0); - } - - // Start threads - MTThread thread[kNumThreads]; - for (int id = 0; id < kNumThreads; id++) { - thread[id].state = &mt; - thread[id].id = id; - env_->StartThread(MTThreadBody, &thread[id]); - } - - // Let them run for a while - env_->SleepForMicroseconds(kTestSeconds * 1000000); - - // Stop the threads and wait for them to finish - mt.stop.Release_Store(&mt); - for (int id = 0; id < kNumThreads; id++) { - while (mt.thread_done[id].Acquire_Load() == NULL) { - env_->SleepForMicroseconds(100000); + do { + // Initialize state + MTState mt; + mt.test = this; + mt.stop.Release_Store(0); + for (int id = 0; id < kNumThreads; id++) { + mt.counter[id].Release_Store(0); + mt.thread_done[id].Release_Store(0); } - } + + // Start threads + MTThread thread[kNumThreads]; + for (int id = 0; id < kNumThreads; id++) { + thread[id].state = &mt; + thread[id].id = id; + env_->StartThread(MTThreadBody, &thread[id]); + } + + // Let them run for a while + env_->SleepForMicroseconds(kTestSeconds * 1000000); + + // Stop the threads and wait for them to finish + mt.stop.Release_Store(&mt); + for (int id = 0; id < kNumThreads; id++) { + while (mt.thread_done[id].Acquire_Load() == NULL) { + env_->SleepForMicroseconds(100000); + } + } + } while (ChangeOptions()); } namespace { @@ -1573,70 +1760,73 @@ static bool CompareIterators(int step, TEST(DBTest, Randomized) { Random rnd(test::RandomSeed()); - ModelDB model(last_options_); - const int N = 10000; - const Snapshot* model_snap = NULL; - const Snapshot* db_snap = NULL; - std::string k, v; - for (int step = 0; step < N; step++) { - if (step % 100 == 0) { - fprintf(stderr, "Step %d of %d\n", step, N); - } - int p = rnd.Uniform(100); - if (p < 45) { // Put - k = RandomKey(&rnd); - v = RandomString(&rnd, - rnd.OneIn(20) - ? 100 + rnd.Uniform(100) - : rnd.Uniform(8)); - ASSERT_OK(model.Put(WriteOptions(), k, v)); - ASSERT_OK(db_->Put(WriteOptions(), k, v)); - - } else if (p < 90) { // Delete - k = RandomKey(&rnd); - ASSERT_OK(model.Delete(WriteOptions(), k)); - ASSERT_OK(db_->Delete(WriteOptions(), k)); - - - } else { // Multi-element batch - WriteBatch b; - const int num = rnd.Uniform(8); - for (int i = 0; i < num; i++) { - if (i == 0 || !rnd.OneIn(10)) { - k = RandomKey(&rnd); - } else { - // Periodically re-use the same key from the previous iter, so - // we have multiple entries in the write batch for the same key - } - if (rnd.OneIn(2)) { - v = RandomString(&rnd, rnd.Uniform(10)); - b.Put(k, v); - } else { - b.Delete(k); - } + do { + ModelDB model(CurrentOptions()); + const int N = 10000; + const Snapshot* model_snap = NULL; + const Snapshot* db_snap = NULL; + std::string k, v; + for (int step = 0; step < N; step++) { + if (step % 100 == 0) { + fprintf(stderr, "Step %d of %d\n", step, N); + } + // TODO(sanjay): Test Get() works + int p = rnd.Uniform(100); + if (p < 45) { // Put + k = RandomKey(&rnd); + v = RandomString(&rnd, + rnd.OneIn(20) + ? 100 + rnd.Uniform(100) + : rnd.Uniform(8)); + ASSERT_OK(model.Put(WriteOptions(), k, v)); + ASSERT_OK(db_->Put(WriteOptions(), k, v)); + + } else if (p < 90) { // Delete + k = RandomKey(&rnd); + ASSERT_OK(model.Delete(WriteOptions(), k)); + ASSERT_OK(db_->Delete(WriteOptions(), k)); + + + } else { // Multi-element batch + WriteBatch b; + const int num = rnd.Uniform(8); + for (int i = 0; i < num; i++) { + if (i == 0 || !rnd.OneIn(10)) { + k = RandomKey(&rnd); + } else { + // Periodically re-use the same key from the previous iter, so + // we have multiple entries in the write batch for the same key + } + if (rnd.OneIn(2)) { + v = RandomString(&rnd, rnd.Uniform(10)); + b.Put(k, v); + } else { + b.Delete(k); + } + } + ASSERT_OK(model.Write(WriteOptions(), &b)); + ASSERT_OK(db_->Write(WriteOptions(), &b)); + } + + if ((step % 100) == 0) { + ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL)); + ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap)); + // Save a snapshot from each DB this time that we'll use next + // time we compare things, to make sure the current state is + // preserved with the snapshot + if (model_snap != NULL) model.ReleaseSnapshot(model_snap); + if (db_snap != NULL) db_->ReleaseSnapshot(db_snap); + + Reopen(); + ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL)); + + model_snap = model.GetSnapshot(); + db_snap = db_->GetSnapshot(); } - ASSERT_OK(model.Write(WriteOptions(), &b)); - ASSERT_OK(db_->Write(WriteOptions(), &b)); } - - if ((step % 100) == 0) { - ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL)); - ASSERT_TRUE(CompareIterators(step, &model, db_, model_snap, db_snap)); - // Save a snapshot from each DB this time that we'll use next - // time we compare things, to make sure the current state is - // preserved with the snapshot - if (model_snap != NULL) model.ReleaseSnapshot(model_snap); - if (db_snap != NULL) db_->ReleaseSnapshot(db_snap); - - Reopen(); - ASSERT_TRUE(CompareIterators(step, &model, db_, NULL, NULL)); - - model_snap = model.GetSnapshot(); - db_snap = db_->GetSnapshot(); - } - } - if (model_snap != NULL) model.ReleaseSnapshot(model_snap); - if (db_snap != NULL) db_->ReleaseSnapshot(db_snap); + if (model_snap != NULL) model.ReleaseSnapshot(model_snap); + if (db_snap != NULL) db_->ReleaseSnapshot(db_snap); + } while (ChangeOptions()); } std::string MakeKey(unsigned int num) { diff --git a/db/dbformat.cc b/db/dbformat.cc index 9168f99..28e11b3 100644 --- a/db/dbformat.cc +++ b/db/dbformat.cc @@ -98,6 +98,26 @@ void InternalKeyComparator::FindShortSuccessor(std::string* key) const { } } +const char* InternalFilterPolicy::Name() const { + return user_policy_->Name(); +} + +void InternalFilterPolicy::CreateFilter(const Slice* keys, int n, + std::string* dst) const { + // We rely on the fact that the code in table.cc does not mind us + // adjusting keys[]. + Slice* mkey = const_cast(keys); + for (int i = 0; i < n; i++) { + mkey[i] = ExtractUserKey(keys[i]); + // TODO(sanjay): Suppress dups? + } + user_policy_->CreateFilter(keys, n, dst); +} + +bool InternalFilterPolicy::KeyMayMatch(const Slice& key, const Slice& f) const { + return user_policy_->KeyMayMatch(ExtractUserKey(key), f); +} + LookupKey::LookupKey(const Slice& user_key, SequenceNumber s) { size_t usize = user_key.size(); size_t needed = usize + 13; // A conservative estimate diff --git a/db/dbformat.h b/db/dbformat.h index 044717d..f7f64da 100644 --- a/db/dbformat.h +++ b/db/dbformat.h @@ -8,6 +8,7 @@ #include #include "leveldb/comparator.h" #include "leveldb/db.h" +#include "leveldb/filter_policy.h" #include "leveldb/slice.h" #include "leveldb/table_builder.h" #include "util/coding.h" @@ -123,6 +124,17 @@ class InternalKeyComparator : public Comparator { int Compare(const InternalKey& a, const InternalKey& b) const; }; +// Filter policy wrapper that converts from internal keys to user keys +class InternalFilterPolicy : public FilterPolicy { + private: + const FilterPolicy* const user_policy_; + public: + explicit InternalFilterPolicy(const FilterPolicy* p) : user_policy_(p) { } + virtual const char* Name() const; + virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const; + virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const; +}; + // Modules in this directory should keep internal keys wrapped inside // the following class instead of plain strings so that we do not // incorrectly use string comparisons instead of an InternalKeyComparator. diff --git a/db/repair.cc b/db/repair.cc index 511c66b..022d52f 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -48,7 +48,8 @@ class Repairer { : dbname_(dbname), env_(options.env), icmp_(options.comparator), - options_(SanitizeOptions(dbname, &icmp_, options)), + ipolicy_(options.filter_policy), + options_(SanitizeOptions(dbname, &icmp_, &ipolicy_, options)), owns_info_log_(options_.info_log != options.info_log), owns_cache_(options_.block_cache != options.block_cache), next_file_number_(1) { @@ -99,6 +100,7 @@ class Repairer { std::string const dbname_; Env* const env_; InternalKeyComparator const icmp_; + InternalFilterPolicy const ipolicy_; Options const options_; bool owns_info_log_; bool owns_cache_; diff --git a/db/table_cache.cc b/db/table_cache.cc index cae79bd..497db27 100644 --- a/db/table_cache.cc +++ b/db/table_cache.cc @@ -42,23 +42,18 @@ TableCache::~TableCache() { delete cache_; } -Iterator* TableCache::NewIterator(const ReadOptions& options, - uint64_t file_number, - uint64_t file_size, - Table** tableptr) { - if (tableptr != NULL) { - *tableptr = NULL; - } - +Status TableCache::FindTable(uint64_t file_number, uint64_t file_size, + Cache::Handle** handle) { + Status s; char buf[sizeof(file_number)]; EncodeFixed64(buf, file_number); Slice key(buf, sizeof(buf)); - Cache::Handle* handle = cache_->Lookup(key); - if (handle == NULL) { + *handle = cache_->Lookup(key); + if (*handle == NULL) { std::string fname = TableFileName(dbname_, file_number); RandomAccessFile* file = NULL; Table* table = NULL; - Status s = env_->NewRandomAccessFile(fname, &file); + s = env_->NewRandomAccessFile(fname, &file); if (s.ok()) { s = Table::Open(*options_, file, file_size, &table); } @@ -68,13 +63,28 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, delete file; // We do not cache error results so that if the error is transient, // or somebody repairs the file, we recover automatically. - return NewErrorIterator(s); + } else { + TableAndFile* tf = new TableAndFile; + tf->file = file; + tf->table = table; + *handle = cache_->Insert(key, tf, 1, &DeleteEntry); } + } + return s; +} - TableAndFile* tf = new TableAndFile; - tf->file = file; - tf->table = table; - handle = cache_->Insert(key, tf, 1, &DeleteEntry); +Iterator* TableCache::NewIterator(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + Table** tableptr) { + if (tableptr != NULL) { + *tableptr = NULL; + } + + Cache::Handle* handle = NULL; + Status s = FindTable(file_number, file_size, &handle); + if (!s.ok()) { + return NewErrorIterator(s); } Table* table = reinterpret_cast(cache_->Value(handle))->table; @@ -86,6 +96,22 @@ Iterator* TableCache::NewIterator(const ReadOptions& options, return result; } +Status TableCache::Get(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + const Slice& k, + void* arg, + void (*saver)(void*, const Slice&, const Slice&)) { + Cache::Handle* handle = NULL; + Status s = FindTable(file_number, file_size, &handle); + if (s.ok()) { + Table* t = reinterpret_cast(cache_->Value(handle))->table; + s = t->InternalGet(options, k, arg, saver); + cache_->Release(handle); + } + return s; +} + void TableCache::Evict(uint64_t file_number) { char buf[sizeof(file_number)]; EncodeFixed64(buf, file_number); diff --git a/db/table_cache.h b/db/table_cache.h index 0f3c73b..8cf4aaf 100644 --- a/db/table_cache.h +++ b/db/table_cache.h @@ -35,6 +35,15 @@ class TableCache { uint64_t file_size, Table** tableptr = NULL); + // If a seek to internal key "k" in specified file finds an entry, + // call (*handle_result)(arg, found_key, found_value). + Status Get(const ReadOptions& options, + uint64_t file_number, + uint64_t file_size, + const Slice& k, + void* arg, + void (*handle_result)(void*, const Slice&, const Slice&)); + // Evict any entry for the specified file number void Evict(uint64_t file_number); @@ -43,6 +52,8 @@ class TableCache { const std::string dbname_; const Options* options_; Cache* cache_; + + Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**); }; } // namespace leveldb diff --git a/db/version_set.cc b/db/version_set.cc index 1310aeb..1f48419 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -255,35 +255,34 @@ void Version::AddIterators(const ReadOptions& options, } } -// If "*iter" points at a value or deletion for user_key, store -// either the value, or a NotFound error and return true. -// Else return false. -static bool GetValue(const Comparator* cmp, - Iterator* iter, const Slice& user_key, - std::string* value, - Status* s) { - if (!iter->Valid()) { - return false; - } +// Callback from TableCache::Get() +namespace { +enum SaverState { + kNotFound, + kFound, + kDeleted, + kCorrupt, +}; +struct Saver { + SaverState state; + const Comparator* ucmp; + Slice user_key; + std::string* value; +}; +} +static void SaveValue(void* arg, const Slice& ikey, const Slice& v) { + Saver* s = reinterpret_cast(arg); ParsedInternalKey parsed_key; - if (!ParseInternalKey(iter->key(), &parsed_key)) { - *s = Status::Corruption("corrupted key for ", user_key); - return true; - } - if (cmp->Compare(parsed_key.user_key, user_key) != 0) { - return false; - } - switch (parsed_key.type) { - case kTypeDeletion: - *s = Status::NotFound(Slice()); // Use an empty error message for speed - break; - case kTypeValue: { - Slice v = iter->value(); - value->assign(v.data(), v.size()); - break; + if (!ParseInternalKey(ikey, &parsed_key)) { + s->state = kCorrupt; + } else { + if (s->ucmp->Compare(parsed_key.user_key, s->user_key) == 0) { + s->state = (parsed_key.type == kTypeValue) ? kFound : kDeleted; + if (s->state == kFound) { + s->value->assign(v.data(), v.size()); + } } } - return true; } static bool NewestFirst(FileMetaData* a, FileMetaData* b) { @@ -361,21 +360,27 @@ Status Version::Get(const ReadOptions& options, last_file_read = f; last_file_read_level = level; - Iterator* iter = vset_->table_cache_->NewIterator( - options, - f->number, - f->file_size); - iter->Seek(ikey); - const bool done = GetValue(ucmp, iter, user_key, value, &s); - if (!iter->status().ok()) { - s = iter->status(); - delete iter; + Saver saver; + saver.state = kNotFound; + saver.ucmp = ucmp; + saver.user_key = user_key; + saver.value = value; + s = vset_->table_cache_->Get(options, f->number, f->file_size, + ikey, &saver, SaveValue); + if (!s.ok()) { return s; - } else { - delete iter; - if (done) { + } + switch (saver.state) { + case kNotFound: + break; // Keep searching in other files + case kFound: + return s; + case kDeleted: + s = Status::NotFound(Slice()); // Use empty error message for speed + return s; + case kCorrupt: + s = Status::Corruption("corrupted key for ", user_key); return s; - } } } } diff --git a/doc/index.html b/doc/index.html index 472f7cd..521d2ba 100644 --- a/doc/index.html +++ b/doc/index.html @@ -400,6 +400,69 @@ We might want to prefix filename keys with one letter (say '/') and over just the metadata do not force us to fetch and cache bulky file contents.

+

Filters

+

+Because of the way leveldb data is organized on disk, +a single Get() call may involve multiple reads from disk. +The optional FilterPolicy mechanism can be used to reduce +the number of disk reads substantially. +

+   leveldb::Options options;
+   options.filter_policy = NewBloomFilter(10);
+   leveldb::DB* db;
+   leveldb::DB::Open(options, "/tmp/testdb", &db);
+   ... use the database ...
+   delete db;
+   delete options.filter_policy;
+
+The preceding code associates a +Bloom filter +based filtering policy with the database. Bloom filter based +filtering relies on keeping some number of bits of data in memory per +key (in this case 10 bits per key since that is the argument we passed +to NewBloomFilter). This filter will reduce the number of unnecessary +disk reads needed for Get() calls by a factor of +approximately a 100. Increasing the bits per key will lead to a +larger reduction at the cost of more memory usage. We recommend that +applications whose working set does not fit in memory and that do a +lot of random reads set a filter policy. +

+If you are using a custom comparator, you should ensure that the filter +policy you are using is compatible with your comparator. For example, +consider a comparator that ignores trailing spaces when comparing keys. +NewBloomFilter must not be used with such a comparator. +Instead, the application should provide a custom filter policy that +also ignores trailing spaces. For example: +

+  class CustomFilterPolicy : public leveldb::FilterPolicy {
+   private:
+    FilterPolicy* builtin_policy_;
+   public:
+    CustomFilterPolicy() : builtin_policy_(NewBloomFilter(10)) { }
+    ~CustomFilterPolicy() { delete builtin_policy_; }
+
+    const char* Name() const { return "IgnoreTrailingSpacesFilter"; }
+
+    void CreateFilter(const Slice* keys, int n, std::string* dst) const {
+      // Use builtin bloom filter code after removing trailing spaces
+      std::vector<Slice> trimmed(n);
+      for (int i = 0; i < n; i++) {
+        trimmed[i] = RemoveTrailingSpaces(keys[i]);
+      }
+      return builtin_policy_->CreateFilter(&trimmed[i], n, dst);
+    }
+
+    bool KeyMayMatch(const Slice& key, const Slice& filter) const {
+      // Use builtin bloom filter code after removing trailing spaces
+      return builtin_policy_->KeyMayMatch(RemoveTrailingSpaces(key), filter);
+    }
+  };
+
+

+Advanced applications may provide a filter policy that does not use +a bloom filter but uses some other mechanism for summarizing a set +of keys. See leveldb/filter_policy.h for detail. +

Checksums

leveldb associates checksums with all data it stores in the file system. diff --git a/doc/table_format.txt b/doc/table_format.txt index ad5aa4b..d0f3065 100644 --- a/doc/table_format.txt +++ b/doc/table_format.txt @@ -47,6 +47,47 @@ the BlockHandle of the metaindex and index blocks as well as a magic number. // (40==2*BlockHandle::kMaxEncodedLength) magic: fixed64; // == 0xdb4775248b80fb57 +"filter" Meta Block +------------------- + +If a "FilterPolicy" was specified when the database was opened, a +filter block is stored in each table. The "metaindex" block contains +an entry that maps from "filter." to the BlockHandle for the filter +block where "" is the string returned by the filter policy's +"Name()" method. + +The filter block stores a sequence of filters, where filter i contains +the output of FilterPolicy::CreateFilter() on all keys that are stored +in a block whose file offset falls within the range + + [ i*base ... (i+1)*base-1 ] + +Currently, "base" is 2KB. So for example, if blocks X and Y start in +the range [ 0KB .. 2KB-1 ], all of the keys in X and Y will be +converted to a filter by calling FilterPolicy::CreateFilter(), and the +resulting filter will be stored as the first filter in the filter +block. + +The filter block is formatted as follows: + + [filter 0] + [filter 1] + [filter 2] + ... + [filter N-1] + + [offset of filter 0] : 4 bytes + [offset of filter 1] : 4 bytes + [offset of filter 2] : 4 bytes + ... + [offset of filter N-1] : 4 bytes + + [offset of beginning of offset array] : 4 bytes + lg(base) : 1 byte + +The offset array at the end of the filter block allows efficient +mapping from a data block offset to the corresponding filter. + "stats" Meta Block ------------------ diff --git a/include/leveldb/c.h b/include/leveldb/c.h index 0be993d..70e3cc6 100644 --- a/include/leveldb/c.h +++ b/include/leveldb/c.h @@ -55,6 +55,7 @@ typedef struct leveldb_cache_t leveldb_cache_t; typedef struct leveldb_comparator_t leveldb_comparator_t; typedef struct leveldb_env_t leveldb_env_t; typedef struct leveldb_filelock_t leveldb_filelock_t; +typedef struct leveldb_filterpolicy_t leveldb_filterpolicy_t; typedef struct leveldb_iterator_t leveldb_iterator_t; typedef struct leveldb_logger_t leveldb_logger_t; typedef struct leveldb_options_t leveldb_options_t; @@ -127,6 +128,11 @@ extern void leveldb_approximate_sizes( const char* const* range_limit_key, const size_t* range_limit_key_len, uint64_t* sizes); +extern void leveldb_compact_range( + leveldb_t* db, + const char* start_key, size_t start_key_len, + const char* limit_key, size_t limit_key_len); + /* Management operations */ extern void leveldb_destroy_db( @@ -177,6 +183,9 @@ extern void leveldb_options_destroy(leveldb_options_t*); extern void leveldb_options_set_comparator( leveldb_options_t*, leveldb_comparator_t*); +extern void leveldb_options_set_filter_policy( + leveldb_options_t*, + leveldb_filterpolicy_t*); extern void leveldb_options_set_create_if_missing( leveldb_options_t*, unsigned char); extern void leveldb_options_set_error_if_exists( @@ -209,6 +218,26 @@ extern leveldb_comparator_t* leveldb_comparator_create( const char* (*name)(void*)); extern void leveldb_comparator_destroy(leveldb_comparator_t*); +/* Filter policy */ + +extern leveldb_filterpolicy_t* leveldb_filterpolicy_create( + void* state, + void (*destructor)(void*), + char* (*create_filter)( + void*, + const char* const* key_array, const size_t* key_length_array, + int num_keys, + size_t* filter_length), + unsigned char (*key_may_match)( + void*, + const char* key, size_t length, + const char* filter, size_t filter_length), + const char* (*name)(void*)); +extern void leveldb_filterpolicy_destroy(leveldb_filterpolicy_t*); + +extern leveldb_filterpolicy_t* leveldb_filterpolicy_create_bloom( + int bits_per_key); + /* Read options */ extern leveldb_readoptions_t* leveldb_readoptions_create(); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index c7d5167..481aad6 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -14,7 +14,7 @@ namespace leveldb { // Update Makefile if you change these static const int kMajorVersion = 1; -static const int kMinorVersion = 3; +static const int kMinorVersion = 4; struct Options; struct ReadOptions; diff --git a/include/leveldb/filter_policy.h b/include/leveldb/filter_policy.h new file mode 100644 index 0000000..1fba080 --- /dev/null +++ b/include/leveldb/filter_policy.h @@ -0,0 +1,70 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// A database can be configured with a custom FilterPolicy object. +// This object is responsible for creating a small filter from a set +// of keys. These filters are stored in leveldb and are consulted +// automatically by leveldb to decide whether or not to read some +// information from disk. In many cases, a filter can cut down the +// number of disk seeks form a handful to a single disk seek per +// DB::Get() call. +// +// Most people will want to use the builtin bloom filter support (see +// NewBloomFilterPolicy() below). + +#ifndef STORAGE_LEVELDB_INCLUDE_FILTER_POLICY_H_ +#define STORAGE_LEVELDB_INCLUDE_FILTER_POLICY_H_ + +#include + +namespace leveldb { + +class Slice; + +class FilterPolicy { + public: + virtual ~FilterPolicy(); + + // Return the name of this policy. Note that if the filter encoding + // changes in an incompatible way, the name returned by this method + // must be changed. Otherwise, old incompatible filters may be + // passed to methods of this type. + virtual const char* Name() const = 0; + + // keys[0,n-1] contains a list of keys (potentially with duplicates) + // that are ordered according to the user supplied comparator. + // Append a filter that summarizes keys[0,n-1] to *dst. + // + // Warning: do not change the initial contents of *dst. Instead, + // append the newly constructed filter to *dst. + virtual void CreateFilter(const Slice* keys, int n, std::string* dst) + const = 0; + + // "filter" contains the data appended by a preceding call to + // CreateFilter() on this class. This method must return true if + // the key was in the list of keys passed to CreateFilter(). + // This method may return true or false if the key was not on the + // list, but it should aim to return false with a high probability. + virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const = 0; +}; + +// Return a new filter policy that uses a bloom filter with approximately +// the specified number of bits per key. A good value for bits_per_key +// is 10, which yields a filter with ~ 1% false positive rate. +// +// Callers must delete the result after any database that is using the +// result has been closed. +// +// Note: if you are using a custom comparator that ignores some parts +// of the keys being compared, you must not use NewBloomFilterPolicy() +// and must provide your own FilterPolicy that also ignores the +// corresponding parts of the keys. For example, if the comparator +// ignores trailing spaces, it would be incorrect to use a +// FilterPolicy (like NewBloomFilterPolicy) that does not ignore +// trailing spaces in keys. +extern const FilterPolicy* NewBloomFilterPolicy(int bits_per_key); + +} + +#endif // STORAGE_LEVELDB_INCLUDE_FILTER_POLICY_H_ diff --git a/include/leveldb/options.h b/include/leveldb/options.h index 79111a0..fdda718 100644 --- a/include/leveldb/options.h +++ b/include/leveldb/options.h @@ -12,6 +12,7 @@ namespace leveldb { class Cache; class Comparator; class Env; +class FilterPolicy; class Logger; class Snapshot; @@ -127,6 +128,13 @@ struct Options { // efficiently detect that and will switch to uncompressed mode. CompressionType compression; + // If non-NULL, use the specified filter policy to reduce disk reads. + // Many applications will benefit from passing the result of + // NewBloomFilterPolicy() here. + // + // Default: NULL + const FilterPolicy* filter_policy; + // Create an Options object with default values for all fields. Options(); }; diff --git a/include/leveldb/table.h b/include/leveldb/table.h index 0cbdd40..a9746c3 100644 --- a/include/leveldb/table.h +++ b/include/leveldb/table.h @@ -12,9 +12,11 @@ namespace leveldb { class Block; class BlockHandle; +class Footer; struct Options; class RandomAccessFile; struct ReadOptions; +class TableCache; // A Table is a sorted map from strings to strings. Tables are // immutable and persistent. A Table may be safely accessed from @@ -60,6 +62,19 @@ class Table { explicit Table(Rep* rep) { rep_ = rep; } static Iterator* BlockReader(void*, const ReadOptions&, const Slice&); + // Calls (*handle_result)(arg, ...) with the entry found after a call + // to Seek(key). May not make such a call if filter policy says + // that key is not present. + friend class TableCache; + Status InternalGet( + const ReadOptions&, const Slice& key, + void* arg, + void (*handle_result)(void* arg, const Slice& k, const Slice& v)); + + + void ReadMeta(const Footer& footer); + void ReadFilter(const Slice& filter_handle_value); + // No copying allowed Table(const Table&); void operator=(const Table&); diff --git a/include/leveldb/table_builder.h b/include/leveldb/table_builder.h index 9ac0868..5fd1dc7 100644 --- a/include/leveldb/table_builder.h +++ b/include/leveldb/table_builder.h @@ -77,6 +77,7 @@ class TableBuilder { private: bool ok() const { return status().ok(); } void WriteBlock(BlockBuilder* block, BlockHandle* handle); + void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle); struct Rep; Rep* rep_; diff --git a/port/port_android.h b/port/port_android.h index 92f0090..b733388 100644 --- a/port/port_android.h +++ b/port/port_android.h @@ -78,6 +78,9 @@ class CondVar { // On ARM chipsets #include #include "leveldb/comparator.h" +#include "table/format.h" #include "util/coding.h" #include "util/logging.h" @@ -19,10 +20,10 @@ inline uint32_t Block::NumRestarts() const { return DecodeFixed32(data_ + size_ - sizeof(uint32_t)); } -Block::Block(const char* data, size_t size, bool take_ownership) - : data_(data), - size_(size), - owned_(take_ownership) { +Block::Block(const BlockContents& contents) + : data_(contents.data.data()), + size_(contents.data.size()), + owned_(contents.heap_allocated) { if (size_ < sizeof(uint32_t)) { size_ = 0; // Error marker } else { diff --git a/table/block.h b/table/block.h index 76088a4..2493eb9 100644 --- a/table/block.h +++ b/table/block.h @@ -11,14 +11,13 @@ namespace leveldb { +struct BlockContents; class Comparator; class Block { public: // Initialize the block with the specified contents. - // Takes ownership of data[] and will delete[] it when done iff - // "take_ownership is true. - Block(const char* data, size_t size, bool take_ownership); + explicit Block(const BlockContents& contents); ~Block(); diff --git a/table/filter_block.cc b/table/filter_block.cc new file mode 100644 index 0000000..203e15c --- /dev/null +++ b/table/filter_block.cc @@ -0,0 +1,111 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/filter_block.h" + +#include "leveldb/filter_policy.h" +#include "util/coding.h" + +namespace leveldb { + +// See doc/table_format.txt for an explanation of the filter block format. + +// Generate new filter every 2KB of data +static const size_t kFilterBaseLg = 11; +static const size_t kFilterBase = 1 << kFilterBaseLg; + +FilterBlockBuilder::FilterBlockBuilder(const FilterPolicy* policy) + : policy_(policy) { +} + +void FilterBlockBuilder::StartBlock(uint64_t block_offset) { + uint64_t filter_index = (block_offset / kFilterBase); + assert(filter_index >= filter_offsets_.size()); + while (filter_index > filter_offsets_.size()) { + GenerateFilter(); + } +} + +void FilterBlockBuilder::AddKey(const Slice& key) { + Slice k = key; + start_.push_back(keys_.size()); + keys_.append(k.data(), k.size()); +} + +Slice FilterBlockBuilder::Finish() { + if (!start_.empty()) { + GenerateFilter(); + } + + // Append array of per-filter offsets + const uint32_t array_offset = result_.size(); + for (size_t i = 0; i < filter_offsets_.size(); i++) { + PutFixed32(&result_, filter_offsets_[i]); + } + + PutFixed32(&result_, array_offset); + result_.push_back(kFilterBaseLg); // Save encoding parameter in result + return Slice(result_); +} + +void FilterBlockBuilder::GenerateFilter() { + const size_t num_keys = start_.size(); + if (num_keys == 0) { + // Fast path if there are no keys for this filter + filter_offsets_.push_back(result_.size()); + return; + } + + // Make list of keys from flattened key structure + start_.push_back(keys_.size()); // Simplify length computation + tmp_keys_.resize(num_keys); + for (size_t i = 0; i < num_keys; i++) { + const char* base = keys_.data() + start_[i]; + size_t length = start_[i+1] - start_[i]; + tmp_keys_[i] = Slice(base, length); + } + + // Generate filter for current set of keys and append to result_. + filter_offsets_.push_back(result_.size()); + policy_->CreateFilter(&tmp_keys_[0], num_keys, &result_); + + tmp_keys_.clear(); + keys_.clear(); + start_.clear(); +} + +FilterBlockReader::FilterBlockReader(const FilterPolicy* policy, + const Slice& contents) + : policy_(policy), + data_(NULL), + offset_(NULL), + num_(0), + base_lg_(0) { + size_t n = contents.size(); + if (n < 5) return; // 1 byte for base_lg_ and 4 for start of offset array + base_lg_ = contents[n-1]; + uint32_t last_word = DecodeFixed32(contents.data() + n - 5); + if (last_word > n - 5) return; + data_ = contents.data(); + offset_ = data_ + last_word; + num_ = (n - 5 - last_word) / 4; +} + +bool FilterBlockReader::KeyMayMatch(uint64_t block_offset, const Slice& key) { + uint64_t index = block_offset >> base_lg_; + if (index < num_) { + uint32_t start = DecodeFixed32(offset_ + index*4); + uint32_t limit = DecodeFixed32(offset_ + index*4 + 4); + if (start <= limit && limit <= (offset_ - data_)) { + Slice filter = Slice(data_ + start, limit - start); + return policy_->KeyMayMatch(key, filter); + } else if (start == limit) { + // Empty filters do not match any keys + return false; + } + } + return true; // Errors are treated as potential matches +} + +} diff --git a/table/filter_block.h b/table/filter_block.h new file mode 100644 index 0000000..c67d010 --- /dev/null +++ b/table/filter_block.h @@ -0,0 +1,68 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// A filter block is stored near the end of a Table file. It contains +// filters (e.g., bloom filters) for all data blocks in the table combined +// into a single filter block. + +#ifndef STORAGE_LEVELDB_TABLE_FILTER_BLOCK_H_ +#define STORAGE_LEVELDB_TABLE_FILTER_BLOCK_H_ + +#include +#include +#include +#include +#include "leveldb/slice.h" +#include "util/hash.h" + +namespace leveldb { + +class FilterPolicy; + +// A FilterBlockBuilder is used to construct all of the filters for a +// particular Table. It generates a single string which is stored as +// a special block in the Table. +// +// The sequence of calls to FilterBlockBuilder must match the regexp: +// (StartBlock AddKey*)* Finish +class FilterBlockBuilder { + public: + explicit FilterBlockBuilder(const FilterPolicy*); + + void StartBlock(uint64_t block_offset); + void AddKey(const Slice& key); + Slice Finish(); + + private: + void GenerateFilter(); + + const FilterPolicy* policy_; + std::string keys_; // Flattened key contents + std::vector start_; // Starting index in keys_ of each key + std::string result_; // Filter data computed so far + std::vector tmp_keys_; // policy_->CreateFilter() argument + std::vector filter_offsets_; + + // No copying allowed + FilterBlockBuilder(const FilterBlockBuilder&); + void operator=(const FilterBlockBuilder&); +}; + +class FilterBlockReader { + public: + // REQUIRES: "contents" and *policy must stay live while *this is live. + FilterBlockReader(const FilterPolicy* policy, const Slice& contents); + bool KeyMayMatch(uint64_t block_offset, const Slice& key); + + private: + const FilterPolicy* policy_; + const char* data_; // Pointer to filter data (at block-start) + const char* offset_; // Pointer to beginning of offset array (at block-end) + size_t num_; // Number of entries in offset array + size_t base_lg_; // Encoding parameter (see kFilterBaseLg in .cc file) +}; + +} + +#endif // STORAGE_LEVELDB_TABLE_FILTER_BLOCK_H_ diff --git a/table/filter_block_test.cc b/table/filter_block_test.cc new file mode 100644 index 0000000..3a2a07c --- /dev/null +++ b/table/filter_block_test.cc @@ -0,0 +1,128 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "table/filter_block.h" + +#include "leveldb/filter_policy.h" +#include "util/coding.h" +#include "util/hash.h" +#include "util/logging.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace leveldb { + +// For testing: emit an array with one hash value per key +class TestHashFilter : public FilterPolicy { + public: + virtual const char* Name() const { + return "TestHashFilter"; + } + + virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const { + for (int i = 0; i < n; i++) { + uint32_t h = Hash(keys[i].data(), keys[i].size(), 1); + PutFixed32(dst, h); + } + } + + virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const { + uint32_t h = Hash(key.data(), key.size(), 1); + for (int i = 0; i + 4 <= filter.size(); i += 4) { + if (h == DecodeFixed32(filter.data() + i)) { + return true; + } + } + return false; + } +}; + +class FilterBlockTest { + public: + TestHashFilter policy_; +}; + +TEST(FilterBlockTest, EmptyBuilder) { + FilterBlockBuilder builder(&policy_); + Slice block = builder.Finish(); + ASSERT_EQ("\\x00\\x00\\x00\\x00\\x0b", EscapeString(block)); + FilterBlockReader reader(&policy_, block); + ASSERT_TRUE(reader.KeyMayMatch(0, "foo")); + ASSERT_TRUE(reader.KeyMayMatch(100000, "foo")); +} + +TEST(FilterBlockTest, SingleChunk) { + FilterBlockBuilder builder(&policy_); + builder.StartBlock(100); + builder.AddKey("foo"); + builder.AddKey("bar"); + builder.AddKey("box"); + builder.StartBlock(200); + builder.AddKey("box"); + builder.StartBlock(300); + builder.AddKey("hello"); + Slice block = builder.Finish(); + FilterBlockReader reader(&policy_, block); + ASSERT_TRUE(reader.KeyMayMatch(100, "foo")); + ASSERT_TRUE(reader.KeyMayMatch(100, "bar")); + ASSERT_TRUE(reader.KeyMayMatch(100, "box")); + ASSERT_TRUE(reader.KeyMayMatch(100, "hello")); + ASSERT_TRUE(reader.KeyMayMatch(100, "foo")); + ASSERT_TRUE(! reader.KeyMayMatch(100, "missing")); + ASSERT_TRUE(! reader.KeyMayMatch(100, "other")); +} + +TEST(FilterBlockTest, MultiChunk) { + FilterBlockBuilder builder(&policy_); + + // First filter + builder.StartBlock(0); + builder.AddKey("foo"); + builder.StartBlock(2000); + builder.AddKey("bar"); + + // Second filter + builder.StartBlock(3100); + builder.AddKey("box"); + + // Third filter is empty + + // Last filter + builder.StartBlock(9000); + builder.AddKey("box"); + builder.AddKey("hello"); + + Slice block = builder.Finish(); + FilterBlockReader reader(&policy_, block); + + // Check first filter + ASSERT_TRUE(reader.KeyMayMatch(0, "foo")); + ASSERT_TRUE(reader.KeyMayMatch(2000, "bar")); + ASSERT_TRUE(! reader.KeyMayMatch(0, "box")); + ASSERT_TRUE(! reader.KeyMayMatch(0, "hello")); + + // Check second filter + ASSERT_TRUE(reader.KeyMayMatch(3100, "box")); + ASSERT_TRUE(! reader.KeyMayMatch(3100, "foo")); + ASSERT_TRUE(! reader.KeyMayMatch(3100, "bar")); + ASSERT_TRUE(! reader.KeyMayMatch(3100, "hello")); + + // Check third filter (empty) + ASSERT_TRUE(! reader.KeyMayMatch(4100, "foo")); + ASSERT_TRUE(! reader.KeyMayMatch(4100, "bar")); + ASSERT_TRUE(! reader.KeyMayMatch(4100, "box")); + ASSERT_TRUE(! reader.KeyMayMatch(4100, "hello")); + + // Check last filter + ASSERT_TRUE(reader.KeyMayMatch(9000, "box")); + ASSERT_TRUE(reader.KeyMayMatch(9000, "hello")); + ASSERT_TRUE(! reader.KeyMayMatch(9000, "foo")); + ASSERT_TRUE(! reader.KeyMayMatch(9000, "bar")); +} + +} // namespace leveldb + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} diff --git a/table/format.cc b/table/format.cc index 25b85a2..cda1dec 100644 --- a/table/format.cc +++ b/table/format.cc @@ -66,10 +66,10 @@ Status Footer::DecodeFrom(Slice* input) { Status ReadBlock(RandomAccessFile* file, const ReadOptions& options, const BlockHandle& handle, - Block** block, - bool* may_cache) { - *block = NULL; - *may_cache = false; + BlockContents* result) { + result->data = Slice(); + result->cachable = false; + result->heap_allocated = false; // Read the block contents as well as the type/crc footer. // See table_builder.cc for the code that built this structure. @@ -105,11 +105,13 @@ Status ReadBlock(RandomAccessFile* file, // Use it directly under the assumption that it will be live // while the file is open. delete[] buf; - *block = new Block(data, n, false /* do not take ownership */); - *may_cache = false; // Do not double-cache + result->data = Slice(data, n); + result->heap_allocated = false; + result->cachable = false; // Do not double-cache } else { - *block = new Block(buf, n, true /* take ownership */); - *may_cache = true; + result->data = Slice(buf, n); + result->heap_allocated = true; + result->cachable = true; } // Ok @@ -127,8 +129,9 @@ Status ReadBlock(RandomAccessFile* file, return Status::Corruption("corrupted compressed block contents"); } delete[] buf; - *block = new Block(ubuf, ulength, true /* take ownership */); - *may_cache = true; + result->data = Slice(ubuf, ulength); + result->heap_allocated = true; + result->cachable = true; break; } default: diff --git a/table/format.h b/table/format.h index 66a15da..6c0b80c 100644 --- a/table/format.h +++ b/table/format.h @@ -83,16 +83,18 @@ static const uint64_t kTableMagicNumber = 0xdb4775248b80fb57ull; // 1-byte type + 32-bit crc static const size_t kBlockTrailerSize = 5; -// Read the block identified by "handle" from "file". On success, -// store a pointer to the heap-allocated result in *block and return -// OK. On failure store NULL in *block and return non-OK. -// On success, stores true in *may_cache if the result may be -// cached, false if it must not be cached. +struct BlockContents { + Slice data; // Actual contents of data + bool cachable; // True iff data can be cached + bool heap_allocated; // True iff caller should delete[] data.data() +}; + +// Read the block identified by "handle" from "file". On failure +// return non-OK. On success fill *result and return OK. extern Status ReadBlock(RandomAccessFile* file, const ReadOptions& options, const BlockHandle& handle, - Block** block, - bool* may_cache); + BlockContents* result); // Implementation details follow. Clients should ignore, diff --git a/table/table.cc b/table/table.cc index 07dcffd..dbd6d3a 100644 --- a/table/table.cc +++ b/table/table.cc @@ -5,8 +5,12 @@ #include "leveldb/table.h" #include "leveldb/cache.h" +#include "leveldb/comparator.h" #include "leveldb/env.h" +#include "leveldb/filter_policy.h" +#include "leveldb/options.h" #include "table/block.h" +#include "table/filter_block.h" #include "table/format.h" #include "table/two_level_iterator.h" #include "util/coding.h" @@ -15,6 +19,8 @@ namespace leveldb { struct Table::Rep { ~Rep() { + delete filter; + delete [] filter_data; delete index_block; } @@ -22,6 +28,8 @@ struct Table::Rep { Status status; RandomAccessFile* file; uint64_t cache_id; + FilterBlockReader* filter; + const char* filter_data; BlockHandle metaindex_handle; // Handle to metaindex_block: saved from footer Block* index_block; @@ -47,11 +55,13 @@ Status Table::Open(const Options& options, if (!s.ok()) return s; // Read the index block + BlockContents contents; Block* index_block = NULL; if (s.ok()) { - bool may_cache; // Ignored result - s = ReadBlock(file, ReadOptions(), footer.index_handle(), &index_block, - &may_cache); + s = ReadBlock(file, ReadOptions(), footer.index_handle(), &contents); + if (s.ok()) { + index_block = new Block(contents); + } } if (s.ok()) { @@ -63,7 +73,10 @@ Status Table::Open(const Options& options, rep->metaindex_handle = footer.metaindex_handle(); rep->index_block = index_block; rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0); + rep->filter_data = NULL; + rep->filter = NULL; *table = new Table(rep); + (*table)->ReadMeta(footer); } else { if (index_block) delete index_block; } @@ -71,6 +84,52 @@ Status Table::Open(const Options& options, return s; } +void Table::ReadMeta(const Footer& footer) { + if (rep_->options.filter_policy == NULL) { + return; // Do not need any metadata + } + + // TODO(sanjay): Skip this if footer.metaindex_handle() size indicates + // it is an empty block. + ReadOptions opt; + BlockContents contents; + if (!ReadBlock(rep_->file, opt, footer.metaindex_handle(), &contents).ok()) { + // Do not propagate errors since meta info is not needed for operation + return; + } + Block* meta = new Block(contents); + + Iterator* iter = meta->NewIterator(BytewiseComparator()); + std::string key = "filter."; + key.append(rep_->options.filter_policy->Name()); + iter->Seek(key); + if (iter->Valid() && iter->key() == Slice(key)) { + ReadFilter(iter->value()); + } + delete iter; + delete meta; +} + +void Table::ReadFilter(const Slice& filter_handle_value) { + Slice v = filter_handle_value; + BlockHandle filter_handle; + if (!filter_handle.DecodeFrom(&v).ok()) { + return; + } + + // We might want to unify with ReadBlock() if we start + // requiring checksum verification in Table::Open. + ReadOptions opt; + BlockContents block; + if (!ReadBlock(rep_->file, opt, filter_handle, &block).ok()) { + return; + } + if (block.heap_allocated) { + rep_->filter_data = block.data.data(); // Will need to delete later + } + rep_->filter = new FilterBlockReader(rep_->options.filter_policy, block.data); +} + Table::~Table() { delete rep_; } @@ -107,7 +166,7 @@ Iterator* Table::BlockReader(void* arg, // can add more features in the future. if (s.ok()) { - bool may_cache; + BlockContents contents; if (block_cache != NULL) { char cache_key_buffer[16]; EncodeFixed64(cache_key_buffer, table->rep_->cache_id); @@ -117,14 +176,20 @@ Iterator* Table::BlockReader(void* arg, if (cache_handle != NULL) { block = reinterpret_cast(block_cache->Value(cache_handle)); } else { - s = ReadBlock(table->rep_->file, options, handle, &block, &may_cache); - if (s.ok() && may_cache && options.fill_cache) { - cache_handle = block_cache->Insert( - key, block, block->size(), &DeleteCachedBlock); + s = ReadBlock(table->rep_->file, options, handle, &contents); + if (s.ok()) { + block = new Block(contents); + if (contents.cachable && options.fill_cache) { + cache_handle = block_cache->Insert( + key, block, block->size(), &DeleteCachedBlock); + } } } } else { - s = ReadBlock(table->rep_->file, options, handle, &block, &may_cache); + s = ReadBlock(table->rep_->file, options, handle, &contents); + if (s.ok()) { + block = new Block(contents); + } } } @@ -148,6 +213,39 @@ Iterator* Table::NewIterator(const ReadOptions& options) const { &Table::BlockReader, const_cast(this), options); } +Status Table::InternalGet(const ReadOptions& options, const Slice& k, + void* arg, + void (*saver)(void*, const Slice&, const Slice&)) { + Status s; + Iterator* iiter = rep_->index_block->NewIterator(rep_->options.comparator); + iiter->Seek(k); + if (iiter->Valid()) { + Slice handle_value = iiter->value(); + FilterBlockReader* filter = rep_->filter; + BlockHandle handle; + if (filter != NULL && + handle.DecodeFrom(&handle_value).ok() && + !filter->KeyMayMatch(handle.offset(), k)) { + // Not found + } else { + Slice handle = iiter->value(); + Iterator* block_iter = BlockReader(this, options, iiter->value()); + block_iter->Seek(k); + if (block_iter->Valid()) { + (*saver)(arg, block_iter->key(), block_iter->value()); + } + s = block_iter->status(); + delete block_iter; + } + } + if (s.ok()) { + s = iiter->status(); + } + delete iiter; + return s; +} + + uint64_t Table::ApproximateOffsetOf(const Slice& key) const { Iterator* index_iter = rep_->index_block->NewIterator(rep_->options.comparator); diff --git a/table/table_builder.cc b/table/table_builder.cc index 682ce5b..62002c8 100644 --- a/table/table_builder.cc +++ b/table/table_builder.cc @@ -5,14 +5,15 @@ #include "leveldb/table_builder.h" #include -#include #include "leveldb/comparator.h" #include "leveldb/env.h" +#include "leveldb/filter_policy.h" +#include "leveldb/options.h" #include "table/block_builder.h" +#include "table/filter_block.h" #include "table/format.h" #include "util/coding.h" #include "util/crc32c.h" -#include "util/logging.h" namespace leveldb { @@ -27,6 +28,7 @@ struct TableBuilder::Rep { std::string last_key; int64_t num_entries; bool closed; // Either Finish() or Abandon() has been called. + FilterBlockBuilder* filter_block; // We do not emit the index entry for a block until we have seen the // first key for the next data block. This allows us to use shorter @@ -51,6 +53,8 @@ struct TableBuilder::Rep { index_block(&index_block_options), num_entries(0), closed(false), + filter_block(opt.filter_policy == NULL ? NULL + : new FilterBlockBuilder(opt.filter_policy)), pending_index_entry(false) { index_block_options.block_restart_interval = 1; } @@ -58,10 +62,14 @@ struct TableBuilder::Rep { TableBuilder::TableBuilder(const Options& options, WritableFile* file) : rep_(new Rep(options, file)) { + if (rep_->filter_block != NULL) { + rep_->filter_block->StartBlock(0); + } } TableBuilder::~TableBuilder() { assert(rep_->closed); // Catch errors where caller forgot to call Finish() + delete rep_->filter_block; delete rep_; } @@ -98,6 +106,10 @@ void TableBuilder::Add(const Slice& key, const Slice& value) { r->pending_index_entry = false; } + if (r->filter_block != NULL) { + r->filter_block->AddKey(key); + } + r->last_key.assign(key.data(), key.size()); r->num_entries++; r->data_block.Add(key, value); @@ -119,6 +131,9 @@ void TableBuilder::Flush() { r->pending_index_entry = true; r->status = r->file->Flush(); } + if (r->filter_block != NULL) { + r->filter_block->StartBlock(r->offset); + } } void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { @@ -152,6 +167,15 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { break; } } + WriteRawBlock(block_contents, type, handle); + r->compressed_output.clear(); + block->Reset(); +} + +void TableBuilder::WriteRawBlock(const Slice& block_contents, + CompressionType type, + BlockHandle* handle) { + Rep* r = rep_; handle->set_offset(r->offset); handle->set_size(block_contents.size()); r->status = r->file->Append(block_contents); @@ -166,8 +190,6 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) { r->offset += block_contents.size() + kBlockTrailerSize; } } - r->compressed_output.clear(); - block->Reset(); } Status TableBuilder::status() const { @@ -179,13 +201,32 @@ Status TableBuilder::Finish() { Flush(); assert(!r->closed); r->closed = true; - BlockHandle metaindex_block_handle; - BlockHandle index_block_handle; + + BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle; + + // Write filter block + if (ok() && r->filter_block != NULL) { + WriteRawBlock(r->filter_block->Finish(), kNoCompression, + &filter_block_handle); + } + + // Write metaindex block if (ok()) { BlockBuilder meta_index_block(&r->options); + if (r->filter_block != NULL) { + // Add mapping from "filter.Name" to location of filter data + std::string key = "filter."; + key.append(r->options.filter_policy->Name()); + std::string handle_encoding; + filter_block_handle.EncodeTo(&handle_encoding); + meta_index_block.Add(key, handle_encoding); + } + // TODO(postrelease): Add stats and other meta blocks WriteBlock(&meta_index_block, &metaindex_block_handle); } + + // Write index block if (ok()) { if (r->pending_index_entry) { r->options.comparator->FindShortSuccessor(&r->last_key); @@ -196,6 +237,8 @@ Status TableBuilder::Finish() { } WriteBlock(&r->index_block, &index_block_handle); } + + // Write footer if (ok()) { Footer footer; footer.set_metaindex_handle(metaindex_block_handle); diff --git a/table/table_test.cc b/table/table_test.cc index 0c8e676..57cea25 100644 --- a/table/table_test.cc +++ b/table/table_test.cc @@ -168,8 +168,6 @@ class Constructor { // Construct the data structure from the data in "data" virtual Status FinishImpl(const Options& options, const KVMap& data) = 0; - virtual size_t NumBytes() const = 0; - virtual Iterator* NewIterator() const = 0; virtual const KVMap& data() { return data_; } @@ -185,7 +183,6 @@ class BlockConstructor: public Constructor { explicit BlockConstructor(const Comparator* cmp) : Constructor(cmp), comparator_(cmp), - block_size_(-1), block_(NULL) { } ~BlockConstructor() { delete block_; @@ -201,22 +198,21 @@ class BlockConstructor: public Constructor { builder.Add(it->first, it->second); } // Open the block - Slice block_data = builder.Finish(); - block_size_ = block_data.size(); - char* block_data_copy = new char[block_size_]; - memcpy(block_data_copy, block_data.data(), block_size_); - block_ = new Block(block_data_copy, block_size_, true /* take ownership */); + data_ = builder.Finish().ToString(); + BlockContents contents; + contents.data = data_; + contents.cachable = false; + contents.heap_allocated = false; + block_ = new Block(contents); return Status::OK(); } - virtual size_t NumBytes() const { return block_size_; } - virtual Iterator* NewIterator() const { return block_->NewIterator(comparator_); } private: const Comparator* comparator_; - int block_size_; + std::string data_; Block* block_; BlockConstructor(); @@ -253,7 +249,6 @@ class TableConstructor: public Constructor { table_options.comparator = options.comparator; return Table::Open(table_options, source_, sink.contents().size(), &table_); } - virtual size_t NumBytes() const { return source_->Size(); } virtual Iterator* NewIterator() const { return table_->NewIterator(ReadOptions()); @@ -342,10 +337,6 @@ class MemTableConstructor: public Constructor { } return Status::OK(); } - virtual size_t NumBytes() const { - return memtable_->ApproximateMemoryUsage(); - } - virtual Iterator* NewIterator() const { return new KeyConvertingIterator(memtable_->NewIterator()); } @@ -379,13 +370,6 @@ class DBConstructor: public Constructor { } return Status::OK(); } - virtual size_t NumBytes() const { - Range r("", "\xff\xff"); - uint64_t size; - db_->GetApproximateSizes(&r, 1, &size); - return size; - } - virtual Iterator* NewIterator() const { return db_->NewIterator(ReadOptions()); } @@ -809,7 +793,7 @@ TEST(TableTest, ApproximateOffsetOfPlain) { ASSERT_TRUE(Between(c.ApproximateOffsetOf("k05"), 210000, 211000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k06"), 510000, 511000)); ASSERT_TRUE(Between(c.ApproximateOffsetOf("k07"), 510000, 511000)); - ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 611000)); + ASSERT_TRUE(Between(c.ApproximateOffsetOf("xyz"), 610000, 612000)); } diff --git a/util/bloom.cc b/util/bloom.cc new file mode 100644 index 0000000..d7941cd --- /dev/null +++ b/util/bloom.cc @@ -0,0 +1,95 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "leveldb/filter_policy.h" + +#include "leveldb/slice.h" +#include "util/hash.h" + +namespace leveldb { + +namespace { +static uint32_t BloomHash(const Slice& key) { + return Hash(key.data(), key.size(), 0xbc9f1d34); +} + +class BloomFilterPolicy : public FilterPolicy { + private: + size_t bits_per_key_; + size_t k_; + + public: + explicit BloomFilterPolicy(int bits_per_key) + : bits_per_key_(bits_per_key) { + // We intentionally round down to reduce probing cost a little bit + k_ = static_cast(bits_per_key * 0.69); // 0.69 =~ ln(2) + if (k_ < 1) k_ = 1; + if (k_ > 30) k_ = 30; + } + + virtual const char* Name() const { + return "leveldb.BuiltinBloomFilter"; + } + + virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const { + // Compute bloom filter size (in both bits and bytes) + size_t bits = n * bits_per_key_; + + // For small n, we can see a very high false positive rate. Fix it + // by enforcing a minimum bloom filter length. + if (bits < 64) bits = 64; + + size_t bytes = (bits + 7) / 8; + bits = bytes * 8; + + const size_t init_size = dst->size(); + dst->resize(init_size + bytes, 0); + dst->push_back(static_cast(k_)); // Remember # of probes in filter + char* array = &(*dst)[init_size]; + for (size_t i = 0; i < n; i++) { + // Use double-hashing to generate a sequence of hash values. + // See analysis in [Kirsch,Mitzenmacher 2006]. + uint32_t h = BloomHash(keys[i]); + const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits + for (size_t j = 0; j < k_; j++) { + const uint32_t bitpos = h % bits; + array[bitpos/8] |= (1 << (bitpos % 8)); + h += delta; + } + } + } + + virtual bool KeyMayMatch(const Slice& key, const Slice& bloom_filter) const { + const size_t len = bloom_filter.size(); + if (len < 2) return false; + + const char* array = bloom_filter.data(); + const size_t bits = (len - 1) * 8; + + // Use the encoded k so that we can read filters generated by + // bloom filters created using different parameters. + const size_t k = array[len-1]; + if (k > 30) { + // Reserved for potentially new encodings for short bloom filters. + // Consider it a match. + return true; + } + + uint32_t h = BloomHash(key); + const uint32_t delta = (h >> 17) | (h << 15); // Rotate right 17 bits + for (size_t j = 0; j < k; j++) { + const uint32_t bitpos = h % bits; + if ((array[bitpos/8] & (1 << (bitpos % 8))) == 0) return false; + h += delta; + } + return true; + } +}; +} + +const FilterPolicy* NewBloomFilterPolicy(int bits_per_key) { + return new BloomFilterPolicy(bits_per_key); +} + +} // namespace leveldb diff --git a/util/bloom_test.cc b/util/bloom_test.cc new file mode 100644 index 0000000..4a6ea1b --- /dev/null +++ b/util/bloom_test.cc @@ -0,0 +1,159 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "leveldb/filter_policy.h" + +#include "util/logging.h" +#include "util/testharness.h" +#include "util/testutil.h" + +namespace leveldb { + +static const int kVerbose = 1; + +static Slice Key(int i, char* buffer) { + memcpy(buffer, &i, sizeof(i)); + return Slice(buffer, sizeof(i)); +} + +class BloomTest { + private: + const FilterPolicy* policy_; + std::string filter_; + std::vector keys_; + + public: + BloomTest() : policy_(NewBloomFilterPolicy(10)) { } + + ~BloomTest() { + delete policy_; + } + + void Reset() { + keys_.clear(); + filter_.clear(); + } + + void Add(const Slice& s) { + keys_.push_back(s.ToString()); + } + + void Build() { + std::vector key_slices; + for (size_t i = 0; i < keys_.size(); i++) { + key_slices.push_back(Slice(keys_[i])); + } + filter_.clear(); + policy_->CreateFilter(&key_slices[0], key_slices.size(), &filter_); + keys_.clear(); + if (kVerbose >= 2) DumpFilter(); + } + + size_t FilterSize() const { + return filter_.size(); + } + + void DumpFilter() { + fprintf(stderr, "F("); + for (size_t i = 0; i+1 < filter_.size(); i++) { + const unsigned int c = static_cast(filter_[i]); + for (int j = 0; j < 8; j++) { + fprintf(stderr, "%c", (c & (1 <KeyMayMatch(s, filter_); + } + + double FalsePositiveRate() { + char buffer[sizeof(int)]; + int result = 0; + for (int i = 0; i < 10000; i++) { + if (Matches(Key(i + 1000000000, buffer))) { + result++; + } + } + return result / 10000.0; + } +}; + +TEST(BloomTest, EmptyFilter) { + ASSERT_TRUE(! Matches("hello")); + ASSERT_TRUE(! Matches("world")); +} + +TEST(BloomTest, Small) { + Add("hello"); + Add("world"); + ASSERT_TRUE(Matches("hello")); + ASSERT_TRUE(Matches("world")); + ASSERT_TRUE(! Matches("x")); + ASSERT_TRUE(! Matches("foo")); +} + +static int NextLength(int length) { + if (length < 10) { + length += 1; + } else if (length < 100) { + length += 10; + } else if (length < 1000) { + length += 100; + } else { + length += 1000; + } + return length; +} + +TEST(BloomTest, VaryingLengths) { + char buffer[sizeof(int)]; + + // Count number of filters that significantly exceed the false positive rate + int mediocre_filters = 0; + int good_filters = 0; + + for (int length = 1; length <= 10000; length = NextLength(length)) { + Reset(); + for (int i = 0; i < length; i++) { + Add(Key(i, buffer)); + } + Build(); + + ASSERT_LE(FilterSize(), (length * 10 / 8) + 40) << length; + + // All added keys must match + for (int i = 0; i < length; i++) { + ASSERT_TRUE(Matches(Key(i, buffer))) + << "Length " << length << "; key " << i; + } + + // Check false positive rate + double rate = FalsePositiveRate(); + if (kVerbose >= 1) { + fprintf(stderr, "False positives: %5.2f%% @ length = %6d ; bytes = %6d\n", + rate*100.0, length, static_cast(FilterSize())); + } + ASSERT_LE(rate, 0.02); // Must not be over 2% + if (rate > 0.0125) mediocre_filters++; // Allowed, but not too often + else good_filters++; + } + if (kVerbose >= 1) { + fprintf(stderr, "Filters: %d good, %d mediocre\n", + good_filters, mediocre_filters); + } + ASSERT_LE(mediocre_filters, good_filters/5); +} + +// Different bits-per-byte + +} // namespace leveldb + +int main(int argc, char** argv) { + return leveldb::test::RunAllTests(); +} diff --git a/util/filter_policy.cc b/util/filter_policy.cc new file mode 100644 index 0000000..7b045c8 --- /dev/null +++ b/util/filter_policy.cc @@ -0,0 +1,11 @@ +// Copyright (c) 2012 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "leveldb/filter_policy.h" + +namespace leveldb { + +FilterPolicy::~FilterPolicy() { } + +} // namespace leveldb diff --git a/util/options.cc b/util/options.cc index bb97838..76af5b9 100644 --- a/util/options.cc +++ b/util/options.cc @@ -21,7 +21,8 @@ Options::Options() block_cache(NULL), block_size(4096), block_restart_interval(16), - compression(kSnappyCompression) { + compression(kSnappyCompression), + filter_policy(NULL) { }