1 | /* Copyright 2020 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | #ifndef TENSORFLOW_CORE_COMMON_RUNTIME_ALL_TO_ALL_H_ |
16 | #define TENSORFLOW_CORE_COMMON_RUNTIME_ALL_TO_ALL_H_ |
17 | |
18 | #include <deque> |
19 | #include <memory> |
20 | #include <string> |
21 | #include <vector> |
22 | |
23 | #include "tensorflow/core/common_runtime/base_collective_executor.h" |
24 | #include "tensorflow/core/framework/collective.h" |
25 | #include "tensorflow/core/framework/device.h" |
26 | |
27 | namespace tensorflow { |
28 | |
29 | // Implementation of collective all-to-all. |
30 | class AllToAll : public CollectiveImplementationInterface { |
31 | public: |
32 | AllToAll(); |
33 | |
34 | void Run(StatusCallback done) override; |
35 | |
36 | Status InitializeCollectiveParams(CollectiveParams* col_params) override { |
37 | return OkStatus(); |
38 | } |
39 | |
40 | // Initializes members of CollectiveContext not yet initialized, i.e. device |
41 | // and device_locality. Also saves the CollectiveContext in this object. |
42 | Status InitializeCollectiveContext( |
43 | std::shared_ptr<CollectiveContext> col_ctx) override; |
44 | |
45 | private: |
46 | std::shared_ptr<CollectiveContext> col_ctx_; |
47 | const CollectiveParams* col_params_; // Not owned |
48 | std::vector<Tensor> input_chunks_; |
49 | Tensor output_buffer_; |
50 | std::vector<Tensor> output_chunks_; |
51 | StatusCallback done_; |
52 | mutex mu_; |
53 | Status status_ TF_GUARDED_BY(mu_); |
54 | int counter_ TF_GUARDED_BY(mu_); |
55 | |
56 | void DispatchSend(int src_rank, int target_rank, const Tensor* tensor, |
57 | const StatusCallback& done); |
58 | |
59 | void DispatchRecv(int src_rank, int target_rank, Tensor* tensor, |
60 | const StatusCallback& done); |
61 | |
62 | // Atomically increments counter_ by one for sending, one for receiving. |
63 | // Invokes done when counter_ reaches 2. |
64 | // The purpose of checking counter_ is to ensure that done_ is called once. |
65 | StatusCallback CheckCounterAndCallDone(); |
66 | }; |
67 | |
68 | } // namespace tensorflow |
69 | #endif // TENSORFLOW_CORE_COMMON_RUNTIME_ALL_TO_ALL_H_ |
70 | |