1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | // Date 2014/09/24 16:01:08 |
19 | |
20 | #ifndef BVAR_REDUCER_H |
21 | #define BVAR_REDUCER_H |
22 | |
23 | #include <limits> // std::numeric_limits |
24 | #include "butil/logging.h" // LOG() |
25 | #include "butil/type_traits.h" // butil::add_cr_non_integral |
26 | #include "butil/class_name.h" // class_name_str |
27 | #include "bvar/variable.h" // Variable |
28 | #include "bvar/detail/combiner.h" // detail::AgentCombiner |
29 | #include "bvar/detail/sampler.h" // ReducerSampler |
30 | #include "bvar/detail/series.h" |
31 | #include "bvar/window.h" |
32 | |
33 | namespace bvar { |
34 | |
35 | // Reduce multiple values into one with `Op': e1 Op e2 Op e3 ... |
36 | // `Op' shall satisfy: |
37 | // - associative: a Op (b Op c) == (a Op b) Op c |
38 | // - commutative: a Op b == b Op a; |
39 | // - no side effects: a Op b never changes if a and b are fixed. |
40 | // otherwise the result is undefined. |
41 | // |
42 | // For performance issues, we don't let Op return value, instead it shall |
43 | // set the result to the first parameter in-place. Namely to add two values, |
44 | // "+=" should be implemented rather than "+". |
45 | // |
46 | // Reducer works for non-primitive T which satisfies: |
47 | // - T() should be the identity of Op. |
48 | // - stream << v should compile and put description of v into the stream |
49 | // Example: |
50 | // class MyType { |
51 | // friend std::ostream& operator<<(std::ostream& os, const MyType&); |
52 | // public: |
53 | // MyType() : _x(0) {} |
54 | // explicit MyType(int x) : _x(x) {} |
55 | // void operator+=(const MyType& rhs) const { |
56 | // _x += rhs._x; |
57 | // } |
58 | // private: |
59 | // int _x; |
60 | // }; |
61 | // std::ostream& operator<<(std::ostream& os, const MyType& value) { |
62 | // return os << "MyType{" << value._x << "}"; |
63 | // } |
64 | // bvar::Adder<MyType> my_type_sum; |
65 | // my_type_sum << MyType(1) << MyType(2) << MyType(3); |
66 | // LOG(INFO) << my_type_sum; // "MyType{6}" |
67 | |
68 | template <typename T, typename Op, typename InvOp = detail::VoidOp> |
69 | class Reducer : public Variable { |
70 | public: |
71 | typedef typename detail::AgentCombiner<T, T, Op> combiner_type; |
72 | typedef typename combiner_type::Agent agent_type; |
73 | typedef detail::ReducerSampler<Reducer, T, Op, InvOp> sampler_type; |
74 | class SeriesSampler : public detail::Sampler { |
75 | public: |
76 | SeriesSampler(Reducer* owner, const Op& op) |
77 | : _owner(owner), _series(op) {} |
78 | ~SeriesSampler() {} |
79 | void take_sample() override { _series.append(_owner->get_value()); } |
80 | void describe(std::ostream& os) { _series.describe(os, NULL); } |
81 | private: |
82 | Reducer* _owner; |
83 | detail::Series<T, Op> _series; |
84 | }; |
85 | |
86 | public: |
87 | // The `identify' must satisfy: identity Op a == a |
88 | Reducer(typename butil::add_cr_non_integral<T>::type identity = T(), |
89 | const Op& op = Op(), |
90 | const InvOp& inv_op = InvOp()) |
91 | : _combiner(identity, identity, op) |
92 | , _sampler(NULL) |
93 | , _series_sampler(NULL) |
94 | , _inv_op(inv_op) { |
95 | } |
96 | |
97 | ~Reducer() { |
98 | // Calling hide() manually is a MUST required by Variable. |
99 | hide(); |
100 | if (_sampler) { |
101 | _sampler->destroy(); |
102 | _sampler = NULL; |
103 | } |
104 | if (_series_sampler) { |
105 | _series_sampler->destroy(); |
106 | _series_sampler = NULL; |
107 | } |
108 | } |
109 | |
110 | // Add a value. |
111 | // Returns self reference for chaining. |
112 | Reducer& operator<<(typename butil::add_cr_non_integral<T>::type value); |
113 | |
114 | // Get reduced value. |
115 | // Notice that this function walks through threads that ever add values |
116 | // into this reducer. You should avoid calling it frequently. |
117 | T get_value() const { |
118 | CHECK(!(butil::is_same<InvOp, detail::VoidOp>::value) || _sampler == NULL) |
119 | << "You should not call Reducer<" << butil::class_name_str<T>() |
120 | << ", " << butil::class_name_str<Op>() << ">::get_value() when a" |
121 | << " Window<> is used because the operator does not have inverse." ; |
122 | return _combiner.combine_agents(); |
123 | } |
124 | |
125 | |
126 | // Reset the reduced value to T(). |
127 | // Returns the reduced value before reset. |
128 | T reset() { return _combiner.reset_all_agents(); } |
129 | |
130 | void describe(std::ostream& os, bool quote_string) const override { |
131 | if (butil::is_same<T, std::string>::value && quote_string) { |
132 | os << '"' << get_value() << '"'; |
133 | } else { |
134 | os << get_value(); |
135 | } |
136 | } |
137 | |
138 | #ifdef BAIDU_INTERNAL |
139 | void get_value(boost::any* value) const override { *value = get_value(); } |
140 | #endif |
141 | |
142 | // True if this reducer is constructed successfully. |
143 | bool valid() const { return _combiner.valid(); } |
144 | |
145 | // Get instance of Op. |
146 | const Op& op() const { return _combiner.op(); } |
147 | const InvOp& inv_op() const { return _inv_op; } |
148 | |
149 | sampler_type* get_sampler() { |
150 | if (NULL == _sampler) { |
151 | _sampler = new sampler_type(this); |
152 | _sampler->schedule(); |
153 | } |
154 | return _sampler; |
155 | } |
156 | |
157 | int describe_series(std::ostream& os, const SeriesOptions& options) const override { |
158 | if (_series_sampler == NULL) { |
159 | return 1; |
160 | } |
161 | if (!options.test_only) { |
162 | _series_sampler->describe(os); |
163 | } |
164 | return 0; |
165 | } |
166 | |
167 | protected: |
168 | int expose_impl(const butil::StringPiece& prefix, |
169 | const butil::StringPiece& name, |
170 | DisplayFilter display_filter) override { |
171 | const int rc = Variable::expose_impl(prefix, name, display_filter); |
172 | if (rc == 0 && |
173 | _series_sampler == NULL && |
174 | !butil::is_same<InvOp, detail::VoidOp>::value && |
175 | !butil::is_same<T, std::string>::value && |
176 | FLAGS_save_series) { |
177 | _series_sampler = new SeriesSampler(this, _combiner.op()); |
178 | _series_sampler->schedule(); |
179 | } |
180 | return rc; |
181 | } |
182 | |
183 | private: |
184 | combiner_type _combiner; |
185 | sampler_type* _sampler; |
186 | SeriesSampler* _series_sampler; |
187 | InvOp _inv_op; |
188 | }; |
189 | |
190 | template <typename T, typename Op, typename InvOp> |
191 | inline Reducer<T, Op, InvOp>& Reducer<T, Op, InvOp>::operator<<( |
192 | typename butil::add_cr_non_integral<T>::type value) { |
193 | // It's wait-free for most time |
194 | agent_type* agent = _combiner.get_or_create_tls_agent(); |
195 | if (__builtin_expect(!agent, 0)) { |
196 | LOG(FATAL) << "Fail to create agent" ; |
197 | return *this; |
198 | } |
199 | agent->element.modify(_combiner.op(), value); |
200 | return *this; |
201 | } |
202 | |
203 | // =================== Common reducers =================== |
204 | |
205 | // bvar::Adder<int> sum; |
206 | // sum << 1 << 2 << 3 << 4; |
207 | // LOG(INFO) << sum.get_value(); // 10 |
208 | // Commonly used functors |
209 | namespace detail { |
210 | template <typename Tp> |
211 | struct AddTo { |
212 | void operator()(Tp & lhs, |
213 | typename butil::add_cr_non_integral<Tp>::type rhs) const |
214 | { lhs += rhs; } |
215 | }; |
216 | template <typename Tp> |
217 | struct MinusFrom { |
218 | void operator()(Tp & lhs, |
219 | typename butil::add_cr_non_integral<Tp>::type rhs) const |
220 | { lhs -= rhs; } |
221 | }; |
222 | } |
223 | template <typename T> |
224 | class Adder : public Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > { |
225 | public: |
226 | typedef Reducer<T, detail::AddTo<T>, detail::MinusFrom<T> > Base; |
227 | typedef T value_type; |
228 | typedef typename Base::sampler_type sampler_type; |
229 | public: |
230 | Adder() : Base() {} |
231 | explicit Adder(const butil::StringPiece& name) : Base() { |
232 | this->expose(name); |
233 | } |
234 | Adder(const butil::StringPiece& prefix, |
235 | const butil::StringPiece& name) : Base() { |
236 | this->expose_as(prefix, name); |
237 | } |
238 | ~Adder() { Variable::hide(); } |
239 | }; |
240 | |
241 | // bvar::Maxer<int> max_value; |
242 | // max_value << 1 << 2 << 3 << 4; |
243 | // LOG(INFO) << max_value.get_value(); // 4 |
244 | namespace detail { |
245 | template <typename Tp> |
246 | struct MaxTo { |
247 | void operator()(Tp & lhs, |
248 | typename butil::add_cr_non_integral<Tp>::type rhs) const { |
249 | // Use operator< as well. |
250 | if (lhs < rhs) { |
251 | lhs = rhs; |
252 | } |
253 | } |
254 | }; |
255 | class LatencyRecorderBase; |
256 | } |
257 | template <typename T> |
258 | class Maxer : public Reducer<T, detail::MaxTo<T> > { |
259 | public: |
260 | typedef Reducer<T, detail::MaxTo<T> > Base; |
261 | typedef T value_type; |
262 | typedef typename Base::sampler_type sampler_type; |
263 | public: |
264 | Maxer() : Base(std::numeric_limits<T>::min()) {} |
265 | explicit Maxer(const butil::StringPiece& name) |
266 | : Base(std::numeric_limits<T>::min()) { |
267 | this->expose(name); |
268 | } |
269 | Maxer(const butil::StringPiece& prefix, const butil::StringPiece& name) |
270 | : Base(std::numeric_limits<T>::min()) { |
271 | this->expose_as(prefix, name); |
272 | } |
273 | ~Maxer() { Variable::hide(); } |
274 | private: |
275 | friend class detail::LatencyRecorderBase; |
276 | // The following private funcition a now used in LatencyRecorder, |
277 | // it's dangerous so we don't make them public |
278 | explicit Maxer(T default_value) : Base(default_value) { |
279 | } |
280 | Maxer(T default_value, const butil::StringPiece& prefix, |
281 | const butil::StringPiece& name) |
282 | : Base(default_value) { |
283 | this->expose_as(prefix, name); |
284 | } |
285 | Maxer(T default_value, const butil::StringPiece& name) : Base(default_value) { |
286 | this->expose(name); |
287 | } |
288 | }; |
289 | |
290 | // bvar::Miner<int> min_value; |
291 | // min_value << 1 << 2 << 3 << 4; |
292 | // LOG(INFO) << min_value.get_value(); // 1 |
293 | namespace detail { |
294 | |
295 | template <typename Tp> |
296 | struct MinTo { |
297 | void operator()(Tp & lhs, |
298 | typename butil::add_cr_non_integral<Tp>::type rhs) const { |
299 | if (rhs < lhs) { |
300 | lhs = rhs; |
301 | } |
302 | } |
303 | }; |
304 | |
305 | } // namespace detail |
306 | |
307 | template <typename T> |
308 | class Miner : public Reducer<T, detail::MinTo<T> > { |
309 | public: |
310 | typedef Reducer<T, detail::MinTo<T> > Base; |
311 | typedef T value_type; |
312 | typedef typename Base::sampler_type sampler_type; |
313 | public: |
314 | Miner() : Base(std::numeric_limits<T>::max()) {} |
315 | explicit Miner(const butil::StringPiece& name) |
316 | : Base(std::numeric_limits<T>::max()) { |
317 | this->expose(name); |
318 | } |
319 | Miner(const butil::StringPiece& prefix, const butil::StringPiece& name) |
320 | : Base(std::numeric_limits<T>::max()) { |
321 | this->expose_as(prefix, name); |
322 | } |
323 | ~Miner() { Variable::hide(); } |
324 | }; |
325 | |
326 | } // namespace bvar |
327 | |
328 | #endif //BVAR_REDUCER_H |
329 | |