1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "gtest/gtest.h"
6#include "db/log_reader.h"
7#include "db/log_writer.h"
8#include "leveldb/env.h"
9#include "util/coding.h"
10#include "util/crc32c.h"
11#include "util/random.h"
12
13namespace leveldb {
14namespace log {
15
16// Construct a string of the specified length made out of the supplied
17// partial string.
18static std::string BigString(const std::string& partial_string, size_t n) {
19 std::string result;
20 while (result.size() < n) {
21 result.append(partial_string);
22 }
23 result.resize(n);
24 return result;
25}
26
27// Construct a string from a number
28static std::string NumberString(int n) {
29 char buf[50];
30 std::snprintf(buf, sizeof(buf), "%d.", n);
31 return std::string(buf);
32}
33
34// Return a skewed potentially long string
35static std::string RandomSkewedString(int i, Random* rnd) {
36 return BigString(NumberString(i), rnd->Skewed(17));
37}
38
39class LogTest : public testing::Test {
40 public:
41 LogTest()
42 : reading_(false),
43 writer_(new Writer(&dest_)),
44 reader_(new Reader(&source_, &report_, true /*checksum*/,
45 0 /*initial_offset*/)) {}
46
47 ~LogTest() {
48 delete writer_;
49 delete reader_;
50 }
51
52 void ReopenForAppend() {
53 delete writer_;
54 writer_ = new Writer(&dest_, dest_.contents_.size());
55 }
56
57 void Write(const std::string& msg) {
58 ASSERT_TRUE(!reading_) << "Write() after starting to read";
59 writer_->AddRecord(Slice(msg));
60 }
61
62 size_t WrittenBytes() const { return dest_.contents_.size(); }
63
64 std::string Read() {
65 if (!reading_) {
66 reading_ = true;
67 source_.contents_ = Slice(dest_.contents_);
68 }
69 std::string scratch;
70 Slice record;
71 if (reader_->ReadRecord(&record, &scratch)) {
72 return record.ToString();
73 } else {
74 return "EOF";
75 }
76 }
77
78 void IncrementByte(int offset, int delta) {
79 dest_.contents_[offset] += delta;
80 }
81
82 void SetByte(int offset, char new_byte) {
83 dest_.contents_[offset] = new_byte;
84 }
85
86 void ShrinkSize(int bytes) {
87 dest_.contents_.resize(dest_.contents_.size() - bytes);
88 }
89
90 void FixChecksum(int header_offset, int len) {
91 // Compute crc of type/len/data
92 uint32_t crc = crc32c::Value(&dest_.contents_[header_offset + 6], 1 + len);
93 crc = crc32c::Mask(crc);
94 EncodeFixed32(&dest_.contents_[header_offset], crc);
95 }
96
97 void ForceError() { source_.force_error_ = true; }
98
99 size_t DroppedBytes() const { return report_.dropped_bytes_; }
100
101 std::string ReportMessage() const { return report_.message_; }
102
103 // Returns OK iff recorded error message contains "msg"
104 std::string MatchError(const std::string& msg) const {
105 if (report_.message_.find(msg) == std::string::npos) {
106 return report_.message_;
107 } else {
108 return "OK";
109 }
110 }
111
112 void WriteInitialOffsetLog() {
113 for (int i = 0; i < num_initial_offset_records_; i++) {
114 std::string record(initial_offset_record_sizes_[i],
115 static_cast<char>('a' + i));
116 Write(record);
117 }
118 }
119
120 void StartReadingAt(uint64_t initial_offset) {
121 delete reader_;
122 reader_ = new Reader(&source_, &report_, true /*checksum*/, initial_offset);
123 }
124
125 void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) {
126 WriteInitialOffsetLog();
127 reading_ = true;
128 source_.contents_ = Slice(dest_.contents_);
129 Reader* offset_reader = new Reader(&source_, &report_, true /*checksum*/,
130 WrittenBytes() + offset_past_end);
131 Slice record;
132 std::string scratch;
133 ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch));
134 delete offset_reader;
135 }
136
137 void CheckInitialOffsetRecord(uint64_t initial_offset,
138 int expected_record_offset) {
139 WriteInitialOffsetLog();
140 reading_ = true;
141 source_.contents_ = Slice(dest_.contents_);
142 Reader* offset_reader =
143 new Reader(&source_, &report_, true /*checksum*/, initial_offset);
144
145 // Read all records from expected_record_offset through the last one.
146 ASSERT_LT(expected_record_offset, num_initial_offset_records_);
147 for (; expected_record_offset < num_initial_offset_records_;
148 ++expected_record_offset) {
149 Slice record;
150 std::string scratch;
151 ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch));
152 ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset],
153 record.size());
154 ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset],
155 offset_reader->LastRecordOffset());
156 ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]);
157 }
158 delete offset_reader;
159 }
160
161 private:
162 class StringDest : public WritableFile {
163 public:
164 Status Close() override { return Status::OK(); }
165 Status Flush() override { return Status::OK(); }
166 Status Sync() override { return Status::OK(); }
167 Status Append(const Slice& slice) override {
168 contents_.append(slice.data(), slice.size());
169 return Status::OK();
170 }
171
172 std::string contents_;
173 };
174
175 class StringSource : public SequentialFile {
176 public:
177 StringSource() : force_error_(false), returned_partial_(false) {}
178
179 Status Read(size_t n, Slice* result, char* scratch) override {
180 EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error";
181
182 if (force_error_) {
183 force_error_ = false;
184 returned_partial_ = true;
185 return Status::Corruption("read error");
186 }
187
188 if (contents_.size() < n) {
189 n = contents_.size();
190 returned_partial_ = true;
191 }
192 *result = Slice(contents_.data(), n);
193 contents_.remove_prefix(n);
194 return Status::OK();
195 }
196
197 Status Skip(uint64_t n) override {
198 if (n > contents_.size()) {
199 contents_.clear();
200 return Status::NotFound("in-memory file skipped past end");
201 }
202
203 contents_.remove_prefix(n);
204
205 return Status::OK();
206 }
207
208 Slice contents_;
209 bool force_error_;
210 bool returned_partial_;
211 };
212
213 class ReportCollector : public Reader::Reporter {
214 public:
215 ReportCollector() : dropped_bytes_(0) {}
216 void Corruption(size_t bytes, const Status& status) override {
217 dropped_bytes_ += bytes;
218 message_.append(status.ToString());
219 }
220
221 size_t dropped_bytes_;
222 std::string message_;
223 };
224
225 // Record metadata for testing initial offset functionality
226 static size_t initial_offset_record_sizes_[];
227 static uint64_t initial_offset_last_record_offsets_[];
228 static int num_initial_offset_records_;
229
230 StringDest dest_;
231 StringSource source_;
232 ReportCollector report_;
233 bool reading_;
234 Writer* writer_;
235 Reader* reader_;
236};
237
238size_t LogTest::initial_offset_record_sizes_[] = {
239 10000, // Two sizable records in first block
240 10000,
241 2 * log::kBlockSize - 1000, // Span three blocks
242 1,
243 13716, // Consume all but two bytes of block 3.
244 log::kBlockSize - kHeaderSize, // Consume the entirety of block 4.
245};
246
247uint64_t LogTest::initial_offset_last_record_offsets_[] = {
248 0,
249 kHeaderSize + 10000,
250 2 * (kHeaderSize + 10000),
251 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
252 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize +
253 kHeaderSize + 1,
254 3 * log::kBlockSize,
255};
256
257// LogTest::initial_offset_last_record_offsets_ must be defined before this.
258int LogTest::num_initial_offset_records_ =
259 sizeof(LogTest::initial_offset_last_record_offsets_) / sizeof(uint64_t);
260
261TEST_F(LogTest, Empty) { ASSERT_EQ("EOF", Read()); }
262
263TEST_F(LogTest, ReadWrite) {
264 Write("foo");
265 Write("bar");
266 Write("");
267 Write("xxxx");
268 ASSERT_EQ("foo", Read());
269 ASSERT_EQ("bar", Read());
270 ASSERT_EQ("", Read());
271 ASSERT_EQ("xxxx", Read());
272 ASSERT_EQ("EOF", Read());
273 ASSERT_EQ("EOF", Read()); // Make sure reads at eof work
274}
275
276TEST_F(LogTest, ManyBlocks) {
277 for (int i = 0; i < 100000; i++) {
278 Write(NumberString(i));
279 }
280 for (int i = 0; i < 100000; i++) {
281 ASSERT_EQ(NumberString(i), Read());
282 }
283 ASSERT_EQ("EOF", Read());
284}
285
286TEST_F(LogTest, Fragmentation) {
287 Write("small");
288 Write(BigString("medium", 50000));
289 Write(BigString("large", 100000));
290 ASSERT_EQ("small", Read());
291 ASSERT_EQ(BigString("medium", 50000), Read());
292 ASSERT_EQ(BigString("large", 100000), Read());
293 ASSERT_EQ("EOF", Read());
294}
295
296TEST_F(LogTest, MarginalTrailer) {
297 // Make a trailer that is exactly the same length as an empty record.
298 const int n = kBlockSize - 2 * kHeaderSize;
299 Write(BigString("foo", n));
300 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
301 Write("");
302 Write("bar");
303 ASSERT_EQ(BigString("foo", n), Read());
304 ASSERT_EQ("", Read());
305 ASSERT_EQ("bar", Read());
306 ASSERT_EQ("EOF", Read());
307}
308
309TEST_F(LogTest, MarginalTrailer2) {
310 // Make a trailer that is exactly the same length as an empty record.
311 const int n = kBlockSize - 2 * kHeaderSize;
312 Write(BigString("foo", n));
313 ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes());
314 Write("bar");
315 ASSERT_EQ(BigString("foo", n), Read());
316 ASSERT_EQ("bar", Read());
317 ASSERT_EQ("EOF", Read());
318 ASSERT_EQ(0, DroppedBytes());
319 ASSERT_EQ("", ReportMessage());
320}
321
322TEST_F(LogTest, ShortTrailer) {
323 const int n = kBlockSize - 2 * kHeaderSize + 4;
324 Write(BigString("foo", n));
325 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
326 Write("");
327 Write("bar");
328 ASSERT_EQ(BigString("foo", n), Read());
329 ASSERT_EQ("", Read());
330 ASSERT_EQ("bar", Read());
331 ASSERT_EQ("EOF", Read());
332}
333
334TEST_F(LogTest, AlignedEof) {
335 const int n = kBlockSize - 2 * kHeaderSize + 4;
336 Write(BigString("foo", n));
337 ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes());
338 ASSERT_EQ(BigString("foo", n), Read());
339 ASSERT_EQ("EOF", Read());
340}
341
342TEST_F(LogTest, OpenForAppend) {
343 Write("hello");
344 ReopenForAppend();
345 Write("world");
346 ASSERT_EQ("hello", Read());
347 ASSERT_EQ("world", Read());
348 ASSERT_EQ("EOF", Read());
349}
350
351TEST_F(LogTest, RandomRead) {
352 const int N = 500;
353 Random write_rnd(301);
354 for (int i = 0; i < N; i++) {
355 Write(RandomSkewedString(i, &write_rnd));
356 }
357 Random read_rnd(301);
358 for (int i = 0; i < N; i++) {
359 ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read());
360 }
361 ASSERT_EQ("EOF", Read());
362}
363
364// Tests of all the error paths in log_reader.cc follow:
365
366TEST_F(LogTest, ReadError) {
367 Write("foo");
368 ForceError();
369 ASSERT_EQ("EOF", Read());
370 ASSERT_EQ(kBlockSize, DroppedBytes());
371 ASSERT_EQ("OK", MatchError("read error"));
372}
373
374TEST_F(LogTest, BadRecordType) {
375 Write("foo");
376 // Type is stored in header[6]
377 IncrementByte(6, 100);
378 FixChecksum(0, 3);
379 ASSERT_EQ("EOF", Read());
380 ASSERT_EQ(3, DroppedBytes());
381 ASSERT_EQ("OK", MatchError("unknown record type"));
382}
383
384TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) {
385 Write("foo");
386 ShrinkSize(4); // Drop all payload as well as a header byte
387 ASSERT_EQ("EOF", Read());
388 // Truncated last record is ignored, not treated as an error.
389 ASSERT_EQ(0, DroppedBytes());
390 ASSERT_EQ("", ReportMessage());
391}
392
393TEST_F(LogTest, BadLength) {
394 const int kPayloadSize = kBlockSize - kHeaderSize;
395 Write(BigString("bar", kPayloadSize));
396 Write("foo");
397 // Least significant size byte is stored in header[4].
398 IncrementByte(4, 1);
399 ASSERT_EQ("foo", Read());
400 ASSERT_EQ(kBlockSize, DroppedBytes());
401 ASSERT_EQ("OK", MatchError("bad record length"));
402}
403
404TEST_F(LogTest, BadLengthAtEndIsIgnored) {
405 Write("foo");
406 ShrinkSize(1);
407 ASSERT_EQ("EOF", Read());
408 ASSERT_EQ(0, DroppedBytes());
409 ASSERT_EQ("", ReportMessage());
410}
411
412TEST_F(LogTest, ChecksumMismatch) {
413 Write("foo");
414 IncrementByte(0, 10);
415 ASSERT_EQ("EOF", Read());
416 ASSERT_EQ(10, DroppedBytes());
417 ASSERT_EQ("OK", MatchError("checksum mismatch"));
418}
419
420TEST_F(LogTest, UnexpectedMiddleType) {
421 Write("foo");
422 SetByte(6, kMiddleType);
423 FixChecksum(0, 3);
424 ASSERT_EQ("EOF", Read());
425 ASSERT_EQ(3, DroppedBytes());
426 ASSERT_EQ("OK", MatchError("missing start"));
427}
428
429TEST_F(LogTest, UnexpectedLastType) {
430 Write("foo");
431 SetByte(6, kLastType);
432 FixChecksum(0, 3);
433 ASSERT_EQ("EOF", Read());
434 ASSERT_EQ(3, DroppedBytes());
435 ASSERT_EQ("OK", MatchError("missing start"));
436}
437
438TEST_F(LogTest, UnexpectedFullType) {
439 Write("foo");
440 Write("bar");
441 SetByte(6, kFirstType);
442 FixChecksum(0, 3);
443 ASSERT_EQ("bar", Read());
444 ASSERT_EQ("EOF", Read());
445 ASSERT_EQ(3, DroppedBytes());
446 ASSERT_EQ("OK", MatchError("partial record without end"));
447}
448
449TEST_F(LogTest, UnexpectedFirstType) {
450 Write("foo");
451 Write(BigString("bar", 100000));
452 SetByte(6, kFirstType);
453 FixChecksum(0, 3);
454 ASSERT_EQ(BigString("bar", 100000), Read());
455 ASSERT_EQ("EOF", Read());
456 ASSERT_EQ(3, DroppedBytes());
457 ASSERT_EQ("OK", MatchError("partial record without end"));
458}
459
460TEST_F(LogTest, MissingLastIsIgnored) {
461 Write(BigString("bar", kBlockSize));
462 // Remove the LAST block, including header.
463 ShrinkSize(14);
464 ASSERT_EQ("EOF", Read());
465 ASSERT_EQ("", ReportMessage());
466 ASSERT_EQ(0, DroppedBytes());
467}
468
469TEST_F(LogTest, PartialLastIsIgnored) {
470 Write(BigString("bar", kBlockSize));
471 // Cause a bad record length in the LAST block.
472 ShrinkSize(1);
473 ASSERT_EQ("EOF", Read());
474 ASSERT_EQ("", ReportMessage());
475 ASSERT_EQ(0, DroppedBytes());
476}
477
478TEST_F(LogTest, SkipIntoMultiRecord) {
479 // Consider a fragmented record:
480 // first(R1), middle(R1), last(R1), first(R2)
481 // If initial_offset points to a record after first(R1) but before first(R2)
482 // incomplete fragment errors are not actual errors, and must be suppressed
483 // until a new first or full record is encountered.
484 Write(BigString("foo", 3 * kBlockSize));
485 Write("correct");
486 StartReadingAt(kBlockSize);
487
488 ASSERT_EQ("correct", Read());
489 ASSERT_EQ("", ReportMessage());
490 ASSERT_EQ(0, DroppedBytes());
491 ASSERT_EQ("EOF", Read());
492}
493
494TEST_F(LogTest, ErrorJoinsRecords) {
495 // Consider two fragmented records:
496 // first(R1) last(R1) first(R2) last(R2)
497 // where the middle two fragments disappear. We do not want
498 // first(R1),last(R2) to get joined and returned as a valid record.
499
500 // Write records that span two blocks
501 Write(BigString("foo", kBlockSize));
502 Write(BigString("bar", kBlockSize));
503 Write("correct");
504
505 // Wipe the middle block
506 for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) {
507 SetByte(offset, 'x');
508 }
509
510 ASSERT_EQ("correct", Read());
511 ASSERT_EQ("EOF", Read());
512 const size_t dropped = DroppedBytes();
513 ASSERT_LE(dropped, 2 * kBlockSize + 100);
514 ASSERT_GE(dropped, 2 * kBlockSize);
515}
516
517TEST_F(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); }
518
519TEST_F(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); }
520
521TEST_F(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); }
522
523TEST_F(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); }
524
525TEST_F(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); }
526
527TEST_F(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); }
528
529TEST_F(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); }
530
531TEST_F(LogTest, ReadFourthFirstBlockTrailer) {
532 CheckInitialOffsetRecord(log::kBlockSize - 4, 3);
533}
534
535TEST_F(LogTest, ReadFourthMiddleBlock) {
536 CheckInitialOffsetRecord(log::kBlockSize + 1, 3);
537}
538
539TEST_F(LogTest, ReadFourthLastBlock) {
540 CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3);
541}
542
543TEST_F(LogTest, ReadFourthStart) {
544 CheckInitialOffsetRecord(
545 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize,
546 3);
547}
548
549TEST_F(LogTest, ReadInitialOffsetIntoBlockPadding) {
550 CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5);
551}
552
553TEST_F(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); }
554
555TEST_F(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); }
556
557} // namespace log
558} // namespace leveldb
559