1 | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
2 | // Use of this source code is governed by a BSD-style license that can be |
3 | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
4 | |
5 | #include <sys/types.h> |
6 | |
7 | #include <atomic> |
8 | #include <cstdio> |
9 | #include <cstdlib> |
10 | |
11 | #include "leveldb/cache.h" |
12 | #include "leveldb/comparator.h" |
13 | #include "leveldb/db.h" |
14 | #include "leveldb/env.h" |
15 | #include "leveldb/filter_policy.h" |
16 | #include "leveldb/write_batch.h" |
17 | #include "port/port.h" |
18 | #include "util/crc32c.h" |
19 | #include "util/histogram.h" |
20 | #include "util/mutexlock.h" |
21 | #include "util/random.h" |
22 | #include "util/testutil.h" |
23 | |
24 | // Comma-separated list of operations to run in the specified order |
25 | // Actual benchmarks: |
26 | // fillseq -- write N values in sequential key order in async mode |
27 | // fillrandom -- write N values in random key order in async mode |
28 | // overwrite -- overwrite N values in random key order in async mode |
29 | // fillsync -- write N/100 values in random key order in sync mode |
30 | // fill100K -- write N/1000 100K values in random order in async mode |
31 | // deleteseq -- delete N keys in sequential order |
32 | // deleterandom -- delete N keys in random order |
33 | // readseq -- read N times sequentially |
34 | // readreverse -- read N times in reverse order |
35 | // readrandom -- read N times in random order |
36 | // readmissing -- read N missing keys in random order |
37 | // readhot -- read N times in random order from 1% section of DB |
38 | // seekrandom -- N random seeks |
39 | // seekordered -- N ordered seeks |
40 | // open -- cost of opening a DB |
41 | // crc32c -- repeated crc32c of 4K of data |
42 | // Meta operations: |
43 | // compact -- Compact the entire DB |
44 | // stats -- Print DB stats |
45 | // sstables -- Print sstable info |
46 | // heapprofile -- Dump a heap profile (if supported by this port) |
47 | static const char* FLAGS_benchmarks = |
48 | "fillseq," |
49 | "fillsync," |
50 | "fillrandom," |
51 | "overwrite," |
52 | "readrandom," |
53 | "readrandom," // Extra run to allow previous compactions to quiesce |
54 | "readseq," |
55 | "readreverse," |
56 | "compact," |
57 | "readrandom," |
58 | "readseq," |
59 | "readreverse," |
60 | "fill100K," |
61 | "crc32c," |
62 | "snappycomp," |
63 | "snappyuncomp," ; |
64 | |
65 | // Number of key/values to place in database |
66 | static int FLAGS_num = 1000000; |
67 | |
68 | // Number of read operations to do. If negative, do FLAGS_num reads. |
69 | static int FLAGS_reads = -1; |
70 | |
71 | // Number of concurrent threads to run. |
72 | static int FLAGS_threads = 1; |
73 | |
74 | // Size of each value |
75 | static int FLAGS_value_size = 100; |
76 | |
77 | // Arrange to generate values that shrink to this fraction of |
78 | // their original size after compression |
79 | static double FLAGS_compression_ratio = 0.5; |
80 | |
81 | // Print histogram of operation timings |
82 | static bool FLAGS_histogram = false; |
83 | |
84 | // Count the number of string comparisons performed |
85 | static bool FLAGS_comparisons = false; |
86 | |
87 | // Number of bytes to buffer in memtable before compacting |
88 | // (initialized to default value by "main") |
89 | static int FLAGS_write_buffer_size = 0; |
90 | |
91 | // Number of bytes written to each file. |
92 | // (initialized to default value by "main") |
93 | static int FLAGS_max_file_size = 0; |
94 | |
95 | // Approximate size of user data packed per block (before compression. |
96 | // (initialized to default value by "main") |
97 | static int FLAGS_block_size = 0; |
98 | |
99 | // Number of bytes to use as a cache of uncompressed data. |
100 | // Negative means use default settings. |
101 | static int FLAGS_cache_size = -1; |
102 | |
103 | // Maximum number of files to keep open at the same time (use default if == 0) |
104 | static int FLAGS_open_files = 0; |
105 | |
106 | // Bloom filter bits per key. |
107 | // Negative means use default settings. |
108 | static int FLAGS_bloom_bits = -1; |
109 | |
110 | // Common key prefix length. |
111 | static int FLAGS_key_prefix = 0; |
112 | |
113 | // If true, do not destroy the existing database. If you set this |
114 | // flag and also specify a benchmark that wants a fresh database, that |
115 | // benchmark will fail. |
116 | static bool FLAGS_use_existing_db = false; |
117 | |
118 | // If true, reuse existing log/MANIFEST files when re-opening a database. |
119 | static bool FLAGS_reuse_logs = false; |
120 | |
121 | // If true, use compression. |
122 | static bool FLAGS_compression = true; |
123 | |
124 | // Use the db with the following name. |
125 | static const char* FLAGS_db = nullptr; |
126 | |
127 | namespace leveldb { |
128 | |
129 | namespace { |
130 | leveldb::Env* g_env = nullptr; |
131 | |
132 | class CountComparator : public Comparator { |
133 | public: |
134 | CountComparator(const Comparator* wrapped) : wrapped_(wrapped) {} |
135 | ~CountComparator() override {} |
136 | int Compare(const Slice& a, const Slice& b) const override { |
137 | count_.fetch_add(1, std::memory_order_relaxed); |
138 | return wrapped_->Compare(a, b); |
139 | } |
140 | const char* Name() const override { return wrapped_->Name(); } |
141 | void FindShortestSeparator(std::string* start, |
142 | const Slice& limit) const override { |
143 | wrapped_->FindShortestSeparator(start, limit); |
144 | } |
145 | |
146 | void FindShortSuccessor(std::string* key) const override { |
147 | return wrapped_->FindShortSuccessor(key); |
148 | } |
149 | |
150 | size_t comparisons() const { return count_.load(std::memory_order_relaxed); } |
151 | |
152 | void reset() { count_.store(0, std::memory_order_relaxed); } |
153 | |
154 | private: |
155 | mutable std::atomic<size_t> count_{0}; |
156 | const Comparator* const wrapped_; |
157 | }; |
158 | |
159 | // Helper for quickly generating random data. |
160 | class RandomGenerator { |
161 | private: |
162 | std::string data_; |
163 | int pos_; |
164 | |
165 | public: |
166 | RandomGenerator() { |
167 | // We use a limited amount of data over and over again and ensure |
168 | // that it is larger than the compression window (32KB), and also |
169 | // large enough to serve all typical value sizes we want to write. |
170 | Random rnd(301); |
171 | std::string piece; |
172 | while (data_.size() < 1048576) { |
173 | // Add a short fragment that is as compressible as specified |
174 | // by FLAGS_compression_ratio. |
175 | test::CompressibleString(&rnd, FLAGS_compression_ratio, 100, &piece); |
176 | data_.append(piece); |
177 | } |
178 | pos_ = 0; |
179 | } |
180 | |
181 | Slice Generate(size_t len) { |
182 | if (pos_ + len > data_.size()) { |
183 | pos_ = 0; |
184 | assert(len < data_.size()); |
185 | } |
186 | pos_ += len; |
187 | return Slice(data_.data() + pos_ - len, len); |
188 | } |
189 | }; |
190 | |
191 | class KeyBuffer { |
192 | public: |
193 | KeyBuffer() { |
194 | assert(FLAGS_key_prefix < sizeof(buffer_)); |
195 | memset(buffer_, 'a', FLAGS_key_prefix); |
196 | } |
197 | KeyBuffer& operator=(KeyBuffer& other) = delete; |
198 | KeyBuffer(KeyBuffer& other) = delete; |
199 | |
200 | void Set(int k) { |
201 | std::snprintf(buffer_ + FLAGS_key_prefix, |
202 | sizeof(buffer_) - FLAGS_key_prefix, "%016d" , k); |
203 | } |
204 | |
205 | Slice slice() const { return Slice(buffer_, FLAGS_key_prefix + 16); } |
206 | |
207 | private: |
208 | char buffer_[1024]; |
209 | }; |
210 | |
211 | #if defined(__linux) |
212 | static Slice TrimSpace(Slice s) { |
213 | size_t start = 0; |
214 | while (start < s.size() && isspace(s[start])) { |
215 | start++; |
216 | } |
217 | size_t limit = s.size(); |
218 | while (limit > start && isspace(s[limit - 1])) { |
219 | limit--; |
220 | } |
221 | return Slice(s.data() + start, limit - start); |
222 | } |
223 | #endif |
224 | |
225 | static void AppendWithSpace(std::string* str, Slice msg) { |
226 | if (msg.empty()) return; |
227 | if (!str->empty()) { |
228 | str->push_back(' '); |
229 | } |
230 | str->append(msg.data(), msg.size()); |
231 | } |
232 | |
233 | class Stats { |
234 | private: |
235 | double start_; |
236 | double finish_; |
237 | double seconds_; |
238 | int done_; |
239 | int next_report_; |
240 | int64_t bytes_; |
241 | double last_op_finish_; |
242 | Histogram hist_; |
243 | std::string message_; |
244 | |
245 | public: |
246 | Stats() { Start(); } |
247 | |
248 | void Start() { |
249 | next_report_ = 100; |
250 | hist_.Clear(); |
251 | done_ = 0; |
252 | bytes_ = 0; |
253 | seconds_ = 0; |
254 | message_.clear(); |
255 | start_ = finish_ = last_op_finish_ = g_env->NowMicros(); |
256 | } |
257 | |
258 | void Merge(const Stats& other) { |
259 | hist_.Merge(other.hist_); |
260 | done_ += other.done_; |
261 | bytes_ += other.bytes_; |
262 | seconds_ += other.seconds_; |
263 | if (other.start_ < start_) start_ = other.start_; |
264 | if (other.finish_ > finish_) finish_ = other.finish_; |
265 | |
266 | // Just keep the messages from one thread |
267 | if (message_.empty()) message_ = other.message_; |
268 | } |
269 | |
270 | void Stop() { |
271 | finish_ = g_env->NowMicros(); |
272 | seconds_ = (finish_ - start_) * 1e-6; |
273 | } |
274 | |
275 | void AddMessage(Slice msg) { AppendWithSpace(&message_, msg); } |
276 | |
277 | void FinishedSingleOp() { |
278 | if (FLAGS_histogram) { |
279 | double now = g_env->NowMicros(); |
280 | double micros = now - last_op_finish_; |
281 | hist_.Add(micros); |
282 | if (micros > 20000) { |
283 | std::fprintf(stderr, "long op: %.1f micros%30s\r" , micros, "" ); |
284 | std::fflush(stderr); |
285 | } |
286 | last_op_finish_ = now; |
287 | } |
288 | |
289 | done_++; |
290 | if (done_ >= next_report_) { |
291 | if (next_report_ < 1000) |
292 | next_report_ += 100; |
293 | else if (next_report_ < 5000) |
294 | next_report_ += 500; |
295 | else if (next_report_ < 10000) |
296 | next_report_ += 1000; |
297 | else if (next_report_ < 50000) |
298 | next_report_ += 5000; |
299 | else if (next_report_ < 100000) |
300 | next_report_ += 10000; |
301 | else if (next_report_ < 500000) |
302 | next_report_ += 50000; |
303 | else |
304 | next_report_ += 100000; |
305 | std::fprintf(stderr, "... finished %d ops%30s\r" , done_, "" ); |
306 | std::fflush(stderr); |
307 | } |
308 | } |
309 | |
310 | void AddBytes(int64_t n) { bytes_ += n; } |
311 | |
312 | void Report(const Slice& name) { |
313 | // Pretend at least one op was done in case we are running a benchmark |
314 | // that does not call FinishedSingleOp(). |
315 | if (done_ < 1) done_ = 1; |
316 | |
317 | std::string ; |
318 | if (bytes_ > 0) { |
319 | // Rate is computed on actual elapsed time, not the sum of per-thread |
320 | // elapsed times. |
321 | double elapsed = (finish_ - start_) * 1e-6; |
322 | char rate[100]; |
323 | std::snprintf(rate, sizeof(rate), "%6.1f MB/s" , |
324 | (bytes_ / 1048576.0) / elapsed); |
325 | extra = rate; |
326 | } |
327 | AppendWithSpace(&extra, message_); |
328 | |
329 | std::fprintf(stdout, "%-12s : %11.3f micros/op;%s%s\n" , |
330 | name.ToString().c_str(), seconds_ * 1e6 / done_, |
331 | (extra.empty() ? "" : " " ), extra.c_str()); |
332 | if (FLAGS_histogram) { |
333 | std::fprintf(stdout, "Microseconds per op:\n%s\n" , |
334 | hist_.ToString().c_str()); |
335 | } |
336 | std::fflush(stdout); |
337 | } |
338 | }; |
339 | |
340 | // State shared by all concurrent executions of the same benchmark. |
341 | struct SharedState { |
342 | port::Mutex mu; |
343 | port::CondVar cv GUARDED_BY(mu); |
344 | int total GUARDED_BY(mu); |
345 | |
346 | // Each thread goes through the following states: |
347 | // (1) initializing |
348 | // (2) waiting for others to be initialized |
349 | // (3) running |
350 | // (4) done |
351 | |
352 | int num_initialized GUARDED_BY(mu); |
353 | int num_done GUARDED_BY(mu); |
354 | bool start GUARDED_BY(mu); |
355 | |
356 | SharedState(int total) |
357 | : cv(&mu), total(total), num_initialized(0), num_done(0), start(false) {} |
358 | }; |
359 | |
360 | // Per-thread state for concurrent executions of the same benchmark. |
361 | struct ThreadState { |
362 | int tid; // 0..n-1 when running in n threads |
363 | Random rand; // Has different seeds for different threads |
364 | Stats stats; |
365 | SharedState* shared; |
366 | |
367 | ThreadState(int index, int seed) : tid(index), rand(seed), shared(nullptr) {} |
368 | }; |
369 | |
370 | } // namespace |
371 | |
372 | class Benchmark { |
373 | private: |
374 | Cache* cache_; |
375 | const FilterPolicy* filter_policy_; |
376 | DB* db_; |
377 | int num_; |
378 | int value_size_; |
379 | int entries_per_batch_; |
380 | WriteOptions write_options_; |
381 | int reads_; |
382 | int heap_counter_; |
383 | CountComparator count_comparator_; |
384 | int total_thread_count_; |
385 | |
386 | void () { |
387 | const int kKeySize = 16 + FLAGS_key_prefix; |
388 | PrintEnvironment(); |
389 | std::fprintf(stdout, "Keys: %d bytes each\n" , kKeySize); |
390 | std::fprintf( |
391 | stdout, "Values: %d bytes each (%d bytes after compression)\n" , |
392 | FLAGS_value_size, |
393 | static_cast<int>(FLAGS_value_size * FLAGS_compression_ratio + 0.5)); |
394 | std::fprintf(stdout, "Entries: %d\n" , num_); |
395 | std::fprintf(stdout, "RawSize: %.1f MB (estimated)\n" , |
396 | ((static_cast<int64_t>(kKeySize + FLAGS_value_size) * num_) / |
397 | 1048576.0)); |
398 | std::fprintf( |
399 | stdout, "FileSize: %.1f MB (estimated)\n" , |
400 | (((kKeySize + FLAGS_value_size * FLAGS_compression_ratio) * num_) / |
401 | 1048576.0)); |
402 | PrintWarnings(); |
403 | std::fprintf(stdout, "------------------------------------------------\n" ); |
404 | } |
405 | |
406 | void PrintWarnings() { |
407 | #if defined(__GNUC__) && !defined(__OPTIMIZE__) |
408 | std::fprintf( |
409 | stdout, |
410 | "WARNING: Optimization is disabled: benchmarks unnecessarily slow\n" ); |
411 | #endif |
412 | #ifndef NDEBUG |
413 | std::fprintf( |
414 | stdout, |
415 | "WARNING: Assertions are enabled; benchmarks unnecessarily slow\n" ); |
416 | #endif |
417 | |
418 | // See if snappy is working by attempting to compress a compressible string |
419 | const char text[] = "yyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyyy" ; |
420 | std::string compressed; |
421 | if (!port::Snappy_Compress(text, sizeof(text), &compressed)) { |
422 | std::fprintf(stdout, "WARNING: Snappy compression is not enabled\n" ); |
423 | } else if (compressed.size() >= sizeof(text)) { |
424 | std::fprintf(stdout, "WARNING: Snappy compression is not effective\n" ); |
425 | } |
426 | } |
427 | |
428 | void PrintEnvironment() { |
429 | std::fprintf(stderr, "LevelDB: version %d.%d\n" , kMajorVersion, |
430 | kMinorVersion); |
431 | |
432 | #if defined(__linux) |
433 | time_t now = time(nullptr); |
434 | std::fprintf(stderr, "Date: %s" , |
435 | ctime(&now)); // ctime() adds newline |
436 | |
437 | FILE* cpuinfo = std::fopen("/proc/cpuinfo" , "r" ); |
438 | if (cpuinfo != nullptr) { |
439 | char line[1000]; |
440 | int num_cpus = 0; |
441 | std::string cpu_type; |
442 | std::string cache_size; |
443 | while (fgets(line, sizeof(line), cpuinfo) != nullptr) { |
444 | const char* sep = strchr(line, ':'); |
445 | if (sep == nullptr) { |
446 | continue; |
447 | } |
448 | Slice key = TrimSpace(Slice(line, sep - 1 - line)); |
449 | Slice val = TrimSpace(Slice(sep + 1)); |
450 | if (key == "model name" ) { |
451 | ++num_cpus; |
452 | cpu_type = val.ToString(); |
453 | } else if (key == "cache size" ) { |
454 | cache_size = val.ToString(); |
455 | } |
456 | } |
457 | std::fclose(cpuinfo); |
458 | std::fprintf(stderr, "CPU: %d * %s\n" , num_cpus, cpu_type.c_str()); |
459 | std::fprintf(stderr, "CPUCache: %s\n" , cache_size.c_str()); |
460 | } |
461 | #endif |
462 | } |
463 | |
464 | public: |
465 | Benchmark() |
466 | : cache_(FLAGS_cache_size >= 0 ? NewLRUCache(FLAGS_cache_size) : nullptr), |
467 | filter_policy_(FLAGS_bloom_bits >= 0 |
468 | ? NewBloomFilterPolicy(FLAGS_bloom_bits) |
469 | : nullptr), |
470 | db_(nullptr), |
471 | num_(FLAGS_num), |
472 | value_size_(FLAGS_value_size), |
473 | entries_per_batch_(1), |
474 | reads_(FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads), |
475 | heap_counter_(0), |
476 | count_comparator_(BytewiseComparator()), |
477 | total_thread_count_(0) { |
478 | std::vector<std::string> files; |
479 | g_env->GetChildren(FLAGS_db, &files); |
480 | for (size_t i = 0; i < files.size(); i++) { |
481 | if (Slice(files[i]).starts_with("heap-" )) { |
482 | g_env->RemoveFile(std::string(FLAGS_db) + "/" + files[i]); |
483 | } |
484 | } |
485 | if (!FLAGS_use_existing_db) { |
486 | DestroyDB(FLAGS_db, Options()); |
487 | } |
488 | } |
489 | |
490 | ~Benchmark() { |
491 | delete db_; |
492 | delete cache_; |
493 | delete filter_policy_; |
494 | } |
495 | |
496 | void Run() { |
497 | PrintHeader(); |
498 | Open(); |
499 | |
500 | const char* benchmarks = FLAGS_benchmarks; |
501 | while (benchmarks != nullptr) { |
502 | const char* sep = strchr(benchmarks, ','); |
503 | Slice name; |
504 | if (sep == nullptr) { |
505 | name = benchmarks; |
506 | benchmarks = nullptr; |
507 | } else { |
508 | name = Slice(benchmarks, sep - benchmarks); |
509 | benchmarks = sep + 1; |
510 | } |
511 | |
512 | // Reset parameters that may be overridden below |
513 | num_ = FLAGS_num; |
514 | reads_ = (FLAGS_reads < 0 ? FLAGS_num : FLAGS_reads); |
515 | value_size_ = FLAGS_value_size; |
516 | entries_per_batch_ = 1; |
517 | write_options_ = WriteOptions(); |
518 | |
519 | void (Benchmark::*method)(ThreadState*) = nullptr; |
520 | bool fresh_db = false; |
521 | int num_threads = FLAGS_threads; |
522 | |
523 | if (name == Slice("open" )) { |
524 | method = &Benchmark::OpenBench; |
525 | num_ /= 10000; |
526 | if (num_ < 1) num_ = 1; |
527 | } else if (name == Slice("fillseq" )) { |
528 | fresh_db = true; |
529 | method = &Benchmark::WriteSeq; |
530 | } else if (name == Slice("fillbatch" )) { |
531 | fresh_db = true; |
532 | entries_per_batch_ = 1000; |
533 | method = &Benchmark::WriteSeq; |
534 | } else if (name == Slice("fillrandom" )) { |
535 | fresh_db = true; |
536 | method = &Benchmark::WriteRandom; |
537 | } else if (name == Slice("overwrite" )) { |
538 | fresh_db = false; |
539 | method = &Benchmark::WriteRandom; |
540 | } else if (name == Slice("fillsync" )) { |
541 | fresh_db = true; |
542 | num_ /= 1000; |
543 | write_options_.sync = true; |
544 | method = &Benchmark::WriteRandom; |
545 | } else if (name == Slice("fill100K" )) { |
546 | fresh_db = true; |
547 | num_ /= 1000; |
548 | value_size_ = 100 * 1000; |
549 | method = &Benchmark::WriteRandom; |
550 | } else if (name == Slice("readseq" )) { |
551 | method = &Benchmark::ReadSequential; |
552 | } else if (name == Slice("readreverse" )) { |
553 | method = &Benchmark::ReadReverse; |
554 | } else if (name == Slice("readrandom" )) { |
555 | method = &Benchmark::ReadRandom; |
556 | } else if (name == Slice("readmissing" )) { |
557 | method = &Benchmark::ReadMissing; |
558 | } else if (name == Slice("seekrandom" )) { |
559 | method = &Benchmark::SeekRandom; |
560 | } else if (name == Slice("seekordered" )) { |
561 | method = &Benchmark::SeekOrdered; |
562 | } else if (name == Slice("readhot" )) { |
563 | method = &Benchmark::ReadHot; |
564 | } else if (name == Slice("readrandomsmall" )) { |
565 | reads_ /= 1000; |
566 | method = &Benchmark::ReadRandom; |
567 | } else if (name == Slice("deleteseq" )) { |
568 | method = &Benchmark::DeleteSeq; |
569 | } else if (name == Slice("deleterandom" )) { |
570 | method = &Benchmark::DeleteRandom; |
571 | } else if (name == Slice("readwhilewriting" )) { |
572 | num_threads++; // Add extra thread for writing |
573 | method = &Benchmark::ReadWhileWriting; |
574 | } else if (name == Slice("compact" )) { |
575 | method = &Benchmark::Compact; |
576 | } else if (name == Slice("crc32c" )) { |
577 | method = &Benchmark::Crc32c; |
578 | } else if (name == Slice("snappycomp" )) { |
579 | method = &Benchmark::SnappyCompress; |
580 | } else if (name == Slice("snappyuncomp" )) { |
581 | method = &Benchmark::SnappyUncompress; |
582 | } else if (name == Slice("heapprofile" )) { |
583 | HeapProfile(); |
584 | } else if (name == Slice("stats" )) { |
585 | PrintStats("leveldb.stats" ); |
586 | } else if (name == Slice("sstables" )) { |
587 | PrintStats("leveldb.sstables" ); |
588 | } else { |
589 | if (!name.empty()) { // No error message for empty name |
590 | std::fprintf(stderr, "unknown benchmark '%s'\n" , |
591 | name.ToString().c_str()); |
592 | } |
593 | } |
594 | |
595 | if (fresh_db) { |
596 | if (FLAGS_use_existing_db) { |
597 | std::fprintf(stdout, "%-12s : skipped (--use_existing_db is true)\n" , |
598 | name.ToString().c_str()); |
599 | method = nullptr; |
600 | } else { |
601 | delete db_; |
602 | db_ = nullptr; |
603 | DestroyDB(FLAGS_db, Options()); |
604 | Open(); |
605 | } |
606 | } |
607 | |
608 | if (method != nullptr) { |
609 | RunBenchmark(num_threads, name, method); |
610 | } |
611 | } |
612 | } |
613 | |
614 | private: |
615 | struct ThreadArg { |
616 | Benchmark* bm; |
617 | SharedState* shared; |
618 | ThreadState* thread; |
619 | void (Benchmark::*method)(ThreadState*); |
620 | }; |
621 | |
622 | static void ThreadBody(void* v) { |
623 | ThreadArg* arg = reinterpret_cast<ThreadArg*>(v); |
624 | SharedState* shared = arg->shared; |
625 | ThreadState* thread = arg->thread; |
626 | { |
627 | MutexLock l(&shared->mu); |
628 | shared->num_initialized++; |
629 | if (shared->num_initialized >= shared->total) { |
630 | shared->cv.SignalAll(); |
631 | } |
632 | while (!shared->start) { |
633 | shared->cv.Wait(); |
634 | } |
635 | } |
636 | |
637 | thread->stats.Start(); |
638 | (arg->bm->*(arg->method))(thread); |
639 | thread->stats.Stop(); |
640 | |
641 | { |
642 | MutexLock l(&shared->mu); |
643 | shared->num_done++; |
644 | if (shared->num_done >= shared->total) { |
645 | shared->cv.SignalAll(); |
646 | } |
647 | } |
648 | } |
649 | |
650 | void RunBenchmark(int n, Slice name, |
651 | void (Benchmark::*method)(ThreadState*)) { |
652 | SharedState shared(n); |
653 | |
654 | ThreadArg* arg = new ThreadArg[n]; |
655 | for (int i = 0; i < n; i++) { |
656 | arg[i].bm = this; |
657 | arg[i].method = method; |
658 | arg[i].shared = &shared; |
659 | ++total_thread_count_; |
660 | // Seed the thread's random state deterministically based upon thread |
661 | // creation across all benchmarks. This ensures that the seeds are unique |
662 | // but reproducible when rerunning the same set of benchmarks. |
663 | arg[i].thread = new ThreadState(i, /*seed=*/1000 + total_thread_count_); |
664 | arg[i].thread->shared = &shared; |
665 | g_env->StartThread(ThreadBody, &arg[i]); |
666 | } |
667 | |
668 | shared.mu.Lock(); |
669 | while (shared.num_initialized < n) { |
670 | shared.cv.Wait(); |
671 | } |
672 | |
673 | shared.start = true; |
674 | shared.cv.SignalAll(); |
675 | while (shared.num_done < n) { |
676 | shared.cv.Wait(); |
677 | } |
678 | shared.mu.Unlock(); |
679 | |
680 | for (int i = 1; i < n; i++) { |
681 | arg[0].thread->stats.Merge(arg[i].thread->stats); |
682 | } |
683 | arg[0].thread->stats.Report(name); |
684 | if (FLAGS_comparisons) { |
685 | fprintf(stdout, "Comparisons: %zu\n" , count_comparator_.comparisons()); |
686 | count_comparator_.reset(); |
687 | fflush(stdout); |
688 | } |
689 | |
690 | for (int i = 0; i < n; i++) { |
691 | delete arg[i].thread; |
692 | } |
693 | delete[] arg; |
694 | } |
695 | |
696 | void Crc32c(ThreadState* thread) { |
697 | // Checksum about 500MB of data total |
698 | const int size = 4096; |
699 | const char* label = "(4K per op)" ; |
700 | std::string data(size, 'x'); |
701 | int64_t bytes = 0; |
702 | uint32_t crc = 0; |
703 | while (bytes < 500 * 1048576) { |
704 | crc = crc32c::Value(data.data(), size); |
705 | thread->stats.FinishedSingleOp(); |
706 | bytes += size; |
707 | } |
708 | // Print so result is not dead |
709 | std::fprintf(stderr, "... crc=0x%x\r" , static_cast<unsigned int>(crc)); |
710 | |
711 | thread->stats.AddBytes(bytes); |
712 | thread->stats.AddMessage(label); |
713 | } |
714 | |
715 | void SnappyCompress(ThreadState* thread) { |
716 | RandomGenerator gen; |
717 | Slice input = gen.Generate(Options().block_size); |
718 | int64_t bytes = 0; |
719 | int64_t produced = 0; |
720 | bool ok = true; |
721 | std::string compressed; |
722 | while (ok && bytes < 1024 * 1048576) { // Compress 1G |
723 | ok = port::Snappy_Compress(input.data(), input.size(), &compressed); |
724 | produced += compressed.size(); |
725 | bytes += input.size(); |
726 | thread->stats.FinishedSingleOp(); |
727 | } |
728 | |
729 | if (!ok) { |
730 | thread->stats.AddMessage("(snappy failure)" ); |
731 | } else { |
732 | char buf[100]; |
733 | std::snprintf(buf, sizeof(buf), "(output: %.1f%%)" , |
734 | (produced * 100.0) / bytes); |
735 | thread->stats.AddMessage(buf); |
736 | thread->stats.AddBytes(bytes); |
737 | } |
738 | } |
739 | |
740 | void SnappyUncompress(ThreadState* thread) { |
741 | RandomGenerator gen; |
742 | Slice input = gen.Generate(Options().block_size); |
743 | std::string compressed; |
744 | bool ok = port::Snappy_Compress(input.data(), input.size(), &compressed); |
745 | int64_t bytes = 0; |
746 | char* uncompressed = new char[input.size()]; |
747 | while (ok && bytes < 1024 * 1048576) { // Compress 1G |
748 | ok = port::Snappy_Uncompress(compressed.data(), compressed.size(), |
749 | uncompressed); |
750 | bytes += input.size(); |
751 | thread->stats.FinishedSingleOp(); |
752 | } |
753 | delete[] uncompressed; |
754 | |
755 | if (!ok) { |
756 | thread->stats.AddMessage("(snappy failure)" ); |
757 | } else { |
758 | thread->stats.AddBytes(bytes); |
759 | } |
760 | } |
761 | |
762 | void Open() { |
763 | assert(db_ == nullptr); |
764 | Options options; |
765 | options.env = g_env; |
766 | options.create_if_missing = !FLAGS_use_existing_db; |
767 | options.block_cache = cache_; |
768 | options.write_buffer_size = FLAGS_write_buffer_size; |
769 | options.max_file_size = FLAGS_max_file_size; |
770 | options.block_size = FLAGS_block_size; |
771 | if (FLAGS_comparisons) { |
772 | options.comparator = &count_comparator_; |
773 | } |
774 | options.max_open_files = FLAGS_open_files; |
775 | options.filter_policy = filter_policy_; |
776 | options.reuse_logs = FLAGS_reuse_logs; |
777 | options.compression = |
778 | FLAGS_compression ? kSnappyCompression : kNoCompression; |
779 | Status s = DB::Open(options, FLAGS_db, &db_); |
780 | if (!s.ok()) { |
781 | std::fprintf(stderr, "open error: %s\n" , s.ToString().c_str()); |
782 | std::exit(1); |
783 | } |
784 | } |
785 | |
786 | void OpenBench(ThreadState* thread) { |
787 | for (int i = 0; i < num_; i++) { |
788 | delete db_; |
789 | Open(); |
790 | thread->stats.FinishedSingleOp(); |
791 | } |
792 | } |
793 | |
794 | void WriteSeq(ThreadState* thread) { DoWrite(thread, true); } |
795 | |
796 | void WriteRandom(ThreadState* thread) { DoWrite(thread, false); } |
797 | |
798 | void DoWrite(ThreadState* thread, bool seq) { |
799 | if (num_ != FLAGS_num) { |
800 | char msg[100]; |
801 | std::snprintf(msg, sizeof(msg), "(%d ops)" , num_); |
802 | thread->stats.AddMessage(msg); |
803 | } |
804 | |
805 | RandomGenerator gen; |
806 | WriteBatch batch; |
807 | Status s; |
808 | int64_t bytes = 0; |
809 | KeyBuffer key; |
810 | for (int i = 0; i < num_; i += entries_per_batch_) { |
811 | batch.Clear(); |
812 | for (int j = 0; j < entries_per_batch_; j++) { |
813 | const int k = seq ? i + j : thread->rand.Uniform(FLAGS_num); |
814 | key.Set(k); |
815 | batch.Put(key.slice(), gen.Generate(value_size_)); |
816 | bytes += value_size_ + key.slice().size(); |
817 | thread->stats.FinishedSingleOp(); |
818 | } |
819 | s = db_->Write(write_options_, &batch); |
820 | if (!s.ok()) { |
821 | std::fprintf(stderr, "put error: %s\n" , s.ToString().c_str()); |
822 | std::exit(1); |
823 | } |
824 | } |
825 | thread->stats.AddBytes(bytes); |
826 | } |
827 | |
828 | void ReadSequential(ThreadState* thread) { |
829 | Iterator* iter = db_->NewIterator(ReadOptions()); |
830 | int i = 0; |
831 | int64_t bytes = 0; |
832 | for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { |
833 | bytes += iter->key().size() + iter->value().size(); |
834 | thread->stats.FinishedSingleOp(); |
835 | ++i; |
836 | } |
837 | delete iter; |
838 | thread->stats.AddBytes(bytes); |
839 | } |
840 | |
841 | void ReadReverse(ThreadState* thread) { |
842 | Iterator* iter = db_->NewIterator(ReadOptions()); |
843 | int i = 0; |
844 | int64_t bytes = 0; |
845 | for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { |
846 | bytes += iter->key().size() + iter->value().size(); |
847 | thread->stats.FinishedSingleOp(); |
848 | ++i; |
849 | } |
850 | delete iter; |
851 | thread->stats.AddBytes(bytes); |
852 | } |
853 | |
854 | void ReadRandom(ThreadState* thread) { |
855 | ReadOptions options; |
856 | std::string value; |
857 | int found = 0; |
858 | KeyBuffer key; |
859 | for (int i = 0; i < reads_; i++) { |
860 | const int k = thread->rand.Uniform(FLAGS_num); |
861 | key.Set(k); |
862 | if (db_->Get(options, key.slice(), &value).ok()) { |
863 | found++; |
864 | } |
865 | thread->stats.FinishedSingleOp(); |
866 | } |
867 | char msg[100]; |
868 | std::snprintf(msg, sizeof(msg), "(%d of %d found)" , found, num_); |
869 | thread->stats.AddMessage(msg); |
870 | } |
871 | |
872 | void ReadMissing(ThreadState* thread) { |
873 | ReadOptions options; |
874 | std::string value; |
875 | KeyBuffer key; |
876 | for (int i = 0; i < reads_; i++) { |
877 | const int k = thread->rand.Uniform(FLAGS_num); |
878 | key.Set(k); |
879 | Slice s = Slice(key.slice().data(), key.slice().size() - 1); |
880 | db_->Get(options, s, &value); |
881 | thread->stats.FinishedSingleOp(); |
882 | } |
883 | } |
884 | |
885 | void ReadHot(ThreadState* thread) { |
886 | ReadOptions options; |
887 | std::string value; |
888 | const int range = (FLAGS_num + 99) / 100; |
889 | KeyBuffer key; |
890 | for (int i = 0; i < reads_; i++) { |
891 | const int k = thread->rand.Uniform(range); |
892 | key.Set(k); |
893 | db_->Get(options, key.slice(), &value); |
894 | thread->stats.FinishedSingleOp(); |
895 | } |
896 | } |
897 | |
898 | void SeekRandom(ThreadState* thread) { |
899 | ReadOptions options; |
900 | int found = 0; |
901 | KeyBuffer key; |
902 | for (int i = 0; i < reads_; i++) { |
903 | Iterator* iter = db_->NewIterator(options); |
904 | const int k = thread->rand.Uniform(FLAGS_num); |
905 | key.Set(k); |
906 | iter->Seek(key.slice()); |
907 | if (iter->Valid() && iter->key() == key.slice()) found++; |
908 | delete iter; |
909 | thread->stats.FinishedSingleOp(); |
910 | } |
911 | char msg[100]; |
912 | snprintf(msg, sizeof(msg), "(%d of %d found)" , found, num_); |
913 | thread->stats.AddMessage(msg); |
914 | } |
915 | |
916 | void SeekOrdered(ThreadState* thread) { |
917 | ReadOptions options; |
918 | Iterator* iter = db_->NewIterator(options); |
919 | int found = 0; |
920 | int k = 0; |
921 | KeyBuffer key; |
922 | for (int i = 0; i < reads_; i++) { |
923 | k = (k + (thread->rand.Uniform(100))) % FLAGS_num; |
924 | key.Set(k); |
925 | iter->Seek(key.slice()); |
926 | if (iter->Valid() && iter->key() == key.slice()) found++; |
927 | thread->stats.FinishedSingleOp(); |
928 | } |
929 | delete iter; |
930 | char msg[100]; |
931 | std::snprintf(msg, sizeof(msg), "(%d of %d found)" , found, num_); |
932 | thread->stats.AddMessage(msg); |
933 | } |
934 | |
935 | void DoDelete(ThreadState* thread, bool seq) { |
936 | RandomGenerator gen; |
937 | WriteBatch batch; |
938 | Status s; |
939 | KeyBuffer key; |
940 | for (int i = 0; i < num_; i += entries_per_batch_) { |
941 | batch.Clear(); |
942 | for (int j = 0; j < entries_per_batch_; j++) { |
943 | const int k = seq ? i + j : (thread->rand.Uniform(FLAGS_num)); |
944 | key.Set(k); |
945 | batch.Delete(key.slice()); |
946 | thread->stats.FinishedSingleOp(); |
947 | } |
948 | s = db_->Write(write_options_, &batch); |
949 | if (!s.ok()) { |
950 | std::fprintf(stderr, "del error: %s\n" , s.ToString().c_str()); |
951 | std::exit(1); |
952 | } |
953 | } |
954 | } |
955 | |
956 | void DeleteSeq(ThreadState* thread) { DoDelete(thread, true); } |
957 | |
958 | void DeleteRandom(ThreadState* thread) { DoDelete(thread, false); } |
959 | |
960 | void ReadWhileWriting(ThreadState* thread) { |
961 | if (thread->tid > 0) { |
962 | ReadRandom(thread); |
963 | } else { |
964 | // Special thread that keeps writing until other threads are done. |
965 | RandomGenerator gen; |
966 | KeyBuffer key; |
967 | while (true) { |
968 | { |
969 | MutexLock l(&thread->shared->mu); |
970 | if (thread->shared->num_done + 1 >= thread->shared->num_initialized) { |
971 | // Other threads have finished |
972 | break; |
973 | } |
974 | } |
975 | |
976 | const int k = thread->rand.Uniform(FLAGS_num); |
977 | key.Set(k); |
978 | Status s = |
979 | db_->Put(write_options_, key.slice(), gen.Generate(value_size_)); |
980 | if (!s.ok()) { |
981 | std::fprintf(stderr, "put error: %s\n" , s.ToString().c_str()); |
982 | std::exit(1); |
983 | } |
984 | } |
985 | |
986 | // Do not count any of the preceding work/delay in stats. |
987 | thread->stats.Start(); |
988 | } |
989 | } |
990 | |
991 | void Compact(ThreadState* thread) { db_->CompactRange(nullptr, nullptr); } |
992 | |
993 | void PrintStats(const char* key) { |
994 | std::string stats; |
995 | if (!db_->GetProperty(key, &stats)) { |
996 | stats = "(failed)" ; |
997 | } |
998 | std::fprintf(stdout, "\n%s\n" , stats.c_str()); |
999 | } |
1000 | |
1001 | static void WriteToFile(void* arg, const char* buf, int n) { |
1002 | reinterpret_cast<WritableFile*>(arg)->Append(Slice(buf, n)); |
1003 | } |
1004 | |
1005 | void HeapProfile() { |
1006 | char fname[100]; |
1007 | std::snprintf(fname, sizeof(fname), "%s/heap-%04d" , FLAGS_db, |
1008 | ++heap_counter_); |
1009 | WritableFile* file; |
1010 | Status s = g_env->NewWritableFile(fname, &file); |
1011 | if (!s.ok()) { |
1012 | std::fprintf(stderr, "%s\n" , s.ToString().c_str()); |
1013 | return; |
1014 | } |
1015 | bool ok = port::GetHeapProfile(WriteToFile, file); |
1016 | delete file; |
1017 | if (!ok) { |
1018 | std::fprintf(stderr, "heap profiling not supported\n" ); |
1019 | g_env->RemoveFile(fname); |
1020 | } |
1021 | } |
1022 | }; |
1023 | |
1024 | } // namespace leveldb |
1025 | |
1026 | int main(int argc, char** argv) { |
1027 | FLAGS_write_buffer_size = leveldb::Options().write_buffer_size; |
1028 | FLAGS_max_file_size = leveldb::Options().max_file_size; |
1029 | FLAGS_block_size = leveldb::Options().block_size; |
1030 | FLAGS_open_files = leveldb::Options().max_open_files; |
1031 | std::string default_db_path; |
1032 | |
1033 | for (int i = 1; i < argc; i++) { |
1034 | double d; |
1035 | int n; |
1036 | char junk; |
1037 | if (leveldb::Slice(argv[i]).starts_with("--benchmarks=" )) { |
1038 | FLAGS_benchmarks = argv[i] + strlen("--benchmarks=" ); |
1039 | } else if (sscanf(argv[i], "--compression_ratio=%lf%c" , &d, &junk) == 1) { |
1040 | FLAGS_compression_ratio = d; |
1041 | } else if (sscanf(argv[i], "--histogram=%d%c" , &n, &junk) == 1 && |
1042 | (n == 0 || n == 1)) { |
1043 | FLAGS_histogram = n; |
1044 | } else if (sscanf(argv[i], "--comparisons=%d%c" , &n, &junk) == 1 && |
1045 | (n == 0 || n == 1)) { |
1046 | FLAGS_comparisons = n; |
1047 | } else if (sscanf(argv[i], "--use_existing_db=%d%c" , &n, &junk) == 1 && |
1048 | (n == 0 || n == 1)) { |
1049 | FLAGS_use_existing_db = n; |
1050 | } else if (sscanf(argv[i], "--reuse_logs=%d%c" , &n, &junk) == 1 && |
1051 | (n == 0 || n == 1)) { |
1052 | FLAGS_reuse_logs = n; |
1053 | } else if (sscanf(argv[i], "--compression=%d%c" , &n, &junk) == 1 && |
1054 | (n == 0 || n == 1)) { |
1055 | FLAGS_compression = n; |
1056 | } else if (sscanf(argv[i], "--num=%d%c" , &n, &junk) == 1) { |
1057 | FLAGS_num = n; |
1058 | } else if (sscanf(argv[i], "--reads=%d%c" , &n, &junk) == 1) { |
1059 | FLAGS_reads = n; |
1060 | } else if (sscanf(argv[i], "--threads=%d%c" , &n, &junk) == 1) { |
1061 | FLAGS_threads = n; |
1062 | } else if (sscanf(argv[i], "--value_size=%d%c" , &n, &junk) == 1) { |
1063 | FLAGS_value_size = n; |
1064 | } else if (sscanf(argv[i], "--write_buffer_size=%d%c" , &n, &junk) == 1) { |
1065 | FLAGS_write_buffer_size = n; |
1066 | } else if (sscanf(argv[i], "--max_file_size=%d%c" , &n, &junk) == 1) { |
1067 | FLAGS_max_file_size = n; |
1068 | } else if (sscanf(argv[i], "--block_size=%d%c" , &n, &junk) == 1) { |
1069 | FLAGS_block_size = n; |
1070 | } else if (sscanf(argv[i], "--key_prefix=%d%c" , &n, &junk) == 1) { |
1071 | FLAGS_key_prefix = n; |
1072 | } else if (sscanf(argv[i], "--cache_size=%d%c" , &n, &junk) == 1) { |
1073 | FLAGS_cache_size = n; |
1074 | } else if (sscanf(argv[i], "--bloom_bits=%d%c" , &n, &junk) == 1) { |
1075 | FLAGS_bloom_bits = n; |
1076 | } else if (sscanf(argv[i], "--open_files=%d%c" , &n, &junk) == 1) { |
1077 | FLAGS_open_files = n; |
1078 | } else if (strncmp(argv[i], "--db=" , 5) == 0) { |
1079 | FLAGS_db = argv[i] + 5; |
1080 | } else { |
1081 | std::fprintf(stderr, "Invalid flag '%s'\n" , argv[i]); |
1082 | std::exit(1); |
1083 | } |
1084 | } |
1085 | |
1086 | leveldb::g_env = leveldb::Env::Default(); |
1087 | |
1088 | // Choose a location for the test database if none given with --db=<path> |
1089 | if (FLAGS_db == nullptr) { |
1090 | leveldb::g_env->GetTestDirectory(&default_db_path); |
1091 | default_db_path += "/dbbench" ; |
1092 | FLAGS_db = default_db_path.c_str(); |
1093 | } |
1094 | |
1095 | leveldb::Benchmark benchmark; |
1096 | benchmark.Run(); |
1097 | return 0; |
1098 | } |
1099 | |