1 | /* Copyright 2020 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #ifndef TENSORFLOW_CORE_DATA_SPLIT_UTILS_H_ |
17 | #define TENSORFLOW_CORE_DATA_SPLIT_UTILS_H_ |
18 | |
19 | #include <functional> |
20 | #include <string> |
21 | |
22 | #include "tensorflow/core/framework/dataset.h" |
23 | |
24 | namespace tensorflow { |
25 | namespace data { |
26 | |
27 | // A class which produces splits for a dataset of size N that can be indexed |
28 | // into. |
29 | class IndexSplitProvider : public SplitProvider { |
30 | public: |
31 | explicit IndexSplitProvider(int64_t n); |
32 | Status GetNext(Tensor* split, bool* end_of_splits) override; |
33 | Status Reset() override; |
34 | Status Save(std::function<std::string(std::string)> full_name, |
35 | IteratorStateWriter* writer) override; |
36 | Status Restore(std::function<std::string(std::string)> full_name, |
37 | IteratorStateReader* reader) override; |
38 | |
39 | private: |
40 | mutex mu_; |
41 | int64_t i_ TF_GUARDED_BY(mu_); |
42 | const int64_t n_; |
43 | }; |
44 | |
45 | // A SplitProvider which wraps another split provider, but drops all splits |
46 | // where `index != shard_index % num_shards` |
47 | class ShardingSplitProvider : public SplitProvider { |
48 | public: |
49 | ShardingSplitProvider(int64_t num_shards, int64_t shard_index, |
50 | std::shared_ptr<SplitProvider> split_provider); |
51 | |
52 | Status GetNext(Tensor* split, bool* end_of_splits) override; |
53 | Status Reset() override; |
54 | Status Save(std::function<std::string(std::string)> full_name, |
55 | IteratorStateWriter* writer) override; |
56 | Status Restore(std::function<std::string(std::string)> full_name, |
57 | IteratorStateReader* reader) override; |
58 | |
59 | private: |
60 | const int64_t num_shards_; |
61 | const int64_t shard_index_; |
62 | mutex mu_; |
63 | std::shared_ptr<SplitProvider> split_provider_ TF_GUARDED_BY(mu_); |
64 | int64_t num_to_skip_ TF_GUARDED_BY(mu_); |
65 | }; |
66 | |
67 | // Returns split providers for all sources of the given dataset. |
68 | StatusOr<std::vector<std::unique_ptr<SplitProvider>>> GetSplitProviders( |
69 | const DatasetBase* dataset); |
70 | |
71 | // Gets the single split provider from the context, or returns an error if the |
72 | // context has zero or multiple split providers. The `dataset` argument is used |
73 | // to produce a more useful error message. |
74 | StatusOr<std::shared_ptr<SplitProvider>> GetSingleSplitProvider( |
75 | IteratorContext* ctx, const DatasetBase* dataset); |
76 | |
77 | // Creates iterator contexts for datasets inputs. The split providers |
78 | // in `ctx` will be divided among the inputs of `dataset`, so that each input |
79 | // gets a number of split providers that matches its number of source datasets. |
80 | // If no split providers are defined, the contexts will be the same as `ctx`. |
81 | StatusOr<std::vector<IteratorContext>> CreateInputIteratorContexts( |
82 | IteratorContext* ctx, const DatasetBase* dataset); |
83 | } // namespace data |
84 | } // namespace tensorflow |
85 | |
86 | #endif // TENSORFLOW_CORE_DATA_SPLIT_UTILS_H_ |
87 | |