1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// Date: 2015/09/15 10:44:17
19
20#ifndef BVAR_DETAIL_PERCENTILE_H
21#define BVAR_DETAIL_PERCENTILE_H
22
23#include <string.h> // memset memcmp
24#include <stdint.h> // uint32_t
25#include <limits> // std::numeric_limits
26#include <ostream> // std::ostream
27#include <algorithm> // std::sort
28#include <math.h> // ceil
29#include "butil/macros.h" // ARRAY_SIZE
30#include "bvar/reducer.h" // Reducer
31#include "bvar/window.h" // Window
32#include "bvar/detail/combiner.h" // AgentCombiner
33#include "bvar/detail/sampler.h" // ReducerSampler
34#include "butil/fast_rand.h"
35
36namespace bvar {
37namespace detail {
38
39// Round of expectation of a rational number |a/b| to a natural number.
40inline unsigned long round_of_expectation(unsigned long a, unsigned long b) {
41 if (BAIDU_UNLIKELY(b == 0)) {
42 return 0;
43 }
44 return a / b + (butil::fast_rand_less_than(b) < a % b);
45}
46
47// Storing latencies inside a interval.
48template <size_t SAMPLE_SIZE>
49class PercentileInterval {
50public:
51 PercentileInterval()
52 : _num_added(0)
53 , _sorted(false)
54 , _num_samples(0) {
55 }
56
57 // Get index-th sample in ascending order.
58 uint32_t get_sample_at(size_t index) {
59 const size_t saved_num = _num_samples;
60 if (index >= saved_num) {
61 if (saved_num == 0) {
62 return 0;
63 }
64 index = saved_num - 1;
65 }
66 if (!_sorted) {
67 std::sort(_samples, _samples + saved_num);
68 _sorted = true;
69 }
70 CHECK_EQ(saved_num, _num_samples) << "You must call get_number() on"
71 " a unchanging PercentileInterval";
72 return _samples[index];
73 }
74
75 // Add samples of another interval. This function tries to make each
76 // sample in merged _samples has (approximately) equal probability to
77 // remain.
78 // This method is invoked when merging ThreadLocalPercentileSamples in to
79 // GlobalPercentileSamples
80 template <size_t size2>
81 void merge(const PercentileInterval<size2> &rhs) {
82 if (rhs._num_added == 0) {
83 return;
84 }
85 BAIDU_CASSERT(SAMPLE_SIZE >= size2,
86 must_merge_small_interval_into_larger_one_currently);
87 CHECK_EQ(rhs._num_samples, rhs._num_added);
88 // Assume that the probability of each sample in |this| is a0/b0 and
89 // the probability of each sample in |rhs| is a1/b1.
90 // We are going to randomly pick some samples from |this| and |rhs| to
91 // satisfy the constraint that each sample stands for the probability
92 // of
93 // * 1 (SAMPLE_SIZE >= |b0 + b1|), which indicates that no sample
94 // has been dropped
95 // * SAMPLE_SIZE / |b0 + b1| (SAMPLE_SIZE < |b0 + b1|)
96 // So we should keep |b0*SAMPLE_SIZE/(b0+b1)| from |this|
97 // |b1*SAMPLE_SIZE/(b0+b1)| from |rhs|.
98 if (_num_added + rhs._num_added <= SAMPLE_SIZE) {
99 // No sample should be dropped
100 CHECK_EQ(_num_samples, _num_added)
101 << "_num_added=" << _num_added
102 << " rhs._num_added" << rhs._num_added
103 << " _num_samples=" << _num_samples
104 << " rhs._num_samples=" << rhs._num_samples
105 << " SAMPLE_SIZE=" << SAMPLE_SIZE
106 << " size2=" << size2;
107 memcpy(_samples + _num_samples, rhs._samples,
108 sizeof(_samples[0]) * rhs._num_samples);
109 _num_samples += rhs._num_samples;
110 } else {
111 // |num_remain| must be less than _num_samples:
112 // if _num_added = _num_samples:
113 // SAMPLE_SIZE / (_num_added + rhs._num_added) < 1 so that
114 // num_remain < _num_added = _num_samples
115 // otherwise:
116 // _num_samples = SAMPLE_SIZE;
117 // _num_added / (_num_added + rhs._num_added) < 1 so that
118 // num_remain < SAMPLE_SIZE = _num_added
119 size_t num_remain = round_of_expectation(
120 _num_added * SAMPLE_SIZE, _num_added + rhs._num_added);
121 CHECK_LE(num_remain, _num_samples);
122 // Randomly drop samples of this
123 for (size_t i = _num_samples; i > num_remain; --i) {
124 _samples[butil::fast_rand_less_than(i)] = _samples[i - 1];
125 }
126 const size_t num_remain_from_rhs = SAMPLE_SIZE - num_remain;
127 CHECK_LE(num_remain_from_rhs, rhs._num_samples);
128 // Have to copy data from rhs to shuffle since it's const
129 DEFINE_SMALL_ARRAY(uint32_t, tmp, rhs._num_samples, 64);
130 memcpy(tmp, rhs._samples, sizeof(uint32_t) * rhs._num_samples);
131 for (size_t i = 0; i < num_remain_from_rhs; ++i) {
132 const int index = butil::fast_rand_less_than(rhs._num_samples - i);
133 _samples[num_remain++] = tmp[index];
134 tmp[index] = tmp[rhs._num_samples - i - 1];
135 }
136 _num_samples = num_remain;
137 CHECK_EQ(_num_samples, SAMPLE_SIZE);
138 }
139 _num_added += rhs._num_added;
140 }
141
142 // Randomly pick n samples from mutable_rhs to |this|
143 template <size_t size2>
144 void merge_with_expectation(PercentileInterval<size2>& mutable_rhs, size_t n) {
145 CHECK(n <= mutable_rhs._num_samples);
146 _num_added += mutable_rhs._num_added;
147 if (_num_samples + n <= SAMPLE_SIZE && n == mutable_rhs._num_samples) {
148 memcpy(_samples + _num_samples, mutable_rhs._samples, sizeof(_samples[0]) * n);
149 _num_samples += n;
150 return;
151 }
152 for (size_t i = 0; i < n; ++i) {
153 size_t index = butil::fast_rand_less_than(mutable_rhs._num_samples - i);
154 if (_num_samples < SAMPLE_SIZE) {
155 _samples[_num_samples++] = mutable_rhs._samples[index];
156 } else {
157 _samples[butil::fast_rand_less_than(_num_samples)]
158 = mutable_rhs._samples[index];
159 }
160 std::swap(mutable_rhs._samples[index],
161 mutable_rhs._samples[mutable_rhs._num_samples - i - 1]);
162 }
163 }
164
165 // Add an unsigned 32-bit latency (what percentile actually accepts) to a
166 // non-full interval, which is invoked by Percentile::operator<< to add a
167 // sample into the ThreadLocalPercentileSamples.
168 // Returns true if the input was stored.
169 bool add32(uint32_t x) {
170 if (BAIDU_UNLIKELY(_num_samples >= SAMPLE_SIZE)) {
171 LOG(ERROR) << "This interval was full";
172 return false;
173 }
174 ++_num_added;
175 _samples[_num_samples++] = x;
176 return true;
177 }
178
179 // Add a signed latency.
180 bool add64(int64_t x) {
181 if (x >= 0) {
182 return add32((uint32_t)x);
183 }
184 return false;
185 }
186
187 // Remove all samples inside.
188 void clear() {
189 _num_added = 0;
190 _sorted = false;
191 _num_samples = 0;
192 }
193
194 // True if no more room for new samples.
195 bool full() const { return _num_samples == SAMPLE_SIZE; }
196
197 // True if there's no samples.
198 bool empty() const { return !_num_samples; }
199
200 // #samples ever added by calling add*()
201 uint32_t added_count() const { return _num_added; }
202
203 // #samples stored.
204 uint32_t sample_count() const { return _num_samples; }
205
206 // For debuggin.
207 void describe(std::ostream &os) const {
208 os << "(num_added=" << added_count() << ")[";
209 for (size_t j = 0; j < _num_samples; ++j) {
210 os << ' ' << _samples[j];
211 }
212 os << " ]";
213 }
214
215 // True if two PercentileInterval are exactly same, namely same # of added and
216 // same samples, mainly for debuggin.
217 bool operator==(const PercentileInterval& rhs) const {
218 return (_num_added == rhs._num_added &&
219 _num_samples == rhs._num_samples &&
220 memcmp(_samples, rhs._samples, _num_samples * sizeof(uint32_t)) == 0);
221 }
222
223private:
224template <size_t size2> friend class PercentileInterval;
225 BAIDU_CASSERT(SAMPLE_SIZE <= 65536, SAMPLE_SIZE_must_be_16bit);
226
227 uint32_t _num_added;
228 bool _sorted;
229 uint16_t _num_samples;
230 uint32_t _samples[SAMPLE_SIZE];
231};
232
233static const size_t NUM_INTERVALS = 32;
234
235// This declartion is a must for gcc 3.4
236class AddLatency;
237
238// Group of PercentileIntervals.
239template <size_t SAMPLE_SIZE_IN>
240class PercentileSamples {
241public:
242friend class AddLatency;
243
244 static const size_t SAMPLE_SIZE = SAMPLE_SIZE_IN;
245
246 PercentileSamples() {
247 memset(this, 0, sizeof(*this));
248 }
249
250 ~PercentileSamples() {
251 for (size_t i = 0; i < NUM_INTERVALS; ++i) {
252 if (_intervals[i]) {
253 delete _intervals[i];
254 }
255 }
256 }
257
258 // Copy-construct from another PercentileSamples.
259 // Copy/assigning happen at per-second scale. should be OK.
260 PercentileSamples(const PercentileSamples& rhs) {
261 _num_added = rhs._num_added;
262 for (size_t i = 0; i < NUM_INTERVALS; ++i) {
263 if (rhs._intervals[i] && !rhs._intervals[i]->empty()) {
264 _intervals[i] = new PercentileInterval<SAMPLE_SIZE>(*rhs._intervals[i]);
265 } else {
266 _intervals[i] = NULL;
267 }
268 }
269 }
270
271 // Assign from another PercentileSamples.
272 // Notice that we keep empty _intervals to avoid future allocations.
273 void operator=(const PercentileSamples& rhs) {
274 _num_added = rhs._num_added;
275 for (size_t i = 0; i < NUM_INTERVALS; ++i) {
276 if (rhs._intervals[i] && !rhs._intervals[i]->empty()) {
277 get_interval_at(i) = *rhs._intervals[i];
278 } else if (_intervals[i]) {
279 _intervals[i]->clear();
280 }
281 }
282 }
283
284 // Get the `ratio'-ile value. E.g. 0.99 means 99%-ile value.
285 // Since we store samples in different intervals internally. We first
286 // address the interval by multiplying ratio with _num_added, then
287 // find the sample inside the interval. We've tested an alternative
288 // method that store all samples together w/o any intervals (or in another
289 // word, only one interval), the method is much simpler but is not as
290 // stable as current impl. CDF plotted by the method changes dramatically
291 // from seconds to seconds. It seems that separating intervals probably
292 // keep more long-tail values.
293 uint32_t get_number(double ratio) {
294 size_t n = (size_t)ceil(ratio * _num_added);
295 if (n > _num_added) {
296 n = _num_added;
297 } else if (n == 0) {
298 return 0;
299 }
300 for (size_t i = 0; i < NUM_INTERVALS; ++i) {
301 if (_intervals[i] == NULL) {
302 continue;
303 }
304 PercentileInterval<SAMPLE_SIZE>& invl = *_intervals[i];
305 if (n <= invl.added_count()) {
306 size_t sample_n = n * invl.sample_count() / invl.added_count();
307 size_t sample_index = (sample_n ? sample_n - 1 : 0);
308 return invl.get_sample_at(sample_index);
309 }
310 n -= invl.added_count();
311 }
312 CHECK(false) << "Can't reach here";
313 return std::numeric_limits<uint32_t>::max();
314 }
315
316 // Add samples in another PercentileSamples.
317 template <size_t size2>
318 void merge(const PercentileSamples<size2> &rhs) {
319 _num_added += rhs._num_added;
320 for (size_t i = 0; i < NUM_INTERVALS; ++i) {
321 if (rhs._intervals[i] && !rhs._intervals[i]->empty()) {
322 get_interval_at(i).merge(*rhs._intervals[i]);
323 }
324 }
325 }
326
327 // Combine multiple into a single PercentileSamples
328 template <typename Iterator>
329 void combine_of(const Iterator& begin, const Iterator& end) {
330 if (_num_added) {
331 // Very unlikely
332 for (size_t i = 0; i < NUM_INTERVALS; ++i) {
333 if (_intervals[i]) {
334 _intervals[i]->clear();
335 }
336 }
337 _num_added = 0;
338 }
339
340 for (Iterator iter = begin; iter != end; ++iter) {
341 _num_added += iter->_num_added;
342 }
343
344 // Calculate probabilities for each interval
345 for (size_t i = 0; i < NUM_INTERVALS; ++i) {
346 size_t total = 0;
347 size_t total_sample=0;
348 for (Iterator iter = begin; iter != end; ++iter) {
349 if (iter->_intervals[i]) {
350 total += iter->_intervals[i]->added_count();
351 total_sample += iter->_intervals[i]->sample_count();
352 }
353 }
354 if (total == 0) {
355 // Empty interval
356 continue;
357 }
358
359
360 // Consider that sub interval took |a| samples out of |b| totally,
361 // each sample won the probability of |a/b| according to the
362 // algorithm of add32(), now we should pick some samples into the
363 // combined interval that satisfied each sample has the
364 // probability of |SAMPLE_SIZE/total|, so each sample has the
365 // probability of |(SAMPLE_SIZE*b)/(a*total) to remain and the
366 // expected number of samples in this interval is
367 // |(SAMPLE_SIZE*b)/total|
368 for (Iterator iter = begin; iter != end; ++iter) {
369 if (!iter->_intervals[i] || iter->_intervals[i]->empty()) {
370 continue;
371 }
372 typename butil::add_reference<BAIDU_TYPEOF(*(iter->_intervals[i]))>::type
373 invl = *(iter->_intervals[i]);
374 if (total <= SAMPLE_SIZE) {
375 get_interval_at(i).merge_with_expectation(
376 invl, invl.sample_count());
377 continue;
378 }
379 // Each
380 const size_t b = invl.added_count();
381 const size_t remain = std::min(
382 round_of_expectation(b * SAMPLE_SIZE, total),
383 (size_t)invl.sample_count());
384 get_interval_at(i).merge_with_expectation(invl, remain);
385 }
386 }
387 }
388
389 // For debuggin.
390 void describe(std::ostream &os) const {
391 os << this << "{num_added=" << _num_added;
392 for (size_t i = 0; i < NUM_INTERVALS; ++i) {
393 if (_intervals[i] && !_intervals[i]->empty()) {
394 os << " interval[" << i << "]=";
395 _intervals[i]->describe(os);
396 }
397 }
398 os << '}';
399 }
400
401 // True if intervals of two PercentileSamples are exactly same.
402 bool operator==(const PercentileSamples& rhs) const {
403 for (size_t i = 0; i < NUM_INTERVALS; ++i) {
404 if (_intervals != rhs._intervals[i]) {
405 return false;
406 }
407 }
408 return true;
409 }
410
411private:
412template <size_t size1> friend class PercentileSamples;
413
414 // Get/create interval on-demand.
415 PercentileInterval<SAMPLE_SIZE>& get_interval_at(size_t index) {
416 if (_intervals[index] == NULL) {
417 _intervals[index] = new PercentileInterval<SAMPLE_SIZE>;
418 }
419 return *_intervals[index];
420 }
421 // sum of _num_added of all intervals. we update this value after any
422 // changes to intervals inside to make it O(1)-time accessible.
423 size_t _num_added;
424 PercentileInterval<SAMPLE_SIZE>* _intervals[NUM_INTERVALS];
425};
426
427template <size_t sz> const size_t PercentileSamples<sz>::SAMPLE_SIZE;
428
429template <size_t size>
430std::ostream &operator<<(std::ostream &os, const PercentileInterval<size> &p) {
431 p.describe(os);
432 return os;
433}
434
435template <size_t size>
436std::ostream &operator<<(std::ostream &os, const PercentileSamples<size> &p) {
437 p.describe(os);
438 return os;
439}
440
441// NOTE: we intentionally minus 2 uint32_t from sample-size to make the struct
442// size be power of 2 and more friendly to memory allocators.
443typedef PercentileSamples<254> GlobalPercentileSamples;
444typedef PercentileSamples<30> ThreadLocalPercentileSamples;
445
446// A specialized reducer for finding the percentile of latencies.
447// NOTE: DON'T use it directly, use LatencyRecorder instead.
448class Percentile {
449public:
450 struct AddPercentileSamples {
451 template <size_t size1, size_t size2>
452 void operator()(PercentileSamples<size1> &b1,
453 const PercentileSamples<size2> &b2) const {
454 b1.merge(b2);
455 }
456 };
457
458 typedef GlobalPercentileSamples value_type;
459 typedef ReducerSampler<Percentile,
460 GlobalPercentileSamples,
461 AddPercentileSamples, VoidOp> sampler_type;
462 typedef AgentCombiner <GlobalPercentileSamples,
463 ThreadLocalPercentileSamples,
464 AddPercentileSamples> combiner_type;
465 typedef combiner_type::Agent agent_type;
466 Percentile();
467 ~Percentile();
468
469 AddPercentileSamples op() const { return AddPercentileSamples(); }
470 VoidOp inv_op() const { return VoidOp(); }
471
472 // The sampler for windows over percentile.
473 sampler_type* get_sampler() {
474 if (NULL == _sampler) {
475 _sampler = new sampler_type(this);
476 _sampler->schedule();
477 }
478 return _sampler;
479 }
480
481 value_type reset();
482
483 value_type get_value() const;
484
485 Percentile& operator<<(int64_t latency);
486
487 bool valid() const { return _combiner != NULL && _combiner->valid(); }
488
489 // This name is useful for warning negative latencies in operator<<
490 void set_debug_name(const butil::StringPiece& name) {
491 _debug_name.assign(name.data(), name.size());
492 }
493
494private:
495 DISALLOW_COPY_AND_ASSIGN(Percentile);
496
497 combiner_type* _combiner;
498 sampler_type* _sampler;
499 std::string _debug_name;
500};
501
502} // namespace detail
503} // namespace bvar
504
505#endif //BVAR_DETAIL_PERCENTILE_H
506