1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4//
5// WriteBatch::rep_ :=
6// sequence: fixed64
7// count: fixed32
8// data: record[count]
9// record :=
10// kTypeValue varstring varstring |
11// kTypeDeletion varstring
12// varstring :=
13// len: varint32
14// data: uint8[len]
15
16#include "leveldb/write_batch.h"
17
18#include "db/dbformat.h"
19#include "db/memtable.h"
20#include "db/write_batch_internal.h"
21#include "leveldb/db.h"
22#include "util/coding.h"
23
24namespace leveldb {
25
26// WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
27static const size_t kHeader = 12;
28
29WriteBatch::WriteBatch() { Clear(); }
30
31WriteBatch::~WriteBatch() = default;
32
33WriteBatch::Handler::~Handler() = default;
34
35void WriteBatch::Clear() {
36 rep_.clear();
37 rep_.resize(kHeader);
38}
39
40size_t WriteBatch::ApproximateSize() const { return rep_.size(); }
41
42Status WriteBatch::Iterate(Handler* handler) const {
43 Slice input(rep_);
44 if (input.size() < kHeader) {
45 return Status::Corruption("malformed WriteBatch (too small)");
46 }
47
48 input.remove_prefix(kHeader);
49 Slice key, value;
50 int found = 0;
51 while (!input.empty()) {
52 found++;
53 char tag = input[0];
54 input.remove_prefix(1);
55 switch (tag) {
56 case kTypeValue:
57 if (GetLengthPrefixedSlice(&input, &key) &&
58 GetLengthPrefixedSlice(&input, &value)) {
59 handler->Put(key, value);
60 } else {
61 return Status::Corruption("bad WriteBatch Put");
62 }
63 break;
64 case kTypeDeletion:
65 if (GetLengthPrefixedSlice(&input, &key)) {
66 handler->Delete(key);
67 } else {
68 return Status::Corruption("bad WriteBatch Delete");
69 }
70 break;
71 default:
72 return Status::Corruption("unknown WriteBatch tag");
73 }
74 }
75 if (found != WriteBatchInternal::Count(this)) {
76 return Status::Corruption("WriteBatch has wrong count");
77 } else {
78 return Status::OK();
79 }
80}
81
82int WriteBatchInternal::Count(const WriteBatch* b) {
83 return DecodeFixed32(b->rep_.data() + 8);
84}
85
86void WriteBatchInternal::SetCount(WriteBatch* b, int n) {
87 EncodeFixed32(&b->rep_[8], n);
88}
89
90SequenceNumber WriteBatchInternal::Sequence(const WriteBatch* b) {
91 return SequenceNumber(DecodeFixed64(b->rep_.data()));
92}
93
94void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
95 EncodeFixed64(&b->rep_[0], seq);
96}
97
98void WriteBatch::Put(const Slice& key, const Slice& value) {
99 WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
100 rep_.push_back(static_cast<char>(kTypeValue));
101 PutLengthPrefixedSlice(&rep_, key);
102 PutLengthPrefixedSlice(&rep_, value);
103}
104
105void WriteBatch::Delete(const Slice& key) {
106 WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
107 rep_.push_back(static_cast<char>(kTypeDeletion));
108 PutLengthPrefixedSlice(&rep_, key);
109}
110
111void WriteBatch::Append(const WriteBatch& source) {
112 WriteBatchInternal::Append(this, &source);
113}
114
115namespace {
116class MemTableInserter : public WriteBatch::Handler {
117 public:
118 SequenceNumber sequence_;
119 MemTable* mem_;
120
121 void Put(const Slice& key, const Slice& value) override {
122 mem_->Add(sequence_, kTypeValue, key, value);
123 sequence_++;
124 }
125 void Delete(const Slice& key) override {
126 mem_->Add(sequence_, kTypeDeletion, key, Slice());
127 sequence_++;
128 }
129};
130} // namespace
131
132Status WriteBatchInternal::InsertInto(const WriteBatch* b, MemTable* memtable) {
133 MemTableInserter inserter;
134 inserter.sequence_ = WriteBatchInternal::Sequence(b);
135 inserter.mem_ = memtable;
136 return b->Iterate(&inserter);
137}
138
139void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
140 assert(contents.size() >= kHeader);
141 b->rep_.assign(contents.data(), contents.size());
142}
143
144void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
145 SetCount(dst, Count(dst) + Count(src));
146 assert(src->rep_.size() >= kHeader);
147 dst->rep_.append(src->rep_.data() + kHeader, src->rep_.size() - kHeader);
148}
149
150} // namespace leveldb
151