1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18
19#ifndef BRPC_CONTROLLER_H
20#define BRPC_CONTROLLER_H
21
22// To brpc developers: This is a header included by user, don't depend
23// on internal structures, use opaque pointers instead.
24
25#include <gflags/gflags.h> // Users often need gflags
26#include "butil/intrusive_ptr.hpp" // butil::intrusive_ptr
27#include "bthread/errno.h" // Redefine errno
28#include "butil/endpoint.h" // butil::EndPoint
29#include "butil/iobuf.h" // butil::IOBuf
30#include "bthread/types.h" // bthread_id_t
31#include "brpc/options.pb.h" // CompressType
32#include "brpc/errno.pb.h" // error code
33#include "brpc/http_header.h" // HttpHeader
34#include "brpc/authenticator.h" // AuthContext
35#include "brpc/socket_id.h" // SocketId
36#include "brpc/stream.h" // StreamId
37#include "brpc/stream_creator.h" // StreamCreator
38#include "brpc/protocol.h" // Protocol
39#include "brpc/traceprintf.h"
40#include "brpc/reloadable_flags.h"
41#include "brpc/closure_guard.h" // User often needs this
42#include "brpc/callback.h"
43#include "brpc/progressive_attachment.h" // ProgressiveAttachment
44#include "brpc/progressive_reader.h" // ProgressiveReader
45#include "brpc/grpc.h"
46
47// EAUTH is defined in MAC
48#ifndef EAUTH
49#define EAUTH ERPCAUTH
50#endif
51
52extern "C" {
53#ifndef USE_MESALINK
54struct x509_st;
55#else
56#include <mesalink/openssl/x509.h>
57#define x509_st X509
58#endif
59}
60
61namespace brpc {
62class Span;
63class Server;
64class SharedLoadBalancer;
65class ExcludedServers;
66class RPCSender;
67class StreamSettings;
68class SampledRequest;
69class MongoContext;
70class RetryPolicy;
71class InputMessageBase;
72class ThriftStub;
73namespace policy {
74class OnServerStreamCreated;
75void ProcessMongoRequest(InputMessageBase*);
76void ProcessThriftRequest(InputMessageBase*);
77}
78namespace schan {
79class Sender;
80class SubDone;
81}
82
83// For serializing/parsing from idl services.
84struct IdlNames {
85 const char* request_name; // must be string-constant
86 const char* response_name; // must be string-constant
87};
88
89extern const IdlNames idl_single_req_single_res;
90extern const IdlNames idl_single_req_multi_res;
91extern const IdlNames idl_multi_req_single_res;
92extern const IdlNames idl_multi_req_multi_res;
93
94// The identifier to be associated with a RPC call.
95typedef bthread_id_t CallId;
96
97// Styles for stopping progressive attachment.
98enum StopStyle {
99 FORCE_STOP,
100 WAIT_FOR_STOP,
101};
102
103const int32_t UNSET_MAGIC_NUM = -123456789;
104
105// A Controller mediates a single method call. The primary purpose of
106// the controller is to provide a way to manipulate settings per RPC-call
107// and to find out about RPC-level errors.
108class Controller : public google::protobuf::RpcController/*non-copyable*/ {
109friend class Channel;
110friend class ParallelChannel;
111friend class ParallelChannelDone;
112friend class ControllerPrivateAccessor;
113friend class ServerPrivateAccessor;
114friend class SelectiveChannel;
115friend class ThriftStub;
116friend class schan::Sender;
117friend class schan::SubDone;
118friend class policy::OnServerStreamCreated;
119friend int StreamCreate(StreamId*, Controller&, const StreamOptions*);
120friend int StreamAccept(StreamId*, Controller&, const StreamOptions*);
121friend void policy::ProcessMongoRequest(InputMessageBase*);
122friend void policy::ProcessThriftRequest(InputMessageBase*);
123 // << Flags >>
124 static const uint32_t FLAGS_IGNORE_EOVERCROWDED = 1;
125 static const uint32_t FLAGS_SECURITY_MODE = (1 << 1);
126 static const uint32_t FLAGS_ADDED_CONCURRENCY = (1 << 2);
127 static const uint32_t FLAGS_READ_PROGRESSIVELY = (1 << 3);
128 static const uint32_t FLAGS_PROGRESSIVE_READER = (1 << 4);
129 static const uint32_t FLAGS_BACKUP_REQUEST = (1 << 5);
130 // Let _done delete the correlation_id, used by combo channels to
131 // make lifetime of the correlation_id more flexible.
132 static const uint32_t FLAGS_DESTROY_CID_IN_DONE = (1 << 7);
133 static const uint32_t FLAGS_CLOSE_CONNECTION = (1 << 8);
134 static const uint32_t FLAGS_LOG_ID = (1 << 9); // log_id is set
135 static const uint32_t FLAGS_REQUEST_CODE = (1 << 10);
136 static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11);
137 static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12);
138 static const uint32_t FLAGS_USED_BY_RPC = (1 << 13);
139 static const uint32_t FLAGS_REQUEST_WITH_AUTH = (1 << 15);
140 static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16);
141 static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17);
142 static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18);
143 static const uint32_t FLAGS_HEALTH_CHECK_CALL = (1 << 19);
144
145public:
146 Controller();
147 ~Controller();
148
149 // ------------------------------------------------------------------
150 // Client-side methods
151 // These calls shall be made from the client side only. Their results
152 // are undefined on the server side (may crash).
153 // ------------------------------------------------------------------
154
155 // Set/get timeout in milliseconds for the RPC call. Use
156 // ChannelOptions.timeout_ms on unset.
157 void set_timeout_ms(int64_t timeout_ms);
158 int64_t timeout_ms() const { return _timeout_ms; }
159
160 // Set/get the delay to send backup request in milliseconds. Use
161 // ChannelOptions.backup_request_ms on unset.
162 void set_backup_request_ms(int64_t timeout_ms);
163 int64_t backup_request_ms() const { return _backup_request_ms; }
164
165 // Set/get maximum times of retrying. Use ChannelOptions.max_retry on unset.
166 // <=0 means no retry.
167 // Conditions of retrying:
168 // * The connection is broken. No retry if the connection is still on.
169 // Use backup_request if you want to issue another request after some
170 // time.
171 // * Not timed out.
172 // * retried_count() < max_retry().
173 // * Retry may work for the error. E.g. No retry when the request is
174 // incorrect (EREQUEST), retrying is pointless.
175 void set_max_retry(int max_retry);
176 int max_retry() const { return _max_retry; }
177
178 // Get number of retries.
179 int retried_count() const { return _current_call.nretry; }
180
181 // True if a backup request was sent during the RPC.
182 bool has_backup_request() const { return has_flag(FLAGS_BACKUP_REQUEST); }
183
184 // This function has different meanings in client and server side.
185 // In client side it gets latency of the RPC call. While in server side,
186 // it gets queue time before server processes the RPC call.
187 int64_t latency_us() const {
188 if (_end_time_us == UNSET_MAGIC_NUM) {
189 return butil::cpuwide_time_us() - _begin_time_us;
190 }
191 return _end_time_us - _begin_time_us;
192 }
193
194 // Response of the RPC call (passed to CallMethod)
195 google::protobuf::Message* response() const { return _response; }
196
197 // An identifier to send to server along with request. This is widely used
198 // throughout baidu's servers to tag a searching session (a series of
199 // queries following the topology of servers) with a same log_id.
200 void set_log_id(uint64_t log_id);
201
202 // Set type of service: http://en.wikipedia.org/wiki/Type_of_service
203 // Current implementation has limits: If the connection is already
204 // established, this setting has no effect until the connection is broken
205 // and re-connected. And because of connection sharing, setting different
206 // tos to a single connection is undefined.
207 void set_type_of_service(short tos) { _tos = tos; }
208
209 // Set type of connections for sending RPC.
210 // Use ChannelOptions.connection_type on unset.
211 void set_connection_type(ConnectionType type) { _connection_type = type; }
212
213 // Set compression method for request.
214 void set_request_compress_type(CompressType t) { _request_compress_type = t; }
215
216 // Required by some load balancers.
217 void set_request_code(uint64_t request_code) {
218 add_flag(FLAGS_REQUEST_CODE);
219 _request_code = request_code;
220 }
221 bool has_request_code() const { return has_flag(FLAGS_REQUEST_CODE); }
222 uint64_t request_code() const { return _request_code; }
223
224 // Mutable header of http request.
225 HttpHeader& http_request() {
226 if (_http_request == NULL) {
227 _http_request = new HttpHeader;
228 }
229 return *_http_request;
230 }
231 bool has_http_request() const { return _http_request; }
232 HttpHeader* release_http_request() {
233 HttpHeader* const tmp = _http_request;
234 _http_request = NULL;
235 return tmp;
236 }
237
238 // User attached data or body of http request, which is wired to network
239 // directly instead of being serialized into protobuf messages.
240 butil::IOBuf& request_attachment() { return _request_attachment; }
241
242 ConnectionType connection_type() const { return _connection_type; }
243 // Get the called method. May-be NULL for non-pb services.
244 const google::protobuf::MethodDescriptor* method() const { return _method; }
245
246 // Get the controllers for accessing sub channels in combo channels.
247 // Ordinary channel:
248 // sub_count() is 0 and sub() is always NULL.
249 // ParallelChannel/PartitionChannel:
250 // sub_count() is #sub-channels and sub(i) is the controller for
251 // accessing i-th sub channel inside ParallelChannel, if i is outside
252 // [0, sub_count() - 1], sub(i) is NULL.
253 // NOTE: You must test sub() against NULL, ALWAYS. Even if i is inside
254 // range, sub(i) can still be NULL:
255 // * the rpc call may fail and terminate before accessing the sub channel
256 // * the sub channel was skipped
257 // SelectiveChannel/DynamicPartitionChannel:
258 // sub_count() is always 1 and sub(0) is the controller of successful
259 // or last call to sub channels.
260 int sub_count() const;
261 const Controller* sub(int index) const;
262
263 // Get/own SampledRequest for sending dumped requests.
264 // Deleted along with controller.
265 void reset_sampled_request(SampledRequest* req);
266 const SampledRequest* sampled_request() { return _sampled_request; }
267
268 // Attach a StreamCreator to this RPC. Notice that the ownership of sc has
269 // been transferred to cntl, and sc->DestroyStreamCreator() would be called
270 // only once to destroy sc.
271 void set_stream_creator(StreamCreator* sc);
272
273 // Make the RPC end when the HTTP response has complete headers and let
274 // user read the remaining body by using ReadProgressiveAttachmentBy().
275 void response_will_be_read_progressively() { add_flag(FLAGS_READ_PROGRESSIVELY); }
276 // True if response_will_be_read_progressively() was called.
277 bool is_response_read_progressively() const { return has_flag(FLAGS_READ_PROGRESSIVELY); }
278
279 // Read the remaining body after RPC:
280 // - This function can only be called once.
281 // - If user called response_will_be_read_progressively() but
282 // ReadProgressiveAttachmentBy(), controller will set a reader ignoring
283 // all bytes read before self's Reset() or dtor.
284 // - If user did not call response_will_be_read_progressively() and calls
285 // ReadProgressiveAttachmentBy(), the reader is Destroyed() immediately.
286 // - Any error occurred will destroy the reader by calling r->Destroy().
287 // - r->Destroy() is guaranteed to be called once and only once.
288 void ReadProgressiveAttachmentBy(ProgressiveReader* r);
289
290 // True if ReadProgressiveAttachmentBy() was ever called successfully.
291 bool has_progressive_reader() const { return has_flag(FLAGS_PROGRESSIVE_READER); }
292
293 // RPC may fail with EOVERCROWDED if the socket to write is too full
294 // (limited by -socket_max_unwritten_bytes). In some scenarios, user
295 // may wish to suppress the error completely. To do this, call this
296 // method before doing the RPC.
297 void ignore_eovercrowded() { add_flag(FLAGS_IGNORE_EOVERCROWDED); }
298
299 // Set if the field of bytes in protobuf message should be encoded
300 // to base64 string in HTTP request.
301 void set_pb_bytes_to_base64(bool f) { set_flag(FLAGS_PB_BYTES_TO_BASE64, f); }
302 bool has_pb_bytes_to_base64() const { return has_flag(FLAGS_PB_BYTES_TO_BASE64); }
303
304 // Set if convert the repeated field that has no entry to a empty array
305 // of json in HTTP response.
306 void set_pb_jsonify_empty_array(bool f) { set_flag(FLAGS_PB_JSONIFY_EMPTY_ARRAY, f); }
307 bool has_pb_jsonify_empty_array() const { return has_flag(FLAGS_PB_JSONIFY_EMPTY_ARRAY); }
308
309 // Whether to always print primitive fields. By default proto3 primitive
310 // fields with default values will be omitted in JSON output. For example, an
311 // int32 field set to 0 will be omitted. Set this flag to true will override
312 // the default behavior and print primitive fields regardless of their values.
313 void set_always_print_primitive_fields(bool f) { set_flag(FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS, f); }
314 bool has_always_print_primitive_fields() const { return has_flag(FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS); }
315
316
317 // Tell RPC that done of the RPC can be run in the same thread where
318 // the RPC is issued, otherwise done is always run in a different thread.
319 // In current implementation, this option only affects RPC that fails
320 // before sending the request.
321 // This option is *rarely* needed by ordinary users. Don't set this option
322 // if you don't know the consequences. Read implementions in channel.cpp
323 // and controller.cpp to know more.
324 void allow_done_to_run_in_place()
325 { add_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); }
326 // True iff above method was called.
327 bool is_done_allowed_to_run_in_place() const
328 { return has_flag(FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE); }
329
330 // ------------------------------------------------------------------------
331 // Server-side methods.
332 // These calls shall be made from the server side only. Their results are
333 // undefined on the client side (may crash).
334 // ------------------------------------------------------------------------
335
336 // Returns true if the client canceled the RPC or the connection has broken,
337 // so the server may as well give up on replying to it. The server should still
338 // call the final "done" callback.
339 // Note: Reaching deadline of the RPC would not affect this function, which means
340 // even if deadline has been reached, this function may still return false.
341 bool IsCanceled() const override;
342
343 // Asks that the given callback be called when the RPC is canceled or the
344 // connection has broken. The callback will always be called exactly once.
345 // If the RPC completes without being canceled/broken connection, the callback
346 // will be called after completion. If the RPC has already been canceled/broken
347 // when NotifyOnCancel() is called, the callback will be called immediately.
348 //
349 // NotifyOnCancel() must be called no more than once per request.
350 void NotifyOnCancel(google::protobuf::Closure* callback) override;
351
352 // Returns the authenticated result. NULL if there is no authentication
353 const AuthContext* auth_context() const { return _auth_context; }
354
355 // Whether the underlying channel is using SSL
356 bool is_ssl() const;
357
358 // Get the peer certificate, which can be printed by ostream
359 x509_st* get_peer_certificate() const;
360
361 // Mutable header of http response.
362 HttpHeader& http_response() {
363 if (_http_response == NULL) {
364 _http_response = new HttpHeader;
365 }
366 return *_http_response;
367 }
368 bool has_http_response() const { return _http_response; }
369 HttpHeader* release_http_response() {
370 HttpHeader* const tmp = _http_response;
371 _http_response = NULL;
372 return tmp;
373 }
374
375 // User attached data or body of http response, which is wired to network
376 // directly instead of being serialized into protobuf messages.
377 butil::IOBuf& response_attachment() { return _response_attachment; }
378
379 // Create a ProgressiveAttachment to write (often after RPC).
380 // If `stop_style' is FORCE_STOP, the underlying socket will be failed
381 // immediately when the socket becomes idle or server is stopped.
382 // Default value of `stop_style' is WAIT_FOR_STOP.
383 ProgressiveAttachment*
384 CreateProgressiveAttachment(StopStyle stop_style = WAIT_FOR_STOP);
385 bool has_progressive_writer() const { return _wpa != NULL; }
386
387 // Set compression method for response.
388 void set_response_compress_type(CompressType t) { _response_compress_type = t; }
389
390 // Non-zero when this RPC call is traced (by rpcz or rig).
391 // NOTE: Only valid at server-side, always zero at client-side.
392 uint64_t trace_id() const;
393 uint64_t span_id() const;
394
395 // Tell RPC to close the connection instead of sending back response.
396 // If this controller was not SetFailed() before, ErrorCode() will be
397 // set to ECLOSE.
398 // NOTE: the underlying connection is not closed immediately.
399 void CloseConnection(const char* reason_fmt, ...);
400
401 // True if CloseConnection() was called.
402 bool IsCloseConnection() const { return has_flag(FLAGS_CLOSE_CONNECTION); }
403
404 // ServerOptions.security_mode is turned on, and the RPC is from
405 // connections accepted from port (rather than internal_port)
406 bool is_security_mode() const { return has_flag(FLAGS_SECURITY_MODE); }
407
408 // The server running this RPC session.
409 // Always NULL at client-side.
410 const Server* server() const { return _server; }
411
412 // Get the data attached to current RPC session. The data is created by
413 // ServerOptions.session_local_data_factory and reused between different
414 // RPC. If factory is NULL, this method returns NULL.
415 void* session_local_data();
416
417 // Get the data attached to a mongo session(practically a socket).
418 MongoContext* mongo_session_data() { return _mongo_session_data.get(); }
419
420 // -------------------------------------------------------------------
421 // Both-side methods.
422 // Following methods can be called from both client and server. But they
423 // may have different or opposite semantics.
424 // -------------------------------------------------------------------
425
426 // Client-side: successful or last server called. Accessible from
427 // PackXXXRequest() in protocols.
428 // Server-side: returns the client sending the request
429 butil::EndPoint remote_side() const { return _remote_side; }
430
431 // Client-side: the local address for talking with server, undefined until
432 // this RPC succeeds (because the connection may not be established
433 // before RPC).
434 // Server-side: the address that clients access.
435 butil::EndPoint local_side() const { return _local_side; }
436
437 // Protocol of the request sent by client or received by server.
438 ProtocolType request_protocol() const { return _request_protocol; }
439
440 // Resets the Controller to its initial state so that it may be reused in
441 // a new call. Must NOT be called while an RPC is in progress.
442 void Reset() override {
443 ResetNonPods();
444 ResetPods();
445 }
446
447 // Causes Failed() to return true on the client side. "reason" will be
448 // incorporated into the message returned by ErrorText().
449 // NOTE: Change http_response().status_code() according to `error_code'
450 // as well if the protocol is HTTP. If you want to overwrite the
451 // status_code, call http_response().set_status_code() after SetFailed()
452 // (rather than before SetFailed)
453 void SetFailed(const std::string& reason) override;
454 void SetFailed(int error_code, const char* reason_fmt, ...)
455 __attribute__ ((__format__ (__printf__, 3, 4)));
456
457 // After a call has finished, returns true if the RPC call failed.
458 // The response to Channel is undefined when Failed() is true.
459 // Calling Failed() before a call has finished is undefined.
460 bool Failed() const override;
461
462 // If Failed() is true, return description of the errors.
463 // NOTE: ErrorText() != berror(ErrorCode()).
464 std::string ErrorText() const override;
465
466 // Last error code. Equals 0 iff Failed() is false.
467 // If there's retry, latter code overwrites former one.
468 int ErrorCode() const { return _error_code; }
469
470 // Getters:
471 bool has_log_id() const { return has_flag(FLAGS_LOG_ID); }
472 uint64_t log_id() const { return _log_id; }
473 CompressType request_compress_type() const { return _request_compress_type; }
474 CompressType response_compress_type() const { return _response_compress_type; }
475 const HttpHeader& http_request() const
476 { return _http_request != NULL ? *_http_request : DefaultHttpHeader(); }
477
478 const HttpHeader& http_response() const
479 { return _http_response != NULL ? *_http_response : DefaultHttpHeader(); }
480
481 const butil::IOBuf& request_attachment() const { return _request_attachment; }
482 const butil::IOBuf& response_attachment() const { return _response_attachment; }
483
484 // Return true if the remote side creates a stream.
485 bool has_remote_stream() { return _remote_stream_settings != NULL; }
486
487 // The id to cancel RPC call or join response.
488 CallId call_id();
489
490 // Get/set idl names. Notice that the names must be string-constant.
491 // int32_t Echo(EchoRequest req, EchoResponse res);
492 // ^ ^
493 // request_name response_name
494 void set_idl_names(const IdlNames& names) { _idl_names = names; }
495 IdlNames idl_names() const { return _idl_names; }
496
497 // Get/set idl result. The type is limited to be integral.
498 // int32_t Echo(EchoRequest req, EchoResponse res);
499 // ^
500 // result
501 void set_idl_result(int64_t result) { _idl_result = result; }
502 int64_t idl_result() const { return _idl_result; }
503
504 const std::string& thrift_method_name() { return _thrift_method_name; }
505
506 // Get sock option. .e.g get vip info through ttm kernel module hook,
507 int GetSockOption(int level, int optname, void* optval, socklen_t* optlen);
508
509 // Get deadline of this RPC (since the Epoch in microseconds).
510 // -1 means no deadline.
511 int64_t deadline_us() const { return _deadline_us; }
512
513private:
514 struct CompletionInfo {
515 CallId id; // call_id of the corresponding request
516 bool responded; // triggered by a response rather than other errors
517 };
518
519 // Call this method when receiving response/failure. If RPC failed,
520 // it will try to retry this RPC. Otherwise, it calls user `done'
521 // if it exists and destroys the correlation_id. Note that
522 // the correlation_id MUST have been locked before this call.
523 // Parameter `new_bthread':
524 // false - Run this function in the current bthread/pthread. Note that
525 // it could last for a long time or even block the caller (as
526 // it contains user's `done')
527 // true - Creates a new bthread to run this function and returns to
528 // the caller immediately
529 // Parameter `id':
530 // It will be used to checked against `_correlation_id' and
531 // `_current_call.nretry'. If not matched, nothing will happen,
532 // which means this event has been processed before
533 // Parameter `saved_error':
534 // If the above check failed, `_error_code' will be reverted to this
535 void OnVersionedRPCReturned(const CompletionInfo&,
536 bool new_bthread, int saved_error);
537
538 static void* RunEndRPC(void* arg);
539 void EndRPC(const CompletionInfo&);
540
541 static int HandleSocketFailed(bthread_id_t, void* data, int error_code,
542 const std::string& error_text);
543 void HandleSendFailed();
544
545 static int RunOnCancel(bthread_id_t, void* data, int error_code);
546
547 void set_auth_context(const AuthContext* ctx);
548
549 // MongoContext is created by ParseMongoRequest when the first msg comes
550 // over a socket, then stored in MongoContextMessage of the socket. cntl
551 // gets a shared reference of the data in PocessMongoRequest. When socket
552 // is recycled, the container, AKA MongoContextMessage is destroyed, which
553 // has no infuluence on the cntl(s) who already gets the shared reference
554 // of the MongoContext. The MongoContext will not be recycled until both
555 // the container(MongoContextMessage) and all related cntl(s) are recycled.
556 void set_mongo_session_data(MongoContext* data);
557
558 // Reset POD/non-POD fields.
559 void ResetPods();
560 void ResetNonPods();
561
562 void StartCancel() override;
563
564 // Using fixed start_realtime_us (microseconds since the Epoch) gives
565 // more accurate deadline.
566 void IssueRPC(int64_t start_realtime_us);
567
568 struct ClientSettings {
569 int32_t timeout_ms;
570 int32_t backup_request_ms;
571 int max_retry;
572 int32_t tos;
573 ConnectionType connection_type;
574 CompressType request_compress_type;
575 uint64_t log_id;
576 bool has_request_code;
577 int64_t request_code;
578 };
579
580 void SaveClientSettings(ClientSettings*) const;
581 void ApplyClientSettings(const ClientSettings&);
582
583 bool FailedInline() const { return _error_code; }
584
585 CallId get_id(int nretry) const {
586 CallId id = { _correlation_id.value + nretry + 1 };
587 return id;
588 }
589
590 // Tell RPC that this particular call is used to do health check.
591 bool is_health_check_call() const { return has_flag(FLAGS_HEALTH_CHECK_CALL); }
592
593public:
594 CallId current_id() const {
595 CallId id = { _correlation_id.value + _current_call.nretry + 1 };
596 return id;
597 }
598private:
599
600 // Append server information to `_error_text'
601 void AppendServerIdentiy();
602
603 // Contexts for tracking and ending a sent request.
604 // One RPC to a channel may send several requests due to retrying.
605 struct Call {
606 Call() { Reset(); }
607 Call(Call*); //move semantics
608 ~Call();
609 void Reset();
610 void OnComplete(Controller* c, int error_code, bool responded, bool end_of_rpc);
611
612 int nretry; // sent in nretry-th retry.
613 bool need_feedback; // The LB needs feedback.
614 bool enable_circuit_breaker; // The channel enabled circuit_breaker
615 bool touched_by_stream_creator;
616 SocketId peer_id; // main server id
617 int64_t begin_time_us; // sent real time.
618 // The actual `Socket' for sending RPC. It's socket id will be
619 // exactly the same as `peer_id' if `_connection_type' is
620 // CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary
621 // socket fetched from socket pool
622 SocketUniquePtr sending_sock;
623 StreamUserData* stream_user_data;
624 };
625
626 void HandleStreamConnection(Socket *host_socket);
627
628 bool SingleServer() const { return _single_server_id != INVALID_SOCKET_ID; }
629
630 void SubmitSpan();
631
632 void OnRPCBegin(int64_t begin_time_us) {
633 _begin_time_us = begin_time_us;
634 // make latency_us() return 0 when RPC is not over
635 _end_time_us = begin_time_us;
636 }
637
638 void OnRPCEnd(int64_t end_time_us) {
639 _end_time_us = end_time_us;
640 }
641
642 static void RunDoneInBackupThread(void*);
643 void DoneInBackupThread();
644
645 // Utilities for manipulating _flags
646 inline void add_flag(uint32_t f) { _flags |= f; }
647 inline void clear_flag(uint32_t f) { _flags &= ~f; }
648 inline void set_flag(uint32_t f, bool t)
649 { return t ? add_flag(f) : clear_flag(f); }
650 inline bool has_flag(uint32_t f) const { return _flags & f; }
651
652 void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); }
653 bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); }
654
655 bool has_enabled_circuit_breaker() const {
656 return has_flag(FLAGS_ENABLED_CIRCUIT_BREAKER);
657 }
658
659 std::string& protocol_param() { return _thrift_method_name; }
660 const std::string& protocol_param() const { return _thrift_method_name; }
661
662private:
663 // NOTE: align and group fields to make Controller as compact as possible.
664
665 Span* _span;
666 uint32_t _flags; // all boolean fields inside Controller
667 int32_t _error_code;
668 std::string _error_text;
669 butil::EndPoint _remote_side;
670 butil::EndPoint _local_side;
671
672 void* _session_local_data;
673 const Server* _server;
674 bthread_id_t _oncancel_id;
675 const AuthContext* _auth_context; // Authentication result
676 butil::intrusive_ptr<MongoContext> _mongo_session_data;
677 SampledRequest* _sampled_request;
678
679 ProtocolType _request_protocol;
680 // Some of them are copied from `Channel' which might be destroyed
681 // after CallMethod.
682 int _max_retry;
683 const RetryPolicy* _retry_policy;
684 // Synchronization object for one RPC call. It remains unchanged even
685 // when retry happens. Synchronous RPC will wait on this id.
686 CallId _correlation_id;
687
688 ConnectionType _connection_type;
689
690 // Used by ParallelChannel
691 int _fail_limit;
692
693 uint32_t _pipelined_count;
694
695 // [Timeout related]
696 int32_t _timeout_ms;
697 int32_t _connect_timeout_ms;
698 int32_t _backup_request_ms;
699 // Deadline of this RPC (since the Epoch in microseconds).
700 int64_t _deadline_us;
701 // Timer registered to trigger RPC timeout event
702 bthread_timer_t _timeout_id;
703
704 // Begin/End time of a single RPC call (since Epoch in microseconds)
705 int64_t _begin_time_us;
706 int64_t _end_time_us;
707 short _tos; // Type of service.
708 // The index of parse function which `InputMessenger' will use
709 int _preferred_index;
710 CompressType _request_compress_type;
711 CompressType _response_compress_type;
712 uint64_t _log_id;
713 int _pchan_sub_count;
714 google::protobuf::Message* _response;
715 google::protobuf::Closure* _done;
716 RPCSender* _sender;
717 uint64_t _request_code;
718 SocketId _single_server_id;
719 butil::intrusive_ptr<SharedLoadBalancer> _lb;
720
721 // for passing parameters to created bthread, don't modify it otherwhere.
722 CompletionInfo _tmp_completion_info;
723
724 Call _current_call;
725 Call* _unfinished_call;
726 ExcludedServers* _accessed;
727
728 StreamCreator* _stream_creator;
729
730 // Fields will be used when making requests
731 Protocol::PackRequest _pack_request;
732 const google::protobuf::MethodDescriptor* _method;
733 const Authenticator* _auth;
734 butil::IOBuf _request_buf;
735 IdlNames _idl_names;
736 int64_t _idl_result;
737
738 HttpHeader* _http_request;
739 HttpHeader* _http_response;
740
741 // Fields with large size but low access frequency
742 butil::IOBuf _request_attachment;
743 butil::IOBuf _response_attachment;
744
745 // Writable progressive attachment
746 butil::intrusive_ptr<ProgressiveAttachment> _wpa;
747 // Readable progressive attachment
748 butil::intrusive_ptr<ReadableProgressiveAttachment> _rpa;
749
750 // TODO: Replace following fields with StreamCreator
751 // Defined at client side
752 StreamId _request_stream;
753 // Defined at server side
754 StreamId _response_stream;
755 // Defined at both sides
756 StreamSettings *_remote_stream_settings;
757
758 // Thrift method name, only used when thrift protocol enabled
759 std::string _thrift_method_name;
760};
761
762// Advises the RPC system that the caller desires that the RPC call be
763// canceled. If the call is canceled, the "done" callback will still be
764// called and the Controller will indicate that the call failed at that
765// time.
766void StartCancel(CallId id);
767
768// Suspend until the RPC finishes.
769void Join(CallId id);
770
771// Get a global closure for doing nothing. Used in semi-synchronous
772// RPC calls. Example:
773// stub1.method1(&cntl1, &request1, &response1, brpc::DoNothing());
774// stub2.method2(&cntl2, &request2, &response2, brpc::DoNothing());
775// ...
776// brpc::Join(cntl1.call_id());
777// brpc::Join(cntl2.call_id());
778google::protobuf::Closure* DoNothing();
779
780// Convert non-web symbols to web equivalence.
781void WebEscape(const std::string& source, std::string* output);
782
783// True if Ctrl-C is ever pressed.
784bool IsAskedToQuit();
785
786// Send Ctrl-C to current process.
787void AskToQuit();
788
789} // namespace brpc
790
791
792#endif // BRPC_CONTROLLER_H
793