1 | /* Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_COLLECTIVE_EXECUTOR_MGR_H_ |
16 | #define TENSORFLOW_CORE_COMMON_RUNTIME_COLLECTIVE_EXECUTOR_MGR_H_ |
17 | |
18 | #include "tensorflow/core/framework/collective.h" |
19 | #include "tensorflow/core/lib/gtl/flatmap.h" |
20 | #include "tensorflow/core/platform/unbounded_work_queue.h" |
21 | |
22 | namespace tensorflow { |
23 | class ConfigProto; |
24 | class DeviceMgr; |
25 | |
26 | class CollectiveExecutorMgr : public CollectiveExecutorMgrInterface { |
27 | public: |
28 | CollectiveExecutorMgr( |
29 | const ConfigProto& config, const DeviceMgr* dev_mgr, |
30 | std::unique_ptr<DeviceResolverInterface> dev_resolver, |
31 | std::unique_ptr<ParamResolverInterface> param_resolver, |
32 | std::unique_ptr<NcclCommunicatorInterface> nccl_communicator); |
33 | |
34 | virtual ~CollectiveExecutorMgr(); |
35 | |
36 | CollectiveExecutor* FindOrCreate(int64_t step_id) override; |
37 | |
38 | void Cleanup(int64_t step_id) override; |
39 | |
40 | ParamResolverInterface* GetParamResolver() const override { |
41 | return param_resolver_.get(); |
42 | } |
43 | |
44 | DeviceResolverInterface* GetDeviceResolver() const override { |
45 | return dev_resolver_.get(); |
46 | } |
47 | |
48 | NcclCommunicatorInterface* GetNcclCommunicator() const override { |
49 | return nccl_communicator_.get(); |
50 | } |
51 | |
52 | void GetStepSequenceAsync(const GetStepSequenceRequest* request, |
53 | GetStepSequenceResponse* response, |
54 | const StatusCallback& done) override; |
55 | |
56 | void RefreshStepIdSequenceAsync(int64_t graph_key, |
57 | const StatusCallback& done) override; |
58 | |
59 | int64_t NextStepId(int64_t graph_key) override { |
60 | return CollectiveExecutor::kInvalidId; |
61 | } |
62 | |
63 | void RetireStepId(int64_t graph_key, int64_t step_id) override {} |
64 | |
65 | protected: |
66 | // Called by FindOrCreate when table entry does not yet exist. |
67 | virtual CollectiveExecutor* Create(int64_t step_id); |
68 | |
69 | const DeviceMgr* dev_mgr_; |
70 | std::unique_ptr<DeviceResolverInterface> dev_resolver_; |
71 | std::unique_ptr<ParamResolverInterface> param_resolver_; |
72 | string gpu_ring_order_; |
73 | std::unique_ptr<NcclCommunicatorInterface> nccl_communicator_; |
74 | // Unbounded work queue for scheduling potentially-blocking work during |
75 | // collective op execution. Ownership is shared between `this` and |
76 | // `CollectiveRemoteAccessLocal`. |
77 | std::shared_ptr<UnboundedWorkQueue> work_queue_; |
78 | |
79 | private: |
80 | mutex exec_mu_; |
81 | // Map from step_id to CollectiveExecutor |
82 | gtl::FlatMap<int64_t, CollectiveExecutor*> executor_table_ |
83 | TF_GUARDED_BY(exec_mu_); |
84 | }; |
85 | |
86 | // Creates a local CollectiveExecutorMgr with production implementations of each |
87 | // components. Cases that need to inject other implementations of these |
88 | // components should call CollectiveExecutorMgr constructor directly. This only |
89 | // supports a single host. For distributed use case, use |
90 | // CreateProdRpcCollectiveExecutorMgr() instead. |
91 | std::unique_ptr<CollectiveExecutorMgr> CreateProdLocalCollectiveExecutorMgr( |
92 | const ConfigProto& config, const DeviceMgr* device_mgr, |
93 | std::unique_ptr<NcclCommunicatorInterface> nccl_communicator); |
94 | |
95 | } // namespace tensorflow |
96 | #endif // TENSORFLOW_CORE_COMMON_RUNTIME_COLLECTIVE_EXECUTOR_MGR_H_ |
97 | |