1/* Copyright 2018 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/tsl/platform/file_system_helper.h"
17
18#include <deque>
19#include <string>
20#include <vector>
21
22#include "tensorflow/tsl/platform/cpu_info.h"
23#include "tensorflow/tsl/platform/env.h"
24#include "tensorflow/tsl/platform/errors.h"
25#include "tensorflow/tsl/platform/file_system.h"
26#include "tensorflow/tsl/platform/mutex.h"
27#include "tensorflow/tsl/platform/path.h"
28#include "tensorflow/tsl/platform/platform.h"
29#include "tensorflow/tsl/platform/status.h"
30#include "tensorflow/tsl/platform/str_util.h"
31#include "tensorflow/tsl/platform/threadpool.h"
32
33namespace tsl {
34namespace internal {
35
36namespace {
37
38const int kNumThreads = port::NumSchedulableCPUs();
39
40// Run a function in parallel using a ThreadPool, but skip the ThreadPool
41// on the iOS platform due to its problems with more than a few threads.
42void ForEach(int first, int last, const std::function<void(int)>& f) {
43#if TARGET_OS_IPHONE
44 for (int i = first; i < last; i++) {
45 f(i);
46 }
47#else
48 int num_threads = std::min(kNumThreads, last - first);
49 thread::ThreadPool threads(Env::Default(), "ForEach", num_threads);
50 for (int i = first; i < last; i++) {
51 threads.Schedule([f, i] { f(i); });
52 }
53#endif
54}
55
56// A globbing pattern can only start with these characters:
57static const char kGlobbingChars[] = "*?[\\";
58
59static inline bool IsGlobbingPattern(const std::string& pattern) {
60 return (pattern.find_first_of(kGlobbingChars) != std::string::npos);
61}
62
63// Make sure that the first entry in `dirs` during glob expansion does not
64// contain a glob pattern. This is to prevent a corner-case bug where
65// `<pattern>` would be treated differently than `./<pattern>`.
66static std::string PatchPattern(const std::string& pattern) {
67 const std::string fixed_prefix =
68 pattern.substr(0, pattern.find_first_of(kGlobbingChars));
69
70 // Patching is needed when there is no directory part in `prefix`
71 if (io::Dirname(fixed_prefix).empty()) {
72 return io::JoinPath(".", pattern);
73 }
74
75 // No patching needed
76 return pattern;
77}
78
79static std::vector<std::string> AllDirectoryPrefixes(const std::string& d) {
80 std::vector<std::string> dirs;
81 const std::string patched = PatchPattern(d);
82 StringPiece dir(patched);
83
84 // If the pattern ends with a `/` (or `\\` on Windows), we need to strip it
85 // otherwise we would have one additional matching step and the result set
86 // would be empty.
87 bool is_directory = d[d.size() - 1] == '/';
88#ifdef PLATFORM_WINDOWS
89 is_directory = is_directory || (d[d.size() - 1] == '\\');
90#endif
91 if (is_directory) {
92 dir = io::Dirname(dir);
93 }
94
95 while (!dir.empty()) {
96 dirs.emplace_back(dir);
97 StringPiece new_dir(io::Dirname(dir));
98 // io::Dirname("/") returns "/" so we need to break the loop.
99 // On Windows, io::Dirname("C:\\") would return "C:\\", so we check for
100 // identity of the result instead of checking for dir[0] == `/`.
101 if (dir == new_dir) break;
102 dir = new_dir;
103 }
104
105 // Order the array from parent to ancestor (reverse order).
106 std::reverse(dirs.begin(), dirs.end());
107
108 return dirs;
109}
110
111static inline int GetFirstGlobbingEntry(const std::vector<std::string>& dirs) {
112 int i = 0;
113 for (const auto& d : dirs) {
114 if (IsGlobbingPattern(d)) {
115 break;
116 }
117 i++;
118 }
119 return i;
120}
121
122} // namespace
123
124Status GetMatchingPaths(FileSystem* fs, Env* env, const string& pattern,
125 std::vector<string>* results) {
126 // Check that `fs`, `env` and `results` are non-null.
127 if (fs == nullptr || env == nullptr || results == nullptr) {
128 return Status(tsl::error::INVALID_ARGUMENT,
129 "Filesystem calls GetMatchingPaths with nullptr arguments");
130 }
131
132 // By design, we don't match anything on empty pattern
133 results->clear();
134 if (pattern.empty()) {
135 return OkStatus();
136 }
137
138 // The pattern can contain globbing characters at multiple levels, e.g.:
139 //
140 // foo/ba?/baz/f*r
141 //
142 // To match the full pattern, we must match every prefix subpattern and then
143 // operate on the children for each match. Thus, we separate all subpatterns
144 // in the `dirs` vector below.
145 std::vector<std::string> dirs = AllDirectoryPrefixes(pattern);
146
147 // We can have patterns that have several parents where no globbing is being
148 // done, for example, `foo/bar/baz/*`. We don't need to expand the directories
149 // which don't contain the globbing characters.
150 int matching_index = GetFirstGlobbingEntry(dirs);
151
152 // If we don't have globbing characters in the pattern then it specifies a
153 // path in the filesystem. We add it to the result set if it exists.
154 if (matching_index == dirs.size()) {
155 if (fs->FileExists(pattern).ok()) {
156 results->emplace_back(pattern);
157 }
158 return OkStatus();
159 }
160
161 // To expand the globbing, we do a BFS from `dirs[matching_index-1]`.
162 // At every step, we work on a pair `{dir, ix}` such that `dir` is a real
163 // directory, `ix < dirs.size() - 1` and `dirs[ix+1]` is a globbing pattern.
164 // To expand the pattern, we select from all the children of `dir` only those
165 // that match against `dirs[ix+1]`.
166 // If there are more entries in `dirs` after `dirs[ix+1]` this mean we have
167 // more patterns to match. So, we add to the queue only those children that
168 // are also directories, paired with `ix+1`.
169 // If there are no more entries in `dirs`, we return all children as part of
170 // the answer.
171 // Since we can get into a combinatorial explosion issue (e.g., pattern
172 // `/*/*/*`), we process the queue in parallel. Each parallel processing takes
173 // elements from `expand_queue` and adds them to `next_expand_queue`, after
174 // which we swap these two queues (similar to double buffering algorithms).
175 // PRECONDITION: `IsGlobbingPattern(dirs[0]) == false`
176 // PRECONDITION: `matching_index > 0`
177 // INVARIANT: If `{d, ix}` is in queue, then `d` and `dirs[ix]` are at the
178 // same level in the filesystem tree.
179 // INVARIANT: If `{d, _}` is in queue, then `IsGlobbingPattern(d) == false`.
180 // INVARIANT: If `{d, _}` is in queue, then `d` is a real directory.
181 // INVARIANT: If `{_, ix}` is in queue, then `ix < dirs.size() - 1`.
182 // INVARIANT: If `{_, ix}` is in queue, `IsGlobbingPattern(dirs[ix + 1])`.
183 std::deque<std::pair<string, int>> expand_queue;
184 std::deque<std::pair<string, int>> next_expand_queue;
185 expand_queue.emplace_back(dirs[matching_index - 1], matching_index - 1);
186
187 // Adding to `result` or `new_expand_queue` need to be protected by mutexes
188 // since there are multiple threads writing to these.
189 mutex result_mutex;
190 mutex queue_mutex;
191
192 while (!expand_queue.empty()) {
193 next_expand_queue.clear();
194
195 // The work item for every item in `expand_queue`.
196 // pattern, we process them in parallel.
197 auto handle_level = [&fs, &results, &dirs, &expand_queue,
198 &next_expand_queue, &result_mutex,
199 &queue_mutex](int i) {
200 // See invariants above, all of these are valid accesses.
201 const auto& queue_item = expand_queue.at(i);
202 const std::string& parent = queue_item.first;
203 const int index = queue_item.second + 1;
204 const std::string& match_pattern = dirs[index];
205
206 // Get all children of `parent`. If this fails, return early.
207 std::vector<std::string> children;
208 Status s = fs->GetChildren(parent, &children);
209 if (s.code() == tsl::error::PERMISSION_DENIED) {
210 return;
211 }
212
213 // Also return early if we don't have any children
214 if (children.empty()) {
215 return;
216 }
217
218 // Since we can get extremely many children here and on some filesystems
219 // `IsDirectory` is expensive, we process the children in parallel.
220 // We also check that children match the pattern in parallel, for speedup.
221 // We store the status of the match and `IsDirectory` in
222 // `children_status` array, one element for each children.
223 std::vector<Status> children_status(children.size());
224 auto handle_children = [&fs, &match_pattern, &parent, &children,
225 &children_status](int j) {
226 const std::string path = io::JoinPath(parent, children[j]);
227 if (!fs->Match(path, match_pattern)) {
228 children_status[j] =
229 Status(tsl::error::CANCELLED, "Operation not needed");
230 } else {
231 children_status[j] = fs->IsDirectory(path);
232 }
233 };
234 ForEach(0, children.size(), handle_children);
235
236 // At this point, pairing `children` with `children_status` will tell us
237 // if a children:
238 // * does not match the pattern
239 // * matches the pattern and is a directory
240 // * matches the pattern and is not a directory
241 // We fully ignore the first case.
242 // If we matched the last pattern (`index == dirs.size() - 1`) then all
243 // remaining children get added to the result.
244 // Otherwise, only the directories get added to the next queue.
245 for (size_t j = 0; j < children.size(); j++) {
246 if (children_status[j].code() == tsl::error::CANCELLED) {
247 continue;
248 }
249
250 const std::string path = io::JoinPath(parent, children[j]);
251 if (index == dirs.size() - 1) {
252 mutex_lock l(result_mutex);
253 results->emplace_back(path);
254 } else if (children_status[j].ok()) {
255 mutex_lock l(queue_mutex);
256 next_expand_queue.emplace_back(path, index);
257 }
258 }
259 };
260 ForEach(0, expand_queue.size(), handle_level);
261
262 // After evaluating one level, swap the "buffers"
263 std::swap(expand_queue, next_expand_queue);
264 }
265
266 return OkStatus();
267}
268
269StatusOr<bool> FileExists(Env* env, const string& fname) {
270 Status status = env->FileExists(fname);
271 if (errors::IsNotFound(status)) {
272 return false;
273 }
274 TF_RETURN_IF_ERROR(status);
275 return true;
276}
277
278} // namespace internal
279} // namespace tsl
280