From 740d8b3d009462eed28a94ea516d667e735b0dfc Mon Sep 17 00:00:00 2001 From: "dgrogan@chromium.org" Date: Sat, 28 May 2011 00:53:58 +0000 Subject: [PATCH] Update from upstream @21551990 * Patch LevelDB to build for OSX and iOS * Fix race condition in memtable iterator deletion. * Other small fixes. git-svn-id: https://leveldb.googlecode.com/svn/trunk@29 62dab493-f737-651d-591e-8d6aee1b9529 --- Makefile | 45 ++++++++++++-- db/db_bench.cc | 20 +++++++ db/db_impl.cc | 36 ++++++++--- db/db_impl.h | 4 -- db/db_test.cc | 95 +++++++++++++++++++++++++++++ db/log_test.cc | 2 +- db/memtable.cc | 31 ++++------ port/port.h | 2 + port/port_osx.cc | 50 ++++++++++++++++ port/port_osx.h | 125 +++++++++++++++++++++++++++++++++++++++ table/iterator.cc | 1 - table/iterator_wrapper.h | 11 ++-- util/cache.cc | 2 + 13 files changed, 380 insertions(+), 44 deletions(-) create mode 100644 port/port_osx.cc create mode 100644 port/port_osx.h diff --git a/Makefile b/Makefile index 43ac23d..5eadd72 100644 --- a/Makefile +++ b/Makefile @@ -8,7 +8,21 @@ CC = g++ #OPT = -O2 -DNDEBUG OPT = -g2 -CFLAGS = -c -DLEVELDB_PLATFORM_POSIX -I. -I./include -std=c++0x $(OPT) +UNAME := $(shell uname) + +ifeq ($(UNAME), Darwin) +# To build for iOS, set PLATFORM=IOS. +ifndef PLATFORM +PLATFORM=OSX +endif # PLATFORM +PLATFORM_CFLAGS = -DLEVELDB_PLATFORM_OSX +PORT_MODULE = port_osx.o +else # UNAME +PLATFORM_CFLAGS = -DLEVELDB_PLATFORM_POSIX -std=c++0x +PORT_MODULE = port_posix.o +endif # UNAME + +CFLAGS = -c -I. -I./include $(PLATFORM_CFLAGS) $(OPT) LDFLAGS=-lpthread @@ -26,7 +40,7 @@ LIBOBJECTS = \ ./db/version_edit.o \ ./db/version_set.o \ ./db/write_batch.o \ - ./port/port_posix.o \ + ./port/$(PORT_MODULE) \ ./table/block.o \ ./table/block_builder.o \ ./table/format.o \ @@ -69,13 +83,25 @@ TESTS = \ PROGRAMS = db_bench $(TESTS) -all: $(PROGRAMS) +LIBRARY = libleveldb.a + +ifeq ($(PLATFORM), IOS) +# Only XCode can build executable applications for iOS. +all: $(LIBRARY) +else +all: $(PROGRAMS) $(LIBRARY) +endif check: $(TESTS) for t in $(TESTS); do echo "***** Running $$t"; ./$$t || exit 1; done clean: - rm -f $(PROGRAMS) */*.o + -rm -f $(PROGRAMS) $(LIBRARY) */*.o ios-x86/*/*.o ios-arm/*/*.o + -rmdir -p ios-x86/* ios-arm/* + +$(LIBRARY): $(LIBOBJECTS) + rm -f $@ + $(AR) -rs $@ $(LIBOBJECTS) db_bench: db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) $(CC) $(LDFLAGS) db/db_bench.o $(LIBOBJECTS) $(TESTUTIL) -o $@ @@ -122,8 +148,19 @@ version_edit_test: db/version_edit_test.o $(LIBOBJECTS) $(TESTHARNESS) write_batch_test: db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) $(CC) $(LDFLAGS) db/write_batch_test.o $(LIBOBJECTS) $(TESTHARNESS) -o $@ +ifeq ($(PLATFORM), IOS) +# For iOS, create universal object files to be used on both the simulator and +# a device. +.cc.o: + mkdir -p ios-x86/$(dir $@) + $(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneSimulator.platform/Developer/SDKs/iPhoneSimulator4.3.sdk -arch i686 $< -o ios-x86/$@ + mkdir -p ios-arm/$(dir $@) + $(CC) $(CFLAGS) -isysroot /Developer/Platforms/iPhoneOS.platform/Developer/SDKs/iPhoneOS4.3.sdk -arch armv6 -arch armv7 $< -o ios-arm/$@ + lipo ios-x86/$@ ios-arm/$@ -create -output $@ +else .cc.o: $(CC) $(CFLAGS) $< -o $@ +endif # TODO(gabor): dependencies for .o files # TODO(gabor): Build library diff --git a/db/db_bench.cc b/db/db_bench.cc index b5fd679..b24179d 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -29,6 +29,7 @@ // readrandom -- read N times in random order // readhot -- read N times in random order from 1% section of DB // crc32c -- repeated crc32c of 4K of data +// acquireload -- load N*1000 times // Meta operations: // compact -- Compact the entire DB // stats -- Print DB stats @@ -50,6 +51,7 @@ static const char* FLAGS_benchmarks = "crc32c," "snappycomp," "snappyuncomp," + "acquireload," ; // Number of key/values to place in database @@ -382,6 +384,8 @@ class Benchmark { Compact(); } else if (name == Slice("crc32c")) { Crc32c(4096, "(4K per op)"); + } else if (name == Slice("acquireload")) { + AcquireLoad(); } else if (name == Slice("snappycomp")) { SnappyCompress(); } else if (name == Slice("snappyuncomp")) { @@ -420,6 +424,22 @@ class Benchmark { message_ = label; } + void AcquireLoad() { + int dummy; + port::AtomicPointer ap(&dummy); + int count = 0; + void *ptr = NULL; + message_ = "(each op is 1000 loads)"; + while (count < 100000) { + for (int i = 0; i < 1000; i++) { + ptr = ap.Acquire_Load(); + } + count++; + FinishedSingleOp(); + } + if (ptr == NULL) exit(1); // Disable unused variable warning. + } + void SnappyCompress() { Slice input = gen_.Generate(Options().block_size); int64_t bytes = 0; diff --git a/db/db_impl.cc b/db/db_impl.cc index baf9299..9b139ce 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -875,22 +875,49 @@ Status DBImpl::DoCompactionWork(CompactionState* compact) { return status; } +namespace { +struct IterState { + port::Mutex* mu; + Version* version; + MemTable* mem; + MemTable* imm; +}; + +static void CleanupIteratorState(void* arg1, void* arg2) { + IterState* state = reinterpret_cast(arg1); + state->mu->Lock(); + state->mem->Unref(); + if (state->imm != NULL) state->imm->Unref(); + state->version->Unref(); + state->mu->Unlock(); + delete state; +} +} + Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, SequenceNumber* latest_snapshot) { + IterState* cleanup = new IterState; mutex_.Lock(); *latest_snapshot = versions_->LastSequence(); // Collect together all needed child iterators std::vector list; list.push_back(mem_->NewIterator()); + mem_->Ref(); if (imm_ != NULL) { list.push_back(imm_->NewIterator()); + imm_->Ref(); } versions_->current()->AddIterators(options, &list); Iterator* internal_iter = NewMergingIterator(&internal_comparator_, &list[0], list.size()); versions_->current()->Ref(); - internal_iter->RegisterCleanup(&DBImpl::Unref, this, versions_->current()); + + cleanup->mu = &mutex_; + cleanup->mem = mem_; + cleanup->imm = imm_; + cleanup->version = versions_->current(); + internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, NULL); mutex_.Unlock(); return internal_iter; @@ -937,13 +964,6 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { : latest_snapshot)); } -void DBImpl::Unref(void* arg1, void* arg2) { - DBImpl* impl = reinterpret_cast(arg1); - Version* v = reinterpret_cast(arg2); - MutexLock l(&impl->mutex_); - v->Unref(); -} - const Snapshot* DBImpl::GetSnapshot() { MutexLock l(&mutex_); return snapshots_.New(versions_->LastSequence()); diff --git a/db/db_impl.h b/db/db_impl.h index 7699d8c..c23ae00 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -77,10 +77,6 @@ class DBImpl : public DB { // Delete any unneeded files and stale in-memory entries. void DeleteObsoleteFiles(); - // Called when an iterator over a particular version of the - // descriptor goes away. - static void Unref(void* arg1, void* arg2); - // Compact the in-memory write buffer to disk. Switches to a new // log-file/memtable and writes a new descriptor iff successful. Status CompactMemTable(); diff --git a/db/db_test.cc b/db/db_test.cc index 06565b2..42e70cf 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -801,6 +801,101 @@ TEST(DBTest, DBOpen_Options) { db = NULL; } +// Multi-threaded test: +namespace { + +static const int kNumThreads = 4; +static const int kTestSeconds = 10; +static const int kNumKeys = 1000; + +struct MTState { + DBTest* test; + port::AtomicPointer stop; + port::AtomicPointer counter[kNumThreads]; + port::AtomicPointer thread_done[kNumThreads]; +}; + +struct MTThread { + MTState* state; + int id; +}; + +static void MTThreadBody(void* arg) { + MTThread* t = reinterpret_cast(arg); + DB* db = t->state->test->db_; + uintptr_t counter = 0; + fprintf(stderr, "... starting thread %d\n", t->id); + Random rnd(1000 + t->id); + std::string value; + char valbuf[1500]; + while (t->state->stop.Acquire_Load() == NULL) { + t->state->counter[t->id].Release_Store(reinterpret_cast(counter)); + + int key = rnd.Uniform(kNumKeys); + char keybuf[20]; + snprintf(keybuf, sizeof(keybuf), "%016d", key); + + if (rnd.OneIn(2)) { + // Write values of the form . + // We add some padding for force compactions. + snprintf(valbuf, sizeof(valbuf), "%d.%d.%-1000d", + key, t->id, static_cast(counter)); + ASSERT_OK(db->Put(WriteOptions(), Slice(keybuf), Slice(valbuf))); + } else { + // Read a value and verify that it matches the pattern written above. + Status s = db->Get(ReadOptions(), Slice(keybuf), &value); + if (s.IsNotFound()) { + // Key has not yet been written + } else { + // Check that the writer thread counter is >= the counter in the value + ASSERT_OK(s); + int k, w, c; + ASSERT_EQ(3, sscanf(value.c_str(), "%d.%d.%d", &k, &w, &c)) << value; + ASSERT_EQ(k, key); + ASSERT_GE(w, 0); + ASSERT_LT(w, kNumThreads); + ASSERT_LE(c, reinterpret_cast( + t->state->counter[w].Acquire_Load())); + } + } + counter++; + } + t->state->thread_done[t->id].Release_Store(t); + fprintf(stderr, "... stopping thread %d after %d ops\n", t->id, int(counter)); +} + +} + +TEST(DBTest, MultiThreaded) { + // Initialize state + MTState mt; + mt.test = this; + mt.stop.Release_Store(0); + for (int id = 0; id < kNumThreads; id++) { + mt.counter[id].Release_Store(0); + mt.thread_done[id].Release_Store(0); + } + + // Start threads + MTThread thread[kNumThreads]; + for (int id = 0; id < kNumThreads; id++) { + thread[id].state = &mt; + thread[id].id = id; + env_->StartThread(MTThreadBody, &thread[id]); + } + + // Let them run for a while + env_->SleepForMicroseconds(kTestSeconds * 1000000); + + // Stop the threads and wait for them to finish + mt.stop.Release_Store(&mt); + for (int id = 0; id < kNumThreads; id++) { + while (mt.thread_done[id].Acquire_Load() == NULL) { + env_->SleepForMicroseconds(100000); + } + } +} + namespace { typedef std::map KVMap; } diff --git a/db/log_test.cc b/db/log_test.cc index 040bdff..06e0893 100644 --- a/db/log_test.cc +++ b/db/log_test.cc @@ -76,7 +76,7 @@ class LogTest { return Status::OK(); } - virtual Status Skip(size_t n) { + virtual Status Skip(uint64_t n) { if (n > contents_.size()) { contents_.clear(); return Status::NotFound("in-memory file skipepd past end"); diff --git a/db/memtable.cc b/db/memtable.cc index 9c25f6d..687900a 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -50,33 +50,24 @@ static const char* EncodeKey(std::string* scratch, const Slice& target) { class MemTableIterator: public Iterator { public: - explicit MemTableIterator(MemTable* mem, MemTable::Table* table) { - mem_ = mem; - iter_ = new MemTable::Table::Iterator(table); - mem->Ref(); - } - virtual ~MemTableIterator() { - delete iter_; - mem_->Unref(); - } + explicit MemTableIterator(MemTable::Table* table) : iter_(table) { } - virtual bool Valid() const { return iter_->Valid(); } - virtual void Seek(const Slice& k) { iter_->Seek(EncodeKey(&tmp_, k)); } - virtual void SeekToFirst() { iter_->SeekToFirst(); } - virtual void SeekToLast() { iter_->SeekToLast(); } - virtual void Next() { iter_->Next(); } - virtual void Prev() { iter_->Prev(); } - virtual Slice key() const { return GetLengthPrefixedSlice(iter_->key()); } + virtual bool Valid() const { return iter_.Valid(); } + virtual void Seek(const Slice& k) { iter_.Seek(EncodeKey(&tmp_, k)); } + virtual void SeekToFirst() { iter_.SeekToFirst(); } + virtual void SeekToLast() { iter_.SeekToLast(); } + virtual void Next() { iter_.Next(); } + virtual void Prev() { iter_.Prev(); } + virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); } virtual Slice value() const { - Slice key_slice = GetLengthPrefixedSlice(iter_->key()); + Slice key_slice = GetLengthPrefixedSlice(iter_.key()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); } virtual Status status() const { return Status::OK(); } private: - MemTable* mem_; - MemTable::Table::Iterator* iter_; + MemTable::Table::Iterator iter_; std::string tmp_; // For passing to EncodeKey // No copying allowed @@ -85,7 +76,7 @@ class MemTableIterator: public Iterator { }; Iterator* MemTable::NewIterator() { - return new MemTableIterator(this, &table_); + return new MemTableIterator(&table_); } void MemTable::Add(SequenceNumber s, ValueType type, diff --git a/port/port.h b/port/port.h index 816826b..e35db23 100644 --- a/port/port.h +++ b/port/port.h @@ -16,6 +16,8 @@ # include "port/port_chromium.h" #elif defined(LEVELDB_PLATFORM_ANDROID) # include "port/port_android.h" +#elif defined(LEVELDB_PLATFORM_OSX) +# include "port/port_osx.h" #endif #endif // STORAGE_LEVELDB_PORT_PORT_H_ diff --git a/port/port_osx.cc b/port/port_osx.cc new file mode 100644 index 0000000..4ab9e31 --- /dev/null +++ b/port/port_osx.cc @@ -0,0 +1,50 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#include "port_osx.h" + +#include +#include +#include +#include "util/logging.h" + +namespace leveldb { +namespace port { + +static void PthreadCall(const char* label, int result) { + if (result != 0) { + fprintf(stderr, "pthread %s: %s\n", label, strerror(result)); + abort(); + } +} + +Mutex::Mutex() { PthreadCall("init mutex", pthread_mutex_init(&mu_, NULL)); } + +Mutex::~Mutex() { PthreadCall("destroy mutex", pthread_mutex_destroy(&mu_)); } + +void Mutex::Lock() { PthreadCall("lock", pthread_mutex_lock(&mu_)); } + +void Mutex::Unlock() { PthreadCall("unlock", pthread_mutex_unlock(&mu_)); } + +CondVar::CondVar(Mutex* mu) + : mu_(mu) { + PthreadCall("init cv", pthread_cond_init(&cv_, NULL)); +} + +CondVar::~CondVar() { PthreadCall("destroy cv", pthread_cond_destroy(&cv_)); } + +void CondVar::Wait() { + PthreadCall("wait", pthread_cond_wait(&cv_, &mu_->mu_)); +} + +void CondVar::Signal() { + PthreadCall("signal", pthread_cond_signal(&cv_)); +} + +void CondVar::SignalAll() { + PthreadCall("broadcast", pthread_cond_broadcast(&cv_)); +} + +} +} diff --git a/port/port_osx.h b/port/port_osx.h new file mode 100644 index 0000000..5524c6c --- /dev/null +++ b/port/port_osx.h @@ -0,0 +1,125 @@ +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. +// +// See port_example.h for documentation for the following types/functions. + +#ifndef STORAGE_LEVELDB_PORT_PORT_OSX_H_ +#define STORAGE_LEVELDB_PORT_PORT_OSX_H_ + +#include +#include +#include +#include + +#include + +namespace leveldb { + +// The following 4 methods implemented here for the benefit of env_posix.cc. +inline size_t fread_unlocked(void *a, size_t b, size_t c, FILE *d) { + return fread(a, b, c, d); +} + +inline size_t fwrite_unlocked(const void *a, size_t b, size_t c, FILE *d) { + return fwrite(a, b, c, d); +} + +inline int fflush_unlocked(FILE *f) { + return fflush(f); +} + +inline int fdatasync(int fd) { + return fsync(fd); +} + +namespace port { + +static const bool kLittleEndian = (__DARWIN_BYTE_ORDER == __DARWIN_LITTLE_ENDIAN); + +// ------------------ Threading ------------------- + +// A Mutex represents an exclusive lock. +class Mutex { + public: + Mutex(); + ~Mutex(); + + void Lock(); + void Unlock(); + void AssertHeld() { } + + private: + friend class CondVar; + pthread_mutex_t mu_; + + // No copying + Mutex(const Mutex&); + void operator=(const Mutex&); +}; + +class CondVar { + public: + explicit CondVar(Mutex* mu); + ~CondVar(); + + void Wait(); + void Signal(); + void SignalAll(); + + private: + pthread_cond_t cv_; + Mutex* mu_; +}; + +inline void MemoryBarrier() { +#if defined(__GNUC__) && (defined(__i386__) || defined(__x86_64__)) + // See http://gcc.gnu.org/ml/gcc/2003-04/msg01180.html for a discussion on + // this idiom. Also see http://en.wikipedia.org/wiki/Memory_ordering. + __asm__ __volatile__("" : : : "memory"); +#else + OSMemoryBarrier(); +#endif +} + +class AtomicPointer { + private: + void* ptr_; + public: + AtomicPointer() { } + explicit AtomicPointer(void* p) : ptr_(p) {} + inline void* Acquire_Load() const { + void* ptr = ptr_; + MemoryBarrier(); + return ptr; + } + inline void Release_Store(void* v) { + MemoryBarrier(); + ptr_ = v; + } + inline void* NoBarrier_Load() const { + return ptr_; + } + inline void NoBarrier_Store(void* v) { + ptr_ = v; + } +}; + +inline bool Snappy_Compress(const char* input, size_t input_length, + std::string* output) { + return false; +} + +inline bool Snappy_Uncompress(const char* input_data, size_t input_length, + std::string* output) { + return false; +} + +inline bool GetHeapProfile(void (*func)(void*, const char*, int), void* arg) { + return false; +} + +} +} + +#endif // STORAGE_LEVELDB_PORT_PORT_OSX_H_ diff --git a/table/iterator.cc b/table/iterator.cc index 4ddd55f..33bc8a2 100644 --- a/table/iterator.cc +++ b/table/iterator.cc @@ -3,7 +3,6 @@ // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "leveldb/iterator.h" -#include "util/logging.h" namespace leveldb { diff --git a/table/iterator_wrapper.h b/table/iterator_wrapper.h index 158d3a7..d8ca2b3 100644 --- a/table/iterator_wrapper.h +++ b/table/iterator_wrapper.h @@ -12,10 +12,6 @@ namespace leveldb { // This can help avoid virtual function calls and also gives better // cache locality. class IteratorWrapper { - private: - Iterator* iter_; - bool valid_; - Slice key_; public: IteratorWrapper(): iter_(NULL), valid_(false) { } explicit IteratorWrapper(Iterator* iter): iter_(NULL) { @@ -56,9 +52,12 @@ class IteratorWrapper { key_ = iter_->key(); } } + + Iterator* iter_; + bool valid_; + Slice key_; }; -} - +} // namespace leveldb #endif // STORAGE_LEVELDB_TABLE_ITERATOR_WRAPPER_H_ diff --git a/util/cache.cc b/util/cache.cc index d8a4426..968e6a0 100644 --- a/util/cache.cc +++ b/util/cache.cc @@ -4,6 +4,8 @@ #if defined(LEVELDB_PLATFORM_POSIX) || defined(LEVELDB_PLATFORM_ANDROID) #include +#elif defined(LEVELDB_PLATFORM_OSX) +#include #elif defined(LEVELDB_PLATFORM_CHROMIUM) #include "base/hash_tables.h" #else