1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "db/db_iter.h"
6
7#include "db/db_impl.h"
8#include "db/dbformat.h"
9#include "db/filename.h"
10#include "leveldb/env.h"
11#include "leveldb/iterator.h"
12#include "port/port.h"
13#include "util/logging.h"
14#include "util/mutexlock.h"
15#include "util/random.h"
16
17namespace leveldb {
18
19#if 0
20static void DumpInternalIter(Iterator* iter) {
21 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
22 ParsedInternalKey k;
23 if (!ParseInternalKey(iter->key(), &k)) {
24 std::fprintf(stderr, "Corrupt '%s'\n", EscapeString(iter->key()).c_str());
25 } else {
26 std::fprintf(stderr, "@ '%s'\n", k.DebugString().c_str());
27 }
28 }
29}
30#endif
31
32namespace {
33
34// Memtables and sstables that make the DB representation contain
35// (userkey,seq,type) => uservalue entries. DBIter
36// combines multiple entries for the same userkey found in the DB
37// representation into a single entry while accounting for sequence
38// numbers, deletion markers, overwrites, etc.
39class DBIter : public Iterator {
40 public:
41 // Which direction is the iterator currently moving?
42 // (1) When moving forward, the internal iterator is positioned at
43 // the exact entry that yields this->key(), this->value()
44 // (2) When moving backwards, the internal iterator is positioned
45 // just before all entries whose user key == this->key().
46 enum Direction { kForward, kReverse };
47
48 DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s,
49 uint32_t seed)
50 : db_(db),
51 user_comparator_(cmp),
52 iter_(iter),
53 sequence_(s),
54 direction_(kForward),
55 valid_(false),
56 rnd_(seed),
57 bytes_until_read_sampling_(RandomCompactionPeriod()) {}
58
59 DBIter(const DBIter&) = delete;
60 DBIter& operator=(const DBIter&) = delete;
61
62 ~DBIter() override { delete iter_; }
63 bool Valid() const override { return valid_; }
64 Slice key() const override {
65 assert(valid_);
66 return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_;
67 }
68 Slice value() const override {
69 assert(valid_);
70 return (direction_ == kForward) ? iter_->value() : saved_value_;
71 }
72 Status status() const override {
73 if (status_.ok()) {
74 return iter_->status();
75 } else {
76 return status_;
77 }
78 }
79
80 void Next() override;
81 void Prev() override;
82 void Seek(const Slice& target) override;
83 void SeekToFirst() override;
84 void SeekToLast() override;
85
86 private:
87 void FindNextUserEntry(bool skipping, std::string* skip);
88 void FindPrevUserEntry();
89 bool ParseKey(ParsedInternalKey* key);
90
91 inline void SaveKey(const Slice& k, std::string* dst) {
92 dst->assign(k.data(), k.size());
93 }
94
95 inline void ClearSavedValue() {
96 if (saved_value_.capacity() > 1048576) {
97 std::string empty;
98 swap(empty, saved_value_);
99 } else {
100 saved_value_.clear();
101 }
102 }
103
104 // Picks the number of bytes that can be read until a compaction is scheduled.
105 size_t RandomCompactionPeriod() {
106 return rnd_.Uniform(2 * config::kReadBytesPeriod);
107 }
108
109 DBImpl* db_;
110 const Comparator* const user_comparator_;
111 Iterator* const iter_;
112 SequenceNumber const sequence_;
113 Status status_;
114 std::string saved_key_; // == current key when direction_==kReverse
115 std::string saved_value_; // == current raw value when direction_==kReverse
116 Direction direction_;
117 bool valid_;
118 Random rnd_;
119 size_t bytes_until_read_sampling_;
120};
121
122inline bool DBIter::ParseKey(ParsedInternalKey* ikey) {
123 Slice k = iter_->key();
124
125 size_t bytes_read = k.size() + iter_->value().size();
126 while (bytes_until_read_sampling_ < bytes_read) {
127 bytes_until_read_sampling_ += RandomCompactionPeriod();
128 db_->RecordReadSample(k);
129 }
130 assert(bytes_until_read_sampling_ >= bytes_read);
131 bytes_until_read_sampling_ -= bytes_read;
132
133 if (!ParseInternalKey(k, ikey)) {
134 status_ = Status::Corruption("corrupted internal key in DBIter");
135 return false;
136 } else {
137 return true;
138 }
139}
140
141void DBIter::Next() {
142 assert(valid_);
143
144 if (direction_ == kReverse) { // Switch directions?
145 direction_ = kForward;
146 // iter_ is pointing just before the entries for this->key(),
147 // so advance into the range of entries for this->key() and then
148 // use the normal skipping code below.
149 if (!iter_->Valid()) {
150 iter_->SeekToFirst();
151 } else {
152 iter_->Next();
153 }
154 if (!iter_->Valid()) {
155 valid_ = false;
156 saved_key_.clear();
157 return;
158 }
159 // saved_key_ already contains the key to skip past.
160 } else {
161 // Store in saved_key_ the current key so we skip it below.
162 SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
163
164 // iter_ is pointing to current key. We can now safely move to the next to
165 // avoid checking current key.
166 iter_->Next();
167 if (!iter_->Valid()) {
168 valid_ = false;
169 saved_key_.clear();
170 return;
171 }
172 }
173
174 FindNextUserEntry(true, &saved_key_);
175}
176
177void DBIter::FindNextUserEntry(bool skipping, std::string* skip) {
178 // Loop until we hit an acceptable entry to yield
179 assert(iter_->Valid());
180 assert(direction_ == kForward);
181 do {
182 ParsedInternalKey ikey;
183 if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
184 switch (ikey.type) {
185 case kTypeDeletion:
186 // Arrange to skip all upcoming entries for this key since
187 // they are hidden by this deletion.
188 SaveKey(ikey.user_key, skip);
189 skipping = true;
190 break;
191 case kTypeValue:
192 if (skipping &&
193 user_comparator_->Compare(ikey.user_key, *skip) <= 0) {
194 // Entry hidden
195 } else {
196 valid_ = true;
197 saved_key_.clear();
198 return;
199 }
200 break;
201 }
202 }
203 iter_->Next();
204 } while (iter_->Valid());
205 saved_key_.clear();
206 valid_ = false;
207}
208
209void DBIter::Prev() {
210 assert(valid_);
211
212 if (direction_ == kForward) { // Switch directions?
213 // iter_ is pointing at the current entry. Scan backwards until
214 // the key changes so we can use the normal reverse scanning code.
215 assert(iter_->Valid()); // Otherwise valid_ would have been false
216 SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
217 while (true) {
218 iter_->Prev();
219 if (!iter_->Valid()) {
220 valid_ = false;
221 saved_key_.clear();
222 ClearSavedValue();
223 return;
224 }
225 if (user_comparator_->Compare(ExtractUserKey(iter_->key()), saved_key_) <
226 0) {
227 break;
228 }
229 }
230 direction_ = kReverse;
231 }
232
233 FindPrevUserEntry();
234}
235
236void DBIter::FindPrevUserEntry() {
237 assert(direction_ == kReverse);
238
239 ValueType value_type = kTypeDeletion;
240 if (iter_->Valid()) {
241 do {
242 ParsedInternalKey ikey;
243 if (ParseKey(&ikey) && ikey.sequence <= sequence_) {
244 if ((value_type != kTypeDeletion) &&
245 user_comparator_->Compare(ikey.user_key, saved_key_) < 0) {
246 // We encountered a non-deleted value in entries for previous keys,
247 break;
248 }
249 value_type = ikey.type;
250 if (value_type == kTypeDeletion) {
251 saved_key_.clear();
252 ClearSavedValue();
253 } else {
254 Slice raw_value = iter_->value();
255 if (saved_value_.capacity() > raw_value.size() + 1048576) {
256 std::string empty;
257 swap(empty, saved_value_);
258 }
259 SaveKey(ExtractUserKey(iter_->key()), &saved_key_);
260 saved_value_.assign(raw_value.data(), raw_value.size());
261 }
262 }
263 iter_->Prev();
264 } while (iter_->Valid());
265 }
266
267 if (value_type == kTypeDeletion) {
268 // End
269 valid_ = false;
270 saved_key_.clear();
271 ClearSavedValue();
272 direction_ = kForward;
273 } else {
274 valid_ = true;
275 }
276}
277
278void DBIter::Seek(const Slice& target) {
279 direction_ = kForward;
280 ClearSavedValue();
281 saved_key_.clear();
282 AppendInternalKey(&saved_key_,
283 ParsedInternalKey(target, sequence_, kValueTypeForSeek));
284 iter_->Seek(saved_key_);
285 if (iter_->Valid()) {
286 FindNextUserEntry(false, &saved_key_ /* temporary storage */);
287 } else {
288 valid_ = false;
289 }
290}
291
292void DBIter::SeekToFirst() {
293 direction_ = kForward;
294 ClearSavedValue();
295 iter_->SeekToFirst();
296 if (iter_->Valid()) {
297 FindNextUserEntry(false, &saved_key_ /* temporary storage */);
298 } else {
299 valid_ = false;
300 }
301}
302
303void DBIter::SeekToLast() {
304 direction_ = kReverse;
305 ClearSavedValue();
306 iter_->SeekToLast();
307 FindPrevUserEntry();
308}
309
310} // anonymous namespace
311
312Iterator* NewDBIterator(DBImpl* db, const Comparator* user_key_comparator,
313 Iterator* internal_iter, SequenceNumber sequence,
314 uint32_t seed) {
315 return new DBIter(db, user_key_comparator, internal_iter, sequence, seed);
316}
317
318} // namespace leveldb
319