1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include <sys/types.h>
6
7#include "gtest/gtest.h"
8#include "db/db_impl.h"
9#include "db/filename.h"
10#include "db/log_format.h"
11#include "db/version_set.h"
12#include "leveldb/cache.h"
13#include "leveldb/db.h"
14#include "leveldb/table.h"
15#include "leveldb/write_batch.h"
16#include "util/logging.h"
17#include "util/testutil.h"
18
19namespace leveldb {
20
21static const int kValueSize = 1000;
22
23class CorruptionTest : public testing::Test {
24 public:
25 CorruptionTest()
26 : db_(nullptr),
27 dbname_("/memenv/corruption_test"),
28 tiny_cache_(NewLRUCache(100)) {
29 options_.env = &env_;
30 options_.block_cache = tiny_cache_;
31 DestroyDB(dbname_, options_);
32
33 options_.create_if_missing = true;
34 Reopen();
35 options_.create_if_missing = false;
36 }
37
38 ~CorruptionTest() {
39 delete db_;
40 delete tiny_cache_;
41 }
42
43 Status TryReopen() {
44 delete db_;
45 db_ = nullptr;
46 return DB::Open(options_, dbname_, &db_);
47 }
48
49 void Reopen() { ASSERT_LEVELDB_OK(TryReopen()); }
50
51 void RepairDB() {
52 delete db_;
53 db_ = nullptr;
54 ASSERT_LEVELDB_OK(::leveldb::RepairDB(dbname_, options_));
55 }
56
57 void Build(int n) {
58 std::string key_space, value_space;
59 WriteBatch batch;
60 for (int i = 0; i < n; i++) {
61 // if ((i % 100) == 0) std::fprintf(stderr, "@ %d of %d\n", i, n);
62 Slice key = Key(i, &key_space);
63 batch.Clear();
64 batch.Put(key, Value(i, &value_space));
65 WriteOptions options;
66 // Corrupt() doesn't work without this sync on windows; stat reports 0 for
67 // the file size.
68 if (i == n - 1) {
69 options.sync = true;
70 }
71 ASSERT_LEVELDB_OK(db_->Write(options, &batch));
72 }
73 }
74
75 void Check(int min_expected, int max_expected) {
76 int next_expected = 0;
77 int missed = 0;
78 int bad_keys = 0;
79 int bad_values = 0;
80 int correct = 0;
81 std::string value_space;
82 Iterator* iter = db_->NewIterator(ReadOptions());
83 for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
84 uint64_t key;
85 Slice in(iter->key());
86 if (in == "" || in == "~") {
87 // Ignore boundary keys.
88 continue;
89 }
90 if (!ConsumeDecimalNumber(&in, &key) || !in.empty() ||
91 key < next_expected) {
92 bad_keys++;
93 continue;
94 }
95 missed += (key - next_expected);
96 next_expected = key + 1;
97 if (iter->value() != Value(key, &value_space)) {
98 bad_values++;
99 } else {
100 correct++;
101 }
102 }
103 delete iter;
104
105 std::fprintf(
106 stderr,
107 "expected=%d..%d; got=%d; bad_keys=%d; bad_values=%d; missed=%d\n",
108 min_expected, max_expected, correct, bad_keys, bad_values, missed);
109 ASSERT_LE(min_expected, correct);
110 ASSERT_GE(max_expected, correct);
111 }
112
113 void Corrupt(FileType filetype, int offset, int bytes_to_corrupt) {
114 // Pick file to corrupt
115 std::vector<std::string> filenames;
116 ASSERT_LEVELDB_OK(env_.target()->GetChildren(dbname_, &filenames));
117 uint64_t number;
118 FileType type;
119 std::string fname;
120 int picked_number = -1;
121 for (size_t i = 0; i < filenames.size(); i++) {
122 if (ParseFileName(filenames[i], &number, &type) && type == filetype &&
123 int(number) > picked_number) { // Pick latest file
124 fname = dbname_ + "/" + filenames[i];
125 picked_number = number;
126 }
127 }
128 ASSERT_TRUE(!fname.empty()) << filetype;
129
130 uint64_t file_size;
131 ASSERT_LEVELDB_OK(env_.target()->GetFileSize(fname, &file_size));
132
133 if (offset < 0) {
134 // Relative to end of file; make it absolute
135 if (-offset > file_size) {
136 offset = 0;
137 } else {
138 offset = file_size + offset;
139 }
140 }
141 if (offset > file_size) {
142 offset = file_size;
143 }
144 if (offset + bytes_to_corrupt > file_size) {
145 bytes_to_corrupt = file_size - offset;
146 }
147
148 // Do it
149 std::string contents;
150 Status s = ReadFileToString(env_.target(), fname, &contents);
151 ASSERT_TRUE(s.ok()) << s.ToString();
152 for (int i = 0; i < bytes_to_corrupt; i++) {
153 contents[i + offset] ^= 0x80;
154 }
155 s = WriteStringToFile(env_.target(), contents, fname);
156 ASSERT_TRUE(s.ok()) << s.ToString();
157 }
158
159 int Property(const std::string& name) {
160 std::string property;
161 int result;
162 if (db_->GetProperty(name, &property) &&
163 sscanf(property.c_str(), "%d", &result) == 1) {
164 return result;
165 } else {
166 return -1;
167 }
168 }
169
170 // Return the ith key
171 Slice Key(int i, std::string* storage) {
172 char buf[100];
173 std::snprintf(buf, sizeof(buf), "%016d", i);
174 storage->assign(buf, strlen(buf));
175 return Slice(*storage);
176 }
177
178 // Return the value to associate with the specified key
179 Slice Value(int k, std::string* storage) {
180 Random r(k);
181 return test::RandomString(&r, kValueSize, storage);
182 }
183
184 test::ErrorEnv env_;
185 Options options_;
186 DB* db_;
187
188 private:
189 std::string dbname_;
190 Cache* tiny_cache_;
191};
192
193TEST_F(CorruptionTest, Recovery) {
194 Build(100);
195 Check(100, 100);
196 Corrupt(kLogFile, 19, 1); // WriteBatch tag for first record
197 Corrupt(kLogFile, log::kBlockSize + 1000, 1); // Somewhere in second block
198 Reopen();
199
200 // The 64 records in the first two log blocks are completely lost.
201 Check(36, 36);
202}
203
204TEST_F(CorruptionTest, RecoverWriteError) {
205 env_.writable_file_error_ = true;
206 Status s = TryReopen();
207 ASSERT_TRUE(!s.ok());
208}
209
210TEST_F(CorruptionTest, NewFileErrorDuringWrite) {
211 // Do enough writing to force minor compaction
212 env_.writable_file_error_ = true;
213 const int num = 3 + (Options().write_buffer_size / kValueSize);
214 std::string value_storage;
215 Status s;
216 for (int i = 0; s.ok() && i < num; i++) {
217 WriteBatch batch;
218 batch.Put("a", Value(100, &value_storage));
219 s = db_->Write(WriteOptions(), &batch);
220 }
221 ASSERT_TRUE(!s.ok());
222 ASSERT_GE(env_.num_writable_file_errors_, 1);
223 env_.writable_file_error_ = false;
224 Reopen();
225}
226
227TEST_F(CorruptionTest, TableFile) {
228 Build(100);
229 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
230 dbi->TEST_CompactMemTable();
231 dbi->TEST_CompactRange(0, nullptr, nullptr);
232 dbi->TEST_CompactRange(1, nullptr, nullptr);
233
234 Corrupt(kTableFile, 100, 1);
235 Check(90, 99);
236}
237
238TEST_F(CorruptionTest, TableFileRepair) {
239 options_.block_size = 2 * kValueSize; // Limit scope of corruption
240 options_.paranoid_checks = true;
241 Reopen();
242 Build(100);
243 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
244 dbi->TEST_CompactMemTable();
245 dbi->TEST_CompactRange(0, nullptr, nullptr);
246 dbi->TEST_CompactRange(1, nullptr, nullptr);
247
248 Corrupt(kTableFile, 100, 1);
249 RepairDB();
250 Reopen();
251 Check(95, 99);
252}
253
254TEST_F(CorruptionTest, TableFileIndexData) {
255 Build(10000); // Enough to build multiple Tables
256 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
257 dbi->TEST_CompactMemTable();
258
259 Corrupt(kTableFile, -2000, 500);
260 Reopen();
261 Check(5000, 9999);
262}
263
264TEST_F(CorruptionTest, MissingDescriptor) {
265 Build(1000);
266 RepairDB();
267 Reopen();
268 Check(1000, 1000);
269}
270
271TEST_F(CorruptionTest, SequenceNumberRecovery) {
272 ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "v1"));
273 ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "v2"));
274 ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "v3"));
275 ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "v4"));
276 ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "v5"));
277 RepairDB();
278 Reopen();
279 std::string v;
280 ASSERT_LEVELDB_OK(db_->Get(ReadOptions(), "foo", &v));
281 ASSERT_EQ("v5", v);
282 // Write something. If sequence number was not recovered properly,
283 // it will be hidden by an earlier write.
284 ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "v6"));
285 ASSERT_LEVELDB_OK(db_->Get(ReadOptions(), "foo", &v));
286 ASSERT_EQ("v6", v);
287 Reopen();
288 ASSERT_LEVELDB_OK(db_->Get(ReadOptions(), "foo", &v));
289 ASSERT_EQ("v6", v);
290}
291
292TEST_F(CorruptionTest, CorruptedDescriptor) {
293 ASSERT_LEVELDB_OK(db_->Put(WriteOptions(), "foo", "hello"));
294 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
295 dbi->TEST_CompactMemTable();
296 dbi->TEST_CompactRange(0, nullptr, nullptr);
297
298 Corrupt(kDescriptorFile, 0, 1000);
299 Status s = TryReopen();
300 ASSERT_TRUE(!s.ok());
301
302 RepairDB();
303 Reopen();
304 std::string v;
305 ASSERT_LEVELDB_OK(db_->Get(ReadOptions(), "foo", &v));
306 ASSERT_EQ("hello", v);
307}
308
309TEST_F(CorruptionTest, CompactionInputError) {
310 Build(10);
311 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
312 dbi->TEST_CompactMemTable();
313 const int last = config::kMaxMemCompactLevel;
314 ASSERT_EQ(1, Property("leveldb.num-files-at-level" + NumberToString(last)));
315
316 Corrupt(kTableFile, 100, 1);
317 Check(5, 9);
318
319 // Force compactions by writing lots of values
320 Build(10000);
321 Check(10000, 10000);
322}
323
324TEST_F(CorruptionTest, CompactionInputErrorParanoid) {
325 options_.paranoid_checks = true;
326 options_.write_buffer_size = 512 << 10;
327 Reopen();
328 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
329
330 // Make multiple inputs so we need to compact.
331 for (int i = 0; i < 2; i++) {
332 Build(10);
333 dbi->TEST_CompactMemTable();
334 Corrupt(kTableFile, 100, 1);
335 env_.SleepForMicroseconds(100000);
336 }
337 dbi->CompactRange(nullptr, nullptr);
338
339 // Write must fail because of corrupted table
340 std::string tmp1, tmp2;
341 Status s = db_->Put(WriteOptions(), Key(5, &tmp1), Value(5, &tmp2));
342 ASSERT_TRUE(!s.ok()) << "write did not fail in corrupted paranoid db";
343}
344
345TEST_F(CorruptionTest, UnrelatedKeys) {
346 Build(10);
347 DBImpl* dbi = reinterpret_cast<DBImpl*>(db_);
348 dbi->TEST_CompactMemTable();
349 Corrupt(kTableFile, 100, 1);
350
351 std::string tmp1, tmp2;
352 ASSERT_LEVELDB_OK(
353 db_->Put(WriteOptions(), Key(1000, &tmp1), Value(1000, &tmp2)));
354 std::string v;
355 ASSERT_LEVELDB_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
356 ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
357 dbi->TEST_CompactMemTable();
358 ASSERT_LEVELDB_OK(db_->Get(ReadOptions(), Key(1000, &tmp1), &v));
359 ASSERT_EQ(Value(1000, &tmp2).ToString(), v);
360}
361
362} // namespace leveldb
363