1 | /* |
2 | * |
3 | * Copyright 2015 gRPC authors. |
4 | * |
5 | * Licensed under the Apache License, Version 2.0 (the "License"); |
6 | * you may not use this file except in compliance with the License. |
7 | * You may obtain a copy of the License at |
8 | * |
9 | * http://www.apache.org/licenses/LICENSE-2.0 |
10 | * |
11 | * Unless required by applicable law or agreed to in writing, software |
12 | * distributed under the License is distributed on an "AS IS" BASIS, |
13 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
14 | * See the License for the specific language governing permissions and |
15 | * limitations under the License. |
16 | * |
17 | */ |
18 | |
19 | #include <grpc/support/port_platform.h> |
20 | |
21 | #include "src/core/ext/transport/chttp2/server/chttp2_server.h" |
22 | |
23 | #include <inttypes.h> |
24 | #include <limits.h> |
25 | #include <string.h> |
26 | |
27 | #include <grpc/grpc.h> |
28 | #include <grpc/impl/codegen/grpc_types.h> |
29 | #include <grpc/support/alloc.h> |
30 | #include <grpc/support/log.h> |
31 | #include <grpc/support/string_util.h> |
32 | #include <grpc/support/sync.h> |
33 | |
34 | #include "src/core/ext/filters/http/server/http_server_filter.h" |
35 | #include "src/core/ext/transport/chttp2/transport/chttp2_transport.h" |
36 | #include "src/core/ext/transport/chttp2/transport/internal.h" |
37 | #include "src/core/lib/channel/channel_args.h" |
38 | #include "src/core/lib/channel/handshaker.h" |
39 | #include "src/core/lib/channel/handshaker_registry.h" |
40 | #include "src/core/lib/iomgr/endpoint.h" |
41 | #include "src/core/lib/iomgr/resolve_address.h" |
42 | #include "src/core/lib/iomgr/resource_quota.h" |
43 | #include "src/core/lib/iomgr/tcp_server.h" |
44 | #include "src/core/lib/slice/slice_internal.h" |
45 | #include "src/core/lib/surface/api_trace.h" |
46 | #include "src/core/lib/surface/server.h" |
47 | |
48 | typedef struct { |
49 | grpc_server* server; |
50 | grpc_tcp_server* tcp_server; |
51 | grpc_channel_args* args; |
52 | gpr_mu mu; |
53 | bool shutdown; |
54 | grpc_closure tcp_server_shutdown_complete; |
55 | grpc_closure* server_destroy_listener_done; |
56 | grpc_core::HandshakeManager* pending_handshake_mgrs; |
57 | grpc_core::RefCountedPtr<grpc_core::channelz::ListenSocketNode> |
58 | channelz_listen_socket; |
59 | } server_state; |
60 | |
61 | typedef struct { |
62 | gpr_refcount refs; |
63 | server_state* svr_state; |
64 | grpc_pollset* accepting_pollset; |
65 | grpc_tcp_server_acceptor* acceptor; |
66 | grpc_core::RefCountedPtr<grpc_core::HandshakeManager> handshake_mgr; |
67 | // State for enforcing handshake timeout on receiving HTTP/2 settings. |
68 | grpc_chttp2_transport* transport; |
69 | grpc_millis deadline; |
70 | grpc_timer timer; |
71 | grpc_closure on_timeout; |
72 | grpc_closure on_receive_settings; |
73 | grpc_pollset_set* interested_parties; |
74 | } server_connection_state; |
75 | |
76 | static void server_connection_state_unref( |
77 | server_connection_state* connection_state) { |
78 | if (gpr_unref(&connection_state->refs)) { |
79 | if (connection_state->transport != nullptr) { |
80 | GRPC_CHTTP2_UNREF_TRANSPORT(connection_state->transport, |
81 | "receive settings timeout" ); |
82 | } |
83 | grpc_pollset_set_del_pollset(connection_state->interested_parties, |
84 | connection_state->accepting_pollset); |
85 | grpc_pollset_set_destroy(connection_state->interested_parties); |
86 | gpr_free(connection_state); |
87 | } |
88 | } |
89 | |
90 | static void on_timeout(void* arg, grpc_error* error) { |
91 | server_connection_state* connection_state = |
92 | static_cast<server_connection_state*>(arg); |
93 | // Note that we may be called with GRPC_ERROR_NONE when the timer fires |
94 | // or with an error indicating that the timer system is being shut down. |
95 | if (error != GRPC_ERROR_CANCELLED) { |
96 | grpc_transport_op* op = grpc_make_transport_op(nullptr); |
97 | op->disconnect_with_error = GRPC_ERROR_CREATE_FROM_STATIC_STRING( |
98 | "Did not receive HTTP/2 settings before handshake timeout" ); |
99 | grpc_transport_perform_op(&connection_state->transport->base, op); |
100 | } |
101 | server_connection_state_unref(connection_state); |
102 | } |
103 | |
104 | static void on_receive_settings(void* arg, grpc_error* error) { |
105 | server_connection_state* connection_state = |
106 | static_cast<server_connection_state*>(arg); |
107 | if (error == GRPC_ERROR_NONE) { |
108 | grpc_timer_cancel(&connection_state->timer); |
109 | } |
110 | server_connection_state_unref(connection_state); |
111 | } |
112 | |
113 | static void on_handshake_done(void* arg, grpc_error* error) { |
114 | auto* args = static_cast<grpc_core::HandshakerArgs*>(arg); |
115 | server_connection_state* connection_state = |
116 | static_cast<server_connection_state*>(args->user_data); |
117 | gpr_mu_lock(&connection_state->svr_state->mu); |
118 | grpc_resource_user* resource_user = grpc_server_get_default_resource_user( |
119 | connection_state->svr_state->server); |
120 | if (error != GRPC_ERROR_NONE || connection_state->svr_state->shutdown) { |
121 | const char* error_str = grpc_error_string(error); |
122 | gpr_log(GPR_DEBUG, "Handshaking failed: %s" , error_str); |
123 | grpc_resource_user* resource_user = grpc_server_get_default_resource_user( |
124 | connection_state->svr_state->server); |
125 | if (resource_user != nullptr) { |
126 | grpc_resource_user_free(resource_user, GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); |
127 | } |
128 | if (error == GRPC_ERROR_NONE && args->endpoint != nullptr) { |
129 | // We were shut down after handshaking completed successfully, so |
130 | // destroy the endpoint here. |
131 | // TODO(ctiller): It is currently necessary to shutdown endpoints |
132 | // before destroying them, even if we know that there are no |
133 | // pending read/write callbacks. This should be fixed, at which |
134 | // point this can be removed. |
135 | grpc_endpoint_shutdown(args->endpoint, GRPC_ERROR_NONE); |
136 | grpc_endpoint_destroy(args->endpoint); |
137 | grpc_channel_args_destroy(args->args); |
138 | grpc_slice_buffer_destroy_internal(args->read_buffer); |
139 | gpr_free(args->read_buffer); |
140 | } |
141 | } else { |
142 | // If the handshaking succeeded but there is no endpoint, then the |
143 | // handshaker may have handed off the connection to some external |
144 | // code, so we can just clean up here without creating a transport. |
145 | if (args->endpoint != nullptr) { |
146 | grpc_transport* transport = grpc_create_chttp2_transport( |
147 | args->args, args->endpoint, false, resource_user); |
148 | grpc_server_setup_transport( |
149 | connection_state->svr_state->server, transport, |
150 | connection_state->accepting_pollset, args->args, |
151 | grpc_chttp2_transport_get_socket_node(transport), resource_user); |
152 | // Use notify_on_receive_settings callback to enforce the |
153 | // handshake deadline. |
154 | connection_state->transport = |
155 | reinterpret_cast<grpc_chttp2_transport*>(transport); |
156 | gpr_ref(&connection_state->refs); |
157 | GRPC_CLOSURE_INIT(&connection_state->on_receive_settings, |
158 | on_receive_settings, connection_state, |
159 | grpc_schedule_on_exec_ctx); |
160 | grpc_chttp2_transport_start_reading( |
161 | transport, args->read_buffer, &connection_state->on_receive_settings); |
162 | grpc_channel_args_destroy(args->args); |
163 | gpr_ref(&connection_state->refs); |
164 | GRPC_CHTTP2_REF_TRANSPORT((grpc_chttp2_transport*)transport, |
165 | "receive settings timeout" ); |
166 | GRPC_CLOSURE_INIT(&connection_state->on_timeout, on_timeout, |
167 | connection_state, grpc_schedule_on_exec_ctx); |
168 | grpc_timer_init(&connection_state->timer, connection_state->deadline, |
169 | &connection_state->on_timeout); |
170 | } else { |
171 | if (resource_user != nullptr) { |
172 | grpc_resource_user_free(resource_user, |
173 | GRPC_RESOURCE_QUOTA_CHANNEL_SIZE); |
174 | } |
175 | } |
176 | } |
177 | connection_state->handshake_mgr->RemoveFromPendingMgrList( |
178 | &connection_state->svr_state->pending_handshake_mgrs); |
179 | gpr_mu_unlock(&connection_state->svr_state->mu); |
180 | connection_state->handshake_mgr.reset(); |
181 | gpr_free(connection_state->acceptor); |
182 | grpc_tcp_server_unref(connection_state->svr_state->tcp_server); |
183 | server_connection_state_unref(connection_state); |
184 | } |
185 | |
186 | static void on_accept(void* arg, grpc_endpoint* tcp, |
187 | grpc_pollset* accepting_pollset, |
188 | grpc_tcp_server_acceptor* acceptor) { |
189 | server_state* state = static_cast<server_state*>(arg); |
190 | gpr_mu_lock(&state->mu); |
191 | if (state->shutdown) { |
192 | gpr_mu_unlock(&state->mu); |
193 | grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE); |
194 | grpc_endpoint_destroy(tcp); |
195 | gpr_free(acceptor); |
196 | return; |
197 | } |
198 | grpc_resource_user* resource_user = |
199 | grpc_server_get_default_resource_user(state->server); |
200 | if (resource_user != nullptr && |
201 | !grpc_resource_user_safe_alloc(resource_user, |
202 | GRPC_RESOURCE_QUOTA_CHANNEL_SIZE)) { |
203 | gpr_log( |
204 | GPR_ERROR, |
205 | "Memory quota exhausted, rejecting the connection, no handshaking." ); |
206 | gpr_mu_unlock(&state->mu); |
207 | grpc_endpoint_shutdown(tcp, GRPC_ERROR_NONE); |
208 | grpc_endpoint_destroy(tcp); |
209 | gpr_free(acceptor); |
210 | return; |
211 | } |
212 | auto handshake_mgr = grpc_core::MakeRefCounted<grpc_core::HandshakeManager>(); |
213 | handshake_mgr->AddToPendingMgrList(&state->pending_handshake_mgrs); |
214 | grpc_tcp_server_ref(state->tcp_server); |
215 | gpr_mu_unlock(&state->mu); |
216 | server_connection_state* connection_state = |
217 | static_cast<server_connection_state*>( |
218 | gpr_zalloc(sizeof(*connection_state))); |
219 | gpr_ref_init(&connection_state->refs, 1); |
220 | connection_state->svr_state = state; |
221 | connection_state->accepting_pollset = accepting_pollset; |
222 | connection_state->acceptor = acceptor; |
223 | connection_state->handshake_mgr = handshake_mgr; |
224 | connection_state->interested_parties = grpc_pollset_set_create(); |
225 | grpc_pollset_set_add_pollset(connection_state->interested_parties, |
226 | connection_state->accepting_pollset); |
227 | grpc_core::HandshakerRegistry::AddHandshakers( |
228 | grpc_core::HANDSHAKER_SERVER, state->args, |
229 | connection_state->interested_parties, |
230 | connection_state->handshake_mgr.get()); |
231 | const grpc_arg* timeout_arg = |
232 | grpc_channel_args_find(state->args, GRPC_ARG_SERVER_HANDSHAKE_TIMEOUT_MS); |
233 | connection_state->deadline = |
234 | grpc_core::ExecCtx::Get()->Now() + |
235 | grpc_channel_arg_get_integer(timeout_arg, |
236 | {120 * GPR_MS_PER_SEC, 1, INT_MAX}); |
237 | connection_state->handshake_mgr->DoHandshake( |
238 | tcp, state->args, connection_state->deadline, acceptor, on_handshake_done, |
239 | connection_state); |
240 | } |
241 | |
242 | /* Server callback: start listening on our ports */ |
243 | static void server_start_listener(grpc_server* /*server*/, void* arg, |
244 | grpc_pollset** pollsets, |
245 | size_t pollset_count) { |
246 | server_state* state = static_cast<server_state*>(arg); |
247 | gpr_mu_lock(&state->mu); |
248 | state->shutdown = false; |
249 | gpr_mu_unlock(&state->mu); |
250 | grpc_tcp_server_start(state->tcp_server, pollsets, pollset_count, on_accept, |
251 | state); |
252 | } |
253 | |
254 | static void tcp_server_shutdown_complete(void* arg, grpc_error* error) { |
255 | server_state* state = static_cast<server_state*>(arg); |
256 | /* ensure all threads have unlocked */ |
257 | gpr_mu_lock(&state->mu); |
258 | grpc_closure* destroy_done = state->server_destroy_listener_done; |
259 | GPR_ASSERT(state->shutdown); |
260 | if (state->pending_handshake_mgrs != nullptr) { |
261 | state->pending_handshake_mgrs->ShutdownAllPending(GRPC_ERROR_REF(error)); |
262 | } |
263 | state->channelz_listen_socket.reset(); |
264 | gpr_mu_unlock(&state->mu); |
265 | // Flush queued work before destroying handshaker factory, since that |
266 | // may do a synchronous unref. |
267 | grpc_core::ExecCtx::Get()->Flush(); |
268 | if (destroy_done != nullptr) { |
269 | grpc_core::ExecCtx::Run(DEBUG_LOCATION, destroy_done, |
270 | GRPC_ERROR_REF(error)); |
271 | grpc_core::ExecCtx::Get()->Flush(); |
272 | } |
273 | grpc_channel_args_destroy(state->args); |
274 | gpr_mu_destroy(&state->mu); |
275 | gpr_free(state); |
276 | } |
277 | |
278 | /* Server callback: destroy the tcp listener (so we don't generate further |
279 | callbacks) */ |
280 | static void server_destroy_listener(grpc_server* /*server*/, void* arg, |
281 | grpc_closure* destroy_done) { |
282 | server_state* state = static_cast<server_state*>(arg); |
283 | gpr_mu_lock(&state->mu); |
284 | state->shutdown = true; |
285 | state->server_destroy_listener_done = destroy_done; |
286 | grpc_tcp_server* tcp_server = state->tcp_server; |
287 | gpr_mu_unlock(&state->mu); |
288 | grpc_tcp_server_shutdown_listeners(tcp_server); |
289 | grpc_tcp_server_unref(tcp_server); |
290 | } |
291 | |
292 | static grpc_error* chttp2_server_add_acceptor(grpc_server* server, |
293 | const char* name, |
294 | grpc_channel_args* args) { |
295 | grpc_tcp_server* tcp_server = nullptr; |
296 | grpc_error* err = GRPC_ERROR_NONE; |
297 | server_state* state = nullptr; |
298 | const grpc_arg* arg = nullptr; |
299 | grpc_core::TcpServerFdHandler** arg_val = nullptr; |
300 | state = static_cast<server_state*>(gpr_zalloc(sizeof(*state))); |
301 | GRPC_CLOSURE_INIT(&state->tcp_server_shutdown_complete, |
302 | tcp_server_shutdown_complete, state, |
303 | grpc_schedule_on_exec_ctx); |
304 | err = grpc_tcp_server_create(&state->tcp_server_shutdown_complete, args, |
305 | &tcp_server); |
306 | if (err != GRPC_ERROR_NONE) { |
307 | goto error; |
308 | } |
309 | state->server = server; |
310 | state->tcp_server = tcp_server; |
311 | state->args = args; |
312 | state->shutdown = true; |
313 | gpr_mu_init(&state->mu); |
314 | // TODO(yangg) channelz |
315 | arg = grpc_channel_args_find(args, name); |
316 | GPR_ASSERT(arg->type == GRPC_ARG_POINTER); |
317 | arg_val = static_cast<grpc_core::TcpServerFdHandler**>(arg->value.pointer.p); |
318 | *arg_val = grpc_tcp_server_create_fd_handler(tcp_server); |
319 | |
320 | grpc_server_add_listener(server, state, server_start_listener, |
321 | server_destroy_listener, /* node */ nullptr); |
322 | return err; |
323 | |
324 | /* Error path: cleanup and return */ |
325 | error: |
326 | GPR_ASSERT(err != GRPC_ERROR_NONE); |
327 | if (tcp_server) { |
328 | grpc_tcp_server_unref(tcp_server); |
329 | } else { |
330 | grpc_channel_args_destroy(args); |
331 | gpr_free(state); |
332 | } |
333 | return err; |
334 | } |
335 | |
336 | grpc_error* grpc_chttp2_server_add_port(grpc_server* server, const char* addr, |
337 | grpc_channel_args* args, |
338 | int* port_num) { |
339 | grpc_resolved_addresses* resolved = nullptr; |
340 | grpc_tcp_server* tcp_server = nullptr; |
341 | size_t i; |
342 | size_t count = 0; |
343 | int port_temp; |
344 | grpc_error* err = GRPC_ERROR_NONE; |
345 | server_state* state = nullptr; |
346 | grpc_error** errors = nullptr; |
347 | size_t naddrs = 0; |
348 | const grpc_arg* arg = nullptr; |
349 | |
350 | *port_num = -1; |
351 | |
352 | if (strncmp(addr, "external:" , 9) == 0) { |
353 | return chttp2_server_add_acceptor(server, addr, args); |
354 | } |
355 | |
356 | /* resolve address */ |
357 | err = grpc_blocking_resolve_address(addr, "https" , &resolved); |
358 | if (err != GRPC_ERROR_NONE) { |
359 | goto error; |
360 | } |
361 | state = static_cast<server_state*>(gpr_zalloc(sizeof(*state))); |
362 | GRPC_CLOSURE_INIT(&state->tcp_server_shutdown_complete, |
363 | tcp_server_shutdown_complete, state, |
364 | grpc_schedule_on_exec_ctx); |
365 | err = grpc_tcp_server_create(&state->tcp_server_shutdown_complete, args, |
366 | &tcp_server); |
367 | if (err != GRPC_ERROR_NONE) { |
368 | goto error; |
369 | } |
370 | |
371 | state->server = server; |
372 | state->tcp_server = tcp_server; |
373 | state->args = args; |
374 | state->shutdown = true; |
375 | gpr_mu_init(&state->mu); |
376 | |
377 | naddrs = resolved->naddrs; |
378 | errors = static_cast<grpc_error**>(gpr_malloc(sizeof(*errors) * naddrs)); |
379 | for (i = 0; i < naddrs; i++) { |
380 | errors[i] = |
381 | grpc_tcp_server_add_port(tcp_server, &resolved->addrs[i], &port_temp); |
382 | if (errors[i] == GRPC_ERROR_NONE) { |
383 | if (*port_num == -1) { |
384 | *port_num = port_temp; |
385 | } else { |
386 | GPR_ASSERT(*port_num == port_temp); |
387 | } |
388 | count++; |
389 | } |
390 | } |
391 | if (count == 0) { |
392 | char* msg; |
393 | gpr_asprintf(&msg, "No address added out of total %" PRIuPTR " resolved" , |
394 | naddrs); |
395 | err = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(msg, errors, naddrs); |
396 | gpr_free(msg); |
397 | goto error; |
398 | } else if (count != naddrs) { |
399 | char* msg; |
400 | gpr_asprintf(&msg, |
401 | "Only %" PRIuPTR " addresses added out of total %" PRIuPTR |
402 | " resolved" , |
403 | count, naddrs); |
404 | err = GRPC_ERROR_CREATE_REFERENCING_FROM_COPIED_STRING(msg, errors, naddrs); |
405 | gpr_free(msg); |
406 | |
407 | const char* warning_message = grpc_error_string(err); |
408 | gpr_log(GPR_INFO, "WARNING: %s" , warning_message); |
409 | |
410 | /* we managed to bind some addresses: continue */ |
411 | } |
412 | grpc_resolved_addresses_destroy(resolved); |
413 | |
414 | arg = grpc_channel_args_find(args, GRPC_ARG_ENABLE_CHANNELZ); |
415 | if (grpc_channel_arg_get_bool(arg, GRPC_ENABLE_CHANNELZ_DEFAULT)) { |
416 | char* socket_name = nullptr; |
417 | gpr_asprintf(&socket_name, "chttp2 listener %s" , addr); |
418 | state->channelz_listen_socket = |
419 | grpc_core::MakeRefCounted<grpc_core::channelz::ListenSocketNode>( |
420 | addr, socket_name); |
421 | // TODO(veblush): Remove this once gpr_asprintf is replaced by |
422 | // absl::StrFormat |
423 | gpr_free(socket_name); |
424 | } |
425 | |
426 | /* Register with the server only upon success */ |
427 | grpc_server_add_listener(server, state, server_start_listener, |
428 | server_destroy_listener, |
429 | state->channelz_listen_socket); |
430 | goto done; |
431 | |
432 | /* Error path: cleanup and return */ |
433 | error: |
434 | GPR_ASSERT(err != GRPC_ERROR_NONE); |
435 | if (resolved) { |
436 | grpc_resolved_addresses_destroy(resolved); |
437 | } |
438 | if (tcp_server) { |
439 | grpc_tcp_server_unref(tcp_server); |
440 | } else { |
441 | grpc_channel_args_destroy(args); |
442 | gpr_free(state); |
443 | } |
444 | *port_num = 0; |
445 | |
446 | done: |
447 | if (errors != nullptr) { |
448 | for (i = 0; i < naddrs; i++) { |
449 | GRPC_ERROR_UNREF(errors[i]); |
450 | } |
451 | gpr_free(errors); |
452 | } |
453 | return err; |
454 | } |
455 | |