1 | // Licensed to the Apache Software Foundation (ASF) under one |
2 | // or more contributor license agreements. See the NOTICE file |
3 | // distributed with this work for additional information |
4 | // regarding copyright ownership. The ASF licenses this file |
5 | // to you under the Apache License, Version 2.0 (the |
6 | // "License"); you may not use this file except in compliance |
7 | // with the License. You may obtain a copy of the License at |
8 | // |
9 | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | // |
11 | // Unless required by applicable law or agreed to in writing, |
12 | // software distributed under the License is distributed on an |
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | // KIND, either express or implied. See the License for the |
15 | // specific language governing permissions and limitations |
16 | // under the License. |
17 | |
18 | |
19 | #ifndef BRPC_CONTROLLER_H |
20 | #define BRPC_CONTROLLER_H |
21 | |
22 | // To brpc developers: This is a header included by user, don't depend |
23 | // on internal structures, use opaque pointers instead. |
24 | |
25 | #include <gflags/gflags.h> // Users often need gflags |
26 | #include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr |
27 | #include "bthread/errno.h" // Redefine errno |
28 | #include "butil/endpoint.h" // butil::EndPoint |
29 | #include "butil/iobuf.h" // butil::IOBuf |
30 | #include "bthread/types.h" // bthread_id_t |
31 | #include "brpc/options.pb.h" // CompressType |
32 | #include "brpc/errno.pb.h" // error code |
33 | #include "brpc/http_header.h" // HttpHeader |
34 | #include "brpc/authenticator.h" // AuthContext |
35 | #include "brpc/socket_id.h" // SocketId |
36 | #include "brpc/stream.h" // StreamId |
37 | #include "brpc/stream_creator.h" // StreamCreator |
38 | #include "brpc/protocol.h" // Protocol |
39 | #include "brpc/traceprintf.h" |
40 | #include "brpc/reloadable_flags.h" |
41 | #include "brpc/closure_guard.h" // User often needs this |
42 | #include "brpc/callback.h" |
43 | #include "brpc/progressive_attachment.h" // ProgressiveAttachment |
44 | #include "brpc/progressive_reader.h" // ProgressiveReader |
45 | #include "brpc/grpc.h" |
46 | |
47 | // EAUTH is defined in MAC |
48 | #ifndef EAUTH |
49 | #define EAUTH ERPCAUTH |
50 | #endif |
51 | |
52 | extern "C" { |
53 | #ifndef USE_MESALINK |
54 | struct x509_st; |
55 | #else |
56 | #include <mesalink/openssl/x509.h> |
57 | #define x509_st X509 |
58 | #endif |
59 | } |
60 | |
61 | namespace brpc { |
62 | class Span; |
63 | class Server; |
64 | class SharedLoadBalancer; |
65 | class ExcludedServers; |
66 | class RPCSender; |
67 | class StreamSettings; |
68 | class SampledRequest; |
69 | class MongoContext; |
70 | class RetryPolicy; |
71 | class InputMessageBase; |
72 | class ThriftStub; |
73 | namespace policy { |
74 | class OnServerStreamCreated; |
75 | void ProcessMongoRequest(InputMessageBase*); |
76 | void ProcessThriftRequest(InputMessageBase*); |
77 | } |
78 | namespace schan { |
79 | class Sender; |
80 | class SubDone; |
81 | } |
82 | |
83 | // For serializing/parsing from idl services. |
84 | struct IdlNames { |
85 | const char* request_name; // must be string-constant |
86 | const char* response_name; // must be string-constant |
87 | }; |
88 | |
89 | extern const IdlNames idl_single_req_single_res; |
90 | extern const IdlNames idl_single_req_multi_res; |
91 | extern const IdlNames idl_multi_req_single_res; |
92 | extern const IdlNames idl_multi_req_multi_res; |
93 | |
94 | // The identifier to be associated with a RPC call. |
95 | typedef bthread_id_t CallId; |
96 | |
97 | // Styles for stopping progressive attachment. |
98 | enum StopStyle { |
99 | FORCE_STOP, |
100 | WAIT_FOR_STOP, |
101 | }; |
102 | |
103 | const int32_t UNSET_MAGIC_NUM = -123456789; |
104 | |
105 | // A Controller mediates a single method call. The primary purpose of |
106 | // the controller is to provide a way to manipulate settings per RPC-call |
107 | // and to find out about RPC-level errors. |
108 | class Controller : public google::protobuf::RpcController/*non-copyable*/ { |
109 | friend class Channel; |
110 | friend class ParallelChannel; |
111 | friend class ParallelChannelDone; |
112 | friend class ControllerPrivateAccessor; |
113 | friend class ServerPrivateAccessor; |
114 | friend class SelectiveChannel; |
115 | friend class ThriftStub; |
116 | friend class schan::Sender; |
117 | friend class schan::SubDone; |
118 | friend class policy::OnServerStreamCreated; |
119 | friend int StreamCreate(StreamId*, Controller&, const StreamOptions*); |
120 | friend int StreamAccept(StreamId*, Controller&, const StreamOptions*); |
121 | friend void policy::ProcessMongoRequest(InputMessageBase*); |
122 | friend void policy::ProcessThriftRequest(InputMessageBase*); |
123 | // << Flags >> |
124 | static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1; |
125 | static const uint32_t FLAGS_SECURITY_MODE = (1 << 1); |
126 | static const uint32_t FLAGS_ADDED_CONCURRENCY = (1 << 2); |
127 | static const uint32_t FLAGS_READ_PROGRESSIVELY = (1 << 3); |
128 | static const uint32_t FLAGS_PROGRESSIVE_READER = (1 << 4); |
129 | static const uint32_t FLAGS_BACKUP_REQUEST = (1 << 5); |
130 | // Let _done delete the correlation_id, used by combo channels to |
131 | // make lifetime of the correlation_id more flexible. |
132 | static const uint32_t FLAGS_DESTROY_CID_IN_DONE = (1 << 7); |
133 | static const uint32_t FLAGS_CLOSE_CONNECTION = (1 << 8); |
134 | static const uint32_t FLAGS_LOG_ID = (1 << 9); // log_id is set |
135 | static const uint32_t FLAGS_REQUEST_CODE = (1 << 10); |
136 | static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11); |
137 | static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12); |
138 | static const uint32_t FLAGS_USED_BY_RPC = (1 << 13); |
139 | static const uint32_t FLAGS_REQUEST_WITH_AUTH = (1 << 15); |
140 | static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16); |
141 | static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17); |
142 | static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18); |
143 | static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19); |
144 | |
145 | public: |
146 | Controller(); |
147 | ~Controller(); |
148 | |
149 | // ------------------------------------------------------------------ |
150 | // Client-side methods |
151 | // These calls shall be made from the client side only. Their results |
152 | // are undefined on the server side (may crash). |
153 | // ------------------------------------------------------------------ |
154 | |
155 | // Set/get timeout in milliseconds for the RPC call. Use |
156 | // ChannelOptions.timeout_ms on unset. |
157 | void set_timeout_ms(int64_t timeout_ms); |
158 | int64_t timeout_ms() const { return _timeout_ms; } |
159 | |
160 | // Set/get the delay to send backup request in milliseconds. Use |
161 | // ChannelOptions.backup_request_ms on unset. |
162 | void set_backup_request_ms(int64_t timeout_ms); |
163 | int64_t backup_request_ms() const { return _backup_request_ms; } |
164 | |
165 | // Set/get maximum times of retrying. Use ChannelOptions.max_retry on unset. |
166 | // <=0 means no retry. |
167 | // Conditions of retrying: |
168 | // * The connection is broken. No retry if the connection is still on. |
169 | // Use backup_request if you want to issue another request after some |
170 | // time. |
171 | // * Not timed out. |
172 | // * retried_count() < max_retry(). |
173 | // * Retry may work for the error. E.g. No retry when the request is |
174 | // incorrect (EREQUEST), retrying is pointless. |
175 | void set_max_retry(int max_retry); |
176 | int max_retry() const { return _max_retry; } |
177 | |
178 | // Get number of retries. |
179 | int retried_count() const { return _current_call.nretry; } |
180 | |
181 | // True if a backup request was sent during the RPC. |
182 | bool has_backup_request() const { return has_flag(FLAGS_BACKUP_REQUEST); } |
183 | |
184 | // This function has different meanings in client and server side. |
185 | // In client side it gets latency of the RPC call. While in server side, |
186 | // it gets queue time before server processes the RPC call. |
187 | int64_t latency_us() const { |
188 | if (_end_time_us == UNSET_MAGIC_NUM) { |
189 | return butil::cpuwide_time_us() - _begin_time_us; |
190 | } |
191 | return _end_time_us - _begin_time_us; |
192 | } |
193 | |
194 | // Response of the RPC call (passed to CallMethod) |
195 | google::protobuf::Message* response() const { return _response; } |
196 | |
197 | // An identifier to send to server along with request. This is widely used |
198 | // throughout baidu's servers to tag a searching session (a series of |
199 | // queries following the topology of servers) with a same log_id. |
200 | void set_log_id(uint64_t log_id); |
201 | |
202 | // Set type of service: http://en.wikipedia.org/wiki/Type_of_service |
203 | // Current implementation has limits: If the connection is already |
204 | // established, this setting has no effect until the connection is broken |
205 | // and re-connected. And because of connection sharing, setting different |
206 | // tos to a single connection is undefined. |
207 | void set_type_of_service(short tos) { _tos = tos; } |
208 | |
209 | // Set type of connections for sending RPC. |
210 | // Use ChannelOptions.connection_type on unset. |
211 | void set_connection_type(ConnectionType type) { _connection_type = type; } |
212 | |
213 | // Set compression method for request. |
214 | void set_request_compress_type(CompressType t) { _request_compress_type = t; } |
215 | |
216 | // Required by some load balancers. |
217 | void set_request_code(uint64_t request_code) { |
218 | add_flag(FLAGS_REQUEST_CODE); |
219 | _request_code = request_code; |
220 | } |
221 | bool has_request_code() const { return has_flag(FLAGS_REQUEST_CODE); } |
222 | uint64_t request_code() const { return _request_code; } |
223 | |
224 | // Mutable header of http request. |
225 | HttpHeader& http_request() { |
226 | if (_http_request == NULL) { |
227 | _http_request = new HttpHeader; |
228 | } |
229 | return *_http_request; |
230 | } |
231 | bool has_http_request() const { return _http_request; } |
232 | HttpHeader* release_http_request() { |
233 | HttpHeader* const tmp = _http_request; |
234 | _http_request = NULL; |
235 | return tmp; |
236 | } |
237 | |
238 | // User attached data or body of http request, which is wired to network |
239 | // directly instead of being serialized into protobuf messages. |
240 | butil::IOBuf& request_attachment() { return _request_attachment; } |
241 | |
242 | ConnectionType connection_type() const { return _connection_type; } |
243 | // Get the called method. May-be NULL for non-pb services. |
244 | const google::protobuf::MethodDescriptor* method() const { return _method; } |
245 | |
246 | // Get the controllers for accessing sub channels in combo channels. |
247 | // Ordinary channel: |
248 | // sub_count() is 0 and sub() is always NULL. |
249 | // ParallelChannel/PartitionChannel: |
250 | // sub_count() is #sub-channels and sub(i) is the controller for |
251 | // accessing i-th sub channel inside ParallelChannel, if i is outside |
252 | // [0, sub_count() - 1], sub(i) is NULL. |
253 | // NOTE: You must test sub() against NULL, ALWAYS. Even if i is inside |
254 | // range, sub(i) can still be NULL: |
255 | // * the rpc call may fail and terminate before accessing the sub channel |
256 | // * the sub channel was skipped |
257 | // SelectiveChannel/DynamicPartitionChannel: |
258 | // sub_count() is always 1 and sub(0) is the controller of successful |
259 | // or last call to sub channels. |
260 | int sub_count() const; |
261 | const Controller* sub(int index) const; |
262 | |
263 | // Get/own SampledRequest for sending dumped requests. |
264 | // Deleted along with controller. |
265 | void reset_sampled_request(SampledRequest* req); |
266 | const SampledRequest* sampled_request() { return _sampled_request; } |
267 | |
268 | // Attach a StreamCreator to this RPC. Notice that the ownership of sc has |
269 | // been transferred to cntl, and sc->DestroyStreamCreator() would be called |
270 | // only once to destroy sc. |
271 | void set_stream_creator(StreamCreator* sc); |
272 | |
273 | // Make the RPC end when the HTTP response has complete headers and let |
274 | // user read the remaining body by using ReadProgressiveAttachmentBy(). |
275 | void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); } |
276 | // True if response_will_be_read_progressively() was called. |
277 | bool is_response_read_progressively() const { return has_flag(FLAGS_READ_PROGRESSIVELY); } |
278 | |
279 | // Read the remaining body after RPC: |
280 | // - This function can only be called once. |
281 | // - If user called response_will_be_read_progressively() but |
282 | // ReadProgressiveAttachmentBy(), controller will set a reader ignoring |
283 | // all bytes read before self's Reset() or dtor. |
284 | // - If user did not call response_will_be_read_progressively() and calls |
285 | // ReadProgressiveAttachmentBy(), the reader is Destroyed() immediately. |
286 | // - Any error occurred will destroy the reader by calling r->Destroy(). |
287 | // - r->Destroy() is guaranteed to be called once and only once. |
288 | void ReadProgressiveAttachmentBy(ProgressiveReader* r); |
289 | |
290 | // True if ReadProgressiveAttachmentBy() was ever called successfully. |
291 | bool has_progressive_reader() const { return has_flag(FLAGS_PROGRESSIVE_READER); } |
292 | |
293 | // RPC may fail with EOVERCROWDED if the socket to write is too full |
294 | // (limited by -socket_max_unwritten_bytes). In some scenarios, user |
295 | // may wish to suppress the error completely. To do this, call this |
296 | // method before doing the RPC. |
297 | void ignore_eovercrowded() { add_flag(FLAGS_IGNORE_EOVERCROWDED); } |
298 | |
299 | // Set if the field of bytes in protobuf message should be encoded |
300 | // to base64 string in HTTP request. |
301 | void set_pb_bytes_to_base64(bool f) { set_flag(FLAGS_PB_BYTES_TO_BASE64, f); } |
302 | bool has_pb_bytes_to_base64() const { return has_flag(FLAGS_PB_BYTES_TO_BASE64); } |
303 | |
304 | // Set if convert the repeated field that has no entry to a empty array |
305 | // of json in HTTP response. |
306 | void set_pb_jsonify_empty_array(bool f) { set_flag(FLAGS_PB_JSONIFY_EMPTY_ARRAY, f); } |
307 | bool has_pb_jsonify_empty_array() const { return has_flag(FLAGS_PB_JSONIFY_EMPTY_ARRAY); } |
308 | |
309 | // Whether to always print primitive fields. By default proto3 primitive |
310 | // fields with default values will be omitted in JSON output. For example, an |
311 | // int32 field set to 0 will be omitted. Set this flag to true will override |
312 | // the default behavior and print primitive fields regardless of their values. |
313 | void set_always_print_primitive_fields(bool f) { set_flag(FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS, f); } |
314 | bool has_always_print_primitive_fields() const { return has_flag(FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS); } |
315 | |
316 | |
317 | // Tell RPC that done of the RPC can be run in the same thread where |
318 | // the RPC is issued, otherwise done is always run in a different thread. |
319 | // In current implementation, this option only affects RPC that fails |
320 | // before sending the request. |
321 | // This option is *rarely* needed by ordinary users. Don't set this option |
322 | // if you don't know the consequences. Read implementions in channel.cpp |
323 | // and controller.cpp to know more. |
324 | void allow_done_to_run_in_place() |
325 | { add_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); } |
326 | // True iff above method was called. |
327 | bool is_done_allowed_to_run_in_place() const |
328 | { return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); } |
329 | |
330 | // ------------------------------------------------------------------------ |
331 | // Server-side methods. |
332 | // These calls shall be made from the server side only. Their results are |
333 | // undefined on the client side (may crash). |
334 | // ------------------------------------------------------------------------ |
335 | |
336 | // Returns true if the client canceled the RPC or the connection has broken, |
337 | // so the server may as well give up on replying to it. The server should still |
338 | // call the final "done" callback. |
339 | // Note: Reaching deadline of the RPC would not affect this function, which means |
340 | // even if deadline has been reached, this function may still return false. |
341 | bool IsCanceled() const override; |
342 | |
343 | // Asks that the given callback be called when the RPC is canceled or the |
344 | // connection has broken. The callback will always be called exactly once. |
345 | // If the RPC completes without being canceled/broken connection, the callback |
346 | // will be called after completion. If the RPC has already been canceled/broken |
347 | // when NotifyOnCancel() is called, the callback will be called immediately. |
348 | // |
349 | // NotifyOnCancel() must be called no more than once per request. |
350 | void NotifyOnCancel(google::protobuf::Closure* callback) override; |
351 | |
352 | // Returns the authenticated result. NULL if there is no authentication |
353 | const AuthContext* auth_context() const { return _auth_context; } |
354 | |
355 | // Whether the underlying channel is using SSL |
356 | bool is_ssl() const; |
357 | |
358 | // Get the peer certificate, which can be printed by ostream |
359 | x509_st* get_peer_certificate() const; |
360 | |
361 | // Mutable header of http response. |
362 | HttpHeader& http_response() { |
363 | if (_http_response == NULL) { |
364 | _http_response = new HttpHeader; |
365 | } |
366 | return *_http_response; |
367 | } |
368 | bool has_http_response() const { return _http_response; } |
369 | HttpHeader* release_http_response() { |
370 | HttpHeader* const tmp = _http_response; |
371 | _http_response = NULL; |
372 | return tmp; |
373 | } |
374 | |
375 | // User attached data or body of http response, which is wired to network |
376 | // directly instead of being serialized into protobuf messages. |
377 | butil::IOBuf& response_attachment() { return _response_attachment; } |
378 | |
379 | // Create a ProgressiveAttachment to write (often after RPC). |
380 | // If `stop_style' is FORCE_STOP, the underlying socket will be failed |
381 | // immediately when the socket becomes idle or server is stopped. |
382 | // Default value of `stop_style' is WAIT_FOR_STOP. |
383 | ProgressiveAttachment* |
384 | CreateProgressiveAttachment(StopStyle stop_style = WAIT_FOR_STOP); |
385 | bool has_progressive_writer() const { return _wpa != NULL; } |
386 | |
387 | // Set compression method for response. |
388 | void set_response_compress_type(CompressType t) { _response_compress_type = t; } |
389 | |
390 | // Non-zero when this RPC call is traced (by rpcz or rig). |
391 | // NOTE: Only valid at server-side, always zero at client-side. |
392 | uint64_t trace_id() const; |
393 | uint64_t span_id() const; |
394 | |
395 | // Tell RPC to close the connection instead of sending back response. |
396 | // If this controller was not SetFailed() before, ErrorCode() will be |
397 | // set to ECLOSE. |
398 | // NOTE: the underlying connection is not closed immediately. |
399 | void CloseConnection(const char* reason_fmt, ...); |
400 | |
401 | // True if CloseConnection() was called. |
402 | bool IsCloseConnection() const { return has_flag(FLAGS_CLOSE_CONNECTION); } |
403 | |
404 | // ServerOptions.security_mode is turned on, and the RPC is from |
405 | // connections accepted from port (rather than internal_port) |
406 | bool is_security_mode() const { return has_flag(FLAGS_SECURITY_MODE); } |
407 | |
408 | // The server running this RPC session. |
409 | // Always NULL at client-side. |
410 | const Server* server() const { return _server; } |
411 | |
412 | // Get the data attached to current RPC session. The data is created by |
413 | // ServerOptions.session_local_data_factory and reused between different |
414 | // RPC. If factory is NULL, this method returns NULL. |
415 | void* session_local_data(); |
416 | |
417 | // Get the data attached to a mongo session(practically a socket). |
418 | MongoContext* mongo_session_data() { return _mongo_session_data.get(); } |
419 | |
420 | // ------------------------------------------------------------------- |
421 | // Both-side methods. |
422 | // Following methods can be called from both client and server. But they |
423 | // may have different or opposite semantics. |
424 | // ------------------------------------------------------------------- |
425 | |
426 | // Client-side: successful or last server called. Accessible from |
427 | // PackXXXRequest() in protocols. |
428 | // Server-side: returns the client sending the request |
429 | butil::EndPoint remote_side() const { return _remote_side; } |
430 | |
431 | // Client-side: the local address for talking with server, undefined until |
432 | // this RPC succeeds (because the connection may not be established |
433 | // before RPC). |
434 | // Server-side: the address that clients access. |
435 | butil::EndPoint local_side() const { return _local_side; } |
436 | |
437 | // Protocol of the request sent by client or received by server. |
438 | ProtocolType request_protocol() const { return _request_protocol; } |
439 | |
440 | // Resets the Controller to its initial state so that it may be reused in |
441 | // a new call. Must NOT be called while an RPC is in progress. |
442 | void Reset() override { |
443 | ResetNonPods(); |
444 | ResetPods(); |
445 | } |
446 | |
447 | // Causes Failed() to return true on the client side. "reason" will be |
448 | // incorporated into the message returned by ErrorText(). |
449 | // NOTE: Change http_response().status_code() according to `error_code' |
450 | // as well if the protocol is HTTP. If you want to overwrite the |
451 | // status_code, call http_response().set_status_code() after SetFailed() |
452 | // (rather than before SetFailed) |
453 | void SetFailed(const std::string& reason) override; |
454 | void SetFailed(int error_code, const char* reason_fmt, ...) |
455 | __attribute__ ((__format__ (__printf__, 3, 4))); |
456 | |
457 | // After a call has finished, returns true if the RPC call failed. |
458 | // The response to Channel is undefined when Failed() is true. |
459 | // Calling Failed() before a call has finished is undefined. |
460 | bool Failed() const override; |
461 | |
462 | // If Failed() is true, return description of the errors. |
463 | // NOTE: ErrorText() != berror(ErrorCode()). |
464 | std::string ErrorText() const override; |
465 | |
466 | // Last error code. Equals 0 iff Failed() is false. |
467 | // If there's retry, latter code overwrites former one. |
468 | int ErrorCode() const { return _error_code; } |
469 | |
470 | // Getters: |
471 | bool has_log_id() const { return has_flag(FLAGS_LOG_ID); } |
472 | uint64_t log_id() const { return _log_id; } |
473 | CompressType request_compress_type() const { return _request_compress_type; } |
474 | CompressType response_compress_type() const { return _response_compress_type; } |
475 | const HttpHeader& http_request() const |
476 | { return _http_request != NULL ? *_http_request : DefaultHttpHeader(); } |
477 | |
478 | const HttpHeader& http_response() const |
479 | { return _http_response != NULL ? *_http_response : DefaultHttpHeader(); } |
480 | |
481 | const butil::IOBuf& request_attachment() const { return _request_attachment; } |
482 | const butil::IOBuf& response_attachment() const { return _response_attachment; } |
483 | |
484 | // Return true if the remote side creates a stream. |
485 | bool has_remote_stream() { return _remote_stream_settings != NULL; } |
486 | |
487 | // The id to cancel RPC call or join response. |
488 | CallId call_id(); |
489 | |
490 | // Get/set idl names. Notice that the names must be string-constant. |
491 | // int32_t Echo(EchoRequest req, EchoResponse res); |
492 | // ^ ^ |
493 | // request_name response_name |
494 | void set_idl_names(const IdlNames& names) { _idl_names = names; } |
495 | IdlNames idl_names() const { return _idl_names; } |
496 | |
497 | // Get/set idl result. The type is limited to be integral. |
498 | // int32_t Echo(EchoRequest req, EchoResponse res); |
499 | // ^ |
500 | // result |
501 | void set_idl_result(int64_t result) { _idl_result = result; } |
502 | int64_t idl_result() const { return _idl_result; } |
503 | |
504 | const std::string& thrift_method_name() { return _thrift_method_name; } |
505 | |
506 | // Get sock option. .e.g get vip info through ttm kernel module hook, |
507 | int GetSockOption(int level, int optname, void* optval, socklen_t* optlen); |
508 | |
509 | // Get deadline of this RPC (since the Epoch in microseconds). |
510 | // -1 means no deadline. |
511 | int64_t deadline_us() const { return _deadline_us; } |
512 | |
513 | private: |
514 | struct CompletionInfo { |
515 | CallId id; // call_id of the corresponding request |
516 | bool responded; // triggered by a response rather than other errors |
517 | }; |
518 | |
519 | // Call this method when receiving response/failure. If RPC failed, |
520 | // it will try to retry this RPC. Otherwise, it calls user `done' |
521 | // if it exists and destroys the correlation_id. Note that |
522 | // the correlation_id MUST have been locked before this call. |
523 | // Parameter `new_bthread': |
524 | // false - Run this function in the current bthread/pthread. Note that |
525 | // it could last for a long time or even block the caller (as |
526 | // it contains user's `done') |
527 | // true - Creates a new bthread to run this function and returns to |
528 | // the caller immediately |
529 | // Parameter `id': |
530 | // It will be used to checked against `_correlation_id' and |
531 | // `_current_call.nretry'. If not matched, nothing will happen, |
532 | // which means this event has been processed before |
533 | // Parameter `saved_error': |
534 | // If the above check failed, `_error_code' will be reverted to this |
535 | void OnVersionedRPCReturned(const CompletionInfo&, |
536 | bool new_bthread, int saved_error); |
537 | |
538 | static void* RunEndRPC(void* arg); |
539 | void EndRPC(const CompletionInfo&); |
540 | |
541 | static int HandleSocketFailed(bthread_id_t, void* data, int error_code, |
542 | const std::string& error_text); |
543 | void HandleSendFailed(); |
544 | |
545 | static int RunOnCancel(bthread_id_t, void* data, int error_code); |
546 | |
547 | void set_auth_context(const AuthContext* ctx); |
548 | |
549 | // MongoContext is created by ParseMongoRequest when the first msg comes |
550 | // over a socket, then stored in MongoContextMessage of the socket. cntl |
551 | // gets a shared reference of the data in PocessMongoRequest. When socket |
552 | // is recycled, the container, AKA MongoContextMessage is destroyed, which |
553 | // has no infuluence on the cntl(s) who already gets the shared reference |
554 | // of the MongoContext. The MongoContext will not be recycled until both |
555 | // the container(MongoContextMessage) and all related cntl(s) are recycled. |
556 | void set_mongo_session_data(MongoContext* data); |
557 | |
558 | // Reset POD/non-POD fields. |
559 | void ResetPods(); |
560 | void ResetNonPods(); |
561 | |
562 | void StartCancel() override; |
563 | |
564 | // Using fixed start_realtime_us (microseconds since the Epoch) gives |
565 | // more accurate deadline. |
566 | void IssueRPC(int64_t start_realtime_us); |
567 | |
568 | struct ClientSettings { |
569 | int32_t timeout_ms; |
570 | int32_t backup_request_ms; |
571 | int max_retry; |
572 | int32_t tos; |
573 | ConnectionType connection_type; |
574 | CompressType request_compress_type; |
575 | uint64_t log_id; |
576 | bool has_request_code; |
577 | int64_t request_code; |
578 | }; |
579 | |
580 | void SaveClientSettings(ClientSettings*) const; |
581 | void ApplyClientSettings(const ClientSettings&); |
582 | |
583 | bool FailedInline() const { return _error_code; } |
584 | |
585 | CallId get_id(int nretry) const { |
586 | CallId id = { _correlation_id.value + nretry + 1 }; |
587 | return id; |
588 | } |
589 | |
590 | // Tell RPC that this particular call is used to do health check. |
591 | bool is_health_check_call() const { return has_flag(FLAGS_HEALTH_CHECK_CALL); } |
592 | |
593 | public: |
594 | CallId current_id() const { |
595 | CallId id = { _correlation_id.value + _current_call.nretry + 1 }; |
596 | return id; |
597 | } |
598 | private: |
599 | |
600 | // Append server information to `_error_text' |
601 | void AppendServerIdentiy(); |
602 | |
603 | // Contexts for tracking and ending a sent request. |
604 | // One RPC to a channel may send several requests due to retrying. |
605 | struct Call { |
606 | Call() { Reset(); } |
607 | Call(Call*); //move semantics |
608 | ~Call(); |
609 | void Reset(); |
610 | void OnComplete(Controller* c, int error_code, bool responded, bool end_of_rpc); |
611 | |
612 | int nretry; // sent in nretry-th retry. |
613 | bool need_feedback; // The LB needs feedback. |
614 | bool enable_circuit_breaker; // The channel enabled circuit_breaker |
615 | bool touched_by_stream_creator; |
616 | SocketId peer_id; // main server id |
617 | int64_t begin_time_us; // sent real time. |
618 | // The actual `Socket' for sending RPC. It's socket id will be |
619 | // exactly the same as `peer_id' if `_connection_type' is |
620 | // CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary |
621 | // socket fetched from socket pool |
622 | SocketUniquePtr sending_sock; |
623 | StreamUserData* stream_user_data; |
624 | }; |
625 | |
626 | void HandleStreamConnection(Socket *host_socket); |
627 | |
628 | bool SingleServer() const { return _single_server_id != INVALID_SOCKET_ID; } |
629 | |
630 | void SubmitSpan(); |
631 | |
632 | void OnRPCBegin(int64_t begin_time_us) { |
633 | _begin_time_us = begin_time_us; |
634 | // make latency_us() return 0 when RPC is not over |
635 | _end_time_us = begin_time_us; |
636 | } |
637 | |
638 | void OnRPCEnd(int64_t end_time_us) { |
639 | _end_time_us = end_time_us; |
640 | } |
641 | |
642 | static void RunDoneInBackupThread(void*); |
643 | void DoneInBackupThread(); |
644 | |
645 | // Utilities for manipulating _flags |
646 | inline void add_flag(uint32_t f) { _flags |= f; } |
647 | inline void clear_flag(uint32_t f) { _flags &= ~f; } |
648 | inline void set_flag(uint32_t f, bool t) |
649 | { return t ? add_flag(f) : clear_flag(f); } |
650 | inline bool has_flag(uint32_t f) const { return _flags & f; } |
651 | |
652 | void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); } |
653 | bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); } |
654 | |
655 | bool has_enabled_circuit_breaker() const { |
656 | return has_flag(FLAGS_ENABLED_CIRCUIT_BREAKER); |
657 | } |
658 | |
659 | std::string& protocol_param() { return _thrift_method_name; } |
660 | const std::string& protocol_param() const { return _thrift_method_name; } |
661 | |
662 | private: |
663 | // NOTE: align and group fields to make Controller as compact as possible. |
664 | |
665 | Span* _span; |
666 | uint32_t _flags; // all boolean fields inside Controller |
667 | int32_t _error_code; |
668 | std::string _error_text; |
669 | butil::EndPoint _remote_side; |
670 | butil::EndPoint _local_side; |
671 | |
672 | void* _session_local_data; |
673 | const Server* _server; |
674 | bthread_id_t _oncancel_id; |
675 | const AuthContext* _auth_context; // Authentication result |
676 | butil::intrusive_ptr<MongoContext> _mongo_session_data; |
677 | SampledRequest* _sampled_request; |
678 | |
679 | ProtocolType _request_protocol; |
680 | // Some of them are copied from `Channel' which might be destroyed |
681 | // after CallMethod. |
682 | int _max_retry; |
683 | const RetryPolicy* _retry_policy; |
684 | // Synchronization object for one RPC call. It remains unchanged even |
685 | // when retry happens. Synchronous RPC will wait on this id. |
686 | CallId _correlation_id; |
687 | |
688 | ConnectionType _connection_type; |
689 | |
690 | // Used by ParallelChannel |
691 | int _fail_limit; |
692 | |
693 | uint32_t _pipelined_count; |
694 | |
695 | // [Timeout related] |
696 | int32_t _timeout_ms; |
697 | int32_t _connect_timeout_ms; |
698 | int32_t _backup_request_ms; |
699 | // Deadline of this RPC (since the Epoch in microseconds). |
700 | int64_t _deadline_us; |
701 | // Timer registered to trigger RPC timeout event |
702 | bthread_timer_t _timeout_id; |
703 | |
704 | // Begin/End time of a single RPC call (since Epoch in microseconds) |
705 | int64_t _begin_time_us; |
706 | int64_t _end_time_us; |
707 | short _tos; // Type of service. |
708 | // The index of parse function which `InputMessenger' will use |
709 | int _preferred_index; |
710 | CompressType _request_compress_type; |
711 | CompressType _response_compress_type; |
712 | uint64_t _log_id; |
713 | int _pchan_sub_count; |
714 | google::protobuf::Message* _response; |
715 | google::protobuf::Closure* _done; |
716 | RPCSender* _sender; |
717 | uint64_t _request_code; |
718 | SocketId _single_server_id; |
719 | butil::intrusive_ptr<SharedLoadBalancer> _lb; |
720 | |
721 | // for passing parameters to created bthread, don't modify it otherwhere. |
722 | CompletionInfo _tmp_completion_info; |
723 | |
724 | Call _current_call; |
725 | Call* _unfinished_call; |
726 | ExcludedServers* _accessed; |
727 | |
728 | StreamCreator* _stream_creator; |
729 | |
730 | // Fields will be used when making requests |
731 | Protocol::PackRequest _pack_request; |
732 | const google::protobuf::MethodDescriptor* _method; |
733 | const Authenticator* _auth; |
734 | butil::IOBuf _request_buf; |
735 | IdlNames _idl_names; |
736 | int64_t _idl_result; |
737 | |
738 | HttpHeader* _http_request; |
739 | HttpHeader* _http_response; |
740 | |
741 | // Fields with large size but low access frequency |
742 | butil::IOBuf _request_attachment; |
743 | butil::IOBuf _response_attachment; |
744 | |
745 | // Writable progressive attachment |
746 | butil::intrusive_ptr<ProgressiveAttachment> _wpa; |
747 | // Readable progressive attachment |
748 | butil::intrusive_ptr<ReadableProgressiveAttachment> _rpa; |
749 | |
750 | // TODO: Replace following fields with StreamCreator |
751 | // Defined at client side |
752 | StreamId _request_stream; |
753 | // Defined at server side |
754 | StreamId _response_stream; |
755 | // Defined at both sides |
756 | StreamSettings *_remote_stream_settings; |
757 | |
758 | // Thrift method name, only used when thrift protocol enabled |
759 | std::string _thrift_method_name; |
760 | }; |
761 | |
762 | // Advises the RPC system that the caller desires that the RPC call be |
763 | // canceled. If the call is canceled, the "done" callback will still be |
764 | // called and the Controller will indicate that the call failed at that |
765 | // time. |
766 | void StartCancel(CallId id); |
767 | |
768 | // Suspend until the RPC finishes. |
769 | void Join(CallId id); |
770 | |
771 | // Get a global closure for doing nothing. Used in semi-synchronous |
772 | // RPC calls. Example: |
773 | // stub1.method1(&cntl1, &request1, &response1, brpc::DoNothing()); |
774 | // stub2.method2(&cntl2, &request2, &response2, brpc::DoNothing()); |
775 | // ... |
776 | // brpc::Join(cntl1.call_id()); |
777 | // brpc::Join(cntl2.call_id()); |
778 | google::protobuf::Closure* DoNothing(); |
779 | |
780 | // Convert non-web symbols to web equivalence. |
781 | void WebEscape(const std::string& source, std::string* output); |
782 | |
783 | // True if Ctrl-C is ever pressed. |
784 | bool IsAskedToQuit(); |
785 | |
786 | // Send Ctrl-C to current process. |
787 | void AskToQuit(); |
788 | |
789 | } // namespace brpc |
790 | |
791 | |
792 | #endif // BRPC_CONTROLLER_H |
793 | |