1// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file. See the AUTHORS file for names of contributors.
4
5#include "leveldb/table_builder.h"
6
7#include <cassert>
8
9#include "leveldb/comparator.h"
10#include "leveldb/env.h"
11#include "leveldb/filter_policy.h"
12#include "leveldb/options.h"
13#include "table/block_builder.h"
14#include "table/filter_block.h"
15#include "table/format.h"
16#include "util/coding.h"
17#include "util/crc32c.h"
18
19namespace leveldb {
20
21struct TableBuilder::Rep {
22 Rep(const Options& opt, WritableFile* f)
23 : options(opt),
24 index_block_options(opt),
25 file(f),
26 offset(0),
27 data_block(&options),
28 index_block(&index_block_options),
29 num_entries(0),
30 closed(false),
31 filter_block(opt.filter_policy == nullptr
32 ? nullptr
33 : new FilterBlockBuilder(opt.filter_policy)),
34 pending_index_entry(false) {
35 index_block_options.block_restart_interval = 1;
36 }
37
38 Options options;
39 Options index_block_options;
40 WritableFile* file;
41 uint64_t offset;
42 Status status;
43 BlockBuilder data_block;
44 BlockBuilder index_block;
45 std::string last_key;
46 int64_t num_entries;
47 bool closed; // Either Finish() or Abandon() has been called.
48 FilterBlockBuilder* filter_block;
49
50 // We do not emit the index entry for a block until we have seen the
51 // first key for the next data block. This allows us to use shorter
52 // keys in the index block. For example, consider a block boundary
53 // between the keys "the quick brown fox" and "the who". We can use
54 // "the r" as the key for the index block entry since it is >= all
55 // entries in the first block and < all entries in subsequent
56 // blocks.
57 //
58 // Invariant: r->pending_index_entry is true only if data_block is empty.
59 bool pending_index_entry;
60 BlockHandle pending_handle; // Handle to add to index block
61
62 std::string compressed_output;
63};
64
65TableBuilder::TableBuilder(const Options& options, WritableFile* file)
66 : rep_(new Rep(options, file)) {
67 if (rep_->filter_block != nullptr) {
68 rep_->filter_block->StartBlock(0);
69 }
70}
71
72TableBuilder::~TableBuilder() {
73 assert(rep_->closed); // Catch errors where caller forgot to call Finish()
74 delete rep_->filter_block;
75 delete rep_;
76}
77
78Status TableBuilder::ChangeOptions(const Options& options) {
79 // Note: if more fields are added to Options, update
80 // this function to catch changes that should not be allowed to
81 // change in the middle of building a Table.
82 if (options.comparator != rep_->options.comparator) {
83 return Status::InvalidArgument("changing comparator while building table");
84 }
85
86 // Note that any live BlockBuilders point to rep_->options and therefore
87 // will automatically pick up the updated options.
88 rep_->options = options;
89 rep_->index_block_options = options;
90 rep_->index_block_options.block_restart_interval = 1;
91 return Status::OK();
92}
93
94void TableBuilder::Add(const Slice& key, const Slice& value) {
95 Rep* r = rep_;
96 assert(!r->closed);
97 if (!ok()) return;
98 if (r->num_entries > 0) {
99 assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
100 }
101
102 if (r->pending_index_entry) {
103 assert(r->data_block.empty());
104 r->options.comparator->FindShortestSeparator(&r->last_key, key);
105 std::string handle_encoding;
106 r->pending_handle.EncodeTo(&handle_encoding);
107 r->index_block.Add(r->last_key, Slice(handle_encoding));
108 r->pending_index_entry = false;
109 }
110
111 if (r->filter_block != nullptr) {
112 r->filter_block->AddKey(key);
113 }
114
115 r->last_key.assign(key.data(), key.size());
116 r->num_entries++;
117 r->data_block.Add(key, value);
118
119 const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
120 if (estimated_block_size >= r->options.block_size) {
121 Flush();
122 }
123}
124
125void TableBuilder::Flush() {
126 Rep* r = rep_;
127 assert(!r->closed);
128 if (!ok()) return;
129 if (r->data_block.empty()) return;
130 assert(!r->pending_index_entry);
131 WriteBlock(&r->data_block, &r->pending_handle);
132 if (ok()) {
133 r->pending_index_entry = true;
134 r->status = r->file->Flush();
135 }
136 if (r->filter_block != nullptr) {
137 r->filter_block->StartBlock(r->offset);
138 }
139}
140
141void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
142 // File format contains a sequence of blocks where each block has:
143 // block_data: uint8[n]
144 // type: uint8
145 // crc: uint32
146 assert(ok());
147 Rep* r = rep_;
148 Slice raw = block->Finish();
149
150 Slice block_contents;
151 CompressionType type = r->options.compression;
152 // TODO(postrelease): Support more compression options: zlib?
153 switch (type) {
154 case kNoCompression:
155 block_contents = raw;
156 break;
157
158 case kSnappyCompression: {
159 std::string* compressed = &r->compressed_output;
160 if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
161 compressed->size() < raw.size() - (raw.size() / 8u)) {
162 block_contents = *compressed;
163 } else {
164 // Snappy not supported, or compressed less than 12.5%, so just
165 // store uncompressed form
166 block_contents = raw;
167 type = kNoCompression;
168 }
169 break;
170 }
171 }
172 WriteRawBlock(block_contents, type, handle);
173 r->compressed_output.clear();
174 block->Reset();
175}
176
177void TableBuilder::WriteRawBlock(const Slice& block_contents,
178 CompressionType type, BlockHandle* handle) {
179 Rep* r = rep_;
180 handle->set_offset(r->offset);
181 handle->set_size(block_contents.size());
182 r->status = r->file->Append(block_contents);
183 if (r->status.ok()) {
184 char trailer[kBlockTrailerSize];
185 trailer[0] = type;
186 uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
187 crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
188 EncodeFixed32(trailer + 1, crc32c::Mask(crc));
189 r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
190 if (r->status.ok()) {
191 r->offset += block_contents.size() + kBlockTrailerSize;
192 }
193 }
194}
195
196Status TableBuilder::status() const { return rep_->status; }
197
198Status TableBuilder::Finish() {
199 Rep* r = rep_;
200 Flush();
201 assert(!r->closed);
202 r->closed = true;
203
204 BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
205
206 // Write filter block
207 if (ok() && r->filter_block != nullptr) {
208 WriteRawBlock(r->filter_block->Finish(), kNoCompression,
209 &filter_block_handle);
210 }
211
212 // Write metaindex block
213 if (ok()) {
214 BlockBuilder meta_index_block(&r->options);
215 if (r->filter_block != nullptr) {
216 // Add mapping from "filter.Name" to location of filter data
217 std::string key = "filter.";
218 key.append(r->options.filter_policy->Name());
219 std::string handle_encoding;
220 filter_block_handle.EncodeTo(&handle_encoding);
221 meta_index_block.Add(key, handle_encoding);
222 }
223
224 // TODO(postrelease): Add stats and other meta blocks
225 WriteBlock(&meta_index_block, &metaindex_block_handle);
226 }
227
228 // Write index block
229 if (ok()) {
230 if (r->pending_index_entry) {
231 r->options.comparator->FindShortSuccessor(&r->last_key);
232 std::string handle_encoding;
233 r->pending_handle.EncodeTo(&handle_encoding);
234 r->index_block.Add(r->last_key, Slice(handle_encoding));
235 r->pending_index_entry = false;
236 }
237 WriteBlock(&r->index_block, &index_block_handle);
238 }
239
240 // Write footer
241 if (ok()) {
242 Footer footer;
243 footer.set_metaindex_handle(metaindex_block_handle);
244 footer.set_index_handle(index_block_handle);
245 std::string footer_encoding;
246 footer.EncodeTo(&footer_encoding);
247 r->status = r->file->Append(footer_encoding);
248 if (r->status.ok()) {
249 r->offset += footer_encoding.size();
250 }
251 }
252 return r->status;
253}
254
255void TableBuilder::Abandon() {
256 Rep* r = rep_;
257 assert(!r->closed);
258 r->closed = true;
259}
260
261uint64_t TableBuilder::NumEntries() const { return rep_->num_entries; }
262
263uint64_t TableBuilder::FileSize() const { return rep_->offset; }
264
265} // namespace leveldb
266