// Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. #include "table/merger.h" #include "include/comparator.h" #include "include/iterator.h" #include "table/iterator_wrapper.h" namespace leveldb { namespace { class MergingIterator : public Iterator { public: MergingIterator(const Comparator* comparator, Iterator** children, int n) : comparator_(comparator), children_(new IteratorWrapper[n]), n_(n), current_(NULL) { for (int i = 0; i < n; i++) { children_[i].Set(children[i]); } } virtual ~MergingIterator() { delete[] children_; } virtual bool Valid() const { return (current_ != NULL); } virtual void SeekToFirst() { for (int i = 0; i < n_; i++) { children_[i].SeekToFirst(); } FindSmallest(); } virtual void SeekToLast() { for (int i = 0; i < n_; i++) { children_[i].SeekToLast(); } FindLargest(); } virtual void Seek(const Slice& target) { for (int i = 0; i < n_; i++) { children_[i].Seek(target); } FindSmallest(); } virtual void Next() { assert(Valid()); current_->Next(); FindSmallest(); } virtual void Prev() { assert(Valid()); current_->Prev(); FindLargest(); } virtual Slice key() const { assert(Valid()); return current_->key(); } virtual Slice value() const { assert(Valid()); return current_->value(); } virtual Status status() const { Status status; for (int i = 0; i < n_; i++) { status = children_[i].status(); if (!status.ok()) { break; } } return status; } private: void FindSmallest(); void FindLargest(); // We might want to use a heap in case there are lots of children. // For now we use a simple array since we expect a very small number // of children in leveldb. const Comparator* comparator_; IteratorWrapper* children_; int n_; IteratorWrapper* current_; }; void MergingIterator::FindSmallest() { IteratorWrapper* smallest = NULL; for (int i = 0; i < n_; i++) { IteratorWrapper* child = &children_[i]; if (child->Valid()) { if (smallest == NULL) { smallest = child; } else if (comparator_->Compare(child->key(), smallest->key()) < 0) { smallest = child; } } } current_ = smallest; } void MergingIterator::FindLargest() { IteratorWrapper* largest = NULL; for (int i = n_-1; i >= 0; i--) { IteratorWrapper* child = &children_[i]; if (child->Valid()) { if (largest == NULL) { largest = child; } else if (comparator_->Compare(child->key(), largest->key()) > 0) { largest = child; } } } current_ = largest; } } Iterator* NewMergingIterator(const Comparator* cmp, Iterator** list, int n) { assert(n >= 0); if (n == 0) { return NewEmptyIterator(); } else if (n == 1) { return list[0]; } else { return new MergingIterator(cmp, list, n); } } }