1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | // Date: 2015/09/15 10:44:17 |
19 | |
20 | #ifndef BVAR_DETAIL_PERCENTILE_H |
21 | #define BVAR_DETAIL_PERCENTILE_H |
22 | |
23 | #include <string.h> // memset memcmp |
24 | #include <stdint.h> // uint32_t |
25 | #include <limits> // std::numeric_limits |
26 | #include <ostream> // std::ostream |
27 | #include <algorithm> // std::sort |
28 | #include <math.h> // ceil |
29 | #include "butil/macros.h" // ARRAY_SIZE |
30 | #include "bvar/reducer.h" // Reducer |
31 | #include "bvar/window.h" // Window |
32 | #include "bvar/detail/combiner.h" // AgentCombiner |
33 | #include "bvar/detail/sampler.h" // ReducerSampler |
34 | #include "butil/fast_rand.h" |
35 | |
36 | namespace bvar { |
37 | namespace detail { |
38 | |
39 | // Round of expectation of a rational number |a/b| to a natural number. |
40 | inline unsigned long round_of_expectation(unsigned long a, unsigned long b) { |
41 | if (BAIDU_UNLIKELY(b == 0)) { |
42 | return 0; |
43 | } |
44 | return a / b + (butil::fast_rand_less_than(b) < a % b); |
45 | } |
46 | |
47 | // Storing latencies inside a interval. |
48 | template <size_t SAMPLE_SIZE> |
49 | class PercentileInterval { |
50 | public: |
51 | PercentileInterval() |
52 | : _num_added(0) |
53 | , _sorted(false) |
54 | , _num_samples(0) { |
55 | } |
56 | |
57 | // Get index-th sample in ascending order. |
58 | uint32_t get_sample_at(size_t index) { |
59 | const size_t saved_num = _num_samples; |
60 | if (index >= saved_num) { |
61 | if (saved_num == 0) { |
62 | return 0; |
63 | } |
64 | index = saved_num - 1; |
65 | } |
66 | if (!_sorted) { |
67 | std::sort(_samples, _samples + saved_num); |
68 | _sorted = true; |
69 | } |
70 | CHECK_EQ(saved_num, _num_samples) << "You must call get_number() on" |
71 | " a unchanging PercentileInterval" ; |
72 | return _samples[index]; |
73 | } |
74 | |
75 | // Add samples of another interval. This function tries to make each |
76 | // sample in merged _samples has (approximately) equal probability to |
77 | // remain. |
78 | // This method is invoked when merging ThreadLocalPercentileSamples in to |
79 | // GlobalPercentileSamples |
80 | template <size_t size2> |
81 | void merge(const PercentileInterval<size2> &rhs) { |
82 | if (rhs._num_added == 0) { |
83 | return; |
84 | } |
85 | BAIDU_CASSERT(SAMPLE_SIZE >= size2, |
86 | must_merge_small_interval_into_larger_one_currently); |
87 | CHECK_EQ(rhs._num_samples, rhs._num_added); |
88 | // Assume that the probability of each sample in |this| is a0/b0 and |
89 | // the probability of each sample in |rhs| is a1/b1. |
90 | // We are going to randomly pick some samples from |this| and |rhs| to |
91 | // satisfy the constraint that each sample stands for the probability |
92 | // of |
93 | // * 1 (SAMPLE_SIZE >= |b0 + b1|), which indicates that no sample |
94 | // has been dropped |
95 | // * SAMPLE_SIZE / |b0 + b1| (SAMPLE_SIZE < |b0 + b1|) |
96 | // So we should keep |b0*SAMPLE_SIZE/(b0+b1)| from |this| |
97 | // |b1*SAMPLE_SIZE/(b0+b1)| from |rhs|. |
98 | if (_num_added + rhs._num_added <= SAMPLE_SIZE) { |
99 | // No sample should be dropped |
100 | CHECK_EQ(_num_samples, _num_added) |
101 | << "_num_added=" << _num_added |
102 | << " rhs._num_added" << rhs._num_added |
103 | << " _num_samples=" << _num_samples |
104 | << " rhs._num_samples=" << rhs._num_samples |
105 | << " SAMPLE_SIZE=" << SAMPLE_SIZE |
106 | << " size2=" << size2; |
107 | memcpy(_samples + _num_samples, rhs._samples, |
108 | sizeof(_samples[0]) * rhs._num_samples); |
109 | _num_samples += rhs._num_samples; |
110 | } else { |
111 | // |num_remain| must be less than _num_samples: |
112 | // if _num_added = _num_samples: |
113 | // SAMPLE_SIZE / (_num_added + rhs._num_added) < 1 so that |
114 | // num_remain < _num_added = _num_samples |
115 | // otherwise: |
116 | // _num_samples = SAMPLE_SIZE; |
117 | // _num_added / (_num_added + rhs._num_added) < 1 so that |
118 | // num_remain < SAMPLE_SIZE = _num_added |
119 | size_t num_remain = round_of_expectation( |
120 | _num_added * SAMPLE_SIZE, _num_added + rhs._num_added); |
121 | CHECK_LE(num_remain, _num_samples); |
122 | // Randomly drop samples of this |
123 | for (size_t i = _num_samples; i > num_remain; --i) { |
124 | _samples[butil::fast_rand_less_than(i)] = _samples[i - 1]; |
125 | } |
126 | const size_t num_remain_from_rhs = SAMPLE_SIZE - num_remain; |
127 | CHECK_LE(num_remain_from_rhs, rhs._num_samples); |
128 | // Have to copy data from rhs to shuffle since it's const |
129 | DEFINE_SMALL_ARRAY(uint32_t, tmp, rhs._num_samples, 64); |
130 | memcpy(tmp, rhs._samples, sizeof(uint32_t) * rhs._num_samples); |
131 | for (size_t i = 0; i < num_remain_from_rhs; ++i) { |
132 | const int index = butil::fast_rand_less_than(rhs._num_samples - i); |
133 | _samples[num_remain++] = tmp[index]; |
134 | tmp[index] = tmp[rhs._num_samples - i - 1]; |
135 | } |
136 | _num_samples = num_remain; |
137 | CHECK_EQ(_num_samples, SAMPLE_SIZE); |
138 | } |
139 | _num_added += rhs._num_added; |
140 | } |
141 | |
142 | // Randomly pick n samples from mutable_rhs to |this| |
143 | template <size_t size2> |
144 | void merge_with_expectation(PercentileInterval<size2>& mutable_rhs, size_t n) { |
145 | CHECK(n <= mutable_rhs._num_samples); |
146 | _num_added += mutable_rhs._num_added; |
147 | if (_num_samples + n <= SAMPLE_SIZE && n == mutable_rhs._num_samples) { |
148 | memcpy(_samples + _num_samples, mutable_rhs._samples, sizeof(_samples[0]) * n); |
149 | _num_samples += n; |
150 | return; |
151 | } |
152 | for (size_t i = 0; i < n; ++i) { |
153 | size_t index = butil::fast_rand_less_than(mutable_rhs._num_samples - i); |
154 | if (_num_samples < SAMPLE_SIZE) { |
155 | _samples[_num_samples++] = mutable_rhs._samples[index]; |
156 | } else { |
157 | _samples[butil::fast_rand_less_than(_num_samples)] |
158 | = mutable_rhs._samples[index]; |
159 | } |
160 | std::swap(mutable_rhs._samples[index], |
161 | mutable_rhs._samples[mutable_rhs._num_samples - i - 1]); |
162 | } |
163 | } |
164 | |
165 | // Add an unsigned 32-bit latency (what percentile actually accepts) to a |
166 | // non-full interval, which is invoked by Percentile::operator<< to add a |
167 | // sample into the ThreadLocalPercentileSamples. |
168 | // Returns true if the input was stored. |
169 | bool add32(uint32_t x) { |
170 | if (BAIDU_UNLIKELY(_num_samples >= SAMPLE_SIZE)) { |
171 | LOG(ERROR) << "This interval was full" ; |
172 | return false; |
173 | } |
174 | ++_num_added; |
175 | _samples[_num_samples++] = x; |
176 | return true; |
177 | } |
178 | |
179 | // Add a signed latency. |
180 | bool add64(int64_t x) { |
181 | if (x >= 0) { |
182 | return add32((uint32_t)x); |
183 | } |
184 | return false; |
185 | } |
186 | |
187 | // Remove all samples inside. |
188 | void clear() { |
189 | _num_added = 0; |
190 | _sorted = false; |
191 | _num_samples = 0; |
192 | } |
193 | |
194 | // True if no more room for new samples. |
195 | bool full() const { return _num_samples == SAMPLE_SIZE; } |
196 | |
197 | // True if there's no samples. |
198 | bool empty() const { return !_num_samples; } |
199 | |
200 | // #samples ever added by calling add*() |
201 | uint32_t added_count() const { return _num_added; } |
202 | |
203 | // #samples stored. |
204 | uint32_t sample_count() const { return _num_samples; } |
205 | |
206 | // For debuggin. |
207 | void describe(std::ostream &os) const { |
208 | os << "(num_added=" << added_count() << ")[" ; |
209 | for (size_t j = 0; j < _num_samples; ++j) { |
210 | os << ' ' << _samples[j]; |
211 | } |
212 | os << " ]" ; |
213 | } |
214 | |
215 | // True if two PercentileInterval are exactly same, namely same # of added and |
216 | // same samples, mainly for debuggin. |
217 | bool operator==(const PercentileInterval& rhs) const { |
218 | return (_num_added == rhs._num_added && |
219 | _num_samples == rhs._num_samples && |
220 | memcmp(_samples, rhs._samples, _num_samples * sizeof(uint32_t)) == 0); |
221 | } |
222 | |
223 | private: |
224 | template <size_t size2> friend class PercentileInterval; |
225 | BAIDU_CASSERT(SAMPLE_SIZE <= 65536, SAMPLE_SIZE_must_be_16bit); |
226 | |
227 | uint32_t _num_added; |
228 | bool _sorted; |
229 | uint16_t _num_samples; |
230 | uint32_t _samples[SAMPLE_SIZE]; |
231 | }; |
232 | |
233 | static const size_t NUM_INTERVALS = 32; |
234 | |
235 | // This declartion is a must for gcc 3.4 |
236 | class AddLatency; |
237 | |
238 | // Group of PercentileIntervals. |
239 | template <size_t SAMPLE_SIZE_IN> |
240 | class PercentileSamples { |
241 | public: |
242 | friend class AddLatency; |
243 | |
244 | static const size_t SAMPLE_SIZE = SAMPLE_SIZE_IN; |
245 | |
246 | PercentileSamples() { |
247 | memset(this, 0, sizeof(*this)); |
248 | } |
249 | |
250 | ~PercentileSamples() { |
251 | for (size_t i = 0; i < NUM_INTERVALS; ++i) { |
252 | if (_intervals[i]) { |
253 | delete _intervals[i]; |
254 | } |
255 | } |
256 | } |
257 | |
258 | // Copy-construct from another PercentileSamples. |
259 | // Copy/assigning happen at per-second scale. should be OK. |
260 | PercentileSamples(const PercentileSamples& rhs) { |
261 | _num_added = rhs._num_added; |
262 | for (size_t i = 0; i < NUM_INTERVALS; ++i) { |
263 | if (rhs._intervals[i] && !rhs._intervals[i]->empty()) { |
264 | _intervals[i] = new PercentileInterval<SAMPLE_SIZE>(*rhs._intervals[i]); |
265 | } else { |
266 | _intervals[i] = NULL; |
267 | } |
268 | } |
269 | } |
270 | |
271 | // Assign from another PercentileSamples. |
272 | // Notice that we keep empty _intervals to avoid future allocations. |
273 | void operator=(const PercentileSamples& rhs) { |
274 | _num_added = rhs._num_added; |
275 | for (size_t i = 0; i < NUM_INTERVALS; ++i) { |
276 | if (rhs._intervals[i] && !rhs._intervals[i]->empty()) { |
277 | get_interval_at(i) = *rhs._intervals[i]; |
278 | } else if (_intervals[i]) { |
279 | _intervals[i]->clear(); |
280 | } |
281 | } |
282 | } |
283 | |
284 | // Get the `ratio'-ile value. E.g. 0.99 means 99%-ile value. |
285 | // Since we store samples in different intervals internally. We first |
286 | // address the interval by multiplying ratio with _num_added, then |
287 | // find the sample inside the interval. We've tested an alternative |
288 | // method that store all samples together w/o any intervals (or in another |
289 | // word, only one interval), the method is much simpler but is not as |
290 | // stable as current impl. CDF plotted by the method changes dramatically |
291 | // from seconds to seconds. It seems that separating intervals probably |
292 | // keep more long-tail values. |
293 | uint32_t get_number(double ratio) { |
294 | size_t n = (size_t)ceil(ratio * _num_added); |
295 | if (n > _num_added) { |
296 | n = _num_added; |
297 | } else if (n == 0) { |
298 | return 0; |
299 | } |
300 | for (size_t i = 0; i < NUM_INTERVALS; ++i) { |
301 | if (_intervals[i] == NULL) { |
302 | continue; |
303 | } |
304 | PercentileInterval<SAMPLE_SIZE>& invl = *_intervals[i]; |
305 | if (n <= invl.added_count()) { |
306 | size_t sample_n = n * invl.sample_count() / invl.added_count(); |
307 | size_t sample_index = (sample_n ? sample_n - 1 : 0); |
308 | return invl.get_sample_at(sample_index); |
309 | } |
310 | n -= invl.added_count(); |
311 | } |
312 | CHECK(false) << "Can't reach here" ; |
313 | return std::numeric_limits<uint32_t>::max(); |
314 | } |
315 | |
316 | // Add samples in another PercentileSamples. |
317 | template <size_t size2> |
318 | void merge(const PercentileSamples<size2> &rhs) { |
319 | _num_added += rhs._num_added; |
320 | for (size_t i = 0; i < NUM_INTERVALS; ++i) { |
321 | if (rhs._intervals[i] && !rhs._intervals[i]->empty()) { |
322 | get_interval_at(i).merge(*rhs._intervals[i]); |
323 | } |
324 | } |
325 | } |
326 | |
327 | // Combine multiple into a single PercentileSamples |
328 | template <typename Iterator> |
329 | void combine_of(const Iterator& begin, const Iterator& end) { |
330 | if (_num_added) { |
331 | // Very unlikely |
332 | for (size_t i = 0; i < NUM_INTERVALS; ++i) { |
333 | if (_intervals[i]) { |
334 | _intervals[i]->clear(); |
335 | } |
336 | } |
337 | _num_added = 0; |
338 | } |
339 | |
340 | for (Iterator iter = begin; iter != end; ++iter) { |
341 | _num_added += iter->_num_added; |
342 | } |
343 | |
344 | // Calculate probabilities for each interval |
345 | for (size_t i = 0; i < NUM_INTERVALS; ++i) { |
346 | size_t total = 0; |
347 | size_t total_sample=0; |
348 | for (Iterator iter = begin; iter != end; ++iter) { |
349 | if (iter->_intervals[i]) { |
350 | total += iter->_intervals[i]->added_count(); |
351 | total_sample += iter->_intervals[i]->sample_count(); |
352 | } |
353 | } |
354 | if (total == 0) { |
355 | // Empty interval |
356 | continue; |
357 | } |
358 | |
359 | |
360 | // Consider that sub interval took |a| samples out of |b| totally, |
361 | // each sample won the probability of |a/b| according to the |
362 | // algorithm of add32(), now we should pick some samples into the |
363 | // combined interval that satisfied each sample has the |
364 | // probability of |SAMPLE_SIZE/total|, so each sample has the |
365 | // probability of |(SAMPLE_SIZE*b)/(a*total) to remain and the |
366 | // expected number of samples in this interval is |
367 | // |(SAMPLE_SIZE*b)/total| |
368 | for (Iterator iter = begin; iter != end; ++iter) { |
369 | if (!iter->_intervals[i] || iter->_intervals[i]->empty()) { |
370 | continue; |
371 | } |
372 | typename butil::add_reference<BAIDU_TYPEOF(*(iter->_intervals[i]))>::type |
373 | invl = *(iter->_intervals[i]); |
374 | if (total <= SAMPLE_SIZE) { |
375 | get_interval_at(i).merge_with_expectation( |
376 | invl, invl.sample_count()); |
377 | continue; |
378 | } |
379 | // Each |
380 | const size_t b = invl.added_count(); |
381 | const size_t remain = std::min( |
382 | round_of_expectation(b * SAMPLE_SIZE, total), |
383 | (size_t)invl.sample_count()); |
384 | get_interval_at(i).merge_with_expectation(invl, remain); |
385 | } |
386 | } |
387 | } |
388 | |
389 | // For debuggin. |
390 | void describe(std::ostream &os) const { |
391 | os << this << "{num_added=" << _num_added; |
392 | for (size_t i = 0; i < NUM_INTERVALS; ++i) { |
393 | if (_intervals[i] && !_intervals[i]->empty()) { |
394 | os << " interval[" << i << "]=" ; |
395 | _intervals[i]->describe(os); |
396 | } |
397 | } |
398 | os << '}'; |
399 | } |
400 | |
401 | // True if intervals of two PercentileSamples are exactly same. |
402 | bool operator==(const PercentileSamples& rhs) const { |
403 | for (size_t i = 0; i < NUM_INTERVALS; ++i) { |
404 | if (_intervals != rhs._intervals[i]) { |
405 | return false; |
406 | } |
407 | } |
408 | return true; |
409 | } |
410 | |
411 | private: |
412 | template <size_t size1> friend class PercentileSamples; |
413 | |
414 | // Get/create interval on-demand. |
415 | PercentileInterval<SAMPLE_SIZE>& get_interval_at(size_t index) { |
416 | if (_intervals[index] == NULL) { |
417 | _intervals[index] = new PercentileInterval<SAMPLE_SIZE>; |
418 | } |
419 | return *_intervals[index]; |
420 | } |
421 | // sum of _num_added of all intervals. we update this value after any |
422 | // changes to intervals inside to make it O(1)-time accessible. |
423 | size_t _num_added; |
424 | PercentileInterval<SAMPLE_SIZE>* _intervals[NUM_INTERVALS]; |
425 | }; |
426 | |
427 | template <size_t sz> const size_t PercentileSamples<sz>::SAMPLE_SIZE; |
428 | |
429 | template <size_t size> |
430 | std::ostream &operator<<(std::ostream &os, const PercentileInterval<size> &p) { |
431 | p.describe(os); |
432 | return os; |
433 | } |
434 | |
435 | template <size_t size> |
436 | std::ostream &operator<<(std::ostream &os, const PercentileSamples<size> &p) { |
437 | p.describe(os); |
438 | return os; |
439 | } |
440 | |
441 | // NOTE: we intentionally minus 2 uint32_t from sample-size to make the struct |
442 | // size be power of 2 and more friendly to memory allocators. |
443 | typedef PercentileSamples<254> GlobalPercentileSamples; |
444 | typedef PercentileSamples<30> ThreadLocalPercentileSamples; |
445 | |
446 | // A specialized reducer for finding the percentile of latencies. |
447 | // NOTE: DON'T use it directly, use LatencyRecorder instead. |
448 | class Percentile { |
449 | public: |
450 | struct AddPercentileSamples { |
451 | template <size_t size1, size_t size2> |
452 | void operator()(PercentileSamples<size1> &b1, |
453 | const PercentileSamples<size2> &b2) const { |
454 | b1.merge(b2); |
455 | } |
456 | }; |
457 | |
458 | typedef GlobalPercentileSamples value_type; |
459 | typedef ReducerSampler<Percentile, |
460 | GlobalPercentileSamples, |
461 | AddPercentileSamples, VoidOp> sampler_type; |
462 | typedef AgentCombiner <GlobalPercentileSamples, |
463 | ThreadLocalPercentileSamples, |
464 | AddPercentileSamples> combiner_type; |
465 | typedef combiner_type::Agent agent_type; |
466 | Percentile(); |
467 | ~Percentile(); |
468 | |
469 | AddPercentileSamples op() const { return AddPercentileSamples(); } |
470 | VoidOp inv_op() const { return VoidOp(); } |
471 | |
472 | // The sampler for windows over percentile. |
473 | sampler_type* get_sampler() { |
474 | if (NULL == _sampler) { |
475 | _sampler = new sampler_type(this); |
476 | _sampler->schedule(); |
477 | } |
478 | return _sampler; |
479 | } |
480 | |
481 | value_type reset(); |
482 | |
483 | value_type get_value() const; |
484 | |
485 | Percentile& operator<<(int64_t latency); |
486 | |
487 | bool valid() const { return _combiner != NULL && _combiner->valid(); } |
488 | |
489 | // This name is useful for warning negative latencies in operator<< |
490 | void set_debug_name(const butil::StringPiece& name) { |
491 | _debug_name.assign(name.data(), name.size()); |
492 | } |
493 | |
494 | private: |
495 | DISALLOW_COPY_AND_ASSIGN(Percentile); |
496 | |
497 | combiner_type* _combiner; |
498 | sampler_type* _sampler; |
499 | std::string _debug_name; |
500 | }; |
501 | |
502 | } // namespace detail |
503 | } // namespace bvar |
504 | |
505 | #endif //BVAR_DETAIL_PERCENTILE_H |
506 | |