1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// Date 2014/09/24 16:01:08
19
20#ifndef BVAR_REDUCER_H
21#define BVAR_REDUCER_H
22
23#include <limits> // std::numeric_limits
24#include "butil/logging.h" // LOG()
25#include "butil/type_traits.h" // butil::add_cr_non_integral
26#include "butil/class_name.h" // class_name_str
27#include "bvar/variable.h" // Variable
28#include "bvar/detail/combiner.h" // detail::AgentCombiner
29#include "bvar/detail/sampler.h" // ReducerSampler
30#include "bvar/detail/series.h"
31#include "bvar/window.h"
32
33namespace bvar {
34
35// Reduce multiple values into one with `Op': e1 Op e2 Op e3 ...
36// `Op' shall satisfy:
37// - associative: a Op (b Op c) == (a Op b) Op c
38// - commutative: a Op b == b Op a;
39// - no side effects: a Op b never changes if a and b are fixed.
40// otherwise the result is undefined.
41//
42// For performance issues, we don't let Op return value, instead it shall
43// set the result to the first parameter in-place. Namely to add two values,
44// "+=" should be implemented rather than "+".
45//
46// Reducer works for non-primitive T which satisfies:
47// - T() should be the identity of Op.
48// - stream << v should compile and put description of v into the stream
49// Example:
50// class MyType {
51// friend std::ostream& operator<<(std::ostream& os, const MyType&);
52// public:
53// MyType() : _x(0) {}
54// explicit MyType(int x) : _x(x) {}
55// void operator+=(const MyType& rhs) const {
56// _x += rhs._x;
57// }
58// private:
59// int _x;
60// };
61// std::ostream& operator<<(std::ostream& os, const MyType& value) {
62// return os << "MyType{" << value._x << "}";
63// }
64// bvar::Adder<MyType> my_type_sum;
65// my_type_sum << MyType(1) << MyType(2) << MyType(3);
66// LOG(INFO) << my_type_sum; // "MyType{6}"
67
68template <typename T, typename Op, typename InvOp = detail::VoidOp>
69class Reducer : public Variable {
70public:
71 typedef typename detail::AgentCombiner<T, T, Op> combiner_type;
72 typedef typename combiner_type::Agent agent_type;
73 typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type;
74 class SeriesSampler : public detail::Sampler {
75 public:
76 SeriesSampler(Reducer* owner, const Op& op)
77 : _owner(owner), _series(op) {}
78 ~SeriesSampler() {}
79 void take_sample() override { _series.append(_owner->get_value()); }
80 void describe(std::ostream& os) { _series.describe(os, NULL); }
81 private:
82 Reducer* _owner;
83 detail::Series<T, Op> _series;
84 };
85
86public:
87 // The `identify' must satisfy: identity Op a == a
88 Reducer(typename butil::add_cr_non_integral<T>::type identity = T(),
89 const Op& op = Op(),
90 const InvOp& inv_op = InvOp())
91 : _combiner(identity, identity, op)
92 , _sampler(NULL)
93 , _series_sampler(NULL)
94 , _inv_op(inv_op) {
95 }
96
97 ~Reducer() {
98 // Calling hide() manually is a MUST required by Variable.
99 hide();
100 if (_sampler) {
101 _sampler->destroy();
102 _sampler = NULL;
103 }
104 if (_series_sampler) {
105 _series_sampler->destroy();
106 _series_sampler = NULL;
107 }
108 }
109
110 // Add a value.
111 // Returns self reference for chaining.
112 Reducer& operator<<(typename butil::add_cr_non_integral<T>::type value);
113
114 // Get reduced value.
115 // Notice that this function walks through threads that ever add values
116 // into this reducer. You should avoid calling it frequently.
117 T get_value() const {
118 CHECK(!(butil::is_same<InvOp, detail::VoidOp>::value) || _sampler == NULL)
119 << "You should not call Reducer<" << butil::class_name_str<T>()
120 << ", " << butil::class_name_str<Op>() << ">::get_value() when a"
121 << " Window<> is used because the operator does not have inverse.";
122 return _combiner.combine_agents();
123 }
124
125
126 // Reset the reduced value to T().
127 // Returns the reduced value before reset.
128 T reset() { return _combiner.reset_all_agents(); }
129
130 void describe(std::ostream& os, bool quote_string) const override {
131 if (butil::is_same<T, std::string>::value && quote_string) {
132 os << '"' << get_value() << '"';
133 } else {
134 os << get_value();
135 }
136 }
137
138#ifdef BAIDU_INTERNAL
139 void get_value(boost::any* value) const override { *value = get_value(); }
140#endif
141
142 // True if this reducer is constructed successfully.
143 bool valid() const { return _combiner.valid(); }
144
145 // Get instance of Op.
146 const Op& op() const { return _combiner.op(); }
147 const InvOp& inv_op() const { return _inv_op; }
148
149 sampler_type* get_sampler() {
150 if (NULL == _sampler) {
151 _sampler = new sampler_type(this);
152 _sampler->schedule();
153 }
154 return _sampler;
155 }
156
157 int describe_series(std::ostream& os, const SeriesOptions& options) const override {
158 if (_series_sampler == NULL) {
159 return 1;
160 }
161 if (!options.test_only) {
162 _series_sampler->describe(os);
163 }
164 return 0;
165 }
166
167protected:
168 int expose_impl(const butil::StringPiece& prefix,
169 const butil::StringPiece& name,
170 DisplayFilter display_filter) override {
171 const int rc = Variable::expose_impl(prefix, name, display_filter);
172 if (rc == 0 &&
173 _series_sampler == NULL &&
174 !butil::is_same<InvOp, detail::VoidOp>::value &&
175 !butil::is_same<T, std::string>::value &&
176 FLAGS_save_series) {
177 _series_sampler = new SeriesSampler(this, _combiner.op());
178 _series_sampler->schedule();
179 }
180 return rc;
181 }
182
183private:
184 combiner_type _combiner;
185 sampler_type* _sampler;
186 SeriesSampler* _series_sampler;
187 InvOp _inv_op;
188};
189
190template <typename T, typename Op, typename InvOp>
191inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<(
192 typename butil::add_cr_non_integral<T>::type value) {
193 // It's wait-free for most time
194 agent_type* agent = _combiner.get_or_create_tls_agent();
195 if (__builtin_expect(!agent, 0)) {
196 LOG(FATAL) << "Fail to create agent";
197 return *this;
198 }
199 agent->element.modify(_combiner.op(), value);
200 return *this;
201}
202
203// =================== Common reducers ===================
204
205// bvar::Adder<int> sum;
206// sum << 1 << 2 << 3 << 4;
207// LOG(INFO) << sum.get_value(); // 10
208// Commonly used functors
209namespace detail {
210template <typename Tp>
211struct AddTo {
212 void operator()(Tp & lhs,
213 typename butil::add_cr_non_integral<Tp>::type rhs) const
214 { lhs += rhs; }
215};
216template <typename Tp>
217struct MinusFrom {
218 void operator()(Tp & lhs,
219 typename butil::add_cr_non_integral<Tp>::type rhs) const
220 { lhs -= rhs; }
221};
222}
223template <typename T>
224class Adder : public Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > {
225public:
226 typedef Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > Base;
227 typedef T value_type;
228 typedef typename Base::sampler_type sampler_type;
229public:
230 Adder() : Base() {}
231 explicit Adder(const butil::StringPiece& name) : Base() {
232 this->expose(name);
233 }
234 Adder(const butil::StringPiece& prefix,
235 const butil::StringPiece& name) : Base() {
236 this->expose_as(prefix, name);
237 }
238 ~Adder() { Variable::hide(); }
239};
240
241// bvar::Maxer<int> max_value;
242// max_value << 1 << 2 << 3 << 4;
243// LOG(INFO) << max_value.get_value(); // 4
244namespace detail {
245template <typename Tp>
246struct MaxTo {
247 void operator()(Tp & lhs,
248 typename butil::add_cr_non_integral<Tp>::type rhs) const {
249 // Use operator< as well.
250 if (lhs < rhs) {
251 lhs = rhs;
252 }
253 }
254};
255class LatencyRecorderBase;
256}
257template <typename T>
258class Maxer : public Reducer<T, detail::MaxTo<T> > {
259public:
260 typedef Reducer<T, detail::MaxTo<T> > Base;
261 typedef T value_type;
262 typedef typename Base::sampler_type sampler_type;
263public:
264 Maxer() : Base(std::numeric_limits<T>::min()) {}
265 explicit Maxer(const butil::StringPiece& name)
266 : Base(std::numeric_limits<T>::min()) {
267 this->expose(name);
268 }
269 Maxer(const butil::StringPiece& prefix, const butil::StringPiece& name)
270 : Base(std::numeric_limits<T>::min()) {
271 this->expose_as(prefix, name);
272 }
273 ~Maxer() { Variable::hide(); }
274private:
275 friend class detail::LatencyRecorderBase;
276 // The following private funcition a now used in LatencyRecorder,
277 // it's dangerous so we don't make them public
278 explicit Maxer(T default_value) : Base(default_value) {
279 }
280 Maxer(T default_value, const butil::StringPiece& prefix,
281 const butil::StringPiece& name)
282 : Base(default_value) {
283 this->expose_as(prefix, name);
284 }
285 Maxer(T default_value, const butil::StringPiece& name) : Base(default_value) {
286 this->expose(name);
287 }
288};
289
290// bvar::Miner<int> min_value;
291// min_value << 1 << 2 << 3 << 4;
292// LOG(INFO) << min_value.get_value(); // 1
293namespace detail {
294
295template <typename Tp>
296struct MinTo {
297 void operator()(Tp & lhs,
298 typename butil::add_cr_non_integral<Tp>::type rhs) const {
299 if (rhs < lhs) {
300 lhs = rhs;
301 }
302 }
303};
304
305} // namespace detail
306
307template <typename T>
308class Miner : public Reducer<T, detail::MinTo<T> > {
309public:
310 typedef Reducer<T, detail::MinTo<T> > Base;
311 typedef T value_type;
312 typedef typename Base::sampler_type sampler_type;
313public:
314 Miner() : Base(std::numeric_limits<T>::max()) {}
315 explicit Miner(const butil::StringPiece& name)
316 : Base(std::numeric_limits<T>::max()) {
317 this->expose(name);
318 }
319 Miner(const butil::StringPiece& prefix, const butil::StringPiece& name)
320 : Base(std::numeric_limits<T>::max()) {
321 this->expose_as(prefix, name);
322 }
323 ~Miner() { Variable::hide(); }
324};
325
326} // namespace bvar
327
328#endif //BVAR_REDUCER_H
329