1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#ifndef TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_
17#define TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_
18
19#include <deque>
20
21#include "tensorflow/core/framework/op_kernel.h"
22#include "tensorflow/core/framework/queue_interface.h"
23#include "tensorflow/core/framework/resource_op_kernel.h"
24#include "tensorflow/core/framework/tensor.h"
25#include "tensorflow/core/framework/types.h"
26#include "tensorflow/core/kernels/queue_base.h"
27#include "tensorflow/core/lib/core/errors.h"
28#include "tensorflow/core/platform/macros.h"
29#include "tensorflow/core/platform/types.h"
30
31namespace tensorflow {
32
33// Defines a QueueOp, an abstract class for Queue construction ops.
34class QueueOp : public ResourceOpKernel<QueueInterface> {
35 public:
36 QueueOp(OpKernelConstruction* context);
37
38 void Compute(OpKernelContext* context) override;
39
40 protected:
41 // Variables accessible by subclasses
42 int32 capacity_;
43 DataTypeVector component_types_;
44
45 private:
46 Status VerifyResource(QueueInterface* queue) override;
47};
48
49class TypedQueueOp : public QueueOp {
50 public:
51 using QueueOp::QueueOp;
52
53 protected:
54 template <typename TypedQueue>
55 Status CreateTypedQueue(TypedQueue* queue, QueueInterface** ret) {
56 if (queue == nullptr) {
57 return errors::ResourceExhausted("Failed to allocate queue.");
58 }
59 *ret = queue;
60 return queue->Initialize();
61 }
62};
63
64// Queue manipulator kernels
65
66class QueueOpKernel : public AsyncOpKernel {
67 public:
68 explicit QueueOpKernel(OpKernelConstruction* context);
69
70 void ComputeAsync(OpKernelContext* ctx, DoneCallback callback) final;
71
72 protected:
73 virtual void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
74 DoneCallback callback) = 0;
75};
76
77class QueueAccessOpKernel : public QueueOpKernel {
78 public:
79 explicit QueueAccessOpKernel(OpKernelConstruction* context);
80
81 protected:
82 int64_t timeout_;
83};
84
85// Defines an EnqueueOp, the execution of which enqueues a tuple of
86// tensors in the given Queue.
87//
88// The op has 1 + k inputs, where k is the number of components in the
89// tuples stored in the given Queue:
90// - Input 0: queue handle.
91// - Input 1: 0th element of the tuple.
92// - ...
93// - Input (1+k): kth element of the tuple.
94class EnqueueOp : public QueueAccessOpKernel {
95 public:
96 explicit EnqueueOp(OpKernelConstruction* context);
97
98 protected:
99 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
100 DoneCallback callback) override;
101
102 private:
103 TF_DISALLOW_COPY_AND_ASSIGN(EnqueueOp);
104};
105
106// Defines an EnqueueManyOp, the execution of which slices each
107// component of a tuple of tensors along the 0th dimension, and
108// enqueues tuples of slices in the given Queue.
109//
110// The op has 1 + k inputs, where k is the number of components in the
111// tuples stored in the given Queue:
112// - Input 0: queue handle.
113// - Input 1: 0th element of the tuple.
114// - ...
115// - Input (1+k): kth element of the tuple.
116//
117// N.B. All tuple components must have the same size in the 0th
118// dimension.
119class EnqueueManyOp : public QueueAccessOpKernel {
120 public:
121 explicit EnqueueManyOp(OpKernelConstruction* context);
122
123 protected:
124 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
125 DoneCallback callback) override;
126
127 ~EnqueueManyOp() override;
128
129 private:
130 TF_DISALLOW_COPY_AND_ASSIGN(EnqueueManyOp);
131};
132
133// Defines a DequeueOp, the execution of which dequeues a tuple of
134// tensors from the given Queue.
135//
136// The op has one input, which is the handle of the appropriate
137// Queue. The op has k outputs, where k is the number of components in
138// the tuples stored in the given Queue, and output i is the ith
139// component of the dequeued tuple.
140class DequeueOp : public QueueAccessOpKernel {
141 public:
142 explicit DequeueOp(OpKernelConstruction* context);
143
144 protected:
145 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
146 DoneCallback callback) override;
147
148 ~DequeueOp() override;
149
150 private:
151 TF_DISALLOW_COPY_AND_ASSIGN(DequeueOp);
152};
153
154// Defines a DequeueManyOp, the execution of which concatenates the
155// requested number of elements from the given Queue along the 0th
156// dimension, and emits the result as a single tuple of tensors.
157//
158// The op has two inputs:
159// - Input 0: the handle to a queue.
160// - Input 1: the number of elements to dequeue.
161//
162// The op has k outputs, where k is the number of components in the
163// tuples stored in the given Queue, and output i is the ith component
164// of the dequeued tuple.
165class DequeueManyOp : public QueueAccessOpKernel {
166 public:
167 explicit DequeueManyOp(OpKernelConstruction* context);
168
169 protected:
170 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
171 DoneCallback callback) override;
172
173 ~DequeueManyOp() override;
174
175 private:
176 TF_DISALLOW_COPY_AND_ASSIGN(DequeueManyOp);
177};
178
179// Defines a DequeueUpToOp, the execution of which concatenates the
180// requested number of elements from the given Queue along the 0th
181// dimension, and emits the result as a single tuple of tensors.
182//
183// The difference between this op and DequeueMany is the handling when
184// the Queue is closed. While the DequeueMany op will return if there
185// an error when there are less than num_elements elements left in the
186// closed queue, this op will return between 1 and
187// min(num_elements, elements_remaining_in_queue), and will not block.
188// If there are no elements left, then the standard DequeueMany error
189// is returned.
190//
191// This op only works if the underlying Queue implementation accepts
192// the allow_small_batch = true parameter to TryDequeueMany.
193// If it does not, an errors::Unimplemented exception is returned.
194//
195// The op has two inputs:
196// - Input 0: the handle to a queue.
197// - Input 1: the number of elements to dequeue.
198//
199// The op has k outputs, where k is the number of components in the
200// tuples stored in the given Queue, and output i is the ith component
201// of the dequeued tuple.
202//
203// The op has one attribute: allow_small_batch. If the Queue supports
204// it, setting this to true causes the queue to return smaller
205// (possibly zero length) batches when it is closed, up to however
206// many elements are available when the op executes. In this case,
207// the Queue does not block when closed.
208class DequeueUpToOp : public QueueAccessOpKernel {
209 public:
210 explicit DequeueUpToOp(OpKernelConstruction* context);
211
212 protected:
213 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
214 DoneCallback callback) override;
215
216 ~DequeueUpToOp() override;
217
218 private:
219 TF_DISALLOW_COPY_AND_ASSIGN(DequeueUpToOp);
220};
221
222// Defines a QueueCloseOp, which closes the given Queue. Closing a
223// Queue signals that no more elements will be enqueued in it.
224//
225// The op has one input, which is the handle of the appropriate Queue.
226class QueueCloseOp : public QueueOpKernel {
227 public:
228 explicit QueueCloseOp(OpKernelConstruction* context);
229
230 protected:
231 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
232 DoneCallback callback) override;
233
234 private:
235 bool cancel_pending_enqueues_;
236 TF_DISALLOW_COPY_AND_ASSIGN(QueueCloseOp);
237};
238
239// Defines a QueueSizeOp, which computes the number of elements in the
240// given Queue, and emits it as an output tensor.
241//
242// The op has one input, which is the handle of the appropriate Queue;
243// and one output, which is a single-element tensor containing the current
244// size of that Queue.
245class QueueSizeOp : public QueueOpKernel {
246 public:
247 explicit QueueSizeOp(OpKernelConstruction* context);
248
249 protected:
250 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
251 DoneCallback callback) override;
252
253 private:
254 TF_DISALLOW_COPY_AND_ASSIGN(QueueSizeOp);
255};
256
257class QueueIsClosedOp : public QueueOpKernel {
258 public:
259 explicit QueueIsClosedOp(OpKernelConstruction* context);
260
261 protected:
262 void ComputeAsync(OpKernelContext* ctx, QueueInterface* queue,
263 DoneCallback callback) override;
264
265 private:
266 TF_DISALLOW_COPY_AND_ASSIGN(QueueIsClosedOp);
267};
268
269} // namespace tensorflow
270
271#endif // TENSORFLOW_CORE_KERNELS_QUEUE_OP_H_
272