1 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style license that can be |
3 | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
4 | |
5 | #include "db/db_iter.h" |
6 | |
7 | #include "db/db_impl.h" |
8 | #include "db/dbformat.h" |
9 | #include "db/filename.h" |
10 | #include "leveldb/env.h" |
11 | #include "leveldb/iterator.h" |
12 | #include "port/port.h" |
13 | #include "util/logging.h" |
14 | #include "util/mutexlock.h" |
15 | #include "util/random.h" |
16 | |
17 | namespace leveldb { |
18 | |
19 | #if 0 |
20 | static void DumpInternalIter(Iterator* iter) { |
21 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { |
22 | ParsedInternalKey k; |
23 | if (!ParseInternalKey(iter->key(), &k)) { |
24 | std::fprintf(stderr, "Corrupt '%s'\n" , EscapeString(iter->key()).c_str()); |
25 | } else { |
26 | std::fprintf(stderr, "@ '%s'\n" , k.DebugString().c_str()); |
27 | } |
28 | } |
29 | } |
30 | #endif |
31 | |
32 | namespace { |
33 | |
34 | // Memtables and sstables that make the DB representation contain |
35 | // (userkey,seq,type) => uservalue entries. DBIter |
36 | // combines multiple entries for the same userkey found in the DB |
37 | // representation into a single entry while accounting for sequence |
38 | // numbers, deletion markers, overwrites, etc. |
39 | class DBIter : public Iterator { |
40 | public: |
41 | // Which direction is the iterator currently moving? |
42 | // (1) When moving forward, the internal iterator is positioned at |
43 | // the exact entry that yields this->key(), this->value() |
44 | // (2) When moving backwards, the internal iterator is positioned |
45 | // just before all entries whose user key == this->key(). |
46 | enum Direction { kForward, kReverse }; |
47 | |
48 | DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s, |
49 | uint32_t seed) |
50 | : db_(db), |
51 | user_comparator_(cmp), |
52 | iter_(iter), |
53 | sequence_(s), |
54 | direction_(kForward), |
55 | valid_(false), |
56 | rnd_(seed), |
57 | bytes_until_read_sampling_(RandomCompactionPeriod()) {} |
58 | |
59 | DBIter(const DBIter&) = delete; |
60 | DBIter& operator=(const DBIter&) = delete; |
61 | |
62 | ~DBIter() override { delete iter_; } |
63 | bool Valid() const override { return valid_; } |
64 | Slice key() const override { |
65 | assert(valid_); |
66 | return (direction_ == kForward) ? ExtractUserKey(iter_->key()) : saved_key_; |
67 | } |
68 | Slice value() const override { |
69 | assert(valid_); |
70 | return (direction_ == kForward) ? iter_->value() : saved_value_; |
71 | } |
72 | Status status() const override { |
73 | if (status_.ok()) { |
74 | return iter_->status(); |
75 | } else { |
76 | return status_; |
77 | } |
78 | } |
79 | |
80 | void Next() override; |
81 | void Prev() override; |
82 | void Seek(const Slice& target) override; |
83 | void SeekToFirst() override; |
84 | void SeekToLast() override; |
85 | |
86 | private: |
87 | void FindNextUserEntry(bool skipping, std::string* skip); |
88 | void FindPrevUserEntry(); |
89 | bool ParseKey(ParsedInternalKey* key); |
90 | |
91 | inline void SaveKey(const Slice& k, std::string* dst) { |
92 | dst->assign(k.data(), k.size()); |
93 | } |
94 | |
95 | inline void ClearSavedValue() { |
96 | if (saved_value_.capacity() > 1048576) { |
97 | std::string empty; |
98 | swap(empty, saved_value_); |
99 | } else { |
100 | saved_value_.clear(); |
101 | } |
102 | } |
103 | |
104 | // Picks the number of bytes that can be read until a compaction is scheduled. |
105 | size_t RandomCompactionPeriod() { |
106 | return rnd_.Uniform(2 * config::kReadBytesPeriod); |
107 | } |
108 | |
109 | DBImpl* db_; |
110 | const Comparator* const user_comparator_; |
111 | Iterator* const iter_; |
112 | SequenceNumber const sequence_; |
113 | Status status_; |
114 | std::string saved_key_; // == current key when direction_==kReverse |
115 | std::string saved_value_; // == current raw value when direction_==kReverse |
116 | Direction direction_; |
117 | bool valid_; |
118 | Random rnd_; |
119 | size_t bytes_until_read_sampling_; |
120 | }; |
121 | |
122 | inline bool DBIter::ParseKey(ParsedInternalKey* ikey) { |
123 | Slice k = iter_->key(); |
124 | |
125 | size_t bytes_read = k.size() + iter_->value().size(); |
126 | while (bytes_until_read_sampling_ < bytes_read) { |
127 | bytes_until_read_sampling_ += RandomCompactionPeriod(); |
128 | db_->RecordReadSample(k); |
129 | } |
130 | assert(bytes_until_read_sampling_ >= bytes_read); |
131 | bytes_until_read_sampling_ -= bytes_read; |
132 | |
133 | if (!ParseInternalKey(k, ikey)) { |
134 | status_ = Status::Corruption("corrupted internal key in DBIter" ); |
135 | return false; |
136 | } else { |
137 | return true; |
138 | } |
139 | } |
140 | |
141 | void DBIter::Next() { |
142 | assert(valid_); |
143 | |
144 | if (direction_ == kReverse) { // Switch directions? |
145 | direction_ = kForward; |
146 | // iter_ is pointing just before the entries for this->key(), |
147 | // so advance into the range of entries for this->key() and then |
148 | // use the normal skipping code below. |
149 | if (!iter_->Valid()) { |
150 | iter_->SeekToFirst(); |
151 | } else { |
152 | iter_->Next(); |
153 | } |
154 | if (!iter_->Valid()) { |
155 | valid_ = false; |
156 | saved_key_.clear(); |
157 | return; |
158 | } |
159 | // saved_key_ already contains the key to skip past. |
160 | } else { |
161 | // Store in saved_key_ the current key so we skip it below. |
162 | SaveKey(ExtractUserKey(iter_->key()), &saved_key_); |
163 | |
164 | // iter_ is pointing to current key. We can now safely move to the next to |
165 | // avoid checking current key. |
166 | iter_->Next(); |
167 | if (!iter_->Valid()) { |
168 | valid_ = false; |
169 | saved_key_.clear(); |
170 | return; |
171 | } |
172 | } |
173 | |
174 | FindNextUserEntry(true, &saved_key_); |
175 | } |
176 | |
177 | void DBIter::FindNextUserEntry(bool skipping, std::string* skip) { |
178 | // Loop until we hit an acceptable entry to yield |
179 | assert(iter_->Valid()); |
180 | assert(direction_ == kForward); |
181 | do { |
182 | ParsedInternalKey ikey; |
183 | if (ParseKey(&ikey) && ikey.sequence <= sequence_) { |
184 | switch (ikey.type) { |
185 | case kTypeDeletion: |
186 | // Arrange to skip all upcoming entries for this key since |
187 | // they are hidden by this deletion. |
188 | SaveKey(ikey.user_key, skip); |
189 | skipping = true; |
190 | break; |
191 | case kTypeValue: |
192 | if (skipping && |
193 | user_comparator_->Compare(ikey.user_key, *skip) <= 0) { |
194 | // Entry hidden |
195 | } else { |
196 | valid_ = true; |
197 | saved_key_.clear(); |
198 | return; |
199 | } |
200 | break; |
201 | } |
202 | } |
203 | iter_->Next(); |
204 | } while (iter_->Valid()); |
205 | saved_key_.clear(); |
206 | valid_ = false; |
207 | } |
208 | |
209 | void DBIter::Prev() { |
210 | assert(valid_); |
211 | |
212 | if (direction_ == kForward) { // Switch directions? |
213 | // iter_ is pointing at the current entry. Scan backwards until |
214 | // the key changes so we can use the normal reverse scanning code. |
215 | assert(iter_->Valid()); // Otherwise valid_ would have been false |
216 | SaveKey(ExtractUserKey(iter_->key()), &saved_key_); |
217 | while (true) { |
218 | iter_->Prev(); |
219 | if (!iter_->Valid()) { |
220 | valid_ = false; |
221 | saved_key_.clear(); |
222 | ClearSavedValue(); |
223 | return; |
224 | } |
225 | if (user_comparator_->Compare(ExtractUserKey(iter_->key()), saved_key_) < |
226 | 0) { |
227 | break; |
228 | } |
229 | } |
230 | direction_ = kReverse; |
231 | } |
232 | |
233 | FindPrevUserEntry(); |
234 | } |
235 | |
236 | void DBIter::FindPrevUserEntry() { |
237 | assert(direction_ == kReverse); |
238 | |
239 | ValueType value_type = kTypeDeletion; |
240 | if (iter_->Valid()) { |
241 | do { |
242 | ParsedInternalKey ikey; |
243 | if (ParseKey(&ikey) && ikey.sequence <= sequence_) { |
244 | if ((value_type != kTypeDeletion) && |
245 | user_comparator_->Compare(ikey.user_key, saved_key_) < 0) { |
246 | // We encountered a non-deleted value in entries for previous keys, |
247 | break; |
248 | } |
249 | value_type = ikey.type; |
250 | if (value_type == kTypeDeletion) { |
251 | saved_key_.clear(); |
252 | ClearSavedValue(); |
253 | } else { |
254 | Slice raw_value = iter_->value(); |
255 | if (saved_value_.capacity() > raw_value.size() + 1048576) { |
256 | std::string empty; |
257 | swap(empty, saved_value_); |
258 | } |
259 | SaveKey(ExtractUserKey(iter_->key()), &saved_key_); |
260 | saved_value_.assign(raw_value.data(), raw_value.size()); |
261 | } |
262 | } |
263 | iter_->Prev(); |
264 | } while (iter_->Valid()); |
265 | } |
266 | |
267 | if (value_type == kTypeDeletion) { |
268 | // End |
269 | valid_ = false; |
270 | saved_key_.clear(); |
271 | ClearSavedValue(); |
272 | direction_ = kForward; |
273 | } else { |
274 | valid_ = true; |
275 | } |
276 | } |
277 | |
278 | void DBIter::Seek(const Slice& target) { |
279 | direction_ = kForward; |
280 | ClearSavedValue(); |
281 | saved_key_.clear(); |
282 | AppendInternalKey(&saved_key_, |
283 | ParsedInternalKey(target, sequence_, kValueTypeForSeek)); |
284 | iter_->Seek(saved_key_); |
285 | if (iter_->Valid()) { |
286 | FindNextUserEntry(false, &saved_key_ /* temporary storage */); |
287 | } else { |
288 | valid_ = false; |
289 | } |
290 | } |
291 | |
292 | void DBIter::SeekToFirst() { |
293 | direction_ = kForward; |
294 | ClearSavedValue(); |
295 | iter_->SeekToFirst(); |
296 | if (iter_->Valid()) { |
297 | FindNextUserEntry(false, &saved_key_ /* temporary storage */); |
298 | } else { |
299 | valid_ = false; |
300 | } |
301 | } |
302 | |
303 | void DBIter::SeekToLast() { |
304 | direction_ = kReverse; |
305 | ClearSavedValue(); |
306 | iter_->SeekToLast(); |
307 | FindPrevUserEntry(); |
308 | } |
309 | |
310 | } // anonymous namespace |
311 | |
312 | Iterator* NewDBIterator(DBImpl* db, const Comparator* user_key_comparator, |
313 | Iterator* internal_iter, SequenceNumber sequence, |
314 | uint32_t seed) { |
315 | return new DBIter(db, user_key_comparator, internal_iter, sequence, seed); |
316 | } |
317 | |
318 | } // namespace leveldb |
319 | |