1 | /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #include "tensorflow/core/distributed_runtime/remote_device.h" |
17 | |
18 | #include <stdlib.h> |
19 | |
20 | #include <vector> |
21 | |
22 | #include "tensorflow/core/common_runtime/device.h" |
23 | #include "tensorflow/core/common_runtime/process_util.h" |
24 | #include "tensorflow/core/common_runtime/renamed_device.h" |
25 | #include "tensorflow/core/distributed_runtime/worker_cache.h" |
26 | #include "tensorflow/core/distributed_runtime/worker_interface.h" |
27 | #include "tensorflow/core/lib/core/errors.h" |
28 | #include "tensorflow/core/lib/gtl/cleanup.h" |
29 | #include "tensorflow/core/platform/logging.h" |
30 | #include "tensorflow/core/platform/macros.h" |
31 | #include "tensorflow/core/protobuf/worker.pb.h" |
32 | #include "tensorflow/core/util/device_name_utils.h" |
33 | |
34 | namespace tensorflow { |
35 | |
36 | class RemoteDevice : public Device { |
37 | public: |
38 | RemoteDevice(Env* env, const DeviceAttributes& da) |
39 | : Device(env, da), |
40 | local_dev_name_(DeviceNameUtils::LocalName(da.name())) {} |
41 | |
42 | Status Sync() override { return OkStatus(); } |
43 | Allocator* GetAllocator(AllocatorAttributes attr) override { return nullptr; } |
44 | |
45 | ResourceMgr* resource_manager() override { |
46 | LOG(FATAL) << "Accessing the resource manager of a remote device is not " |
47 | << "supported." ; |
48 | std::abort(); |
49 | } |
50 | |
51 | bool IsLocal() const override { return false; } |
52 | |
53 | bool IsRemoteCallAllowed() const override { return true; } |
54 | |
55 | private: |
56 | const string local_dev_name_; |
57 | |
58 | TF_DISALLOW_COPY_AND_ASSIGN(RemoteDevice); |
59 | }; |
60 | |
61 | void AsRemoteDevices( |
62 | Env* env, |
63 | const protobuf::RepeatedPtrField<DeviceAttributes>& device_attributes, |
64 | LookupLocalDevice lookup_local_device, |
65 | std::vector<std::unique_ptr<Device>>* remote_devices) { |
66 | for (const auto& da : device_attributes) { |
67 | Device* local_device; |
68 | if (lookup_local_device != nullptr && |
69 | lookup_local_device(da.name(), &local_device).ok()) { |
70 | remote_devices->emplace_back(RenamedDevice::NewRenamedDevice( |
71 | local_device->name(), local_device, false, false)); |
72 | } else { |
73 | auto d = new RemoteDevice(env, da); |
74 | remote_devices->emplace_back(d); |
75 | } |
76 | } |
77 | } |
78 | |
79 | void NewRemoteDevices(Env* env, WorkerCacheInterface* worker_cache, |
80 | const string& worker_name, NewRemoteDevicesDone done) { |
81 | WorkerInterface* wi = worker_cache->GetOrCreateWorker(worker_name); |
82 | if (wi == nullptr) { |
83 | std::vector<Device*> empty; |
84 | done(errors::NotFound("Device " , worker_name, " is not found." ), &empty); |
85 | return; |
86 | } |
87 | struct Call { |
88 | GetStatusRequest req; |
89 | GetStatusResponse resp; |
90 | }; |
91 | Call* call = new Call; |
92 | auto cb = [env, worker_cache, worker_name, done, wi, |
93 | call](const Status& status) { |
94 | Status s = status; |
95 | std::vector<Device*> remote_devices; |
96 | auto cleanup = gtl::MakeCleanup( |
97 | [&worker_cache, &worker_name, &wi, &done, &remote_devices, &s, call] { |
98 | worker_cache->ReleaseWorker(worker_name, wi); |
99 | done(s, &remote_devices); |
100 | delete call; |
101 | }); |
102 | if (s.ok()) { |
103 | DeviceNameUtils::ParsedName worker_name_parsed; |
104 | if (!DeviceNameUtils::ParseFullName(worker_name, &worker_name_parsed) || |
105 | !worker_name_parsed.has_job || !worker_name_parsed.has_replica || |
106 | !worker_name_parsed.has_task) { |
107 | s = errors::InvalidArgument("Could not parse worker name: " , |
108 | worker_name); |
109 | LOG(WARNING) << s; |
110 | return; |
111 | } |
112 | remote_devices.reserve(call->resp.device_attributes_size()); |
113 | for (const DeviceAttributes& da : call->resp.device_attributes()) { |
114 | DeviceNameUtils::ParsedName device_name_parsed; |
115 | CHECK(DeviceNameUtils::ParseFullName(da.name(), &device_name_parsed)) |
116 | << "Device attribute name '" << da.name() << "' could not be " |
117 | << "parsed. Device Attribute: " << da.DebugString(); |
118 | // Preserve the exact name, if possible. |
119 | // TODO(b/37868888): Simplify when legacy device name formats removed. |
120 | if (device_name_parsed.job == worker_name_parsed.job && |
121 | device_name_parsed.replica == worker_name_parsed.replica && |
122 | device_name_parsed.task == worker_name_parsed.task) { |
123 | auto d = new RemoteDevice(env, da); |
124 | remote_devices.push_back(d); |
125 | } else { |
126 | DeviceAttributes da_rewritten = da; |
127 | da_rewritten.set_name(DeviceNameUtils::FullName( |
128 | worker_name_parsed.job, worker_name_parsed.replica, |
129 | worker_name_parsed.task, device_name_parsed.type, |
130 | device_name_parsed.id)); |
131 | auto d = new RemoteDevice(env, da_rewritten); |
132 | |
133 | // Experimental: Skipping over adding any TPU-type devices that aren't |
134 | // on the job called "worker" (but still adds the CPUs of other jobs). |
135 | if (getenv("TPU_NO_POPULATE_DEVICE_LIST_FROM_CLUSTER_SPEC" ) != |
136 | nullptr) { |
137 | if (worker_name_parsed.job == "worker" || |
138 | device_name_parsed.type.find("TPU" ) == std::string::npos) { |
139 | remote_devices.push_back(d); |
140 | } |
141 | } else { |
142 | remote_devices.push_back(d); |
143 | } |
144 | } |
145 | } |
146 | } |
147 | }; |
148 | wi->GetStatusAsync(/*opts=*/nullptr, &call->req, &call->resp, |
149 | /*fail_fast=*/false, cb); |
150 | } |
151 | |
152 | std::unique_ptr<Device> NewRemoteDevice(Env* env, |
153 | DeviceAttributes device_attribute) { |
154 | return std::make_unique<RemoteDevice>(env, device_attribute); |
155 | } |
156 | |
157 | } // namespace tensorflow |
158 | |