1 | /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #ifndef TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_ |
17 | #define TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_ |
18 | |
19 | #include <deque> |
20 | |
21 | #include "tensorflow/core/framework/op_kernel.h" |
22 | #include "tensorflow/core/framework/queue_interface.h" |
23 | #include "tensorflow/core/framework/resource_op_kernel.h" |
24 | #include "tensorflow/core/framework/tensor.h" |
25 | #include "tensorflow/core/framework/types.h" |
26 | #include "tensorflow/core/kernels/queue_base.h" |
27 | #include "tensorflow/core/lib/core/errors.h" |
28 | #include "tensorflow/core/platform/macros.h" |
29 | #include "tensorflow/core/platform/types.h" |
30 | |
31 | namespace tensorflow { |
32 | |
33 | // Defines a QueueOp, an abstract class for Queue construction ops. |
34 | class QueueOp : public ResourceOpKernel<QueueInterface> { |
35 | public: |
36 | QueueOp(OpKernelConstruction* context); |
37 | |
38 | void Compute(OpKernelContext* context) override; |
39 | |
40 | protected: |
41 | // Variables accessible by subclasses |
42 | int32 capacity_; |
43 | DataTypeVector component_types_; |
44 | |
45 | private: |
46 | Status VerifyResource(QueueInterface* queue) override; |
47 | }; |
48 | |
49 | class TypedQueueOp : public QueueOp { |
50 | public: |
51 | using QueueOp::QueueOp; |
52 | |
53 | protected: |
54 | template <typename TypedQueue> |
55 | Status CreateTypedQueue(TypedQueue* queue, QueueInterface** ret) { |
56 | if (queue == nullptr) { |
57 | return errors::ResourceExhausted("Failed to allocate queue." ); |
58 | } |
59 | *ret = queue; |
60 | return queue->Initialize(); |
61 | } |
62 | }; |
63 | |
64 | // Queue manipulator kernels |
65 | |
66 | class QueueOpKernel : public AsyncOpKernel { |
67 | public: |
68 | explicit QueueOpKernel(OpKernelConstruction* context); |
69 | |
70 | void ComputeAsync(OpKernelContext* ctx, DoneCallback callback) final; |
71 | |
72 | protected: |
73 | virtual void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, |
74 | DoneCallback callback) = 0; |
75 | }; |
76 | |
77 | class QueueAccessOpKernel : public QueueOpKernel { |
78 | public: |
79 | explicit QueueAccessOpKernel(OpKernelConstruction* context); |
80 | |
81 | protected: |
82 | int64_t timeout_; |
83 | }; |
84 | |
85 | // Defines an EnqueueOp, the execution of which enqueues a tuple of |
86 | // tensors in the given Queue. |
87 | // |
88 | // The op has 1 + k inputs, where k is the number of components in the |
89 | // tuples stored in the given Queue: |
90 | // - Input 0: queue handle. |
91 | // - Input 1: 0th element of the tuple. |
92 | // - ... |
93 | // - Input (1+k): kth element of the tuple. |
94 | class EnqueueOp : public QueueAccessOpKernel { |
95 | public: |
96 | explicit EnqueueOp(OpKernelConstruction* context); |
97 | |
98 | protected: |
99 | void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, |
100 | DoneCallback callback) override; |
101 | |
102 | private: |
103 | TF_DISALLOW_COPY_AND_ASSIGN(EnqueueOp); |
104 | }; |
105 | |
106 | // Defines an EnqueueManyOp, the execution of which slices each |
107 | // component of a tuple of tensors along the 0th dimension, and |
108 | // enqueues tuples of slices in the given Queue. |
109 | // |
110 | // The op has 1 + k inputs, where k is the number of components in the |
111 | // tuples stored in the given Queue: |
112 | // - Input 0: queue handle. |
113 | // - Input 1: 0th element of the tuple. |
114 | // - ... |
115 | // - Input (1+k): kth element of the tuple. |
116 | // |
117 | // N.B. All tuple components must have the same size in the 0th |
118 | // dimension. |
119 | class EnqueueManyOp : public QueueAccessOpKernel { |
120 | public: |
121 | explicit EnqueueManyOp(OpKernelConstruction* context); |
122 | |
123 | protected: |
124 | void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, |
125 | DoneCallback callback) override; |
126 | |
127 | ~EnqueueManyOp() override; |
128 | |
129 | private: |
130 | TF_DISALLOW_COPY_AND_ASSIGN(EnqueueManyOp); |
131 | }; |
132 | |
133 | // Defines a DequeueOp, the execution of which dequeues a tuple of |
134 | // tensors from the given Queue. |
135 | // |
136 | // The op has one input, which is the handle of the appropriate |
137 | // Queue. The op has k outputs, where k is the number of components in |
138 | // the tuples stored in the given Queue, and output i is the ith |
139 | // component of the dequeued tuple. |
140 | class DequeueOp : public QueueAccessOpKernel { |
141 | public: |
142 | explicit DequeueOp(OpKernelConstruction* context); |
143 | |
144 | protected: |
145 | void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, |
146 | DoneCallback callback) override; |
147 | |
148 | ~DequeueOp() override; |
149 | |
150 | private: |
151 | TF_DISALLOW_COPY_AND_ASSIGN(DequeueOp); |
152 | }; |
153 | |
154 | // Defines a DequeueManyOp, the execution of which concatenates the |
155 | // requested number of elements from the given Queue along the 0th |
156 | // dimension, and emits the result as a single tuple of tensors. |
157 | // |
158 | // The op has two inputs: |
159 | // - Input 0: the handle to a queue. |
160 | // - Input 1: the number of elements to dequeue. |
161 | // |
162 | // The op has k outputs, where k is the number of components in the |
163 | // tuples stored in the given Queue, and output i is the ith component |
164 | // of the dequeued tuple. |
165 | class DequeueManyOp : public QueueAccessOpKernel { |
166 | public: |
167 | explicit DequeueManyOp(OpKernelConstruction* context); |
168 | |
169 | protected: |
170 | void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, |
171 | DoneCallback callback) override; |
172 | |
173 | ~DequeueManyOp() override; |
174 | |
175 | private: |
176 | TF_DISALLOW_COPY_AND_ASSIGN(DequeueManyOp); |
177 | }; |
178 | |
179 | // Defines a DequeueUpToOp, the execution of which concatenates the |
180 | // requested number of elements from the given Queue along the 0th |
181 | // dimension, and emits the result as a single tuple of tensors. |
182 | // |
183 | // The difference between this op and DequeueMany is the handling when |
184 | // the Queue is closed. While the DequeueMany op will return if there |
185 | // an error when there are less than num_elements elements left in the |
186 | // closed queue, this op will return between 1 and |
187 | // min(num_elements, elements_remaining_in_queue), and will not block. |
188 | // If there are no elements left, then the standard DequeueMany error |
189 | // is returned. |
190 | // |
191 | // This op only works if the underlying Queue implementation accepts |
192 | // the allow_small_batch = true parameter to TryDequeueMany. |
193 | // If it does not, an errors::Unimplemented exception is returned. |
194 | // |
195 | // The op has two inputs: |
196 | // - Input 0: the handle to a queue. |
197 | // - Input 1: the number of elements to dequeue. |
198 | // |
199 | // The op has k outputs, where k is the number of components in the |
200 | // tuples stored in the given Queue, and output i is the ith component |
201 | // of the dequeued tuple. |
202 | // |
203 | // The op has one attribute: allow_small_batch. If the Queue supports |
204 | // it, setting this to true causes the queue to return smaller |
205 | // (possibly zero length) batches when it is closed, up to however |
206 | // many elements are available when the op executes. In this case, |
207 | // the Queue does not block when closed. |
208 | class DequeueUpToOp : public QueueAccessOpKernel { |
209 | public: |
210 | explicit DequeueUpToOp(OpKernelConstruction* context); |
211 | |
212 | protected: |
213 | void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, |
214 | DoneCallback callback) override; |
215 | |
216 | ~DequeueUpToOp() override; |
217 | |
218 | private: |
219 | TF_DISALLOW_COPY_AND_ASSIGN(DequeueUpToOp); |
220 | }; |
221 | |
222 | // Defines a QueueCloseOp, which closes the given Queue. Closing a |
223 | // Queue signals that no more elements will be enqueued in it. |
224 | // |
225 | // The op has one input, which is the handle of the appropriate Queue. |
226 | class QueueCloseOp : public QueueOpKernel { |
227 | public: |
228 | explicit QueueCloseOp(OpKernelConstruction* context); |
229 | |
230 | protected: |
231 | void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, |
232 | DoneCallback callback) override; |
233 | |
234 | private: |
235 | bool cancel_pending_enqueues_; |
236 | TF_DISALLOW_COPY_AND_ASSIGN(QueueCloseOp); |
237 | }; |
238 | |
239 | // Defines a QueueSizeOp, which computes the number of elements in the |
240 | // given Queue, and emits it as an output tensor. |
241 | // |
242 | // The op has one input, which is the handle of the appropriate Queue; |
243 | // and one output, which is a single-element tensor containing the current |
244 | // size of that Queue. |
245 | class QueueSizeOp : public QueueOpKernel { |
246 | public: |
247 | explicit QueueSizeOp(OpKernelConstruction* context); |
248 | |
249 | protected: |
250 | void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, |
251 | DoneCallback callback) override; |
252 | |
253 | private: |
254 | TF_DISALLOW_COPY_AND_ASSIGN(QueueSizeOp); |
255 | }; |
256 | |
257 | class QueueIsClosedOp : public QueueOpKernel { |
258 | public: |
259 | explicit QueueIsClosedOp(OpKernelConstruction* context); |
260 | |
261 | protected: |
262 | void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue, |
263 | DoneCallback callback) override; |
264 | |
265 | private: |
266 | TF_DISALLOW_COPY_AND_ASSIGN(QueueIsClosedOp); |
267 | }; |
268 | |
269 | } // namespace tensorflow |
270 | |
271 | #endif // TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_ |
272 | |