1/**
2 * Copyright 2021 Alibaba, Inc. and its affiliates. All Rights Reserved.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15
16 * \author Jiliang.ljl
17 * \date Mar 2021
18 * \brief Metrics interface.
19 */
20
21#include "proto/proxima_be.pb.h"
22#include "metrics_collector.h"
23
24namespace proxima {
25namespace be {
26namespace metrics {
27
28
29//! QueryMetrics RAII-style reporter
30class QueryMetrics {
31 public:
32 /**
33 * Constructor
34 *
35 * @param type
36 * @param ret REQUIRED to be accessible when QueryMetrics destructs.
37 */
38 QueryMetrics(ProtocolType type, const int *ret) : type_(type), ret_(ret) {}
39
40 //! Update metrics count with request.
41 //! request is not saved and can destruct early than QueryMetrics
42 void update_with_query_request(const proto::QueryRequest &request) {
43 batch_ = GetBatch(request);
44 query_type_ = request.query_type();
45 }
46
47 ~QueryMetrics() {
48 if (batch_ <= 0) {
49 return;
50 }
51 auto rt_us = timer_.micro_seconds();
52 auto single_rt_us = rt_us / batch_;
53 auto &metrics_obj = MetricsCollector::GetInstance();
54 metrics_obj.report_query_rt(type_, batch_, single_rt_us);
55 metrics_obj.report_query_count_by_type(query_type_, batch_);
56 metrics_obj.report_query_batch(batch_);
57 if (*ret_ == 0) {
58 metrics_obj.report_query_success_count(batch_);
59 } else {
60 metrics_obj.report_query_failure_count(batch_);
61 }
62 }
63
64 private:
65 static size_t GetBatch(const proto::QueryRequest &request) {
66 switch (request.query_type()) {
67 case proto::QueryRequest_QueryType_QT_KNN:
68 return request.knn_param().batch_count();
69 default:
70 // to fix warning enum not handled in switch
71 {}
72 }
73 LOG_ERROR("Unexpected query type:%d",
74 static_cast<int>(request.query_type()));
75 return 0;
76 };
77
78 private:
79 ProtocolType type_{ProtocolType::kGrpc};
80 const int *ret_{nullptr};
81 size_t batch_{0};
82 proto::QueryRequest_QueryType query_type_{
83 proto::QueryRequest_QueryType_QT_KNN};
84 ailego::ElapsedTime timer_;
85};
86
87//! GetDocumentMetrics RAII-style reporter
88class GetDocumentMetrics {
89 public:
90 /**
91 * Constructor
92 *
93 * @param type
94 * @param ret REQUIRED to be accessible when QueryMetrics destructs.
95 */
96 GetDocumentMetrics(ProtocolType type, const int *ret)
97 : type_(type), ret_(ret) {}
98
99 ~GetDocumentMetrics() {
100 auto rt_us = timer_.micro_seconds();
101 auto &metrics_obj = MetricsCollector::GetInstance();
102 metrics_obj.report_get_document_rt(type_, rt_us);
103 if (*ret_ == 0) {
104 metrics_obj.report_get_document_success_count();
105 } else {
106 metrics_obj.report_get_document_failure_count();
107 }
108 }
109
110 private:
111 ProtocolType type_{ProtocolType::kGrpc};
112 const int *ret_{nullptr};
113 ailego::ElapsedTime timer_;
114};
115
116//! WriteMetrics RAII-style reporter
117class WriteMetrics {
118 public:
119 /**
120 * Constructor
121 *
122 * @param type
123 * @param ret REQUIRED to be accessible when QueryMetrics destructs.
124 */
125 WriteMetrics(ProtocolType type, const int *ret) : type_(type), ret_(ret) {}
126
127 ~WriteMetrics() {
128 if (!batch_) {
129 return;
130 }
131 auto rt_us = timer_.micro_seconds() / batch_;
132 auto &metrics_obj = MetricsCollector::GetInstance();
133 metrics_obj.report_write_rt(type_, batch_, rt_us);
134 if (insert_doc_count_ > 0) {
135 metrics_obj.report_write_doc_count_by_operation_type(proto::OP_INSERT,
136 insert_doc_count_);
137 }
138 if (update_doc_count_ > 0) {
139 metrics_obj.report_write_doc_count_by_operation_type(proto::OP_UPDATE,
140 update_doc_count_);
141 }
142 if (delete_doc_count_ > 0) {
143 metrics_obj.report_write_doc_count_by_operation_type(proto::OP_DELETE,
144 delete_doc_count_);
145 }
146 metrics_obj.report_write_batch(batch_);
147 if (*ret_ == 0) {
148 metrics_obj.report_write_success_count(batch_);
149 } else {
150 metrics_obj.report_write_failure_count(batch_);
151 }
152 }
153
154 void update_with_write_request(const proto::WriteRequest &req) {
155 size_t insert_count = 0;
156 size_t update_count = 0;
157 size_t delete_count = 0;
158 for (const auto &row : req.rows()) {
159 switch (row.operation_type()) {
160 case proto::OP_INSERT:
161 insert_count++;
162 break;
163 case proto::OP_UPDATE:
164 update_count++;
165 break;
166 case proto::OP_DELETE:
167 delete_count++;
168 break;
169 default:
170 LOG_ERROR("Unknown operation type:%d",
171 static_cast<int>(row.operation_type()));
172 }
173 }
174 insert_doc_count_ = insert_count;
175 update_doc_count_ = update_count;
176 delete_doc_count_ = delete_count;
177 batch_ = req.rows_size();
178 }
179
180 private:
181 ProtocolType type_{ProtocolType::kGrpc};
182 const int *ret_{nullptr};
183 ailego::ElapsedTime timer_;
184 size_t insert_doc_count_{0};
185 size_t update_doc_count_{0};
186 size_t delete_doc_count_{0};
187 size_t batch_{0};
188};
189
190
191} // namespace metrics
192
193} // namespace be
194} // namespace proxima
195