1 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style license that can be |
3 | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
4 | |
5 | #include "db/db_impl.h" |
6 | |
7 | #include <algorithm> |
8 | #include <atomic> |
9 | #include <cstdint> |
10 | #include <cstdio> |
11 | #include <set> |
12 | #include <string> |
13 | #include <vector> |
14 | |
15 | #include "db/builder.h" |
16 | #include "db/db_iter.h" |
17 | #include "db/dbformat.h" |
18 | #include "db/filename.h" |
19 | #include "db/log_reader.h" |
20 | #include "db/log_writer.h" |
21 | #include "db/memtable.h" |
22 | #include "db/table_cache.h" |
23 | #include "db/version_set.h" |
24 | #include "db/write_batch_internal.h" |
25 | #include "leveldb/db.h" |
26 | #include "leveldb/env.h" |
27 | #include "leveldb/status.h" |
28 | #include "leveldb/table.h" |
29 | #include "leveldb/table_builder.h" |
30 | #include "port/port.h" |
31 | #include "table/block.h" |
32 | #include "table/merger.h" |
33 | #include "table/two_level_iterator.h" |
34 | #include "util/coding.h" |
35 | #include "util/logging.h" |
36 | #include "util/mutexlock.h" |
37 | |
38 | namespace leveldb { |
39 | |
40 | const int kNumNonTableCacheFiles = 10; |
41 | |
42 | // Information kept for every waiting writer |
43 | struct DBImpl::Writer { |
44 | explicit Writer(port::Mutex* mu) |
45 | : batch(nullptr), sync(false), done(false), cv(mu) {} |
46 | |
47 | Status status; |
48 | WriteBatch* batch; |
49 | bool sync; |
50 | bool done; |
51 | port::CondVar cv; |
52 | }; |
53 | |
54 | struct DBImpl::CompactionState { |
55 | // Files produced by compaction |
56 | struct Output { |
57 | uint64_t number; |
58 | uint64_t file_size; |
59 | InternalKey smallest, largest; |
60 | }; |
61 | |
62 | Output* current_output() { return &outputs[outputs.size() - 1]; } |
63 | |
64 | explicit CompactionState(Compaction* c) |
65 | : compaction(c), |
66 | smallest_snapshot(0), |
67 | outfile(nullptr), |
68 | builder(nullptr), |
69 | total_bytes(0) {} |
70 | |
71 | Compaction* const compaction; |
72 | |
73 | // Sequence numbers < smallest_snapshot are not significant since we |
74 | // will never have to service a snapshot below smallest_snapshot. |
75 | // Therefore if we have seen a sequence number S <= smallest_snapshot, |
76 | // we can drop all entries for the same key with sequence numbers < S. |
77 | SequenceNumber smallest_snapshot; |
78 | |
79 | std::vector<Output> outputs; |
80 | |
81 | // State kept for output being generated |
82 | WritableFile* outfile; |
83 | TableBuilder* builder; |
84 | |
85 | uint64_t total_bytes; |
86 | }; |
87 | |
88 | // Fix user-supplied options to be reasonable |
89 | template <class T, class V> |
90 | static void ClipToRange(T* ptr, V minvalue, V maxvalue) { |
91 | if (static_cast<V>(*ptr) > maxvalue) *ptr = maxvalue; |
92 | if (static_cast<V>(*ptr) < minvalue) *ptr = minvalue; |
93 | } |
94 | Options SanitizeOptions(const std::string& dbname, |
95 | const InternalKeyComparator* icmp, |
96 | const InternalFilterPolicy* ipolicy, |
97 | const Options& src) { |
98 | Options result = src; |
99 | result.comparator = icmp; |
100 | result.filter_policy = (src.filter_policy != nullptr) ? ipolicy : nullptr; |
101 | ClipToRange(&result.max_open_files, 64 + kNumNonTableCacheFiles, 50000); |
102 | ClipToRange(&result.write_buffer_size, 64 << 10, 1 << 30); |
103 | ClipToRange(&result.max_file_size, 1 << 20, 1 << 30); |
104 | ClipToRange(&result.block_size, 1 << 10, 4 << 20); |
105 | if (result.info_log == nullptr) { |
106 | // Open a log file in the same directory as the db |
107 | src.env->CreateDir(dbname); // In case it does not exist |
108 | src.env->RenameFile(InfoLogFileName(dbname), OldInfoLogFileName(dbname)); |
109 | Status s = src.env->NewLogger(InfoLogFileName(dbname), &result.info_log); |
110 | if (!s.ok()) { |
111 | // No place suitable for logging |
112 | result.info_log = nullptr; |
113 | } |
114 | } |
115 | if (result.block_cache == nullptr) { |
116 | result.block_cache = NewLRUCache(8 << 20); |
117 | } |
118 | return result; |
119 | } |
120 | |
121 | static int TableCacheSize(const Options& sanitized_options) { |
122 | // Reserve ten files or so for other uses and give the rest to TableCache. |
123 | return sanitized_options.max_open_files - kNumNonTableCacheFiles; |
124 | } |
125 | |
126 | DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) |
127 | : env_(raw_options.env), |
128 | internal_comparator_(raw_options.comparator), |
129 | internal_filter_policy_(raw_options.filter_policy), |
130 | options_(SanitizeOptions(dbname, &internal_comparator_, |
131 | &internal_filter_policy_, raw_options)), |
132 | owns_info_log_(options_.info_log != raw_options.info_log), |
133 | owns_cache_(options_.block_cache != raw_options.block_cache), |
134 | dbname_(dbname), |
135 | table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), |
136 | db_lock_(nullptr), |
137 | shutting_down_(false), |
138 | background_work_finished_signal_(&mutex_), |
139 | mem_(nullptr), |
140 | imm_(nullptr), |
141 | has_imm_(false), |
142 | logfile_(nullptr), |
143 | logfile_number_(0), |
144 | log_(nullptr), |
145 | seed_(0), |
146 | tmp_batch_(new WriteBatch), |
147 | background_compaction_scheduled_(false), |
148 | manual_compaction_(nullptr), |
149 | versions_(new VersionSet(dbname_, &options_, table_cache_, |
150 | &internal_comparator_)) {} |
151 | |
152 | DBImpl::~DBImpl() { |
153 | // Wait for background work to finish. |
154 | mutex_.Lock(); |
155 | shutting_down_.store(true, std::memory_order_release); |
156 | while (background_compaction_scheduled_) { |
157 | background_work_finished_signal_.Wait(); |
158 | } |
159 | mutex_.Unlock(); |
160 | |
161 | if (db_lock_ != nullptr) { |
162 | env_->UnlockFile(db_lock_); |
163 | } |
164 | |
165 | delete versions_; |
166 | if (mem_ != nullptr) mem_->Unref(); |
167 | if (imm_ != nullptr) imm_->Unref(); |
168 | delete tmp_batch_; |
169 | delete log_; |
170 | delete logfile_; |
171 | delete table_cache_; |
172 | |
173 | if (owns_info_log_) { |
174 | delete options_.info_log; |
175 | } |
176 | if (owns_cache_) { |
177 | delete options_.block_cache; |
178 | } |
179 | } |
180 | |
181 | Status DBImpl::NewDB() { |
182 | VersionEdit new_db; |
183 | new_db.SetComparatorName(user_comparator()->Name()); |
184 | new_db.SetLogNumber(0); |
185 | new_db.SetNextFile(2); |
186 | new_db.SetLastSequence(0); |
187 | |
188 | const std::string manifest = DescriptorFileName(dbname_, 1); |
189 | WritableFile* file; |
190 | Status s = env_->NewWritableFile(manifest, &file); |
191 | if (!s.ok()) { |
192 | return s; |
193 | } |
194 | { |
195 | log::Writer log(file); |
196 | std::string record; |
197 | new_db.EncodeTo(&record); |
198 | s = log.AddRecord(record); |
199 | if (s.ok()) { |
200 | s = file->Sync(); |
201 | } |
202 | if (s.ok()) { |
203 | s = file->Close(); |
204 | } |
205 | } |
206 | delete file; |
207 | if (s.ok()) { |
208 | // Make "CURRENT" file that points to the new manifest file. |
209 | s = SetCurrentFile(env_, dbname_, 1); |
210 | } else { |
211 | env_->RemoveFile(manifest); |
212 | } |
213 | return s; |
214 | } |
215 | |
216 | void DBImpl::MaybeIgnoreError(Status* s) const { |
217 | if (s->ok() || options_.paranoid_checks) { |
218 | // No change needed |
219 | } else { |
220 | Log(options_.info_log, "Ignoring error %s" , s->ToString().c_str()); |
221 | *s = Status::OK(); |
222 | } |
223 | } |
224 | |
225 | void DBImpl::RemoveObsoleteFiles() { |
226 | mutex_.AssertHeld(); |
227 | |
228 | if (!bg_error_.ok()) { |
229 | // After a background error, we don't know whether a new version may |
230 | // or may not have been committed, so we cannot safely garbage collect. |
231 | return; |
232 | } |
233 | |
234 | // Make a set of all of the live files |
235 | std::set<uint64_t> live = pending_outputs_; |
236 | versions_->AddLiveFiles(&live); |
237 | |
238 | std::vector<std::string> filenames; |
239 | env_->GetChildren(dbname_, &filenames); // Ignoring errors on purpose |
240 | uint64_t number; |
241 | FileType type; |
242 | std::vector<std::string> files_to_delete; |
243 | for (std::string& filename : filenames) { |
244 | if (ParseFileName(filename, &number, &type)) { |
245 | bool keep = true; |
246 | switch (type) { |
247 | case kLogFile: |
248 | keep = ((number >= versions_->LogNumber()) || |
249 | (number == versions_->PrevLogNumber())); |
250 | break; |
251 | case kDescriptorFile: |
252 | // Keep my manifest file, and any newer incarnations' |
253 | // (in case there is a race that allows other incarnations) |
254 | keep = (number >= versions_->ManifestFileNumber()); |
255 | break; |
256 | case kTableFile: |
257 | keep = (live.find(number) != live.end()); |
258 | break; |
259 | case kTempFile: |
260 | // Any temp files that are currently being written to must |
261 | // be recorded in pending_outputs_, which is inserted into "live" |
262 | keep = (live.find(number) != live.end()); |
263 | break; |
264 | case kCurrentFile: |
265 | case kDBLockFile: |
266 | case kInfoLogFile: |
267 | keep = true; |
268 | break; |
269 | } |
270 | |
271 | if (!keep) { |
272 | files_to_delete.push_back(std::move(filename)); |
273 | if (type == kTableFile) { |
274 | table_cache_->Evict(number); |
275 | } |
276 | Log(options_.info_log, "Delete type=%d #%lld\n" , static_cast<int>(type), |
277 | static_cast<unsigned long long>(number)); |
278 | } |
279 | } |
280 | } |
281 | |
282 | // While deleting all files unblock other threads. All files being deleted |
283 | // have unique names which will not collide with newly created files and |
284 | // are therefore safe to delete while allowing other threads to proceed. |
285 | mutex_.Unlock(); |
286 | for (const std::string& filename : files_to_delete) { |
287 | env_->RemoveFile(dbname_ + "/" + filename); |
288 | } |
289 | mutex_.Lock(); |
290 | } |
291 | |
292 | Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) { |
293 | mutex_.AssertHeld(); |
294 | |
295 | // Ignore error from CreateDir since the creation of the DB is |
296 | // committed only when the descriptor is created, and this directory |
297 | // may already exist from a previous failed creation attempt. |
298 | env_->CreateDir(dbname_); |
299 | assert(db_lock_ == nullptr); |
300 | Status s = env_->LockFile(LockFileName(dbname_), &db_lock_); |
301 | if (!s.ok()) { |
302 | return s; |
303 | } |
304 | |
305 | if (!env_->FileExists(CurrentFileName(dbname_))) { |
306 | if (options_.create_if_missing) { |
307 | Log(options_.info_log, "Creating DB %s since it was missing." , |
308 | dbname_.c_str()); |
309 | s = NewDB(); |
310 | if (!s.ok()) { |
311 | return s; |
312 | } |
313 | } else { |
314 | return Status::InvalidArgument( |
315 | dbname_, "does not exist (create_if_missing is false)" ); |
316 | } |
317 | } else { |
318 | if (options_.error_if_exists) { |
319 | return Status::InvalidArgument(dbname_, |
320 | "exists (error_if_exists is true)" ); |
321 | } |
322 | } |
323 | |
324 | s = versions_->Recover(save_manifest); |
325 | if (!s.ok()) { |
326 | return s; |
327 | } |
328 | SequenceNumber max_sequence(0); |
329 | |
330 | // Recover from all newer log files than the ones named in the |
331 | // descriptor (new log files may have been added by the previous |
332 | // incarnation without registering them in the descriptor). |
333 | // |
334 | // Note that PrevLogNumber() is no longer used, but we pay |
335 | // attention to it in case we are recovering a database |
336 | // produced by an older version of leveldb. |
337 | const uint64_t min_log = versions_->LogNumber(); |
338 | const uint64_t prev_log = versions_->PrevLogNumber(); |
339 | std::vector<std::string> filenames; |
340 | s = env_->GetChildren(dbname_, &filenames); |
341 | if (!s.ok()) { |
342 | return s; |
343 | } |
344 | std::set<uint64_t> expected; |
345 | versions_->AddLiveFiles(&expected); |
346 | uint64_t number; |
347 | FileType type; |
348 | std::vector<uint64_t> logs; |
349 | for (size_t i = 0; i < filenames.size(); i++) { |
350 | if (ParseFileName(filenames[i], &number, &type)) { |
351 | expected.erase(number); |
352 | if (type == kLogFile && ((number >= min_log) || (number == prev_log))) |
353 | logs.push_back(number); |
354 | } |
355 | } |
356 | if (!expected.empty()) { |
357 | char buf[50]; |
358 | std::snprintf(buf, sizeof(buf), "%d missing files; e.g." , |
359 | static_cast<int>(expected.size())); |
360 | return Status::Corruption(buf, TableFileName(dbname_, *(expected.begin()))); |
361 | } |
362 | |
363 | // Recover in the order in which the logs were generated |
364 | std::sort(logs.begin(), logs.end()); |
365 | for (size_t i = 0; i < logs.size(); i++) { |
366 | s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit, |
367 | &max_sequence); |
368 | if (!s.ok()) { |
369 | return s; |
370 | } |
371 | |
372 | // The previous incarnation may not have written any MANIFEST |
373 | // records after allocating this log number. So we manually |
374 | // update the file number allocation counter in VersionSet. |
375 | versions_->MarkFileNumberUsed(logs[i]); |
376 | } |
377 | |
378 | if (versions_->LastSequence() < max_sequence) { |
379 | versions_->SetLastSequence(max_sequence); |
380 | } |
381 | |
382 | return Status::OK(); |
383 | } |
384 | |
385 | Status DBImpl::RecoverLogFile(uint64_t log_number, bool last_log, |
386 | bool* save_manifest, VersionEdit* edit, |
387 | SequenceNumber* max_sequence) { |
388 | struct LogReporter : public log::Reader::Reporter { |
389 | Env* env; |
390 | Logger* info_log; |
391 | const char* fname; |
392 | Status* status; // null if options_.paranoid_checks==false |
393 | void Corruption(size_t bytes, const Status& s) override { |
394 | Log(info_log, "%s%s: dropping %d bytes; %s" , |
395 | (this->status == nullptr ? "(ignoring error) " : "" ), fname, |
396 | static_cast<int>(bytes), s.ToString().c_str()); |
397 | if (this->status != nullptr && this->status->ok()) *this->status = s; |
398 | } |
399 | }; |
400 | |
401 | mutex_.AssertHeld(); |
402 | |
403 | // Open the log file |
404 | std::string fname = LogFileName(dbname_, log_number); |
405 | SequentialFile* file; |
406 | Status status = env_->NewSequentialFile(fname, &file); |
407 | if (!status.ok()) { |
408 | MaybeIgnoreError(&status); |
409 | return status; |
410 | } |
411 | |
412 | // Create the log reader. |
413 | LogReporter reporter; |
414 | reporter.env = env_; |
415 | reporter.info_log = options_.info_log; |
416 | reporter.fname = fname.c_str(); |
417 | reporter.status = (options_.paranoid_checks ? &status : nullptr); |
418 | // We intentionally make log::Reader do checksumming even if |
419 | // paranoid_checks==false so that corruptions cause entire commits |
420 | // to be skipped instead of propagating bad information (like overly |
421 | // large sequence numbers). |
422 | log::Reader reader(file, &reporter, true /*checksum*/, 0 /*initial_offset*/); |
423 | Log(options_.info_log, "Recovering log #%llu" , |
424 | (unsigned long long)log_number); |
425 | |
426 | // Read all the records and add to a memtable |
427 | std::string scratch; |
428 | Slice record; |
429 | WriteBatch batch; |
430 | int compactions = 0; |
431 | MemTable* mem = nullptr; |
432 | while (reader.ReadRecord(&record, &scratch) && status.ok()) { |
433 | if (record.size() < 12) { |
434 | reporter.Corruption(record.size(), |
435 | Status::Corruption("log record too small" )); |
436 | continue; |
437 | } |
438 | WriteBatchInternal::SetContents(&batch, record); |
439 | |
440 | if (mem == nullptr) { |
441 | mem = new MemTable(internal_comparator_); |
442 | mem->Ref(); |
443 | } |
444 | status = WriteBatchInternal::InsertInto(&batch, mem); |
445 | MaybeIgnoreError(&status); |
446 | if (!status.ok()) { |
447 | break; |
448 | } |
449 | const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + |
450 | WriteBatchInternal::Count(&batch) - 1; |
451 | if (last_seq > *max_sequence) { |
452 | *max_sequence = last_seq; |
453 | } |
454 | |
455 | if (mem->ApproximateMemoryUsage() > options_.write_buffer_size) { |
456 | compactions++; |
457 | *save_manifest = true; |
458 | status = WriteLevel0Table(mem, edit, nullptr); |
459 | mem->Unref(); |
460 | mem = nullptr; |
461 | if (!status.ok()) { |
462 | // Reflect errors immediately so that conditions like full |
463 | // file-systems cause the DB::Open() to fail. |
464 | break; |
465 | } |
466 | } |
467 | } |
468 | |
469 | delete file; |
470 | |
471 | // See if we should keep reusing the last log file. |
472 | if (status.ok() && options_.reuse_logs && last_log && compactions == 0) { |
473 | assert(logfile_ == nullptr); |
474 | assert(log_ == nullptr); |
475 | assert(mem_ == nullptr); |
476 | uint64_t lfile_size; |
477 | if (env_->GetFileSize(fname, &lfile_size).ok() && |
478 | env_->NewAppendableFile(fname, &logfile_).ok()) { |
479 | Log(options_.info_log, "Reusing old log %s \n" , fname.c_str()); |
480 | log_ = new log::Writer(logfile_, lfile_size); |
481 | logfile_number_ = log_number; |
482 | if (mem != nullptr) { |
483 | mem_ = mem; |
484 | mem = nullptr; |
485 | } else { |
486 | // mem can be nullptr if lognum exists but was empty. |
487 | mem_ = new MemTable(internal_comparator_); |
488 | mem_->Ref(); |
489 | } |
490 | } |
491 | } |
492 | |
493 | if (mem != nullptr) { |
494 | // mem did not get reused; compact it. |
495 | if (status.ok()) { |
496 | *save_manifest = true; |
497 | status = WriteLevel0Table(mem, edit, nullptr); |
498 | } |
499 | mem->Unref(); |
500 | } |
501 | |
502 | return status; |
503 | } |
504 | |
505 | Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit, |
506 | Version* base) { |
507 | mutex_.AssertHeld(); |
508 | const uint64_t start_micros = env_->NowMicros(); |
509 | FileMetaData meta; |
510 | meta.number = versions_->NewFileNumber(); |
511 | pending_outputs_.insert(meta.number); |
512 | Iterator* iter = mem->NewIterator(); |
513 | Log(options_.info_log, "Level-0 table #%llu: started" , |
514 | (unsigned long long)meta.number); |
515 | |
516 | Status s; |
517 | { |
518 | mutex_.Unlock(); |
519 | s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); |
520 | mutex_.Lock(); |
521 | } |
522 | |
523 | Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s" , |
524 | (unsigned long long)meta.number, (unsigned long long)meta.file_size, |
525 | s.ToString().c_str()); |
526 | delete iter; |
527 | pending_outputs_.erase(meta.number); |
528 | |
529 | // Note that if file_size is zero, the file has been deleted and |
530 | // should not be added to the manifest. |
531 | int level = 0; |
532 | if (s.ok() && meta.file_size > 0) { |
533 | const Slice min_user_key = meta.smallest.user_key(); |
534 | const Slice max_user_key = meta.largest.user_key(); |
535 | if (base != nullptr) { |
536 | level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); |
537 | } |
538 | edit->AddFile(level, meta.number, meta.file_size, meta.smallest, |
539 | meta.largest); |
540 | } |
541 | |
542 | CompactionStats stats; |
543 | stats.micros = env_->NowMicros() - start_micros; |
544 | stats.bytes_written = meta.file_size; |
545 | stats_[level].Add(stats); |
546 | return s; |
547 | } |
548 | |
549 | void DBImpl::CompactMemTable() { |
550 | mutex_.AssertHeld(); |
551 | assert(imm_ != nullptr); |
552 | |
553 | // Save the contents of the memtable as a new Table |
554 | VersionEdit edit; |
555 | Version* base = versions_->current(); |
556 | base->Ref(); |
557 | Status s = WriteLevel0Table(imm_, &edit, base); |
558 | base->Unref(); |
559 | |
560 | if (s.ok() && shutting_down_.load(std::memory_order_acquire)) { |
561 | s = Status::IOError("Deleting DB during memtable compaction" ); |
562 | } |
563 | |
564 | // Replace immutable memtable with the generated Table |
565 | if (s.ok()) { |
566 | edit.SetPrevLogNumber(0); |
567 | edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed |
568 | s = versions_->LogAndApply(&edit, &mutex_); |
569 | } |
570 | |
571 | if (s.ok()) { |
572 | // Commit to the new state |
573 | imm_->Unref(); |
574 | imm_ = nullptr; |
575 | has_imm_.store(false, std::memory_order_release); |
576 | RemoveObsoleteFiles(); |
577 | } else { |
578 | RecordBackgroundError(s); |
579 | } |
580 | } |
581 | |
582 | void DBImpl::CompactRange(const Slice* begin, const Slice* end) { |
583 | int max_level_with_files = 1; |
584 | { |
585 | MutexLock l(&mutex_); |
586 | Version* base = versions_->current(); |
587 | for (int level = 1; level < config::kNumLevels; level++) { |
588 | if (base->OverlapInLevel(level, begin, end)) { |
589 | max_level_with_files = level; |
590 | } |
591 | } |
592 | } |
593 | TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap |
594 | for (int level = 0; level < max_level_with_files; level++) { |
595 | TEST_CompactRange(level, begin, end); |
596 | } |
597 | } |
598 | |
599 | void DBImpl::TEST_CompactRange(int level, const Slice* begin, |
600 | const Slice* end) { |
601 | assert(level >= 0); |
602 | assert(level + 1 < config::kNumLevels); |
603 | |
604 | InternalKey begin_storage, end_storage; |
605 | |
606 | ManualCompaction manual; |
607 | manual.level = level; |
608 | manual.done = false; |
609 | if (begin == nullptr) { |
610 | manual.begin = nullptr; |
611 | } else { |
612 | begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek); |
613 | manual.begin = &begin_storage; |
614 | } |
615 | if (end == nullptr) { |
616 | manual.end = nullptr; |
617 | } else { |
618 | end_storage = InternalKey(*end, 0, static_cast<ValueType>(0)); |
619 | manual.end = &end_storage; |
620 | } |
621 | |
622 | MutexLock l(&mutex_); |
623 | while (!manual.done && !shutting_down_.load(std::memory_order_acquire) && |
624 | bg_error_.ok()) { |
625 | if (manual_compaction_ == nullptr) { // Idle |
626 | manual_compaction_ = &manual; |
627 | MaybeScheduleCompaction(); |
628 | } else { // Running either my compaction or another compaction. |
629 | background_work_finished_signal_.Wait(); |
630 | } |
631 | } |
632 | if (manual_compaction_ == &manual) { |
633 | // Cancel my manual compaction since we aborted early for some reason. |
634 | manual_compaction_ = nullptr; |
635 | } |
636 | } |
637 | |
638 | Status DBImpl::TEST_CompactMemTable() { |
639 | // nullptr batch means just wait for earlier writes to be done |
640 | Status s = Write(WriteOptions(), nullptr); |
641 | if (s.ok()) { |
642 | // Wait until the compaction completes |
643 | MutexLock l(&mutex_); |
644 | while (imm_ != nullptr && bg_error_.ok()) { |
645 | background_work_finished_signal_.Wait(); |
646 | } |
647 | if (imm_ != nullptr) { |
648 | s = bg_error_; |
649 | } |
650 | } |
651 | return s; |
652 | } |
653 | |
654 | void DBImpl::RecordBackgroundError(const Status& s) { |
655 | mutex_.AssertHeld(); |
656 | if (bg_error_.ok()) { |
657 | bg_error_ = s; |
658 | background_work_finished_signal_.SignalAll(); |
659 | } |
660 | } |
661 | |
662 | void DBImpl::MaybeScheduleCompaction() { |
663 | mutex_.AssertHeld(); |
664 | if (background_compaction_scheduled_) { |
665 | // Already scheduled |
666 | } else if (shutting_down_.load(std::memory_order_acquire)) { |
667 | // DB is being deleted; no more background compactions |
668 | } else if (!bg_error_.ok()) { |
669 | // Already got an error; no more changes |
670 | } else if (imm_ == nullptr && manual_compaction_ == nullptr && |
671 | !versions_->NeedsCompaction()) { |
672 | // No work to be done |
673 | } else { |
674 | background_compaction_scheduled_ = true; |
675 | env_->Schedule(&DBImpl::BGWork, this); |
676 | } |
677 | } |
678 | |
679 | void DBImpl::BGWork(void* db) { |
680 | reinterpret_cast<DBImpl*>(db)->BackgroundCall(); |
681 | } |
682 | |
683 | void DBImpl::BackgroundCall() { |
684 | MutexLock l(&mutex_); |
685 | assert(background_compaction_scheduled_); |
686 | if (shutting_down_.load(std::memory_order_acquire)) { |
687 | // No more background work when shutting down. |
688 | } else if (!bg_error_.ok()) { |
689 | // No more background work after a background error. |
690 | } else { |
691 | BackgroundCompaction(); |
692 | } |
693 | |
694 | background_compaction_scheduled_ = false; |
695 | |
696 | // Previous compaction may have produced too many files in a level, |
697 | // so reschedule another compaction if needed. |
698 | MaybeScheduleCompaction(); |
699 | background_work_finished_signal_.SignalAll(); |
700 | } |
701 | |
702 | void DBImpl::BackgroundCompaction() { |
703 | mutex_.AssertHeld(); |
704 | |
705 | if (imm_ != nullptr) { |
706 | CompactMemTable(); |
707 | return; |
708 | } |
709 | |
710 | Compaction* c; |
711 | bool is_manual = (manual_compaction_ != nullptr); |
712 | InternalKey manual_end; |
713 | if (is_manual) { |
714 | ManualCompaction* m = manual_compaction_; |
715 | c = versions_->CompactRange(m->level, m->begin, m->end); |
716 | m->done = (c == nullptr); |
717 | if (c != nullptr) { |
718 | manual_end = c->input(0, c->num_input_files(0) - 1)->largest; |
719 | } |
720 | Log(options_.info_log, |
721 | "Manual compaction at level-%d from %s .. %s; will stop at %s\n" , |
722 | m->level, (m->begin ? m->begin->DebugString().c_str() : "(begin)" ), |
723 | (m->end ? m->end->DebugString().c_str() : "(end)" ), |
724 | (m->done ? "(end)" : manual_end.DebugString().c_str())); |
725 | } else { |
726 | c = versions_->PickCompaction(); |
727 | } |
728 | |
729 | Status status; |
730 | if (c == nullptr) { |
731 | // Nothing to do |
732 | } else if (!is_manual && c->IsTrivialMove()) { |
733 | // Move file to next level |
734 | assert(c->num_input_files(0) == 1); |
735 | FileMetaData* f = c->input(0, 0); |
736 | c->edit()->RemoveFile(c->level(), f->number); |
737 | c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, |
738 | f->largest); |
739 | status = versions_->LogAndApply(c->edit(), &mutex_); |
740 | if (!status.ok()) { |
741 | RecordBackgroundError(status); |
742 | } |
743 | VersionSet::LevelSummaryStorage tmp; |
744 | Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n" , |
745 | static_cast<unsigned long long>(f->number), c->level() + 1, |
746 | static_cast<unsigned long long>(f->file_size), |
747 | status.ToString().c_str(), versions_->LevelSummary(&tmp)); |
748 | } else { |
749 | CompactionState* compact = new CompactionState(c); |
750 | status = DoCompactionWork(compact); |
751 | if (!status.ok()) { |
752 | RecordBackgroundError(status); |
753 | } |
754 | CleanupCompaction(compact); |
755 | c->ReleaseInputs(); |
756 | RemoveObsoleteFiles(); |
757 | } |
758 | delete c; |
759 | |
760 | if (status.ok()) { |
761 | // Done |
762 | } else if (shutting_down_.load(std::memory_order_acquire)) { |
763 | // Ignore compaction errors found during shutting down |
764 | } else { |
765 | Log(options_.info_log, "Compaction error: %s" , status.ToString().c_str()); |
766 | } |
767 | |
768 | if (is_manual) { |
769 | ManualCompaction* m = manual_compaction_; |
770 | if (!status.ok()) { |
771 | m->done = true; |
772 | } |
773 | if (!m->done) { |
774 | // We only compacted part of the requested range. Update *m |
775 | // to the range that is left to be compacted. |
776 | m->tmp_storage = manual_end; |
777 | m->begin = &m->tmp_storage; |
778 | } |
779 | manual_compaction_ = nullptr; |
780 | } |
781 | } |
782 | |
783 | void DBImpl::CleanupCompaction(CompactionState* compact) { |
784 | mutex_.AssertHeld(); |
785 | if (compact->builder != nullptr) { |
786 | // May happen if we get a shutdown call in the middle of compaction |
787 | compact->builder->Abandon(); |
788 | delete compact->builder; |
789 | } else { |
790 | assert(compact->outfile == nullptr); |
791 | } |
792 | delete compact->outfile; |
793 | for (size_t i = 0; i < compact->outputs.size(); i++) { |
794 | const CompactionState::Output& out = compact->outputs[i]; |
795 | pending_outputs_.erase(out.number); |
796 | } |
797 | delete compact; |
798 | } |
799 | |
800 | Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) { |
801 | assert(compact != nullptr); |
802 | assert(compact->builder == nullptr); |
803 | uint64_t file_number; |
804 | { |
805 | mutex_.Lock(); |
806 | file_number = versions_->NewFileNumber(); |
807 | pending_outputs_.insert(file_number); |
808 | CompactionState::Output out; |
809 | out.number = file_number; |
810 | out.smallest.Clear(); |
811 | out.largest.Clear(); |
812 | compact->outputs.push_back(out); |
813 | mutex_.Unlock(); |
814 | } |
815 | |
816 | // Make the output file |
817 | std::string fname = TableFileName(dbname_, file_number); |
818 | Status s = env_->NewWritableFile(fname, &compact->outfile); |
819 | if (s.ok()) { |
820 | compact->builder = new TableBuilder(options_, compact->outfile); |
821 | } |
822 | return s; |
823 | } |
824 | |
825 | Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, |
826 | Iterator* input) { |
827 | assert(compact != nullptr); |
828 | assert(compact->outfile != nullptr); |
829 | assert(compact->builder != nullptr); |
830 | |
831 | const uint64_t output_number = compact->current_output()->number; |
832 | assert(output_number != 0); |
833 | |
834 | // Check for iterator errors |
835 | Status s = input->status(); |
836 | const uint64_t current_entries = compact->builder->NumEntries(); |
837 | if (s.ok()) { |
838 | s = compact->builder->Finish(); |
839 | } else { |
840 | compact->builder->Abandon(); |
841 | } |
842 | const uint64_t current_bytes = compact->builder->FileSize(); |
843 | compact->current_output()->file_size = current_bytes; |
844 | compact->total_bytes += current_bytes; |
845 | delete compact->builder; |
846 | compact->builder = nullptr; |
847 | |
848 | // Finish and check for file errors |
849 | if (s.ok()) { |
850 | s = compact->outfile->Sync(); |
851 | } |
852 | if (s.ok()) { |
853 | s = compact->outfile->Close(); |
854 | } |
855 | delete compact->outfile; |
856 | compact->outfile = nullptr; |
857 | |
858 | if (s.ok() && current_entries > 0) { |
859 | // Verify that the table is usable |
860 | Iterator* iter = |
861 | table_cache_->NewIterator(ReadOptions(), output_number, current_bytes); |
862 | s = iter->status(); |
863 | delete iter; |
864 | if (s.ok()) { |
865 | Log(options_.info_log, "Generated table #%llu@%d: %lld keys, %lld bytes" , |
866 | (unsigned long long)output_number, compact->compaction->level(), |
867 | (unsigned long long)current_entries, |
868 | (unsigned long long)current_bytes); |
869 | } |
870 | } |
871 | return s; |
872 | } |
873 | |
874 | Status DBImpl::InstallCompactionResults(CompactionState* compact) { |
875 | mutex_.AssertHeld(); |
876 | Log(options_.info_log, "Compacted %d@%d + %d@%d files => %lld bytes" , |
877 | compact->compaction->num_input_files(0), compact->compaction->level(), |
878 | compact->compaction->num_input_files(1), compact->compaction->level() + 1, |
879 | static_cast<long long>(compact->total_bytes)); |
880 | |
881 | // Add compaction outputs |
882 | compact->compaction->AddInputDeletions(compact->compaction->edit()); |
883 | const int level = compact->compaction->level(); |
884 | for (size_t i = 0; i < compact->outputs.size(); i++) { |
885 | const CompactionState::Output& out = compact->outputs[i]; |
886 | compact->compaction->edit()->AddFile(level + 1, out.number, out.file_size, |
887 | out.smallest, out.largest); |
888 | } |
889 | return versions_->LogAndApply(compact->compaction->edit(), &mutex_); |
890 | } |
891 | |
892 | Status DBImpl::DoCompactionWork(CompactionState* compact) { |
893 | const uint64_t start_micros = env_->NowMicros(); |
894 | int64_t imm_micros = 0; // Micros spent doing imm_ compactions |
895 | |
896 | Log(options_.info_log, "Compacting %d@%d + %d@%d files" , |
897 | compact->compaction->num_input_files(0), compact->compaction->level(), |
898 | compact->compaction->num_input_files(1), |
899 | compact->compaction->level() + 1); |
900 | |
901 | assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); |
902 | assert(compact->builder == nullptr); |
903 | assert(compact->outfile == nullptr); |
904 | if (snapshots_.empty()) { |
905 | compact->smallest_snapshot = versions_->LastSequence(); |
906 | } else { |
907 | compact->smallest_snapshot = snapshots_.oldest()->sequence_number(); |
908 | } |
909 | |
910 | Iterator* input = versions_->MakeInputIterator(compact->compaction); |
911 | |
912 | // Release mutex while we're actually doing the compaction work |
913 | mutex_.Unlock(); |
914 | |
915 | input->SeekToFirst(); |
916 | Status status; |
917 | ParsedInternalKey ikey; |
918 | std::string current_user_key; |
919 | bool has_current_user_key = false; |
920 | SequenceNumber last_sequence_for_key = kMaxSequenceNumber; |
921 | while (input->Valid() && !shutting_down_.load(std::memory_order_acquire)) { |
922 | // Prioritize immutable compaction work |
923 | if (has_imm_.load(std::memory_order_relaxed)) { |
924 | const uint64_t imm_start = env_->NowMicros(); |
925 | mutex_.Lock(); |
926 | if (imm_ != nullptr) { |
927 | CompactMemTable(); |
928 | // Wake up MakeRoomForWrite() if necessary. |
929 | background_work_finished_signal_.SignalAll(); |
930 | } |
931 | mutex_.Unlock(); |
932 | imm_micros += (env_->NowMicros() - imm_start); |
933 | } |
934 | |
935 | Slice key = input->key(); |
936 | if (compact->compaction->ShouldStopBefore(key) && |
937 | compact->builder != nullptr) { |
938 | status = FinishCompactionOutputFile(compact, input); |
939 | if (!status.ok()) { |
940 | break; |
941 | } |
942 | } |
943 | |
944 | // Handle key/value, add to state, etc. |
945 | bool drop = false; |
946 | if (!ParseInternalKey(key, &ikey)) { |
947 | // Do not hide error keys |
948 | current_user_key.clear(); |
949 | has_current_user_key = false; |
950 | last_sequence_for_key = kMaxSequenceNumber; |
951 | } else { |
952 | if (!has_current_user_key || |
953 | user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != |
954 | 0) { |
955 | // First occurrence of this user key |
956 | current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); |
957 | has_current_user_key = true; |
958 | last_sequence_for_key = kMaxSequenceNumber; |
959 | } |
960 | |
961 | if (last_sequence_for_key <= compact->smallest_snapshot) { |
962 | // Hidden by an newer entry for same user key |
963 | drop = true; // (A) |
964 | } else if (ikey.type == kTypeDeletion && |
965 | ikey.sequence <= compact->smallest_snapshot && |
966 | compact->compaction->IsBaseLevelForKey(ikey.user_key)) { |
967 | // For this user key: |
968 | // (1) there is no data in higher levels |
969 | // (2) data in lower levels will have larger sequence numbers |
970 | // (3) data in layers that are being compacted here and have |
971 | // smaller sequence numbers will be dropped in the next |
972 | // few iterations of this loop (by rule (A) above). |
973 | // Therefore this deletion marker is obsolete and can be dropped. |
974 | drop = true; |
975 | } |
976 | |
977 | last_sequence_for_key = ikey.sequence; |
978 | } |
979 | #if 0 |
980 | Log(options_.info_log, |
981 | " Compact: %s, seq %d, type: %d %d, drop: %d, is_base: %d, " |
982 | "%d smallest_snapshot: %d" , |
983 | ikey.user_key.ToString().c_str(), |
984 | (int)ikey.sequence, ikey.type, kTypeValue, drop, |
985 | compact->compaction->IsBaseLevelForKey(ikey.user_key), |
986 | (int)last_sequence_for_key, (int)compact->smallest_snapshot); |
987 | #endif |
988 | |
989 | if (!drop) { |
990 | // Open output file if necessary |
991 | if (compact->builder == nullptr) { |
992 | status = OpenCompactionOutputFile(compact); |
993 | if (!status.ok()) { |
994 | break; |
995 | } |
996 | } |
997 | if (compact->builder->NumEntries() == 0) { |
998 | compact->current_output()->smallest.DecodeFrom(key); |
999 | } |
1000 | compact->current_output()->largest.DecodeFrom(key); |
1001 | compact->builder->Add(key, input->value()); |
1002 | |
1003 | // Close output file if it is big enough |
1004 | if (compact->builder->FileSize() >= |
1005 | compact->compaction->MaxOutputFileSize()) { |
1006 | status = FinishCompactionOutputFile(compact, input); |
1007 | if (!status.ok()) { |
1008 | break; |
1009 | } |
1010 | } |
1011 | } |
1012 | |
1013 | input->Next(); |
1014 | } |
1015 | |
1016 | if (status.ok() && shutting_down_.load(std::memory_order_acquire)) { |
1017 | status = Status::IOError("Deleting DB during compaction" ); |
1018 | } |
1019 | if (status.ok() && compact->builder != nullptr) { |
1020 | status = FinishCompactionOutputFile(compact, input); |
1021 | } |
1022 | if (status.ok()) { |
1023 | status = input->status(); |
1024 | } |
1025 | delete input; |
1026 | input = nullptr; |
1027 | |
1028 | CompactionStats stats; |
1029 | stats.micros = env_->NowMicros() - start_micros - imm_micros; |
1030 | for (int which = 0; which < 2; which++) { |
1031 | for (int i = 0; i < compact->compaction->num_input_files(which); i++) { |
1032 | stats.bytes_read += compact->compaction->input(which, i)->file_size; |
1033 | } |
1034 | } |
1035 | for (size_t i = 0; i < compact->outputs.size(); i++) { |
1036 | stats.bytes_written += compact->outputs[i].file_size; |
1037 | } |
1038 | |
1039 | mutex_.Lock(); |
1040 | stats_[compact->compaction->level() + 1].Add(stats); |
1041 | |
1042 | if (status.ok()) { |
1043 | status = InstallCompactionResults(compact); |
1044 | } |
1045 | if (!status.ok()) { |
1046 | RecordBackgroundError(status); |
1047 | } |
1048 | VersionSet::LevelSummaryStorage tmp; |
1049 | Log(options_.info_log, "compacted to: %s" , versions_->LevelSummary(&tmp)); |
1050 | return status; |
1051 | } |
1052 | |
1053 | namespace { |
1054 | |
1055 | struct IterState { |
1056 | port::Mutex* const mu; |
1057 | Version* const version GUARDED_BY(mu); |
1058 | MemTable* const mem GUARDED_BY(mu); |
1059 | MemTable* const imm GUARDED_BY(mu); |
1060 | |
1061 | IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version) |
1062 | : mu(mutex), version(version), mem(mem), imm(imm) {} |
1063 | }; |
1064 | |
1065 | static void CleanupIteratorState(void* arg1, void* arg2) { |
1066 | IterState* state = reinterpret_cast<IterState*>(arg1); |
1067 | state->mu->Lock(); |
1068 | state->mem->Unref(); |
1069 | if (state->imm != nullptr) state->imm->Unref(); |
1070 | state->version->Unref(); |
1071 | state->mu->Unlock(); |
1072 | delete state; |
1073 | } |
1074 | |
1075 | } // anonymous namespace |
1076 | |
1077 | Iterator* DBImpl::NewInternalIterator(const ReadOptions& options, |
1078 | SequenceNumber* latest_snapshot, |
1079 | uint32_t* seed) { |
1080 | mutex_.Lock(); |
1081 | *latest_snapshot = versions_->LastSequence(); |
1082 | |
1083 | // Collect together all needed child iterators |
1084 | std::vector<Iterator*> list; |
1085 | list.push_back(mem_->NewIterator()); |
1086 | mem_->Ref(); |
1087 | if (imm_ != nullptr) { |
1088 | list.push_back(imm_->NewIterator()); |
1089 | imm_->Ref(); |
1090 | } |
1091 | versions_->current()->AddIterators(options, &list); |
1092 | Iterator* internal_iter = |
1093 | NewMergingIterator(&internal_comparator_, &list[0], list.size()); |
1094 | versions_->current()->Ref(); |
1095 | |
1096 | IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current()); |
1097 | internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr); |
1098 | |
1099 | *seed = ++seed_; |
1100 | mutex_.Unlock(); |
1101 | return internal_iter; |
1102 | } |
1103 | |
1104 | Iterator* DBImpl::TEST_NewInternalIterator() { |
1105 | SequenceNumber ignored; |
1106 | uint32_t ignored_seed; |
1107 | return NewInternalIterator(ReadOptions(), &ignored, &ignored_seed); |
1108 | } |
1109 | |
1110 | int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { |
1111 | MutexLock l(&mutex_); |
1112 | return versions_->MaxNextLevelOverlappingBytes(); |
1113 | } |
1114 | |
1115 | Status DBImpl::Get(const ReadOptions& options, const Slice& key, |
1116 | std::string* value) { |
1117 | Status s; |
1118 | MutexLock l(&mutex_); |
1119 | SequenceNumber snapshot; |
1120 | if (options.snapshot != nullptr) { |
1121 | snapshot = |
1122 | static_cast<const SnapshotImpl*>(options.snapshot)->sequence_number(); |
1123 | } else { |
1124 | snapshot = versions_->LastSequence(); |
1125 | } |
1126 | |
1127 | MemTable* mem = mem_; |
1128 | MemTable* imm = imm_; |
1129 | Version* current = versions_->current(); |
1130 | mem->Ref(); |
1131 | if (imm != nullptr) imm->Ref(); |
1132 | current->Ref(); |
1133 | |
1134 | bool have_stat_update = false; |
1135 | Version::GetStats stats; |
1136 | |
1137 | // Unlock while reading from files and memtables |
1138 | { |
1139 | mutex_.Unlock(); |
1140 | // First look in the memtable, then in the immutable memtable (if any). |
1141 | LookupKey lkey(key, snapshot); |
1142 | if (mem->Get(lkey, value, &s)) { |
1143 | // Done |
1144 | } else if (imm != nullptr && imm->Get(lkey, value, &s)) { |
1145 | // Done |
1146 | } else { |
1147 | s = current->Get(options, lkey, value, &stats); |
1148 | have_stat_update = true; |
1149 | } |
1150 | mutex_.Lock(); |
1151 | } |
1152 | |
1153 | if (have_stat_update && current->UpdateStats(stats)) { |
1154 | MaybeScheduleCompaction(); |
1155 | } |
1156 | mem->Unref(); |
1157 | if (imm != nullptr) imm->Unref(); |
1158 | current->Unref(); |
1159 | return s; |
1160 | } |
1161 | |
1162 | Iterator* DBImpl::NewIterator(const ReadOptions& options) { |
1163 | SequenceNumber latest_snapshot; |
1164 | uint32_t seed; |
1165 | Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed); |
1166 | return NewDBIterator(this, user_comparator(), iter, |
1167 | (options.snapshot != nullptr |
1168 | ? static_cast<const SnapshotImpl*>(options.snapshot) |
1169 | ->sequence_number() |
1170 | : latest_snapshot), |
1171 | seed); |
1172 | } |
1173 | |
1174 | void DBImpl::RecordReadSample(Slice key) { |
1175 | MutexLock l(&mutex_); |
1176 | if (versions_->current()->RecordReadSample(key)) { |
1177 | MaybeScheduleCompaction(); |
1178 | } |
1179 | } |
1180 | |
1181 | const Snapshot* DBImpl::GetSnapshot() { |
1182 | MutexLock l(&mutex_); |
1183 | return snapshots_.New(versions_->LastSequence()); |
1184 | } |
1185 | |
1186 | void DBImpl::ReleaseSnapshot(const Snapshot* snapshot) { |
1187 | MutexLock l(&mutex_); |
1188 | snapshots_.Delete(static_cast<const SnapshotImpl*>(snapshot)); |
1189 | } |
1190 | |
1191 | // Convenience methods |
1192 | Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { |
1193 | return DB::Put(o, key, val); |
1194 | } |
1195 | |
1196 | Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { |
1197 | return DB::Delete(options, key); |
1198 | } |
1199 | |
1200 | Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) { |
1201 | Writer w(&mutex_); |
1202 | w.batch = updates; |
1203 | w.sync = options.sync; |
1204 | w.done = false; |
1205 | |
1206 | MutexLock l(&mutex_); |
1207 | writers_.push_back(&w); |
1208 | while (!w.done && &w != writers_.front()) { |
1209 | w.cv.Wait(); |
1210 | } |
1211 | if (w.done) { |
1212 | return w.status; |
1213 | } |
1214 | |
1215 | // May temporarily unlock and wait. |
1216 | Status status = MakeRoomForWrite(updates == nullptr); |
1217 | uint64_t last_sequence = versions_->LastSequence(); |
1218 | Writer* last_writer = &w; |
1219 | if (status.ok() && updates != nullptr) { // nullptr batch is for compactions |
1220 | WriteBatch* write_batch = BuildBatchGroup(&last_writer); |
1221 | WriteBatchInternal::SetSequence(write_batch, last_sequence + 1); |
1222 | last_sequence += WriteBatchInternal::Count(write_batch); |
1223 | |
1224 | // Add to log and apply to memtable. We can release the lock |
1225 | // during this phase since &w is currently responsible for logging |
1226 | // and protects against concurrent loggers and concurrent writes |
1227 | // into mem_. |
1228 | { |
1229 | mutex_.Unlock(); |
1230 | status = log_->AddRecord(WriteBatchInternal::Contents(write_batch)); |
1231 | bool sync_error = false; |
1232 | if (status.ok() && options.sync) { |
1233 | status = logfile_->Sync(); |
1234 | if (!status.ok()) { |
1235 | sync_error = true; |
1236 | } |
1237 | } |
1238 | if (status.ok()) { |
1239 | status = WriteBatchInternal::InsertInto(write_batch, mem_); |
1240 | } |
1241 | mutex_.Lock(); |
1242 | if (sync_error) { |
1243 | // The state of the log file is indeterminate: the log record we |
1244 | // just added may or may not show up when the DB is re-opened. |
1245 | // So we force the DB into a mode where all future writes fail. |
1246 | RecordBackgroundError(status); |
1247 | } |
1248 | } |
1249 | if (write_batch == tmp_batch_) tmp_batch_->Clear(); |
1250 | |
1251 | versions_->SetLastSequence(last_sequence); |
1252 | } |
1253 | |
1254 | while (true) { |
1255 | Writer* ready = writers_.front(); |
1256 | writers_.pop_front(); |
1257 | if (ready != &w) { |
1258 | ready->status = status; |
1259 | ready->done = true; |
1260 | ready->cv.Signal(); |
1261 | } |
1262 | if (ready == last_writer) break; |
1263 | } |
1264 | |
1265 | // Notify new head of write queue |
1266 | if (!writers_.empty()) { |
1267 | writers_.front()->cv.Signal(); |
1268 | } |
1269 | |
1270 | return status; |
1271 | } |
1272 | |
1273 | // REQUIRES: Writer list must be non-empty |
1274 | // REQUIRES: First writer must have a non-null batch |
1275 | WriteBatch* DBImpl::BuildBatchGroup(Writer** last_writer) { |
1276 | mutex_.AssertHeld(); |
1277 | assert(!writers_.empty()); |
1278 | Writer* first = writers_.front(); |
1279 | WriteBatch* result = first->batch; |
1280 | assert(result != nullptr); |
1281 | |
1282 | size_t size = WriteBatchInternal::ByteSize(first->batch); |
1283 | |
1284 | // Allow the group to grow up to a maximum size, but if the |
1285 | // original write is small, limit the growth so we do not slow |
1286 | // down the small write too much. |
1287 | size_t max_size = 1 << 20; |
1288 | if (size <= (128 << 10)) { |
1289 | max_size = size + (128 << 10); |
1290 | } |
1291 | |
1292 | *last_writer = first; |
1293 | std::deque<Writer*>::iterator iter = writers_.begin(); |
1294 | ++iter; // Advance past "first" |
1295 | for (; iter != writers_.end(); ++iter) { |
1296 | Writer* w = *iter; |
1297 | if (w->sync && !first->sync) { |
1298 | // Do not include a sync write into a batch handled by a non-sync write. |
1299 | break; |
1300 | } |
1301 | |
1302 | if (w->batch != nullptr) { |
1303 | size += WriteBatchInternal::ByteSize(w->batch); |
1304 | if (size > max_size) { |
1305 | // Do not make batch too big |
1306 | break; |
1307 | } |
1308 | |
1309 | // Append to *result |
1310 | if (result == first->batch) { |
1311 | // Switch to temporary batch instead of disturbing caller's batch |
1312 | result = tmp_batch_; |
1313 | assert(WriteBatchInternal::Count(result) == 0); |
1314 | WriteBatchInternal::Append(result, first->batch); |
1315 | } |
1316 | WriteBatchInternal::Append(result, w->batch); |
1317 | } |
1318 | *last_writer = w; |
1319 | } |
1320 | return result; |
1321 | } |
1322 | |
1323 | // REQUIRES: mutex_ is held |
1324 | // REQUIRES: this thread is currently at the front of the writer queue |
1325 | Status DBImpl::MakeRoomForWrite(bool force) { |
1326 | mutex_.AssertHeld(); |
1327 | assert(!writers_.empty()); |
1328 | bool allow_delay = !force; |
1329 | Status s; |
1330 | while (true) { |
1331 | if (!bg_error_.ok()) { |
1332 | // Yield previous error |
1333 | s = bg_error_; |
1334 | break; |
1335 | } else if (allow_delay && versions_->NumLevelFiles(0) >= |
1336 | config::kL0_SlowdownWritesTrigger) { |
1337 | // We are getting close to hitting a hard limit on the number of |
1338 | // L0 files. Rather than delaying a single write by several |
1339 | // seconds when we hit the hard limit, start delaying each |
1340 | // individual write by 1ms to reduce latency variance. Also, |
1341 | // this delay hands over some CPU to the compaction thread in |
1342 | // case it is sharing the same core as the writer. |
1343 | mutex_.Unlock(); |
1344 | env_->SleepForMicroseconds(1000); |
1345 | allow_delay = false; // Do not delay a single write more than once |
1346 | mutex_.Lock(); |
1347 | } else if (!force && |
1348 | (mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) { |
1349 | // There is room in current memtable |
1350 | break; |
1351 | } else if (imm_ != nullptr) { |
1352 | // We have filled up the current memtable, but the previous |
1353 | // one is still being compacted, so we wait. |
1354 | Log(options_.info_log, "Current memtable full; waiting...\n" ); |
1355 | background_work_finished_signal_.Wait(); |
1356 | } else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) { |
1357 | // There are too many level-0 files. |
1358 | Log(options_.info_log, "Too many L0 files; waiting...\n" ); |
1359 | background_work_finished_signal_.Wait(); |
1360 | } else { |
1361 | // Attempt to switch to a new memtable and trigger compaction of old |
1362 | assert(versions_->PrevLogNumber() == 0); |
1363 | uint64_t new_log_number = versions_->NewFileNumber(); |
1364 | WritableFile* lfile = nullptr; |
1365 | s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile); |
1366 | if (!s.ok()) { |
1367 | // Avoid chewing through file number space in a tight loop. |
1368 | versions_->ReuseFileNumber(new_log_number); |
1369 | break; |
1370 | } |
1371 | delete log_; |
1372 | delete logfile_; |
1373 | logfile_ = lfile; |
1374 | logfile_number_ = new_log_number; |
1375 | log_ = new log::Writer(lfile); |
1376 | imm_ = mem_; |
1377 | has_imm_.store(true, std::memory_order_release); |
1378 | mem_ = new MemTable(internal_comparator_); |
1379 | mem_->Ref(); |
1380 | force = false; // Do not force another compaction if have room |
1381 | MaybeScheduleCompaction(); |
1382 | } |
1383 | } |
1384 | return s; |
1385 | } |
1386 | |
1387 | bool DBImpl::GetProperty(const Slice& property, std::string* value) { |
1388 | value->clear(); |
1389 | |
1390 | MutexLock l(&mutex_); |
1391 | Slice in = property; |
1392 | Slice prefix("leveldb." ); |
1393 | if (!in.starts_with(prefix)) return false; |
1394 | in.remove_prefix(prefix.size()); |
1395 | |
1396 | if (in.starts_with("num-files-at-level" )) { |
1397 | in.remove_prefix(strlen("num-files-at-level" )); |
1398 | uint64_t level; |
1399 | bool ok = ConsumeDecimalNumber(&in, &level) && in.empty(); |
1400 | if (!ok || level >= config::kNumLevels) { |
1401 | return false; |
1402 | } else { |
1403 | char buf[100]; |
1404 | std::snprintf(buf, sizeof(buf), "%d" , |
1405 | versions_->NumLevelFiles(static_cast<int>(level))); |
1406 | *value = buf; |
1407 | return true; |
1408 | } |
1409 | } else if (in == "stats" ) { |
1410 | char buf[200]; |
1411 | std::snprintf(buf, sizeof(buf), |
1412 | " Compactions\n" |
1413 | "Level Files Size(MB) Time(sec) Read(MB) Write(MB)\n" |
1414 | "--------------------------------------------------\n" ); |
1415 | value->append(buf); |
1416 | for (int level = 0; level < config::kNumLevels; level++) { |
1417 | int files = versions_->NumLevelFiles(level); |
1418 | if (stats_[level].micros > 0 || files > 0) { |
1419 | std::snprintf(buf, sizeof(buf), "%3d %8d %8.0f %9.0f %8.0f %9.0f\n" , |
1420 | level, files, versions_->NumLevelBytes(level) / 1048576.0, |
1421 | stats_[level].micros / 1e6, |
1422 | stats_[level].bytes_read / 1048576.0, |
1423 | stats_[level].bytes_written / 1048576.0); |
1424 | value->append(buf); |
1425 | } |
1426 | } |
1427 | return true; |
1428 | } else if (in == "sstables" ) { |
1429 | *value = versions_->current()->DebugString(); |
1430 | return true; |
1431 | } else if (in == "approximate-memory-usage" ) { |
1432 | size_t total_usage = options_.block_cache->TotalCharge(); |
1433 | if (mem_) { |
1434 | total_usage += mem_->ApproximateMemoryUsage(); |
1435 | } |
1436 | if (imm_) { |
1437 | total_usage += imm_->ApproximateMemoryUsage(); |
1438 | } |
1439 | char buf[50]; |
1440 | std::snprintf(buf, sizeof(buf), "%llu" , |
1441 | static_cast<unsigned long long>(total_usage)); |
1442 | value->append(buf); |
1443 | return true; |
1444 | } |
1445 | |
1446 | return false; |
1447 | } |
1448 | |
1449 | void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { |
1450 | // TODO(opt): better implementation |
1451 | MutexLock l(&mutex_); |
1452 | Version* v = versions_->current(); |
1453 | v->Ref(); |
1454 | |
1455 | for (int i = 0; i < n; i++) { |
1456 | // Convert user_key into a corresponding internal key. |
1457 | InternalKey k1(range[i].start, kMaxSequenceNumber, kValueTypeForSeek); |
1458 | InternalKey k2(range[i].limit, kMaxSequenceNumber, kValueTypeForSeek); |
1459 | uint64_t start = versions_->ApproximateOffsetOf(v, k1); |
1460 | uint64_t limit = versions_->ApproximateOffsetOf(v, k2); |
1461 | sizes[i] = (limit >= start ? limit - start : 0); |
1462 | } |
1463 | |
1464 | v->Unref(); |
1465 | } |
1466 | |
1467 | // Default implementations of convenience methods that subclasses of DB |
1468 | // can call if they wish |
1469 | Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { |
1470 | WriteBatch batch; |
1471 | batch.Put(key, value); |
1472 | return Write(opt, &batch); |
1473 | } |
1474 | |
1475 | Status DB::Delete(const WriteOptions& opt, const Slice& key) { |
1476 | WriteBatch batch; |
1477 | batch.Delete(key); |
1478 | return Write(opt, &batch); |
1479 | } |
1480 | |
1481 | DB::~DB() = default; |
1482 | |
1483 | Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { |
1484 | *dbptr = nullptr; |
1485 | |
1486 | DBImpl* impl = new DBImpl(options, dbname); |
1487 | impl->mutex_.Lock(); |
1488 | VersionEdit edit; |
1489 | // Recover handles create_if_missing, error_if_exists |
1490 | bool save_manifest = false; |
1491 | Status s = impl->Recover(&edit, &save_manifest); |
1492 | if (s.ok() && impl->mem_ == nullptr) { |
1493 | // Create new log and a corresponding memtable. |
1494 | uint64_t new_log_number = impl->versions_->NewFileNumber(); |
1495 | WritableFile* lfile; |
1496 | s = options.env->NewWritableFile(LogFileName(dbname, new_log_number), |
1497 | &lfile); |
1498 | if (s.ok()) { |
1499 | edit.SetLogNumber(new_log_number); |
1500 | impl->logfile_ = lfile; |
1501 | impl->logfile_number_ = new_log_number; |
1502 | impl->log_ = new log::Writer(lfile); |
1503 | impl->mem_ = new MemTable(impl->internal_comparator_); |
1504 | impl->mem_->Ref(); |
1505 | } |
1506 | } |
1507 | if (s.ok() && save_manifest) { |
1508 | edit.SetPrevLogNumber(0); // No older logs needed after recovery. |
1509 | edit.SetLogNumber(impl->logfile_number_); |
1510 | s = impl->versions_->LogAndApply(&edit, &impl->mutex_); |
1511 | } |
1512 | if (s.ok()) { |
1513 | impl->RemoveObsoleteFiles(); |
1514 | impl->MaybeScheduleCompaction(); |
1515 | } |
1516 | impl->mutex_.Unlock(); |
1517 | if (s.ok()) { |
1518 | assert(impl->mem_ != nullptr); |
1519 | *dbptr = impl; |
1520 | } else { |
1521 | delete impl; |
1522 | } |
1523 | return s; |
1524 | } |
1525 | |
1526 | Snapshot::~Snapshot() = default; |
1527 | |
1528 | Status DestroyDB(const std::string& dbname, const Options& options) { |
1529 | Env* env = options.env; |
1530 | std::vector<std::string> filenames; |
1531 | Status result = env->GetChildren(dbname, &filenames); |
1532 | if (!result.ok()) { |
1533 | // Ignore error in case directory does not exist |
1534 | return Status::OK(); |
1535 | } |
1536 | |
1537 | FileLock* lock; |
1538 | const std::string lockname = LockFileName(dbname); |
1539 | result = env->LockFile(lockname, &lock); |
1540 | if (result.ok()) { |
1541 | uint64_t number; |
1542 | FileType type; |
1543 | for (size_t i = 0; i < filenames.size(); i++) { |
1544 | if (ParseFileName(filenames[i], &number, &type) && |
1545 | type != kDBLockFile) { // Lock file will be deleted at end |
1546 | Status del = env->RemoveFile(dbname + "/" + filenames[i]); |
1547 | if (result.ok() && !del.ok()) { |
1548 | result = del; |
1549 | } |
1550 | } |
1551 | } |
1552 | env->UnlockFile(lock); // Ignore error since state is already gone |
1553 | env->RemoveFile(lockname); |
1554 | env->RemoveDir(dbname); // Ignore error in case dir contains other files |
1555 | } |
1556 | return result; |
1557 | } |
1558 | |
1559 | } // namespace leveldb |
1560 | |