1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18
19#ifndef BRPC_PROTOCOL_H
20#define BRPC_PROTOCOL_H
21
22// To brpc developers: This is a header included by user, don't depend
23// on internal structures, use opaque pointers instead.
24
25#include <vector> // std::vector
26#include <stdint.h> // uint64_t
27#include <gflags/gflags_declare.h> // DECLARE_xxx
28#include "butil/endpoint.h" // butil::EndPoint
29#include "butil/iobuf.h"
30#include "butil/logging.h"
31#include "brpc/options.pb.h" // ProtocolType
32#include "brpc/socket_id.h" // SocketId
33#include "brpc/parse_result.h" // ParseResult
34#include "brpc/adaptive_connection_type.h"
35#include "brpc/adaptive_protocol_type.h"
36
37namespace google {
38namespace protobuf {
39class Message;
40class MethodDescriptor;
41} // namespace protobuf
42} // namespace google
43
44namespace butil {
45class IOBuf;
46}
47
48
49namespace brpc {
50class Socket;
51class SocketMessage;
52class Controller;
53class Authenticator;
54class InputMessageBase;
55
56DECLARE_uint64(max_body_size);
57DECLARE_bool(log_error_text);
58
59// 3 steps to add a new Protocol:
60// Step1: Add a new ProtocolType in src/brpc/options.proto
61// as identifier of the Protocol.
62// Step2: Implement callbacks of struct `Protocol' in policy/ directory.
63// Step3: Register the protocol in global.cpp using `RegisterProtocol'
64
65struct Protocol {
66 // [Required by both client and server]
67 // The callback to cut a message from `source'.
68 // Returned message will be passed to process_request and process_response
69 // later and Destroy()-ed by InputMessenger.
70 // Returns:
71 // MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA):
72 // `source' does not form a complete message yet.
73 // MakeParseError(PARSE_ERROR_TRY_OTHERS).
74 // `source' does not fit the protocol, the data should be tried by
75 // other protocols. If the data is definitely corrupted (e.g. magic
76 // header matches but other fields are wrong), pop corrupted part
77 // from `source' before returning.
78 // MakeMessage(InputMessageBase*):
79 // The message is parsed successfully and cut from `source'.
80 typedef ParseResult (*Parse)(butil::IOBuf* source, Socket *socket,
81 bool read_eof, const void *arg);
82 Parse parse;
83
84 // [Required by client]
85 // The callback to serialize `request' into `request_buf' which will be
86 // packed into message by pack_request later. Called once for each RPC.
87 // `cntl' provides additional data needed by some protocol (say HTTP).
88 // Call cntl->SetFailed() on error.
89 typedef void (*SerializeRequest)(
90 butil::IOBuf* request_buf,
91 Controller* cntl,
92 const google::protobuf::Message* request);
93 SerializeRequest serialize_request;
94
95 // [Required by client]
96 // The callback to pack `request_buf' into `iobuf_out' or `user_message_out'
97 // Called before sending each request (including retries).
98 // Remember to pack authentication information when `auth' is not NULL.
99 // Call cntl->SetFailed() on error.
100 typedef void (*PackRequest)(
101 butil::IOBuf* iobuf_out,
102 SocketMessage** user_message_out,
103 uint64_t correlation_id,
104 const google::protobuf::MethodDescriptor* method,
105 Controller* controller,
106 const butil::IOBuf& request_buf,
107 const Authenticator* auth);
108 PackRequest pack_request;
109
110 // [Required by server]
111 // The callback to handle request `msg' created by a successful parse().
112 // `msg' must be Destroy()-ed when the processing is done. To make sure
113 // Destroy() is always called, consider using DestroyingPtr<> defined in
114 // destroyable.h
115 // May be called in a different thread from parse().
116 typedef void (*ProcessRequest)(InputMessageBase* msg);
117 ProcessRequest process_request;
118
119 // [Required by client]
120 // The callback to handle response `msg' created by a successful parse().
121 // `msg' must be Destroy()-ed when the processing is done. To make sure
122 // Destroy() is always called, consider using DestroyingPtr<> defined in
123 // destroyable.h
124 // May be called in a different thread from parse().
125 typedef void (*ProcessResponse)(InputMessageBase* msg);
126 ProcessResponse process_response;
127
128 // [Required by authenticating server]
129 // The callback to verify authentication of this socket. Only called
130 // on the first message that a socket receives. Can be NULL when
131 // authentication is not needed or this is the client side.
132 // Returns true on successful authentication.
133 typedef bool (*Verify)(const InputMessageBase* msg);
134 Verify verify;
135
136 // [Optional]
137 // Convert `server_addr_and_port'(a parameter to Channel) to butil::EndPoint.
138 typedef bool (*ParseServerAddress)(butil::EndPoint* out,
139 const char* server_addr_and_port);
140 ParseServerAddress parse_server_address;
141
142 // [Optional] Customize method name.
143 typedef const std::string& (*GetMethodName)(
144 const google::protobuf::MethodDescriptor* method,
145 const Controller*);
146 GetMethodName get_method_name;
147
148 // Bitwise-or of supported ConnectionType
149 ConnectionType supported_connection_type;
150
151 // Name of this protocol, must be string constant.
152 const char* name;
153
154 // True if this protocol is supported at client-side.
155 bool support_client() const {
156 return serialize_request && pack_request && process_response;
157 }
158 // True if this protocol is supported at server-side.
159 bool support_server() const { return process_request; }
160};
161
162const ConnectionType CONNECTION_TYPE_POOLED_AND_SHORT =
163 (ConnectionType)((int)CONNECTION_TYPE_POOLED |
164 (int)CONNECTION_TYPE_SHORT);
165
166const ConnectionType CONNECTION_TYPE_ALL =
167 (ConnectionType)((int)CONNECTION_TYPE_SINGLE |
168 (int)CONNECTION_TYPE_POOLED |
169 (int)CONNECTION_TYPE_SHORT);
170
171// [thread-safe]
172// Register `protocol' using key=`type'.
173// Returns 0 on success, -1 otherwise
174int RegisterProtocol(ProtocolType type, const Protocol& protocol);
175
176// [thread-safe]
177// Find the protocol registered with key=`type'.
178// Returns NULL on not found.
179const Protocol* FindProtocol(ProtocolType type);
180
181// [thread-safe]
182// List all registered protocols into `vec'.
183void ListProtocols(std::vector<Protocol>* vec);
184void ListProtocols(std::vector<std::pair<ProtocolType, Protocol> >* vec);
185
186// The common serialize_request implementation used by many protocols.
187void SerializeRequestDefault(butil::IOBuf* buf,
188 Controller* cntl,
189 const google::protobuf::Message* request);
190
191// Replacements for msg->ParseFromXXX() to make the bytes limit in pb
192// consistent with -max_body_size
193bool ParsePbFromZeroCopyStream(google::protobuf::Message* msg,
194 google::protobuf::io::ZeroCopyInputStream* input);
195bool ParsePbFromIOBuf(google::protobuf::Message* msg, const butil::IOBuf& buf);
196bool ParsePbFromArray(google::protobuf::Message* msg, const void* data, size_t size);
197bool ParsePbFromString(google::protobuf::Message* msg, const std::string& str);
198
199// Deleter for unique_ptr to print error_text of the controller when
200// -log_error_text is on, then delete the controller if `delete_cntl' is true
201class LogErrorTextAndDelete {
202public:
203 explicit LogErrorTextAndDelete(bool delete_cntl = true)
204 : _delete_cntl(delete_cntl) {}
205 void operator()(Controller* c) const;
206private:
207 bool _delete_cntl;
208};
209
210// Utility to build a temporary array.
211// Example:
212// TemporaryArrayBuilder<Foo, 5> b;
213// b.push() = Foo1;
214// b.push() = Foo2;
215// UseArray(b.raw_array(), b.size());
216template <typename T, size_t N>
217class TemporaryArrayBuilder {
218public:
219 TemporaryArrayBuilder() : _size(0) {}
220 T& push() {
221 if (_size < N) {
222 return _arr[_size++];
223 } else {
224 CHECK(false) << "push to a full array, cap=" << N;
225 static T dummy;
226 return dummy;
227 }
228 }
229 T& operator[](size_t i) { return _arr[i]; }
230 size_t size() const { return _size; }
231 T* raw_array() { return _arr; }
232private:
233 size_t _size;
234 T _arr[N];
235};
236
237} // namespace brpc
238
239
240#endif // BRPC_PROTOCOL_H
241