1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "table/merger.h"
6
7#include "leveldb/comparator.h"
8#include "leveldb/iterator.h"
9#include "table/iterator_wrapper.h"
10
11namespace leveldb {
12
13namespace {
14class MergingIterator : public Iterator {
15 public:
16 MergingIterator(const Comparator* comparator, Iterator** children, int n)
17 : comparator_(comparator),
18 children_(new IteratorWrapper[n]),
19 n_(n),
20 current_(nullptr),
21 direction_(kForward) {
22 for (int i = 0; i < n; i++) {
23 children_[i].Set(children[i]);
24 }
25 }
26
27 ~MergingIterator() override { delete[] children_; }
28
29 bool Valid() const override { return (current_ != nullptr); }
30
31 void SeekToFirst() override {
32 for (int i = 0; i < n_; i++) {
33 children_[i].SeekToFirst();
34 }
35 FindSmallest();
36 direction_ = kForward;
37 }
38
39 void SeekToLast() override {
40 for (int i = 0; i < n_; i++) {
41 children_[i].SeekToLast();
42 }
43 FindLargest();
44 direction_ = kReverse;
45 }
46
47 void Seek(const Slice& target) override {
48 for (int i = 0; i < n_; i++) {
49 children_[i].Seek(target);
50 }
51 FindSmallest();
52 direction_ = kForward;
53 }
54
55 void Next() override {
56 assert(Valid());
57
58 // Ensure that all children are positioned after key().
59 // If we are moving in the forward direction, it is already
60 // true for all of the non-current_ children since current_ is
61 // the smallest child and key() == current_->key(). Otherwise,
62 // we explicitly position the non-current_ children.
63 if (direction_ != kForward) {
64 for (int i = 0; i < n_; i++) {
65 IteratorWrapper* child = &children_[i];
66 if (child != current_) {
67 child->Seek(key());
68 if (child->Valid() &&
69 comparator_->Compare(key(), child->key()) == 0) {
70 child->Next();
71 }
72 }
73 }
74 direction_ = kForward;
75 }
76
77 current_->Next();
78 FindSmallest();
79 }
80
81 void Prev() override {
82 assert(Valid());
83
84 // Ensure that all children are positioned before key().
85 // If we are moving in the reverse direction, it is already
86 // true for all of the non-current_ children since current_ is
87 // the largest child and key() == current_->key(). Otherwise,
88 // we explicitly position the non-current_ children.
89 if (direction_ != kReverse) {
90 for (int i = 0; i < n_; i++) {
91 IteratorWrapper* child = &children_[i];
92 if (child != current_) {
93 child->Seek(key());
94 if (child->Valid()) {
95 // Child is at first entry >= key(). Step back one to be < key()
96 child->Prev();
97 } else {
98 // Child has no entries >= key(). Position at last entry.
99 child->SeekToLast();
100 }
101 }
102 }
103 direction_ = kReverse;
104 }
105
106 current_->Prev();
107 FindLargest();
108 }
109
110 Slice key() const override {
111 assert(Valid());
112 return current_->key();
113 }
114
115 Slice value() const override {
116 assert(Valid());
117 return current_->value();
118 }
119
120 Status status() const override {
121 Status status;
122 for (int i = 0; i < n_; i++) {
123 status = children_[i].status();
124 if (!status.ok()) {
125 break;
126 }
127 }
128 return status;
129 }
130
131 private:
132 // Which direction is the iterator moving?
133 enum Direction { kForward, kReverse };
134
135 void FindSmallest();
136 void FindLargest();
137
138 // We might want to use a heap in case there are lots of children.
139 // For now we use a simple array since we expect a very small number
140 // of children in leveldb.
141 const Comparator* comparator_;
142 IteratorWrapper* children_;
143 int n_;
144 IteratorWrapper* current_;
145 Direction direction_;
146};
147
148void MergingIterator::FindSmallest() {
149 IteratorWrapper* smallest = nullptr;
150 for (int i = 0; i < n_; i++) {
151 IteratorWrapper* child = &children_[i];
152 if (child->Valid()) {
153 if (smallest == nullptr) {
154 smallest = child;
155 } else if (comparator_->Compare(child->key(), smallest->key()) < 0) {
156 smallest = child;
157 }
158 }
159 }
160 current_ = smallest;
161}
162
163void MergingIterator::FindLargest() {
164 IteratorWrapper* largest = nullptr;
165 for (int i = n_ - 1; i >= 0; i--) {
166 IteratorWrapper* child = &children_[i];
167 if (child->Valid()) {
168 if (largest == nullptr) {
169 largest = child;
170 } else if (comparator_->Compare(child->key(), largest->key()) > 0) {
171 largest = child;
172 }
173 }
174 }
175 current_ = largest;
176}
177} // namespace
178
179Iterator* NewMergingIterator(const Comparator* comparator, Iterator** children,
180 int n) {
181 assert(n >= 0);
182 if (n == 0) {
183 return NewEmptyIterator();
184 } else if (n == 1) {
185 return children[0];
186 } else {
187 return new MergingIterator(comparator, children, n);
188 }
189}
190
191} // namespace leveldb
192