1// Copyright (c) 2014 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "gtest/gtest.h"
6#include "db/db_impl.h"
7#include "db/filename.h"
8#include "db/version_set.h"
9#include "db/write_batch_internal.h"
10#include "leveldb/db.h"
11#include "leveldb/env.h"
12#include "leveldb/write_batch.h"
13#include "util/logging.h"
14#include "util/testutil.h"
15
16namespace leveldb {
17
18class RecoveryTest : public testing::Test {
19 public:
20 RecoveryTest() : env_(Env::Default()), db_(nullptr) {
21 dbname_ = testing::TempDir() + "recovery_test";
22 DestroyDB(dbname_, Options());
23 Open();
24 }
25
26 ~RecoveryTest() {
27 Close();
28 DestroyDB(dbname_, Options());
29 }
30
31 DBImpl* dbfull() const { return reinterpret_cast<DBImpl*>(db_); }
32 Env* env() const { return env_; }
33
34 bool CanAppend() {
35 WritableFile* tmp;
36 Status s = env_->NewAppendableFile(CurrentFileName(dbname_), &tmp);
37 delete tmp;
38 if (s.IsNotSupportedError()) {
39 return false;
40 } else {
41 return true;
42 }
43 }
44
45 void Close() {
46 delete db_;
47 db_ = nullptr;
48 }
49
50 Status OpenWithStatus(Options* options = nullptr) {
51 Close();
52 Options opts;
53 if (options != nullptr) {
54 opts = *options;
55 } else {
56 opts.reuse_logs = true; // TODO(sanjay): test both ways
57 opts.create_if_missing = true;
58 }
59 if (opts.env == nullptr) {
60 opts.env = env_;
61 }
62 return DB::Open(opts, dbname_, &db_);
63 }
64
65 void Open(Options* options = nullptr) {
66 ASSERT_LEVELDB_OK(OpenWithStatus(options));
67 ASSERT_EQ(1, NumLogs());
68 }
69
70 Status Put(const std::string& k, const std::string& v) {
71 return db_->Put(WriteOptions(), k, v);
72 }
73
74 std::string Get(const std::string& k, const Snapshot* snapshot = nullptr) {
75 std::string result;
76 Status s = db_->Get(ReadOptions(), k, &result);
77 if (s.IsNotFound()) {
78 result = "NOT_FOUND";
79 } else if (!s.ok()) {
80 result = s.ToString();
81 }
82 return result;
83 }
84
85 std::string ManifestFileName() {
86 std::string current;
87 EXPECT_LEVELDB_OK(
88 ReadFileToString(env_, CurrentFileName(dbname_), &current));
89 size_t len = current.size();
90 if (len > 0 && current[len - 1] == '\n') {
91 current.resize(len - 1);
92 }
93 return dbname_ + "/" + current;
94 }
95
96 std::string LogName(uint64_t number) { return LogFileName(dbname_, number); }
97
98 size_t RemoveLogFiles() {
99 // Linux allows unlinking open files, but Windows does not.
100 // Closing the db allows for file deletion.
101 Close();
102 std::vector<uint64_t> logs = GetFiles(kLogFile);
103 for (size_t i = 0; i < logs.size(); i++) {
104 EXPECT_LEVELDB_OK(env_->RemoveFile(LogName(logs[i]))) << LogName(logs[i]);
105 }
106 return logs.size();
107 }
108
109 void RemoveManifestFile() {
110 ASSERT_LEVELDB_OK(env_->RemoveFile(ManifestFileName()));
111 }
112
113 uint64_t FirstLogFile() { return GetFiles(kLogFile)[0]; }
114
115 std::vector<uint64_t> GetFiles(FileType t) {
116 std::vector<std::string> filenames;
117 EXPECT_LEVELDB_OK(env_->GetChildren(dbname_, &filenames));
118 std::vector<uint64_t> result;
119 for (size_t i = 0; i < filenames.size(); i++) {
120 uint64_t number;
121 FileType type;
122 if (ParseFileName(filenames[i], &number, &type) && type == t) {
123 result.push_back(number);
124 }
125 }
126 return result;
127 }
128
129 int NumLogs() { return GetFiles(kLogFile).size(); }
130
131 int NumTables() { return GetFiles(kTableFile).size(); }
132
133 uint64_t FileSize(const std::string& fname) {
134 uint64_t result;
135 EXPECT_LEVELDB_OK(env_->GetFileSize(fname, &result)) << fname;
136 return result;
137 }
138
139 void CompactMemTable() { dbfull()->TEST_CompactMemTable(); }
140
141 // Directly construct a log file that sets key to val.
142 void MakeLogFile(uint64_t lognum, SequenceNumber seq, Slice key, Slice val) {
143 std::string fname = LogFileName(dbname_, lognum);
144 WritableFile* file;
145 ASSERT_LEVELDB_OK(env_->NewWritableFile(fname, &file));
146 log::Writer writer(file);
147 WriteBatch batch;
148 batch.Put(key, val);
149 WriteBatchInternal::SetSequence(&batch, seq);
150 ASSERT_LEVELDB_OK(writer.AddRecord(WriteBatchInternal::Contents(&batch)));
151 ASSERT_LEVELDB_OK(file->Flush());
152 delete file;
153 }
154
155 private:
156 std::string dbname_;
157 Env* env_;
158 DB* db_;
159};
160
161TEST_F(RecoveryTest, ManifestReused) {
162 if (!CanAppend()) {
163 std::fprintf(stderr,
164 "skipping test because env does not support appending\n");
165 return;
166 }
167 ASSERT_LEVELDB_OK(Put("foo", "bar"));
168 Close();
169 std::string old_manifest = ManifestFileName();
170 Open();
171 ASSERT_EQ(old_manifest, ManifestFileName());
172 ASSERT_EQ("bar", Get("foo"));
173 Open();
174 ASSERT_EQ(old_manifest, ManifestFileName());
175 ASSERT_EQ("bar", Get("foo"));
176}
177
178TEST_F(RecoveryTest, LargeManifestCompacted) {
179 if (!CanAppend()) {
180 std::fprintf(stderr,
181 "skipping test because env does not support appending\n");
182 return;
183 }
184 ASSERT_LEVELDB_OK(Put("foo", "bar"));
185 Close();
186 std::string old_manifest = ManifestFileName();
187
188 // Pad with zeroes to make manifest file very big.
189 {
190 uint64_t len = FileSize(old_manifest);
191 WritableFile* file;
192 ASSERT_LEVELDB_OK(env()->NewAppendableFile(old_manifest, &file));
193 std::string zeroes(3 * 1048576 - static_cast<size_t>(len), 0);
194 ASSERT_LEVELDB_OK(file->Append(zeroes));
195 ASSERT_LEVELDB_OK(file->Flush());
196 delete file;
197 }
198
199 Open();
200 std::string new_manifest = ManifestFileName();
201 ASSERT_NE(old_manifest, new_manifest);
202 ASSERT_GT(10000, FileSize(new_manifest));
203 ASSERT_EQ("bar", Get("foo"));
204
205 Open();
206 ASSERT_EQ(new_manifest, ManifestFileName());
207 ASSERT_EQ("bar", Get("foo"));
208}
209
210TEST_F(RecoveryTest, NoLogFiles) {
211 ASSERT_LEVELDB_OK(Put("foo", "bar"));
212 ASSERT_EQ(1, RemoveLogFiles());
213 Open();
214 ASSERT_EQ("NOT_FOUND", Get("foo"));
215 Open();
216 ASSERT_EQ("NOT_FOUND", Get("foo"));
217}
218
219TEST_F(RecoveryTest, LogFileReuse) {
220 if (!CanAppend()) {
221 std::fprintf(stderr,
222 "skipping test because env does not support appending\n");
223 return;
224 }
225 for (int i = 0; i < 2; i++) {
226 ASSERT_LEVELDB_OK(Put("foo", "bar"));
227 if (i == 0) {
228 // Compact to ensure current log is empty
229 CompactMemTable();
230 }
231 Close();
232 ASSERT_EQ(1, NumLogs());
233 uint64_t number = FirstLogFile();
234 if (i == 0) {
235 ASSERT_EQ(0, FileSize(LogName(number)));
236 } else {
237 ASSERT_LT(0, FileSize(LogName(number)));
238 }
239 Open();
240 ASSERT_EQ(1, NumLogs());
241 ASSERT_EQ(number, FirstLogFile()) << "did not reuse log file";
242 ASSERT_EQ("bar", Get("foo"));
243 Open();
244 ASSERT_EQ(1, NumLogs());
245 ASSERT_EQ(number, FirstLogFile()) << "did not reuse log file";
246 ASSERT_EQ("bar", Get("foo"));
247 }
248}
249
250TEST_F(RecoveryTest, MultipleMemTables) {
251 // Make a large log.
252 const int kNum = 1000;
253 for (int i = 0; i < kNum; i++) {
254 char buf[100];
255 std::snprintf(buf, sizeof(buf), "%050d", i);
256 ASSERT_LEVELDB_OK(Put(buf, buf));
257 }
258 ASSERT_EQ(0, NumTables());
259 Close();
260 ASSERT_EQ(0, NumTables());
261 ASSERT_EQ(1, NumLogs());
262 uint64_t old_log_file = FirstLogFile();
263
264 // Force creation of multiple memtables by reducing the write buffer size.
265 Options opt;
266 opt.reuse_logs = true;
267 opt.write_buffer_size = (kNum * 100) / 2;
268 Open(&opt);
269 ASSERT_LE(2, NumTables());
270 ASSERT_EQ(1, NumLogs());
271 ASSERT_NE(old_log_file, FirstLogFile()) << "must not reuse log";
272 for (int i = 0; i < kNum; i++) {
273 char buf[100];
274 std::snprintf(buf, sizeof(buf), "%050d", i);
275 ASSERT_EQ(buf, Get(buf));
276 }
277}
278
279TEST_F(RecoveryTest, MultipleLogFiles) {
280 ASSERT_LEVELDB_OK(Put("foo", "bar"));
281 Close();
282 ASSERT_EQ(1, NumLogs());
283
284 // Make a bunch of uncompacted log files.
285 uint64_t old_log = FirstLogFile();
286 MakeLogFile(old_log + 1, 1000, "hello", "world");
287 MakeLogFile(old_log + 2, 1001, "hi", "there");
288 MakeLogFile(old_log + 3, 1002, "foo", "bar2");
289
290 // Recover and check that all log files were processed.
291 Open();
292 ASSERT_LE(1, NumTables());
293 ASSERT_EQ(1, NumLogs());
294 uint64_t new_log = FirstLogFile();
295 ASSERT_LE(old_log + 3, new_log);
296 ASSERT_EQ("bar2", Get("foo"));
297 ASSERT_EQ("world", Get("hello"));
298 ASSERT_EQ("there", Get("hi"));
299
300 // Test that previous recovery produced recoverable state.
301 Open();
302 ASSERT_LE(1, NumTables());
303 ASSERT_EQ(1, NumLogs());
304 if (CanAppend()) {
305 ASSERT_EQ(new_log, FirstLogFile());
306 }
307 ASSERT_EQ("bar2", Get("foo"));
308 ASSERT_EQ("world", Get("hello"));
309 ASSERT_EQ("there", Get("hi"));
310
311 // Check that introducing an older log file does not cause it to be re-read.
312 Close();
313 MakeLogFile(old_log + 1, 2000, "hello", "stale write");
314 Open();
315 ASSERT_LE(1, NumTables());
316 ASSERT_EQ(1, NumLogs());
317 if (CanAppend()) {
318 ASSERT_EQ(new_log, FirstLogFile());
319 }
320 ASSERT_EQ("bar2", Get("foo"));
321 ASSERT_EQ("world", Get("hello"));
322 ASSERT_EQ("there", Get("hi"));
323}
324
325TEST_F(RecoveryTest, ManifestMissing) {
326 ASSERT_LEVELDB_OK(Put("foo", "bar"));
327 Close();
328 RemoveManifestFile();
329
330 Status status = OpenWithStatus();
331 ASSERT_TRUE(status.IsCorruption());
332}
333
334} // namespace leveldb
335