1 | /* Copyright 2019 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | #include "tensorflow/core/kernels/collective_nccl_reducer.h" |
16 | |
17 | #if GOOGLE_CUDA || TENSORFLOW_USE_ROCM |
18 | |
19 | #include "tensorflow/core/common_runtime/collective_util.h" |
20 | #include "tensorflow/core/nccl/nccl_manager.h" |
21 | #include "tensorflow/core/platform/tracing.h" |
22 | #include "tensorflow/core/profiler/lib/traceme.h" |
23 | |
24 | namespace tensorflow { |
25 | |
26 | void NcclReducer::Run(StatusCallback done) { |
27 | Tensor group_size; |
28 | std::unique_ptr<Notification> group_size_ready; |
29 | Status group_size_status; |
30 | std::unique_ptr<Notification> nccl_done; |
31 | if (col_params_->final_op) { |
32 | group_size_ready = std::make_unique<Notification>(); |
33 | // Create an on-device scalar value from group_size_. |
34 | // TODO(ayushd, tucker): avoid this copy by either reusing across |
35 | // invocations or providing the scalar to the kernel in host memory. |
36 | Tensor group_size_val; |
37 | switch (col_ctx_->output->dtype()) { |
38 | case DT_HALF: |
39 | group_size_val = |
40 | Tensor(static_cast<Eigen::half>(col_params_->group.group_size)); |
41 | break; |
42 | case DT_FLOAT: |
43 | group_size_val = |
44 | Tensor(static_cast<float>(col_params_->group.group_size)); |
45 | break; |
46 | case DT_DOUBLE: |
47 | group_size_val = |
48 | Tensor(static_cast<double>(col_params_->group.group_size)); |
49 | break; |
50 | case DT_INT32: |
51 | group_size_val = |
52 | Tensor(static_cast<int32>(col_params_->group.group_size)); |
53 | break; |
54 | case DT_INT64: |
55 | group_size_val = |
56 | Tensor(static_cast<int64_t>(col_params_->group.group_size)); |
57 | break; |
58 | default: |
59 | done(errors::Internal("Unsupported type " , |
60 | DataTypeString(col_ctx_->output->dtype()))); |
61 | return; |
62 | } |
63 | group_size = Tensor( |
64 | col_ctx_->device->GetAllocator(col_ctx_->op_ctx->input_alloc_attr(0)), |
65 | col_ctx_->output->dtype(), TensorShape({})); |
66 | DeviceContext* op_dev_ctx = col_ctx_->op_ctx->op_device_context(); |
67 | // Enqueue copy on gpu stream. |
68 | Notification* copy_note = group_size_ready.get(); |
69 | op_dev_ctx->CopyCPUTensorToDevice( |
70 | &group_size_val, col_ctx_->device, &group_size, |
71 | [copy_note, &group_size_status](const Status& s) { |
72 | group_size_status = s; |
73 | copy_note->Notify(); |
74 | }); |
75 | nccl_done = std::make_unique<Notification>(); |
76 | } |
77 | |
78 | Status nccl_status; |
79 | // If no final_op, then the NCCL callback is just `done`. Otherwise we notify |
80 | // `nccl_done` so that we can then perform `final_op`. |
81 | StatusCallback done_callback; |
82 | if (col_params_->final_op) { |
83 | Notification* nccl_note = nccl_done.get(); |
84 | done_callback = [nccl_note, &nccl_status](const Status& s) { |
85 | nccl_status = s; |
86 | nccl_note->Notify(); |
87 | }; |
88 | } else { |
89 | done_callback = std::move(done); |
90 | } |
91 | // Hold a ref to col_params for the rest of this function. |
92 | col_params_->Ref(); |
93 | core::ScopedUnref unref(col_params_); |
94 | col_ctx_->nccl_communicator->Enqueue(col_ctx_, std::move(done_callback)); |
95 | |
96 | // If no final_op, then this OpKernel is non-blocking. |
97 | if (!col_params_->final_op) { |
98 | return; |
99 | } |
100 | |
101 | // Wait for nccl op and group_size copy to succeed, then do final_op. This |
102 | // kernel needs to wait for both notifications because they execute on |
103 | // different GPU streams with no ordering guarantees between them. |
104 | // TODO(b/80529858): make this entirely non-blocking by getting rid of the |
105 | // waits below and calling final op from the nccl kernel's DoneCallback. |
106 | { |
107 | profiler::TraceMe activity("Nccl" , profiler::TraceMeLevel::kInfo); |
108 | nccl_done->WaitForNotification(); |
109 | } |
110 | { |
111 | profiler::TraceMe activity("GroupSizeCopy" , profiler::TraceMeLevel::kInfo); |
112 | group_size_ready->WaitForNotification(); |
113 | } |
114 | Status final_status = |
115 | group_size_status.ok() ? nccl_status : group_size_status; |
116 | if (final_status.ok()) { |
117 | final_status = collective_util::ComputeBinOp( |
118 | col_ctx_->op_ctx, col_ctx_->op_params, col_ctx_->device, |
119 | col_params_->final_op, col_ctx_->output, &group_size); |
120 | } |
121 | done(final_status); |
122 | } |
123 | |
124 | REGISTER_COLLECTIVE(NcclReduce, NcclReducer); |
125 | |
126 | } // namespace tensorflow |
127 | |
128 | #endif // GOOGLE_CUDA || TENSORFLOW_USE_ROCM |
129 | |