1 | /* Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | #ifndef TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_COLLECTIVE_RMA_DISTRIBUTED_H_ |
16 | #define TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_COLLECTIVE_RMA_DISTRIBUTED_H_ |
17 | |
18 | #include "tensorflow/core/common_runtime/collective_rma_local.h" |
19 | #include "tensorflow/core/framework/cancellation.h" |
20 | #include "tensorflow/core/framework/tensor.h" |
21 | #include "tensorflow/core/platform/unbounded_work_queue.h" |
22 | |
23 | namespace tensorflow { |
24 | class WorkerCacheInterface; |
25 | |
26 | // Extend CollectiveRemoteAccessLocal with access to remote peers. |
27 | class CollectiveRemoteAccessDistributed : public CollectiveRemoteAccessLocal { |
28 | public: |
29 | CollectiveRemoteAccessDistributed( |
30 | const DeviceMgr* dev_mgr, DeviceResolverInterface* dev_resolver, |
31 | std::shared_ptr<UnboundedWorkQueue> work_queue, |
32 | WorkerCacheInterface* worker_cache, int64_t step_id, string task_name) |
33 | : CollectiveRemoteAccessLocal(dev_mgr, dev_resolver, step_id), |
34 | worker_cache_(worker_cache), |
35 | work_queue_(std::move(work_queue)), |
36 | task_name_(std::move(task_name)) {} |
37 | |
38 | ~CollectiveRemoteAccessDistributed() override {} |
39 | |
40 | void RecvFromPeer(const string& peer_device, const string& peer_task, |
41 | bool peer_is_local, const string& key, Device* to_device, |
42 | DeviceContext* to_device_ctx, |
43 | const AllocatorAttributes& to_alloc_attr, Tensor* to_tensor, |
44 | const DeviceLocality& client_locality, |
45 | int dev_to_dev_stream_index, |
46 | CancellationManager* cancellation_manager, |
47 | const StatusCallback& done) override; |
48 | |
49 | void CheckPeerHealth(const string& peer_task, int64_t timeout_in_ms, |
50 | const StatusCallback& done) override; |
51 | |
52 | void StartAbort(const Status& s) override; |
53 | |
54 | protected: |
55 | WorkerCacheInterface* worker_cache_; // Not owned |
56 | // Ownership of `work_queue_` is shared between `this` and |
57 | // `CollectiveExecutorMgr`. |
58 | std::shared_ptr<UnboundedWorkQueue> work_queue_; |
59 | CancellationManager abortion_cancel_mgr_; |
60 | string task_name_; |
61 | }; |
62 | |
63 | } // namespace tensorflow |
64 | #endif // TENSORFLOW_CORE_DISTRIBUTED_RUNTIME_COLLECTIVE_RMA_DISTRIBUTED_H_ |
65 | |