1/*
2 *
3 * Copyright 2015 gRPC authors.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 */
18
19#ifndef GRPCPP_SERVER_IMPL_H
20#define GRPCPP_SERVER_IMPL_H
21
22#include <list>
23#include <memory>
24#include <vector>
25
26#include <grpc/impl/codegen/port_platform.h>
27
28#include <grpc/compression.h>
29#include <grpc/support/atm.h>
30#include <grpcpp/channel_impl.h>
31#include <grpcpp/completion_queue_impl.h>
32#include <grpcpp/health_check_service_interface.h>
33#include <grpcpp/impl/call.h>
34#include <grpcpp/impl/codegen/client_interceptor.h>
35#include <grpcpp/impl/codegen/completion_queue_impl.h>
36#include <grpcpp/impl/codegen/grpc_library.h>
37#include <grpcpp/impl/codegen/server_interface.h>
38#include <grpcpp/impl/rpc_service_method.h>
39#include <grpcpp/security/server_credentials.h>
40#include <grpcpp/support/channel_arguments_impl.h>
41#include <grpcpp/support/config.h>
42#include <grpcpp/support/status.h>
43
44struct grpc_server;
45
46namespace grpc {
47class AsyncGenericService;
48
49namespace internal {
50class ExternalConnectionAcceptorImpl;
51} // namespace internal
52
53} // namespace grpc
54
55namespace grpc_impl {
56class HealthCheckServiceInterface;
57class ServerContext;
58class ServerInitializer;
59
60/// Represents a gRPC server.
61///
62/// Use a \a grpc::ServerBuilder to create, configure, and start
63/// \a Server instances.
64class Server : public grpc::ServerInterface, private grpc::GrpcLibraryCodegen {
65 public:
66 ~Server();
67
68 /// Block until the server shuts down.
69 ///
70 /// \warning The server must be either shutting down or some other thread must
71 /// call \a Shutdown for this function to ever return.
72 void Wait() override;
73
74 /// Global callbacks are a set of hooks that are called when server
75 /// events occur. \a SetGlobalCallbacks method is used to register
76 /// the hooks with gRPC. Note that
77 /// the \a GlobalCallbacks instance will be shared among all
78 /// \a Server instances in an application and can be set exactly
79 /// once per application.
80 class GlobalCallbacks {
81 public:
82 virtual ~GlobalCallbacks() {}
83 /// Called before server is created.
84 virtual void UpdateArguments(ChannelArguments* /*args*/) {}
85 /// Called before application callback for each synchronous server request
86 virtual void PreSynchronousRequest(grpc_impl::ServerContext* context) = 0;
87 /// Called after application callback for each synchronous server request
88 virtual void PostSynchronousRequest(grpc_impl::ServerContext* context) = 0;
89 /// Called before server is started.
90 virtual void PreServerStart(Server* /*server*/) {}
91 /// Called after a server port is added.
92 virtual void AddPort(Server* /*server*/, const grpc::string& /*addr*/,
93 grpc::ServerCredentials* /*creds*/, int /*port*/) {}
94 };
95 /// Set the global callback object. Can only be called once per application.
96 /// Does not take ownership of callbacks, and expects the pointed to object
97 /// to be alive until all server objects in the process have been destroyed.
98 /// The same \a GlobalCallbacks object will be used throughout the
99 /// application and is shared among all \a Server objects.
100 static void SetGlobalCallbacks(GlobalCallbacks* callbacks);
101
102 /// Returns a \em raw pointer to the underlying \a grpc_server instance.
103 /// EXPERIMENTAL: for internal/test use only
104 grpc_server* c_server();
105
106 /// Returns the health check service.
107 grpc::HealthCheckServiceInterface* GetHealthCheckService() const {
108 return health_check_service_.get();
109 }
110
111 /// Establish a channel for in-process communication
112 std::shared_ptr<Channel> InProcessChannel(const ChannelArguments& args);
113
114 /// NOTE: class experimental_type is not part of the public API of this class.
115 /// TODO(yashykt): Integrate into public API when this is no longer
116 /// experimental.
117 class experimental_type {
118 public:
119 explicit experimental_type(Server* server) : server_(server) {}
120
121 /// Establish a channel for in-process communication with client
122 /// interceptors
123 std::shared_ptr<Channel> InProcessChannelWithInterceptors(
124 const ChannelArguments& args,
125 std::vector<std::unique_ptr<
126 grpc::experimental::ClientInterceptorFactoryInterface>>
127 interceptor_creators);
128
129 private:
130 Server* server_;
131 };
132
133 /// NOTE: The function experimental() is not stable public API. It is a view
134 /// to the experimental components of this class. It may be changed or removed
135 /// at any time.
136 experimental_type experimental() { return experimental_type(this); }
137
138 protected:
139 /// Register a service. This call does not take ownership of the service.
140 /// The service must exist for the lifetime of the Server instance.
141 bool RegisterService(const grpc::string* host,
142 grpc::Service* service) override;
143
144 /// Try binding the server to the given \a addr endpoint
145 /// (port, and optionally including IP address to bind to).
146 ///
147 /// It can be invoked multiple times. Should be used before
148 /// starting the server.
149 ///
150 /// \param addr The address to try to bind to the server (eg, localhost:1234,
151 /// 192.168.1.1:31416, [::1]:27182, etc.).
152 /// \param creds The credentials associated with the server.
153 ///
154 /// \return bound port number on success, 0 on failure.
155 ///
156 /// \warning It is an error to call this method on an already started server.
157 int AddListeningPort(const grpc::string& addr,
158 grpc::ServerCredentials* creds) override;
159
160 /// NOTE: This is *NOT* a public API. The server constructors are supposed to
161 /// be used by \a ServerBuilder class only. The constructor will be made
162 /// 'private' very soon.
163 ///
164 /// Server constructors. To be used by \a ServerBuilder only.
165 ///
166 /// \param args The channel args
167 ///
168 /// \param sync_server_cqs The completion queues to use if the server is a
169 /// synchronous server (or a hybrid server). The server polls for new RPCs on
170 /// these queues
171 ///
172 /// \param min_pollers The minimum number of polling threads per server
173 /// completion queue (in param sync_server_cqs) to use for listening to
174 /// incoming requests (used only in case of sync server)
175 ///
176 /// \param max_pollers The maximum number of polling threads per server
177 /// completion queue (in param sync_server_cqs) to use for listening to
178 /// incoming requests (used only in case of sync server)
179 ///
180 /// \param sync_cq_timeout_msec The timeout to use when calling AsyncNext() on
181 /// server completion queues passed via sync_server_cqs param.
182 Server(ChannelArguments* args,
183 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
184 sync_server_cqs,
185 int min_pollers, int max_pollers, int sync_cq_timeout_msec,
186 std::vector<
187 std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
188 acceptors,
189 grpc_resource_quota* server_rq = nullptr,
190 std::vector<std::unique_ptr<
191 grpc::experimental::ServerInterceptorFactoryInterface>>
192 interceptor_creators = std::vector<std::unique_ptr<
193 grpc::experimental::ServerInterceptorFactoryInterface>>());
194
195 /// Start the server.
196 ///
197 /// \param cqs Completion queues for handling asynchronous services. The
198 /// caller is required to keep all completion queues live until the server is
199 /// destroyed.
200 /// \param num_cqs How many completion queues does \a cqs hold.
201 void Start(ServerCompletionQueue** cqs, size_t num_cqs) override;
202
203 grpc_server* server() override { return server_; }
204
205 protected:
206 /// NOTE: This method is not part of the public API for this class.
207 void set_health_check_service(
208 std::unique_ptr<grpc::HealthCheckServiceInterface> service) {
209 health_check_service_ = std::move(service);
210 }
211
212 /// NOTE: This method is not part of the public API for this class.
213 bool health_check_service_disabled() const {
214 return health_check_service_disabled_;
215 }
216
217 private:
218 std::vector<
219 std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>*
220 interceptor_creators() override {
221 return &interceptor_creators_;
222 }
223
224 friend class grpc::AsyncGenericService;
225 friend class grpc_impl::ServerBuilder;
226 friend class grpc_impl::ServerInitializer;
227
228 class SyncRequest;
229 class CallbackRequestBase;
230 template <class ServerContextType>
231 class CallbackRequest;
232 class UnimplementedAsyncRequest;
233 class UnimplementedAsyncResponse;
234
235 /// SyncRequestThreadManager is an implementation of ThreadManager. This class
236 /// is responsible for polling for incoming RPCs and calling the RPC handlers.
237 /// This is only used in case of a Sync server (i.e a server exposing a sync
238 /// interface)
239 class SyncRequestThreadManager;
240
241 /// Register a generic service. This call does not take ownership of the
242 /// service. The service must exist for the lifetime of the Server instance.
243 void RegisterAsyncGenericService(grpc::AsyncGenericService* service) override;
244
245#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
246 /// Register a callback-based generic service. This call does not take
247 /// ownership of theservice. The service must exist for the lifetime of the
248 /// Server instance.
249 void RegisterCallbackGenericService(
250 grpc::CallbackGenericService* service) override;
251#else
252 /// NOTE: class experimental_registration_type is not part of the public API
253 /// of this class
254 /// TODO(vjpai): Move these contents to the public API of Server when
255 /// they are no longer experimental
256 class experimental_registration_type final
257 : public experimental_registration_interface {
258 public:
259 explicit experimental_registration_type(Server* server) : server_(server) {}
260 void RegisterCallbackGenericService(
261 grpc::experimental::CallbackGenericService* service) override {
262 server_->RegisterCallbackGenericService(service);
263 }
264
265 private:
266 Server* server_;
267 };
268
269 /// TODO(vjpai): Mark this override when experimental type above is deleted
270 void RegisterCallbackGenericService(
271 grpc::experimental::CallbackGenericService* service);
272
273 /// NOTE: The function experimental_registration() is not stable public API.
274 /// It is a view to the experimental components of this class. It may be
275 /// changed or removed at any time.
276 experimental_registration_interface* experimental_registration() override {
277 return &experimental_registration_;
278 }
279#endif
280
281 void PerformOpsOnCall(grpc::internal::CallOpSetInterface* ops,
282 grpc::internal::Call* call) override;
283
284 void ShutdownInternal(gpr_timespec deadline) override;
285
286 int max_receive_message_size() const override {
287 return max_receive_message_size_;
288 }
289
290 CompletionQueue* CallbackCQ() override;
291
292 grpc_impl::ServerInitializer* initializer();
293
294 std::vector<std::shared_ptr<grpc::internal::ExternalConnectionAcceptorImpl>>
295 acceptors_;
296
297 // A vector of interceptor factory objects.
298 // This should be destroyed after health_check_service_ and this requirement
299 // is satisfied by declaring interceptor_creators_ before
300 // health_check_service_. (C++ mandates that member objects be destroyed in
301 // the reverse order of initialization.)
302 std::vector<
303 std::unique_ptr<grpc::experimental::ServerInterceptorFactoryInterface>>
304 interceptor_creators_;
305
306 int max_receive_message_size_;
307
308 /// The following completion queues are ONLY used in case of Sync API
309 /// i.e. if the server has any services with sync methods. The server uses
310 /// these completion queues to poll for new RPCs
311 std::shared_ptr<std::vector<std::unique_ptr<ServerCompletionQueue>>>
312 sync_server_cqs_;
313
314 /// List of \a ThreadManager instances (one for each cq in
315 /// the \a sync_server_cqs)
316 std::vector<std::unique_ptr<SyncRequestThreadManager>> sync_req_mgrs_;
317
318 // Outstanding unmatched callback requests, indexed by method.
319 // NOTE: Using a gpr_atm rather than atomic_int because atomic_int isn't
320 // copyable or movable and thus will cause compilation errors. We
321 // actually only want to extend the vector before the threaded use
322 // starts, but this is still a limitation.
323 std::vector<gpr_atm> callback_unmatched_reqs_count_;
324
325 // List of callback requests to start when server actually starts.
326 std::list<CallbackRequestBase*> callback_reqs_to_start_;
327
328#ifndef GRPC_CALLBACK_API_NONEXPERIMENTAL
329 // For registering experimental callback generic service; remove when that
330 // method longer experimental
331 experimental_registration_type experimental_registration_{this};
332#endif
333
334 // Server status
335 grpc::internal::Mutex mu_;
336 bool started_;
337 bool shutdown_;
338 bool shutdown_notified_; // Was notify called on the shutdown_cv_
339
340 grpc::internal::CondVar shutdown_cv_;
341
342 // It is ok (but not required) to nest callback_reqs_mu_ under mu_ .
343 // Incrementing callback_reqs_outstanding_ is ok without a lock but it must be
344 // decremented under the lock in case it is the last request and enables the
345 // server shutdown. The increment is performance-critical since it happens
346 // during periods of increasing load; the decrement happens only when memory
347 // is maxed out, during server shutdown, or (possibly in a future version)
348 // during decreasing load, so it is less performance-critical.
349 grpc::internal::Mutex callback_reqs_mu_;
350 grpc::internal::CondVar callback_reqs_done_cv_;
351 std::atomic<intptr_t> callback_reqs_outstanding_{0};
352
353 std::shared_ptr<GlobalCallbacks> global_callbacks_;
354
355 std::vector<grpc::string> services_;
356 bool has_async_generic_service_{false};
357 bool has_callback_generic_service_{false};
358
359 // Pointer to the wrapped grpc_server.
360 grpc_server* server_;
361
362 std::unique_ptr<grpc_impl::ServerInitializer> server_initializer_;
363
364 std::unique_ptr<grpc::HealthCheckServiceInterface> health_check_service_;
365 bool health_check_service_disabled_;
366
367 // When appropriate, use a default callback generic service to handle
368 // unimplemented methods
369#ifdef GRPC_CALLBACK_API_NONEXPERIMENTAL
370 std::unique_ptr<grpc::CallbackGenericService> unimplemented_service_;
371#else
372 std::unique_ptr<grpc::experimental::CallbackGenericService>
373 unimplemented_service_;
374#endif
375
376 // A special handler for resource exhausted in sync case
377 std::unique_ptr<grpc::internal::MethodHandler> resource_exhausted_handler_;
378
379 // Handler for callback generic service, if any
380 std::unique_ptr<grpc::internal::MethodHandler> generic_handler_;
381
382 // callback_cq_ references the callbackable completion queue associated
383 // with this server (if any). It is set on the first call to CallbackCQ().
384 // It is _not owned_ by the server; ownership belongs with its internal
385 // shutdown callback tag (invoked when the CQ is fully shutdown).
386 // It is protected by mu_
387 CompletionQueue* callback_cq_ = nullptr;
388};
389
390} // namespace grpc_impl
391
392#endif // GRPCPP_SERVER_IMPL_H
393