1 | /* Copyright 2018 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #include "tensorflow/tsl/platform/file_system_helper.h" |
17 | |
18 | #include <deque> |
19 | #include <string> |
20 | #include <vector> |
21 | |
22 | #include "tensorflow/tsl/platform/cpu_info.h" |
23 | #include "tensorflow/tsl/platform/env.h" |
24 | #include "tensorflow/tsl/platform/errors.h" |
25 | #include "tensorflow/tsl/platform/file_system.h" |
26 | #include "tensorflow/tsl/platform/mutex.h" |
27 | #include "tensorflow/tsl/platform/path.h" |
28 | #include "tensorflow/tsl/platform/platform.h" |
29 | #include "tensorflow/tsl/platform/status.h" |
30 | #include "tensorflow/tsl/platform/str_util.h" |
31 | #include "tensorflow/tsl/platform/threadpool.h" |
32 | |
33 | namespace tsl { |
34 | namespace internal { |
35 | |
36 | namespace { |
37 | |
38 | const int kNumThreads = port::NumSchedulableCPUs(); |
39 | |
40 | // Run a function in parallel using a ThreadPool, but skip the ThreadPool |
41 | // on the iOS platform due to its problems with more than a few threads. |
42 | void ForEach(int first, int last, const std::function<void(int)>& f) { |
43 | #if TARGET_OS_IPHONE |
44 | for (int i = first; i < last; i++) { |
45 | f(i); |
46 | } |
47 | #else |
48 | int num_threads = std::min(kNumThreads, last - first); |
49 | thread::ThreadPool threads(Env::Default(), "ForEach" , num_threads); |
50 | for (int i = first; i < last; i++) { |
51 | threads.Schedule([f, i] { f(i); }); |
52 | } |
53 | #endif |
54 | } |
55 | |
56 | // A globbing pattern can only start with these characters: |
57 | static const char kGlobbingChars[] = "*?[\\" ; |
58 | |
59 | static inline bool IsGlobbingPattern(const std::string& pattern) { |
60 | return (pattern.find_first_of(kGlobbingChars) != std::string::npos); |
61 | } |
62 | |
63 | // Make sure that the first entry in `dirs` during glob expansion does not |
64 | // contain a glob pattern. This is to prevent a corner-case bug where |
65 | // `<pattern>` would be treated differently than `./<pattern>`. |
66 | static std::string PatchPattern(const std::string& pattern) { |
67 | const std::string fixed_prefix = |
68 | pattern.substr(0, pattern.find_first_of(kGlobbingChars)); |
69 | |
70 | // Patching is needed when there is no directory part in `prefix` |
71 | if (io::Dirname(fixed_prefix).empty()) { |
72 | return io::JoinPath("." , pattern); |
73 | } |
74 | |
75 | // No patching needed |
76 | return pattern; |
77 | } |
78 | |
79 | static std::vector<std::string> AllDirectoryPrefixes(const std::string& d) { |
80 | std::vector<std::string> dirs; |
81 | const std::string patched = PatchPattern(d); |
82 | StringPiece dir(patched); |
83 | |
84 | // If the pattern ends with a `/` (or `\\` on Windows), we need to strip it |
85 | // otherwise we would have one additional matching step and the result set |
86 | // would be empty. |
87 | bool is_directory = d[d.size() - 1] == '/'; |
88 | #ifdef PLATFORM_WINDOWS |
89 | is_directory = is_directory || (d[d.size() - 1] == '\\'); |
90 | #endif |
91 | if (is_directory) { |
92 | dir = io::Dirname(dir); |
93 | } |
94 | |
95 | while (!dir.empty()) { |
96 | dirs.emplace_back(dir); |
97 | StringPiece new_dir(io::Dirname(dir)); |
98 | // io::Dirname("/") returns "/" so we need to break the loop. |
99 | // On Windows, io::Dirname("C:\\") would return "C:\\", so we check for |
100 | // identity of the result instead of checking for dir[0] == `/`. |
101 | if (dir == new_dir) break; |
102 | dir = new_dir; |
103 | } |
104 | |
105 | // Order the array from parent to ancestor (reverse order). |
106 | std::reverse(dirs.begin(), dirs.end()); |
107 | |
108 | return dirs; |
109 | } |
110 | |
111 | static inline int GetFirstGlobbingEntry(const std::vector<std::string>& dirs) { |
112 | int i = 0; |
113 | for (const auto& d : dirs) { |
114 | if (IsGlobbingPattern(d)) { |
115 | break; |
116 | } |
117 | i++; |
118 | } |
119 | return i; |
120 | } |
121 | |
122 | } // namespace |
123 | |
124 | Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern, |
125 | std::vector<string>* results) { |
126 | // Check that `fs`, `env` and `results` are non-null. |
127 | if (fs == nullptr || env == nullptr || results == nullptr) { |
128 | return Status(tsl::error::INVALID_ARGUMENT, |
129 | "Filesystem calls GetMatchingPaths with nullptr arguments" ); |
130 | } |
131 | |
132 | // By design, we don't match anything on empty pattern |
133 | results->clear(); |
134 | if (pattern.empty()) { |
135 | return OkStatus(); |
136 | } |
137 | |
138 | // The pattern can contain globbing characters at multiple levels, e.g.: |
139 | // |
140 | // foo/ba?/baz/f*r |
141 | // |
142 | // To match the full pattern, we must match every prefix subpattern and then |
143 | // operate on the children for each match. Thus, we separate all subpatterns |
144 | // in the `dirs` vector below. |
145 | std::vector<std::string> dirs = AllDirectoryPrefixes(pattern); |
146 | |
147 | // We can have patterns that have several parents where no globbing is being |
148 | // done, for example, `foo/bar/baz/*`. We don't need to expand the directories |
149 | // which don't contain the globbing characters. |
150 | int matching_index = GetFirstGlobbingEntry(dirs); |
151 | |
152 | // If we don't have globbing characters in the pattern then it specifies a |
153 | // path in the filesystem. We add it to the result set if it exists. |
154 | if (matching_index == dirs.size()) { |
155 | if (fs->FileExists(pattern).ok()) { |
156 | results->emplace_back(pattern); |
157 | } |
158 | return OkStatus(); |
159 | } |
160 | |
161 | // To expand the globbing, we do a BFS from `dirs[matching_index-1]`. |
162 | // At every step, we work on a pair `{dir, ix}` such that `dir` is a real |
163 | // directory, `ix < dirs.size() - 1` and `dirs[ix+1]` is a globbing pattern. |
164 | // To expand the pattern, we select from all the children of `dir` only those |
165 | // that match against `dirs[ix+1]`. |
166 | // If there are more entries in `dirs` after `dirs[ix+1]` this mean we have |
167 | // more patterns to match. So, we add to the queue only those children that |
168 | // are also directories, paired with `ix+1`. |
169 | // If there are no more entries in `dirs`, we return all children as part of |
170 | // the answer. |
171 | // Since we can get into a combinatorial explosion issue (e.g., pattern |
172 | // `/*/*/*`), we process the queue in parallel. Each parallel processing takes |
173 | // elements from `expand_queue` and adds them to `next_expand_queue`, after |
174 | // which we swap these two queues (similar to double buffering algorithms). |
175 | // PRECONDITION: `IsGlobbingPattern(dirs[0]) == false` |
176 | // PRECONDITION: `matching_index > 0` |
177 | // INVARIANT: If `{d, ix}` is in queue, then `d` and `dirs[ix]` are at the |
178 | // same level in the filesystem tree. |
179 | // INVARIANT: If `{d, _}` is in queue, then `IsGlobbingPattern(d) == false`. |
180 | // INVARIANT: If `{d, _}` is in queue, then `d` is a real directory. |
181 | // INVARIANT: If `{_, ix}` is in queue, then `ix < dirs.size() - 1`. |
182 | // INVARIANT: If `{_, ix}` is in queue, `IsGlobbingPattern(dirs[ix + 1])`. |
183 | std::deque<std::pair<string, int>> expand_queue; |
184 | std::deque<std::pair<string, int>> next_expand_queue; |
185 | expand_queue.emplace_back(dirs[matching_index - 1], matching_index - 1); |
186 | |
187 | // Adding to `result` or `new_expand_queue` need to be protected by mutexes |
188 | // since there are multiple threads writing to these. |
189 | mutex result_mutex; |
190 | mutex queue_mutex; |
191 | |
192 | while (!expand_queue.empty()) { |
193 | next_expand_queue.clear(); |
194 | |
195 | // The work item for every item in `expand_queue`. |
196 | // pattern, we process them in parallel. |
197 | auto handle_level = [&fs, &results, &dirs, &expand_queue, |
198 | &next_expand_queue, &result_mutex, |
199 | &queue_mutex](int i) { |
200 | // See invariants above, all of these are valid accesses. |
201 | const auto& queue_item = expand_queue.at(i); |
202 | const std::string& parent = queue_item.first; |
203 | const int index = queue_item.second + 1; |
204 | const std::string& match_pattern = dirs[index]; |
205 | |
206 | // Get all children of `parent`. If this fails, return early. |
207 | std::vector<std::string> children; |
208 | Status s = fs->GetChildren(parent, &children); |
209 | if (s.code() == tsl::error::PERMISSION_DENIED) { |
210 | return; |
211 | } |
212 | |
213 | // Also return early if we don't have any children |
214 | if (children.empty()) { |
215 | return; |
216 | } |
217 | |
218 | // Since we can get extremely many children here and on some filesystems |
219 | // `IsDirectory` is expensive, we process the children in parallel. |
220 | // We also check that children match the pattern in parallel, for speedup. |
221 | // We store the status of the match and `IsDirectory` in |
222 | // `children_status` array, one element for each children. |
223 | std::vector<Status> children_status(children.size()); |
224 | auto handle_children = [&fs, &match_pattern, &parent, &children, |
225 | &children_status](int j) { |
226 | const std::string path = io::JoinPath(parent, children[j]); |
227 | if (!fs->Match(path, match_pattern)) { |
228 | children_status[j] = |
229 | Status(tsl::error::CANCELLED, "Operation not needed" ); |
230 | } else { |
231 | children_status[j] = fs->IsDirectory(path); |
232 | } |
233 | }; |
234 | ForEach(0, children.size(), handle_children); |
235 | |
236 | // At this point, pairing `children` with `children_status` will tell us |
237 | // if a children: |
238 | // * does not match the pattern |
239 | // * matches the pattern and is a directory |
240 | // * matches the pattern and is not a directory |
241 | // We fully ignore the first case. |
242 | // If we matched the last pattern (`index == dirs.size() - 1`) then all |
243 | // remaining children get added to the result. |
244 | // Otherwise, only the directories get added to the next queue. |
245 | for (size_t j = 0; j < children.size(); j++) { |
246 | if (children_status[j].code() == tsl::error::CANCELLED) { |
247 | continue; |
248 | } |
249 | |
250 | const std::string path = io::JoinPath(parent, children[j]); |
251 | if (index == dirs.size() - 1) { |
252 | mutex_lock l(result_mutex); |
253 | results->emplace_back(path); |
254 | } else if (children_status[j].ok()) { |
255 | mutex_lock l(queue_mutex); |
256 | next_expand_queue.emplace_back(path, index); |
257 | } |
258 | } |
259 | }; |
260 | ForEach(0, expand_queue.size(), handle_level); |
261 | |
262 | // After evaluating one level, swap the "buffers" |
263 | std::swap(expand_queue, next_expand_queue); |
264 | } |
265 | |
266 | return OkStatus(); |
267 | } |
268 | |
269 | StatusOr<bool> FileExists(Env* env, const string& fname) { |
270 | Status status = env->FileExists(fname); |
271 | if (errors::IsNotFound(status)) { |
272 | return false; |
273 | } |
274 | TF_RETURN_IF_ERROR(status); |
275 | return true; |
276 | } |
277 | |
278 | } // namespace internal |
279 | } // namespace tsl |
280 | |