1 | /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #ifndef TENSORFLOW_CORE_PLATFORM_RETRYING_FILE_SYSTEM_H_ |
17 | #define TENSORFLOW_CORE_PLATFORM_RETRYING_FILE_SYSTEM_H_ |
18 | |
19 | #include <functional> |
20 | #include <string> |
21 | #include <vector> |
22 | |
23 | #include "tensorflow/core/lib/random/random.h" |
24 | #include "tensorflow/core/platform/env.h" |
25 | #include "tensorflow/core/platform/errors.h" |
26 | #include "tensorflow/core/platform/file_system.h" |
27 | #include "tensorflow/core/platform/retrying_utils.h" |
28 | #include "tensorflow/core/platform/status.h" |
29 | |
30 | namespace tensorflow { |
31 | |
32 | /// A wrapper to add retry logic to another file system. |
33 | template <typename Underlying> |
34 | class RetryingFileSystem : public FileSystem { |
35 | public: |
36 | RetryingFileSystem(std::unique_ptr<Underlying> base_file_system, |
37 | const RetryConfig& retry_config) |
38 | : base_file_system_(std::move(base_file_system)), |
39 | retry_config_(retry_config) {} |
40 | |
41 | TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT; |
42 | |
43 | Status NewRandomAccessFile( |
44 | const string& filename, TransactionToken* token, |
45 | std::unique_ptr<RandomAccessFile>* result) override; |
46 | |
47 | Status NewWritableFile(const string& filename, TransactionToken* token, |
48 | std::unique_ptr<WritableFile>* result) override; |
49 | |
50 | Status NewAppendableFile(const string& filename, TransactionToken* token, |
51 | std::unique_ptr<WritableFile>* result) override; |
52 | |
53 | Status NewReadOnlyMemoryRegionFromFile( |
54 | const string& filename, TransactionToken* token, |
55 | std::unique_ptr<ReadOnlyMemoryRegion>* result) override; |
56 | |
57 | Status FileExists(const string& fname, TransactionToken* token) override { |
58 | return RetryingUtils::CallWithRetries( |
59 | [this, &fname, token]() { |
60 | return base_file_system_->FileExists(fname, token); |
61 | }, |
62 | retry_config_); |
63 | } |
64 | |
65 | Status GetChildren(const string& dir, TransactionToken* token, |
66 | std::vector<string>* result) override { |
67 | return RetryingUtils::CallWithRetries( |
68 | [this, &dir, result, token]() { |
69 | return base_file_system_->GetChildren(dir, token, result); |
70 | }, |
71 | retry_config_); |
72 | } |
73 | |
74 | Status GetMatchingPaths(const string& pattern, TransactionToken* token, |
75 | std::vector<string>* result) override { |
76 | return RetryingUtils::CallWithRetries( |
77 | [this, &pattern, result, token]() { |
78 | return base_file_system_->GetMatchingPaths(pattern, token, result); |
79 | }, |
80 | retry_config_); |
81 | } |
82 | |
83 | Status Stat(const string& fname, TransactionToken* token, |
84 | FileStatistics* stat) override { |
85 | return RetryingUtils::CallWithRetries( |
86 | [this, &fname, stat, token]() { |
87 | return base_file_system_->Stat(fname, token, stat); |
88 | }, |
89 | retry_config_); |
90 | } |
91 | |
92 | Status DeleteFile(const string& fname, TransactionToken* token) override { |
93 | return RetryingUtils::DeleteWithRetries( |
94 | [this, &fname, token]() { |
95 | return base_file_system_->DeleteFile(fname, token); |
96 | }, |
97 | retry_config_); |
98 | } |
99 | |
100 | Status CreateDir(const string& dirname, TransactionToken* token) override { |
101 | return RetryingUtils::CallWithRetries( |
102 | [this, &dirname, token]() { |
103 | return base_file_system_->CreateDir(dirname, token); |
104 | }, |
105 | retry_config_); |
106 | } |
107 | |
108 | Status DeleteDir(const string& dirname, TransactionToken* token) override { |
109 | return RetryingUtils::DeleteWithRetries( |
110 | [this, &dirname, token]() { |
111 | return base_file_system_->DeleteDir(dirname, token); |
112 | }, |
113 | retry_config_); |
114 | } |
115 | |
116 | Status GetFileSize(const string& fname, TransactionToken* token, |
117 | uint64* file_size) override { |
118 | return RetryingUtils::CallWithRetries( |
119 | [this, &fname, file_size, token]() { |
120 | return base_file_system_->GetFileSize(fname, token, file_size); |
121 | }, |
122 | retry_config_); |
123 | } |
124 | |
125 | Status RenameFile(const string& src, const string& target, |
126 | TransactionToken* token) override { |
127 | return RetryingUtils::CallWithRetries( |
128 | [this, &src, &target, token]() { |
129 | return base_file_system_->RenameFile(src, target, token); |
130 | }, |
131 | retry_config_); |
132 | } |
133 | |
134 | Status IsDirectory(const string& dirname, TransactionToken* token) override { |
135 | return RetryingUtils::CallWithRetries( |
136 | [this, &dirname, token]() { |
137 | return base_file_system_->IsDirectory(dirname, token); |
138 | }, |
139 | retry_config_); |
140 | } |
141 | |
142 | Status HasAtomicMove(const string& path, bool* has_atomic_move) override { |
143 | // this method does not need to be retried |
144 | return base_file_system_->HasAtomicMove(path, has_atomic_move); |
145 | } |
146 | |
147 | Status DeleteRecursively(const string& dirname, TransactionToken* token, |
148 | int64_t* undeleted_files, |
149 | int64_t* undeleted_dirs) override { |
150 | return RetryingUtils::DeleteWithRetries( |
151 | [this, &dirname, token, undeleted_files, undeleted_dirs]() { |
152 | return base_file_system_->DeleteRecursively( |
153 | dirname, token, undeleted_files, undeleted_dirs); |
154 | }, |
155 | retry_config_); |
156 | } |
157 | |
158 | void FlushCaches(TransactionToken* token) override { |
159 | base_file_system_->FlushCaches(token); |
160 | } |
161 | |
162 | Underlying* underlying() const { return base_file_system_.get(); } |
163 | |
164 | private: |
165 | std::unique_ptr<Underlying> base_file_system_; |
166 | const RetryConfig retry_config_; |
167 | |
168 | TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem); |
169 | }; |
170 | |
171 | namespace retrying_internals { |
172 | |
173 | class RetryingRandomAccessFile : public RandomAccessFile { |
174 | public: |
175 | RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file, |
176 | const RetryConfig& retry_config) |
177 | : base_file_(std::move(base_file)), retry_config_(retry_config) {} |
178 | |
179 | Status Name(StringPiece* result) const override { |
180 | return base_file_->Name(result); |
181 | } |
182 | |
183 | Status Read(uint64 offset, size_t n, StringPiece* result, |
184 | char* scratch) const override { |
185 | return RetryingUtils::CallWithRetries( |
186 | [this, offset, n, result, scratch]() { |
187 | return base_file_->Read(offset, n, result, scratch); |
188 | }, |
189 | retry_config_); |
190 | } |
191 | |
192 | private: |
193 | std::unique_ptr<RandomAccessFile> base_file_; |
194 | const RetryConfig retry_config_; |
195 | }; |
196 | |
197 | class RetryingWritableFile : public WritableFile { |
198 | public: |
199 | RetryingWritableFile(std::unique_ptr<WritableFile> base_file, |
200 | const RetryConfig& retry_config) |
201 | : base_file_(std::move(base_file)), retry_config_(retry_config) {} |
202 | |
203 | ~RetryingWritableFile() override { |
204 | // Makes sure the retrying version of Close() is called in the destructor. |
205 | Close().IgnoreError(); |
206 | } |
207 | |
208 | Status Append(StringPiece data) override { |
209 | return RetryingUtils::CallWithRetries( |
210 | [this, &data]() { return base_file_->Append(data); }, retry_config_); |
211 | } |
212 | Status Close() override { |
213 | return RetryingUtils::CallWithRetries( |
214 | [this]() { return base_file_->Close(); }, retry_config_); |
215 | } |
216 | Status Flush() override { |
217 | return RetryingUtils::CallWithRetries( |
218 | [this]() { return base_file_->Flush(); }, retry_config_); |
219 | } |
220 | Status Name(StringPiece* result) const override { |
221 | return base_file_->Name(result); |
222 | } |
223 | Status Sync() override { |
224 | return RetryingUtils::CallWithRetries( |
225 | [this]() { return base_file_->Sync(); }, retry_config_); |
226 | } |
227 | Status Tell(int64_t* position) override { |
228 | return RetryingUtils::CallWithRetries( |
229 | [this, &position]() { return base_file_->Tell(position); }, |
230 | retry_config_); |
231 | } |
232 | |
233 | private: |
234 | std::unique_ptr<WritableFile> base_file_; |
235 | const RetryConfig retry_config_; |
236 | }; |
237 | |
238 | } // namespace retrying_internals |
239 | |
240 | template <typename Underlying> |
241 | Status RetryingFileSystem<Underlying>::NewRandomAccessFile( |
242 | const string& filename, TransactionToken* token, |
243 | std::unique_ptr<RandomAccessFile>* result) { |
244 | std::unique_ptr<RandomAccessFile> base_file; |
245 | TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries( |
246 | [this, &filename, &base_file, token]() { |
247 | return base_file_system_->NewRandomAccessFile(filename, token, |
248 | &base_file); |
249 | }, |
250 | retry_config_)); |
251 | result->reset(new retrying_internals::RetryingRandomAccessFile( |
252 | std::move(base_file), retry_config_)); |
253 | return OkStatus(); |
254 | } |
255 | |
256 | template <typename Underlying> |
257 | Status RetryingFileSystem<Underlying>::NewWritableFile( |
258 | const string& filename, TransactionToken* token, |
259 | std::unique_ptr<WritableFile>* result) { |
260 | std::unique_ptr<WritableFile> base_file; |
261 | TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries( |
262 | [this, &filename, &base_file, token]() { |
263 | return base_file_system_->NewWritableFile(filename, token, &base_file); |
264 | }, |
265 | retry_config_)); |
266 | result->reset(new retrying_internals::RetryingWritableFile( |
267 | std::move(base_file), retry_config_)); |
268 | return OkStatus(); |
269 | } |
270 | |
271 | template <typename Underlying> |
272 | Status RetryingFileSystem<Underlying>::NewAppendableFile( |
273 | const string& filename, TransactionToken* token, |
274 | std::unique_ptr<WritableFile>* result) { |
275 | std::unique_ptr<WritableFile> base_file; |
276 | TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries( |
277 | [this, &filename, &base_file, token]() { |
278 | return base_file_system_->NewAppendableFile(filename, token, |
279 | &base_file); |
280 | }, |
281 | retry_config_)); |
282 | result->reset(new retrying_internals::RetryingWritableFile( |
283 | std::move(base_file), retry_config_)); |
284 | return OkStatus(); |
285 | } |
286 | |
287 | template <typename Underlying> |
288 | Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile( |
289 | const string& filename, TransactionToken* token, |
290 | std::unique_ptr<ReadOnlyMemoryRegion>* result) { |
291 | return RetryingUtils::CallWithRetries( |
292 | [this, &filename, result, token]() { |
293 | return base_file_system_->NewReadOnlyMemoryRegionFromFile( |
294 | filename, token, result); |
295 | }, |
296 | retry_config_); |
297 | } |
298 | |
299 | } // namespace tensorflow |
300 | |
301 | #endif // TENSORFLOW_CORE_PLATFORM_RETRYING_FILE_SYSTEM_H_ |
302 | |