1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#ifndef TENSORFLOW_CORE_PLATFORM_RETRYING_FILE_SYSTEM_H_
17#define TENSORFLOW_CORE_PLATFORM_RETRYING_FILE_SYSTEM_H_
18
19#include <functional>
20#include <string>
21#include <vector>
22
23#include "tensorflow/core/lib/random/random.h"
24#include "tensorflow/core/platform/env.h"
25#include "tensorflow/core/platform/errors.h"
26#include "tensorflow/core/platform/file_system.h"
27#include "tensorflow/core/platform/retrying_utils.h"
28#include "tensorflow/core/platform/status.h"
29
30namespace tensorflow {
31
32/// A wrapper to add retry logic to another file system.
33template <typename Underlying>
34class RetryingFileSystem : public FileSystem {
35 public:
36 RetryingFileSystem(std::unique_ptr<Underlying> base_file_system,
37 const RetryConfig& retry_config)
38 : base_file_system_(std::move(base_file_system)),
39 retry_config_(retry_config) {}
40
41 TF_USE_FILESYSTEM_METHODS_WITH_NO_TRANSACTION_SUPPORT;
42
43 Status NewRandomAccessFile(
44 const string& filename, TransactionToken* token,
45 std::unique_ptr<RandomAccessFile>* result) override;
46
47 Status NewWritableFile(const string& filename, TransactionToken* token,
48 std::unique_ptr<WritableFile>* result) override;
49
50 Status NewAppendableFile(const string& filename, TransactionToken* token,
51 std::unique_ptr<WritableFile>* result) override;
52
53 Status NewReadOnlyMemoryRegionFromFile(
54 const string& filename, TransactionToken* token,
55 std::unique_ptr<ReadOnlyMemoryRegion>* result) override;
56
57 Status FileExists(const string& fname, TransactionToken* token) override {
58 return RetryingUtils::CallWithRetries(
59 [this, &fname, token]() {
60 return base_file_system_->FileExists(fname, token);
61 },
62 retry_config_);
63 }
64
65 Status GetChildren(const string& dir, TransactionToken* token,
66 std::vector<string>* result) override {
67 return RetryingUtils::CallWithRetries(
68 [this, &dir, result, token]() {
69 return base_file_system_->GetChildren(dir, token, result);
70 },
71 retry_config_);
72 }
73
74 Status GetMatchingPaths(const string& pattern, TransactionToken* token,
75 std::vector<string>* result) override {
76 return RetryingUtils::CallWithRetries(
77 [this, &pattern, result, token]() {
78 return base_file_system_->GetMatchingPaths(pattern, token, result);
79 },
80 retry_config_);
81 }
82
83 Status Stat(const string& fname, TransactionToken* token,
84 FileStatistics* stat) override {
85 return RetryingUtils::CallWithRetries(
86 [this, &fname, stat, token]() {
87 return base_file_system_->Stat(fname, token, stat);
88 },
89 retry_config_);
90 }
91
92 Status DeleteFile(const string& fname, TransactionToken* token) override {
93 return RetryingUtils::DeleteWithRetries(
94 [this, &fname, token]() {
95 return base_file_system_->DeleteFile(fname, token);
96 },
97 retry_config_);
98 }
99
100 Status CreateDir(const string& dirname, TransactionToken* token) override {
101 return RetryingUtils::CallWithRetries(
102 [this, &dirname, token]() {
103 return base_file_system_->CreateDir(dirname, token);
104 },
105 retry_config_);
106 }
107
108 Status DeleteDir(const string& dirname, TransactionToken* token) override {
109 return RetryingUtils::DeleteWithRetries(
110 [this, &dirname, token]() {
111 return base_file_system_->DeleteDir(dirname, token);
112 },
113 retry_config_);
114 }
115
116 Status GetFileSize(const string& fname, TransactionToken* token,
117 uint64* file_size) override {
118 return RetryingUtils::CallWithRetries(
119 [this, &fname, file_size, token]() {
120 return base_file_system_->GetFileSize(fname, token, file_size);
121 },
122 retry_config_);
123 }
124
125 Status RenameFile(const string& src, const string& target,
126 TransactionToken* token) override {
127 return RetryingUtils::CallWithRetries(
128 [this, &src, &target, token]() {
129 return base_file_system_->RenameFile(src, target, token);
130 },
131 retry_config_);
132 }
133
134 Status IsDirectory(const string& dirname, TransactionToken* token) override {
135 return RetryingUtils::CallWithRetries(
136 [this, &dirname, token]() {
137 return base_file_system_->IsDirectory(dirname, token);
138 },
139 retry_config_);
140 }
141
142 Status HasAtomicMove(const string& path, bool* has_atomic_move) override {
143 // this method does not need to be retried
144 return base_file_system_->HasAtomicMove(path, has_atomic_move);
145 }
146
147 Status DeleteRecursively(const string& dirname, TransactionToken* token,
148 int64_t* undeleted_files,
149 int64_t* undeleted_dirs) override {
150 return RetryingUtils::DeleteWithRetries(
151 [this, &dirname, token, undeleted_files, undeleted_dirs]() {
152 return base_file_system_->DeleteRecursively(
153 dirname, token, undeleted_files, undeleted_dirs);
154 },
155 retry_config_);
156 }
157
158 void FlushCaches(TransactionToken* token) override {
159 base_file_system_->FlushCaches(token);
160 }
161
162 Underlying* underlying() const { return base_file_system_.get(); }
163
164 private:
165 std::unique_ptr<Underlying> base_file_system_;
166 const RetryConfig retry_config_;
167
168 TF_DISALLOW_COPY_AND_ASSIGN(RetryingFileSystem);
169};
170
171namespace retrying_internals {
172
173class RetryingRandomAccessFile : public RandomAccessFile {
174 public:
175 RetryingRandomAccessFile(std::unique_ptr<RandomAccessFile> base_file,
176 const RetryConfig& retry_config)
177 : base_file_(std::move(base_file)), retry_config_(retry_config) {}
178
179 Status Name(StringPiece* result) const override {
180 return base_file_->Name(result);
181 }
182
183 Status Read(uint64 offset, size_t n, StringPiece* result,
184 char* scratch) const override {
185 return RetryingUtils::CallWithRetries(
186 [this, offset, n, result, scratch]() {
187 return base_file_->Read(offset, n, result, scratch);
188 },
189 retry_config_);
190 }
191
192 private:
193 std::unique_ptr<RandomAccessFile> base_file_;
194 const RetryConfig retry_config_;
195};
196
197class RetryingWritableFile : public WritableFile {
198 public:
199 RetryingWritableFile(std::unique_ptr<WritableFile> base_file,
200 const RetryConfig& retry_config)
201 : base_file_(std::move(base_file)), retry_config_(retry_config) {}
202
203 ~RetryingWritableFile() override {
204 // Makes sure the retrying version of Close() is called in the destructor.
205 Close().IgnoreError();
206 }
207
208 Status Append(StringPiece data) override {
209 return RetryingUtils::CallWithRetries(
210 [this, &data]() { return base_file_->Append(data); }, retry_config_);
211 }
212 Status Close() override {
213 return RetryingUtils::CallWithRetries(
214 [this]() { return base_file_->Close(); }, retry_config_);
215 }
216 Status Flush() override {
217 return RetryingUtils::CallWithRetries(
218 [this]() { return base_file_->Flush(); }, retry_config_);
219 }
220 Status Name(StringPiece* result) const override {
221 return base_file_->Name(result);
222 }
223 Status Sync() override {
224 return RetryingUtils::CallWithRetries(
225 [this]() { return base_file_->Sync(); }, retry_config_);
226 }
227 Status Tell(int64_t* position) override {
228 return RetryingUtils::CallWithRetries(
229 [this, &position]() { return base_file_->Tell(position); },
230 retry_config_);
231 }
232
233 private:
234 std::unique_ptr<WritableFile> base_file_;
235 const RetryConfig retry_config_;
236};
237
238} // namespace retrying_internals
239
240template <typename Underlying>
241Status RetryingFileSystem<Underlying>::NewRandomAccessFile(
242 const string& filename, TransactionToken* token,
243 std::unique_ptr<RandomAccessFile>* result) {
244 std::unique_ptr<RandomAccessFile> base_file;
245 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
246 [this, &filename, &base_file, token]() {
247 return base_file_system_->NewRandomAccessFile(filename, token,
248 &base_file);
249 },
250 retry_config_));
251 result->reset(new retrying_internals::RetryingRandomAccessFile(
252 std::move(base_file), retry_config_));
253 return OkStatus();
254}
255
256template <typename Underlying>
257Status RetryingFileSystem<Underlying>::NewWritableFile(
258 const string& filename, TransactionToken* token,
259 std::unique_ptr<WritableFile>* result) {
260 std::unique_ptr<WritableFile> base_file;
261 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
262 [this, &filename, &base_file, token]() {
263 return base_file_system_->NewWritableFile(filename, token, &base_file);
264 },
265 retry_config_));
266 result->reset(new retrying_internals::RetryingWritableFile(
267 std::move(base_file), retry_config_));
268 return OkStatus();
269}
270
271template <typename Underlying>
272Status RetryingFileSystem<Underlying>::NewAppendableFile(
273 const string& filename, TransactionToken* token,
274 std::unique_ptr<WritableFile>* result) {
275 std::unique_ptr<WritableFile> base_file;
276 TF_RETURN_IF_ERROR(RetryingUtils::CallWithRetries(
277 [this, &filename, &base_file, token]() {
278 return base_file_system_->NewAppendableFile(filename, token,
279 &base_file);
280 },
281 retry_config_));
282 result->reset(new retrying_internals::RetryingWritableFile(
283 std::move(base_file), retry_config_));
284 return OkStatus();
285}
286
287template <typename Underlying>
288Status RetryingFileSystem<Underlying>::NewReadOnlyMemoryRegionFromFile(
289 const string& filename, TransactionToken* token,
290 std::unique_ptr<ReadOnlyMemoryRegion>* result) {
291 return RetryingUtils::CallWithRetries(
292 [this, &filename, result, token]() {
293 return base_file_system_->NewReadOnlyMemoryRegionFromFile(
294 filename, token, result);
295 },
296 retry_config_);
297}
298
299} // namespace tensorflow
300
301#endif // TENSORFLOW_CORE_PLATFORM_RETRYING_FILE_SYSTEM_H_
302