1 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style license that can be |
3 | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
4 | |
5 | #include "gtest/gtest.h" |
6 | #include "db/log_reader.h" |
7 | #include "db/log_writer.h" |
8 | #include "leveldb/env.h" |
9 | #include "util/coding.h" |
10 | #include "util/crc32c.h" |
11 | #include "util/random.h" |
12 | |
13 | namespace leveldb { |
14 | namespace log { |
15 | |
16 | // Construct a string of the specified length made out of the supplied |
17 | // partial string. |
18 | static std::string BigString(const std::string& partial_string, size_t n) { |
19 | std::string result; |
20 | while (result.size() < n) { |
21 | result.append(partial_string); |
22 | } |
23 | result.resize(n); |
24 | return result; |
25 | } |
26 | |
27 | // Construct a string from a number |
28 | static std::string NumberString(int n) { |
29 | char buf[50]; |
30 | std::snprintf(buf, sizeof(buf), "%d." , n); |
31 | return std::string(buf); |
32 | } |
33 | |
34 | // Return a skewed potentially long string |
35 | static std::string RandomSkewedString(int i, Random* rnd) { |
36 | return BigString(NumberString(i), rnd->Skewed(17)); |
37 | } |
38 | |
39 | class LogTest : public testing::Test { |
40 | public: |
41 | LogTest() |
42 | : reading_(false), |
43 | writer_(new Writer(&dest_)), |
44 | reader_(new Reader(&source_, &report_, true /*checksum*/, |
45 | 0 /*initial_offset*/)) {} |
46 | |
47 | ~LogTest() { |
48 | delete writer_; |
49 | delete reader_; |
50 | } |
51 | |
52 | void ReopenForAppend() { |
53 | delete writer_; |
54 | writer_ = new Writer(&dest_, dest_.contents_.size()); |
55 | } |
56 | |
57 | void Write(const std::string& msg) { |
58 | ASSERT_TRUE(!reading_) << "Write() after starting to read" ; |
59 | writer_->AddRecord(Slice(msg)); |
60 | } |
61 | |
62 | size_t WrittenBytes() const { return dest_.contents_.size(); } |
63 | |
64 | std::string Read() { |
65 | if (!reading_) { |
66 | reading_ = true; |
67 | source_.contents_ = Slice(dest_.contents_); |
68 | } |
69 | std::string scratch; |
70 | Slice record; |
71 | if (reader_->ReadRecord(&record, &scratch)) { |
72 | return record.ToString(); |
73 | } else { |
74 | return "EOF" ; |
75 | } |
76 | } |
77 | |
78 | void IncrementByte(int offset, int delta) { |
79 | dest_.contents_[offset] += delta; |
80 | } |
81 | |
82 | void SetByte(int offset, char new_byte) { |
83 | dest_.contents_[offset] = new_byte; |
84 | } |
85 | |
86 | void ShrinkSize(int bytes) { |
87 | dest_.contents_.resize(dest_.contents_.size() - bytes); |
88 | } |
89 | |
90 | void FixChecksum(int , int len) { |
91 | // Compute crc of type/len/data |
92 | uint32_t crc = crc32c::Value(&dest_.contents_[header_offset + 6], 1 + len); |
93 | crc = crc32c::Mask(crc); |
94 | EncodeFixed32(&dest_.contents_[header_offset], crc); |
95 | } |
96 | |
97 | void ForceError() { source_.force_error_ = true; } |
98 | |
99 | size_t DroppedBytes() const { return report_.dropped_bytes_; } |
100 | |
101 | std::string ReportMessage() const { return report_.message_; } |
102 | |
103 | // Returns OK iff recorded error message contains "msg" |
104 | std::string MatchError(const std::string& msg) const { |
105 | if (report_.message_.find(msg) == std::string::npos) { |
106 | return report_.message_; |
107 | } else { |
108 | return "OK" ; |
109 | } |
110 | } |
111 | |
112 | void WriteInitialOffsetLog() { |
113 | for (int i = 0; i < num_initial_offset_records_; i++) { |
114 | std::string record(initial_offset_record_sizes_[i], |
115 | static_cast<char>('a' + i)); |
116 | Write(record); |
117 | } |
118 | } |
119 | |
120 | void StartReadingAt(uint64_t initial_offset) { |
121 | delete reader_; |
122 | reader_ = new Reader(&source_, &report_, true /*checksum*/, initial_offset); |
123 | } |
124 | |
125 | void CheckOffsetPastEndReturnsNoRecords(uint64_t offset_past_end) { |
126 | WriteInitialOffsetLog(); |
127 | reading_ = true; |
128 | source_.contents_ = Slice(dest_.contents_); |
129 | Reader* offset_reader = new Reader(&source_, &report_, true /*checksum*/, |
130 | WrittenBytes() + offset_past_end); |
131 | Slice record; |
132 | std::string scratch; |
133 | ASSERT_TRUE(!offset_reader->ReadRecord(&record, &scratch)); |
134 | delete offset_reader; |
135 | } |
136 | |
137 | void CheckInitialOffsetRecord(uint64_t initial_offset, |
138 | int expected_record_offset) { |
139 | WriteInitialOffsetLog(); |
140 | reading_ = true; |
141 | source_.contents_ = Slice(dest_.contents_); |
142 | Reader* offset_reader = |
143 | new Reader(&source_, &report_, true /*checksum*/, initial_offset); |
144 | |
145 | // Read all records from expected_record_offset through the last one. |
146 | ASSERT_LT(expected_record_offset, num_initial_offset_records_); |
147 | for (; expected_record_offset < num_initial_offset_records_; |
148 | ++expected_record_offset) { |
149 | Slice record; |
150 | std::string scratch; |
151 | ASSERT_TRUE(offset_reader->ReadRecord(&record, &scratch)); |
152 | ASSERT_EQ(initial_offset_record_sizes_[expected_record_offset], |
153 | record.size()); |
154 | ASSERT_EQ(initial_offset_last_record_offsets_[expected_record_offset], |
155 | offset_reader->LastRecordOffset()); |
156 | ASSERT_EQ((char)('a' + expected_record_offset), record.data()[0]); |
157 | } |
158 | delete offset_reader; |
159 | } |
160 | |
161 | private: |
162 | class StringDest : public WritableFile { |
163 | public: |
164 | Status Close() override { return Status::OK(); } |
165 | Status Flush() override { return Status::OK(); } |
166 | Status Sync() override { return Status::OK(); } |
167 | Status Append(const Slice& slice) override { |
168 | contents_.append(slice.data(), slice.size()); |
169 | return Status::OK(); |
170 | } |
171 | |
172 | std::string contents_; |
173 | }; |
174 | |
175 | class StringSource : public SequentialFile { |
176 | public: |
177 | StringSource() : force_error_(false), returned_partial_(false) {} |
178 | |
179 | Status Read(size_t n, Slice* result, char* scratch) override { |
180 | EXPECT_TRUE(!returned_partial_) << "must not Read() after eof/error" ; |
181 | |
182 | if (force_error_) { |
183 | force_error_ = false; |
184 | returned_partial_ = true; |
185 | return Status::Corruption("read error" ); |
186 | } |
187 | |
188 | if (contents_.size() < n) { |
189 | n = contents_.size(); |
190 | returned_partial_ = true; |
191 | } |
192 | *result = Slice(contents_.data(), n); |
193 | contents_.remove_prefix(n); |
194 | return Status::OK(); |
195 | } |
196 | |
197 | Status Skip(uint64_t n) override { |
198 | if (n > contents_.size()) { |
199 | contents_.clear(); |
200 | return Status::NotFound("in-memory file skipped past end" ); |
201 | } |
202 | |
203 | contents_.remove_prefix(n); |
204 | |
205 | return Status::OK(); |
206 | } |
207 | |
208 | Slice contents_; |
209 | bool force_error_; |
210 | bool returned_partial_; |
211 | }; |
212 | |
213 | class ReportCollector : public Reader::Reporter { |
214 | public: |
215 | ReportCollector() : dropped_bytes_(0) {} |
216 | void Corruption(size_t bytes, const Status& status) override { |
217 | dropped_bytes_ += bytes; |
218 | message_.append(status.ToString()); |
219 | } |
220 | |
221 | size_t dropped_bytes_; |
222 | std::string message_; |
223 | }; |
224 | |
225 | // Record metadata for testing initial offset functionality |
226 | static size_t initial_offset_record_sizes_[]; |
227 | static uint64_t initial_offset_last_record_offsets_[]; |
228 | static int num_initial_offset_records_; |
229 | |
230 | StringDest dest_; |
231 | StringSource source_; |
232 | ReportCollector report_; |
233 | bool reading_; |
234 | Writer* writer_; |
235 | Reader* reader_; |
236 | }; |
237 | |
238 | size_t LogTest::initial_offset_record_sizes_[] = { |
239 | 10000, // Two sizable records in first block |
240 | 10000, |
241 | 2 * log::kBlockSize - 1000, // Span three blocks |
242 | 1, |
243 | 13716, // Consume all but two bytes of block 3. |
244 | log::kBlockSize - kHeaderSize, // Consume the entirety of block 4. |
245 | }; |
246 | |
247 | uint64_t LogTest::initial_offset_last_record_offsets_[] = { |
248 | 0, |
249 | kHeaderSize + 10000, |
250 | 2 * (kHeaderSize + 10000), |
251 | 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize, |
252 | 2 * (kHeaderSize + 10000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize + |
253 | kHeaderSize + 1, |
254 | 3 * log::kBlockSize, |
255 | }; |
256 | |
257 | // LogTest::initial_offset_last_record_offsets_ must be defined before this. |
258 | int LogTest::num_initial_offset_records_ = |
259 | sizeof(LogTest::initial_offset_last_record_offsets_) / sizeof(uint64_t); |
260 | |
261 | TEST_F(LogTest, Empty) { ASSERT_EQ("EOF" , Read()); } |
262 | |
263 | TEST_F(LogTest, ReadWrite) { |
264 | Write("foo" ); |
265 | Write("bar" ); |
266 | Write("" ); |
267 | Write("xxxx" ); |
268 | ASSERT_EQ("foo" , Read()); |
269 | ASSERT_EQ("bar" , Read()); |
270 | ASSERT_EQ("" , Read()); |
271 | ASSERT_EQ("xxxx" , Read()); |
272 | ASSERT_EQ("EOF" , Read()); |
273 | ASSERT_EQ("EOF" , Read()); // Make sure reads at eof work |
274 | } |
275 | |
276 | TEST_F(LogTest, ManyBlocks) { |
277 | for (int i = 0; i < 100000; i++) { |
278 | Write(NumberString(i)); |
279 | } |
280 | for (int i = 0; i < 100000; i++) { |
281 | ASSERT_EQ(NumberString(i), Read()); |
282 | } |
283 | ASSERT_EQ("EOF" , Read()); |
284 | } |
285 | |
286 | TEST_F(LogTest, Fragmentation) { |
287 | Write("small" ); |
288 | Write(BigString("medium" , 50000)); |
289 | Write(BigString("large" , 100000)); |
290 | ASSERT_EQ("small" , Read()); |
291 | ASSERT_EQ(BigString("medium" , 50000), Read()); |
292 | ASSERT_EQ(BigString("large" , 100000), Read()); |
293 | ASSERT_EQ("EOF" , Read()); |
294 | } |
295 | |
296 | TEST_F(LogTest, MarginalTrailer) { |
297 | // Make a trailer that is exactly the same length as an empty record. |
298 | const int n = kBlockSize - 2 * kHeaderSize; |
299 | Write(BigString("foo" , n)); |
300 | ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes()); |
301 | Write("" ); |
302 | Write("bar" ); |
303 | ASSERT_EQ(BigString("foo" , n), Read()); |
304 | ASSERT_EQ("" , Read()); |
305 | ASSERT_EQ("bar" , Read()); |
306 | ASSERT_EQ("EOF" , Read()); |
307 | } |
308 | |
309 | TEST_F(LogTest, MarginalTrailer2) { |
310 | // Make a trailer that is exactly the same length as an empty record. |
311 | const int n = kBlockSize - 2 * kHeaderSize; |
312 | Write(BigString("foo" , n)); |
313 | ASSERT_EQ(kBlockSize - kHeaderSize, WrittenBytes()); |
314 | Write("bar" ); |
315 | ASSERT_EQ(BigString("foo" , n), Read()); |
316 | ASSERT_EQ("bar" , Read()); |
317 | ASSERT_EQ("EOF" , Read()); |
318 | ASSERT_EQ(0, DroppedBytes()); |
319 | ASSERT_EQ("" , ReportMessage()); |
320 | } |
321 | |
322 | TEST_F(LogTest, ShortTrailer) { |
323 | const int n = kBlockSize - 2 * kHeaderSize + 4; |
324 | Write(BigString("foo" , n)); |
325 | ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes()); |
326 | Write("" ); |
327 | Write("bar" ); |
328 | ASSERT_EQ(BigString("foo" , n), Read()); |
329 | ASSERT_EQ("" , Read()); |
330 | ASSERT_EQ("bar" , Read()); |
331 | ASSERT_EQ("EOF" , Read()); |
332 | } |
333 | |
334 | TEST_F(LogTest, AlignedEof) { |
335 | const int n = kBlockSize - 2 * kHeaderSize + 4; |
336 | Write(BigString("foo" , n)); |
337 | ASSERT_EQ(kBlockSize - kHeaderSize + 4, WrittenBytes()); |
338 | ASSERT_EQ(BigString("foo" , n), Read()); |
339 | ASSERT_EQ("EOF" , Read()); |
340 | } |
341 | |
342 | TEST_F(LogTest, OpenForAppend) { |
343 | Write("hello" ); |
344 | ReopenForAppend(); |
345 | Write("world" ); |
346 | ASSERT_EQ("hello" , Read()); |
347 | ASSERT_EQ("world" , Read()); |
348 | ASSERT_EQ("EOF" , Read()); |
349 | } |
350 | |
351 | TEST_F(LogTest, RandomRead) { |
352 | const int N = 500; |
353 | Random write_rnd(301); |
354 | for (int i = 0; i < N; i++) { |
355 | Write(RandomSkewedString(i, &write_rnd)); |
356 | } |
357 | Random read_rnd(301); |
358 | for (int i = 0; i < N; i++) { |
359 | ASSERT_EQ(RandomSkewedString(i, &read_rnd), Read()); |
360 | } |
361 | ASSERT_EQ("EOF" , Read()); |
362 | } |
363 | |
364 | // Tests of all the error paths in log_reader.cc follow: |
365 | |
366 | TEST_F(LogTest, ReadError) { |
367 | Write("foo" ); |
368 | ForceError(); |
369 | ASSERT_EQ("EOF" , Read()); |
370 | ASSERT_EQ(kBlockSize, DroppedBytes()); |
371 | ASSERT_EQ("OK" , MatchError("read error" )); |
372 | } |
373 | |
374 | TEST_F(LogTest, BadRecordType) { |
375 | Write("foo" ); |
376 | // Type is stored in header[6] |
377 | IncrementByte(6, 100); |
378 | FixChecksum(0, 3); |
379 | ASSERT_EQ("EOF" , Read()); |
380 | ASSERT_EQ(3, DroppedBytes()); |
381 | ASSERT_EQ("OK" , MatchError("unknown record type" )); |
382 | } |
383 | |
384 | TEST_F(LogTest, TruncatedTrailingRecordIsIgnored) { |
385 | Write("foo" ); |
386 | ShrinkSize(4); // Drop all payload as well as a header byte |
387 | ASSERT_EQ("EOF" , Read()); |
388 | // Truncated last record is ignored, not treated as an error. |
389 | ASSERT_EQ(0, DroppedBytes()); |
390 | ASSERT_EQ("" , ReportMessage()); |
391 | } |
392 | |
393 | TEST_F(LogTest, BadLength) { |
394 | const int kPayloadSize = kBlockSize - kHeaderSize; |
395 | Write(BigString("bar" , kPayloadSize)); |
396 | Write("foo" ); |
397 | // Least significant size byte is stored in header[4]. |
398 | IncrementByte(4, 1); |
399 | ASSERT_EQ("foo" , Read()); |
400 | ASSERT_EQ(kBlockSize, DroppedBytes()); |
401 | ASSERT_EQ("OK" , MatchError("bad record length" )); |
402 | } |
403 | |
404 | TEST_F(LogTest, BadLengthAtEndIsIgnored) { |
405 | Write("foo" ); |
406 | ShrinkSize(1); |
407 | ASSERT_EQ("EOF" , Read()); |
408 | ASSERT_EQ(0, DroppedBytes()); |
409 | ASSERT_EQ("" , ReportMessage()); |
410 | } |
411 | |
412 | TEST_F(LogTest, ChecksumMismatch) { |
413 | Write("foo" ); |
414 | IncrementByte(0, 10); |
415 | ASSERT_EQ("EOF" , Read()); |
416 | ASSERT_EQ(10, DroppedBytes()); |
417 | ASSERT_EQ("OK" , MatchError("checksum mismatch" )); |
418 | } |
419 | |
420 | TEST_F(LogTest, UnexpectedMiddleType) { |
421 | Write("foo" ); |
422 | SetByte(6, kMiddleType); |
423 | FixChecksum(0, 3); |
424 | ASSERT_EQ("EOF" , Read()); |
425 | ASSERT_EQ(3, DroppedBytes()); |
426 | ASSERT_EQ("OK" , MatchError("missing start" )); |
427 | } |
428 | |
429 | TEST_F(LogTest, UnexpectedLastType) { |
430 | Write("foo" ); |
431 | SetByte(6, kLastType); |
432 | FixChecksum(0, 3); |
433 | ASSERT_EQ("EOF" , Read()); |
434 | ASSERT_EQ(3, DroppedBytes()); |
435 | ASSERT_EQ("OK" , MatchError("missing start" )); |
436 | } |
437 | |
438 | TEST_F(LogTest, UnexpectedFullType) { |
439 | Write("foo" ); |
440 | Write("bar" ); |
441 | SetByte(6, kFirstType); |
442 | FixChecksum(0, 3); |
443 | ASSERT_EQ("bar" , Read()); |
444 | ASSERT_EQ("EOF" , Read()); |
445 | ASSERT_EQ(3, DroppedBytes()); |
446 | ASSERT_EQ("OK" , MatchError("partial record without end" )); |
447 | } |
448 | |
449 | TEST_F(LogTest, UnexpectedFirstType) { |
450 | Write("foo" ); |
451 | Write(BigString("bar" , 100000)); |
452 | SetByte(6, kFirstType); |
453 | FixChecksum(0, 3); |
454 | ASSERT_EQ(BigString("bar" , 100000), Read()); |
455 | ASSERT_EQ("EOF" , Read()); |
456 | ASSERT_EQ(3, DroppedBytes()); |
457 | ASSERT_EQ("OK" , MatchError("partial record without end" )); |
458 | } |
459 | |
460 | TEST_F(LogTest, MissingLastIsIgnored) { |
461 | Write(BigString("bar" , kBlockSize)); |
462 | // Remove the LAST block, including header. |
463 | ShrinkSize(14); |
464 | ASSERT_EQ("EOF" , Read()); |
465 | ASSERT_EQ("" , ReportMessage()); |
466 | ASSERT_EQ(0, DroppedBytes()); |
467 | } |
468 | |
469 | TEST_F(LogTest, PartialLastIsIgnored) { |
470 | Write(BigString("bar" , kBlockSize)); |
471 | // Cause a bad record length in the LAST block. |
472 | ShrinkSize(1); |
473 | ASSERT_EQ("EOF" , Read()); |
474 | ASSERT_EQ("" , ReportMessage()); |
475 | ASSERT_EQ(0, DroppedBytes()); |
476 | } |
477 | |
478 | TEST_F(LogTest, SkipIntoMultiRecord) { |
479 | // Consider a fragmented record: |
480 | // first(R1), middle(R1), last(R1), first(R2) |
481 | // If initial_offset points to a record after first(R1) but before first(R2) |
482 | // incomplete fragment errors are not actual errors, and must be suppressed |
483 | // until a new first or full record is encountered. |
484 | Write(BigString("foo" , 3 * kBlockSize)); |
485 | Write("correct" ); |
486 | StartReadingAt(kBlockSize); |
487 | |
488 | ASSERT_EQ("correct" , Read()); |
489 | ASSERT_EQ("" , ReportMessage()); |
490 | ASSERT_EQ(0, DroppedBytes()); |
491 | ASSERT_EQ("EOF" , Read()); |
492 | } |
493 | |
494 | TEST_F(LogTest, ErrorJoinsRecords) { |
495 | // Consider two fragmented records: |
496 | // first(R1) last(R1) first(R2) last(R2) |
497 | // where the middle two fragments disappear. We do not want |
498 | // first(R1),last(R2) to get joined and returned as a valid record. |
499 | |
500 | // Write records that span two blocks |
501 | Write(BigString("foo" , kBlockSize)); |
502 | Write(BigString("bar" , kBlockSize)); |
503 | Write("correct" ); |
504 | |
505 | // Wipe the middle block |
506 | for (int offset = kBlockSize; offset < 2 * kBlockSize; offset++) { |
507 | SetByte(offset, 'x'); |
508 | } |
509 | |
510 | ASSERT_EQ("correct" , Read()); |
511 | ASSERT_EQ("EOF" , Read()); |
512 | const size_t dropped = DroppedBytes(); |
513 | ASSERT_LE(dropped, 2 * kBlockSize + 100); |
514 | ASSERT_GE(dropped, 2 * kBlockSize); |
515 | } |
516 | |
517 | TEST_F(LogTest, ReadStart) { CheckInitialOffsetRecord(0, 0); } |
518 | |
519 | TEST_F(LogTest, ReadSecondOneOff) { CheckInitialOffsetRecord(1, 1); } |
520 | |
521 | TEST_F(LogTest, ReadSecondTenThousand) { CheckInitialOffsetRecord(10000, 1); } |
522 | |
523 | TEST_F(LogTest, ReadSecondStart) { CheckInitialOffsetRecord(10007, 1); } |
524 | |
525 | TEST_F(LogTest, ReadThirdOneOff) { CheckInitialOffsetRecord(10008, 2); } |
526 | |
527 | TEST_F(LogTest, ReadThirdStart) { CheckInitialOffsetRecord(20014, 2); } |
528 | |
529 | TEST_F(LogTest, ReadFourthOneOff) { CheckInitialOffsetRecord(20015, 3); } |
530 | |
531 | TEST_F(LogTest, ReadFourthFirstBlockTrailer) { |
532 | CheckInitialOffsetRecord(log::kBlockSize - 4, 3); |
533 | } |
534 | |
535 | TEST_F(LogTest, ReadFourthMiddleBlock) { |
536 | CheckInitialOffsetRecord(log::kBlockSize + 1, 3); |
537 | } |
538 | |
539 | TEST_F(LogTest, ReadFourthLastBlock) { |
540 | CheckInitialOffsetRecord(2 * log::kBlockSize + 1, 3); |
541 | } |
542 | |
543 | TEST_F(LogTest, ReadFourthStart) { |
544 | CheckInitialOffsetRecord( |
545 | 2 * (kHeaderSize + 1000) + (2 * log::kBlockSize - 1000) + 3 * kHeaderSize, |
546 | 3); |
547 | } |
548 | |
549 | TEST_F(LogTest, ReadInitialOffsetIntoBlockPadding) { |
550 | CheckInitialOffsetRecord(3 * log::kBlockSize - 3, 5); |
551 | } |
552 | |
553 | TEST_F(LogTest, ReadEnd) { CheckOffsetPastEndReturnsNoRecords(0); } |
554 | |
555 | TEST_F(LogTest, ReadPastEnd) { CheckOffsetPastEndReturnsNoRecords(5); } |
556 | |
557 | } // namespace log |
558 | } // namespace leveldb |
559 | |