1/* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16// See docs in ../ops/data_flow_ops.cc.
17
18#include "tensorflow/core/kernels/fifo_queue.h"
19
20#include <algorithm>
21#include <deque>
22#include <vector>
23
24#include "tensorflow/core/framework/node_def.pb.h"
25#include "tensorflow/core/framework/tensor.h"
26#include "tensorflow/core/framework/tensor_shape.h"
27#include "tensorflow/core/framework/types.h"
28#include "tensorflow/core/kernels/queue_base.h"
29#include "tensorflow/core/lib/core/errors.h"
30#include "tensorflow/core/platform/logging.h"
31#include "tensorflow/core/platform/mutex.h"
32#include "tensorflow/core/platform/types.h"
33#include "tensorflow/core/util/batch_util.h"
34
35namespace tensorflow {
36
37FIFOQueue::FIFOQueue(int capacity, const DataTypeVector& component_dtypes,
38 const std::vector<TensorShape>& component_shapes,
39 const string& name)
40 : TypedQueue(capacity, component_dtypes, component_shapes, name) {}
41
42void FIFOQueue::DequeueLocked(OpKernelContext* ctx, Tuple* tuple) {
43 DCHECK_GT(queues_[0].size(), size_t{0});
44 (*tuple).reserve(num_components());
45 for (int i = 0; i < num_components(); ++i) {
46 (*tuple).push_back(queues_[i][0]);
47 queues_[i].pop_front();
48 }
49}
50
51void FIFOQueue::TryEnqueue(const Tuple& tuple, OpKernelContext* ctx,
52 DoneCallback callback) {
53 CancellationManager* cm = ctx->cancellation_manager();
54 CancellationToken token = cm->get_cancellation_token();
55 bool already_cancelled;
56 {
57 mutex_lock l(mu_);
58 already_cancelled = !cm->RegisterCallback(
59 token, [this, cm, token]() { Cancel(kEnqueue, cm, token); });
60 if (!already_cancelled) {
61 enqueue_attempts_.emplace_back(
62 1, callback, ctx, cm, token,
63 [tuple, this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
64 if (closed_) {
65 attempt->context->SetStatus(
66 errors::Cancelled("FIFOQueue '", name_, "' is closed."));
67 return kComplete;
68 }
69 if (queues_[0].size() < static_cast<size_t>(capacity_)) {
70 for (int i = 0; i < num_components(); ++i) {
71 queues_[i].push_back(tuple[i]);
72 }
73 return kComplete;
74 } else {
75 return kNoProgress;
76 }
77 });
78 }
79 }
80 if (!already_cancelled) {
81 FlushUnlocked();
82 } else {
83 ctx->SetStatus(errors::Cancelled("Enqueue operation was cancelled"));
84 callback();
85 }
86}
87
88/* static */
89Status FIFOQueue::GetElementComponentFromBatch(const FIFOQueue::Tuple& tuple,
90 int64_t index, int component,
91 OpKernelContext* ctx,
92 Tensor* out_tensor) {
93 TensorShape element_shape(tuple[component].shape());
94 element_shape.RemoveDim(0);
95 TF_RETURN_IF_ERROR(
96 ctx->allocate_temp(tuple[component].dtype(), element_shape, out_tensor));
97 TF_RETURN_IF_ERROR(
98 batch_util::CopySliceToElement(tuple[component], out_tensor, index));
99 return OkStatus();
100}
101
102void FIFOQueue::TryEnqueueMany(const Tuple& tuple, OpKernelContext* ctx,
103 DoneCallback callback) {
104 const int64_t batch_size = tuple[0].dim_size(0);
105 if (batch_size == 0) {
106 callback();
107 return;
108 }
109
110 CancellationManager* cm = ctx->cancellation_manager();
111 CancellationToken token = cm->get_cancellation_token();
112 bool already_cancelled;
113 {
114 mutex_lock l(mu_);
115 already_cancelled = !cm->RegisterCallback(
116 token, [this, cm, token]() { Cancel(kEnqueue, cm, token); });
117 if (!already_cancelled) {
118 enqueue_attempts_.emplace_back(
119 batch_size, callback, ctx, cm, token,
120 [tuple, this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
121 if (closed_) {
122 attempt->context->SetStatus(
123 errors::Cancelled("FIFOQueue '", name_, "' is closed."));
124 return kComplete;
125 }
126 RunResult result = kNoProgress;
127 while (queues_[0].size() < static_cast<size_t>(capacity_)) {
128 result = kProgress;
129 const int64_t index =
130 tuple[0].dim_size(0) - attempt->elements_requested;
131 for (int i = 0; i < num_components(); ++i) {
132 Tensor element;
133 attempt->context->SetStatus(GetElementComponentFromBatch(
134 tuple, index, i, attempt->context, &element));
135 if (!attempt->context->status().ok()) return kComplete;
136 queues_[i].push_back(element);
137 }
138 --attempt->elements_requested;
139 if (attempt->elements_requested == 0) {
140 return kComplete;
141 }
142 }
143 return result;
144 });
145 }
146 }
147 if (!already_cancelled) {
148 FlushUnlocked();
149 } else {
150 ctx->SetStatus(errors::Cancelled("Enqueue operation was cancelled"));
151 callback();
152 }
153}
154
155void FIFOQueue::TryDequeue(OpKernelContext* ctx, CallbackWithTuple callback) {
156 CancellationManager* cm = ctx->cancellation_manager();
157 CancellationToken token = cm->get_cancellation_token();
158 bool already_cancelled;
159 {
160 mutex_lock l(mu_);
161 already_cancelled = !cm->RegisterCallback(
162 token, [this, cm, token]() { Cancel(kDequeue, cm, token); });
163 if (!already_cancelled) {
164 // TODO(josh11b): This makes two copies of callback, avoid this if possible.
165 dequeue_attempts_.emplace_back(
166 1, [callback]() { callback(Tuple()); }, ctx, cm, token,
167 [callback, this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
168 const int64_t queue_size = queues_[0].size();
169 if (closed_ && queue_size == 0) {
170 attempt->context->SetStatus(errors::OutOfRange(
171 "FIFOQueue '", name_, "' is closed and has ",
172 "insufficient elements (requested ", 1, ", current size ",
173 queue_size, ")"));
174 return kComplete;
175 }
176 if (queue_size > 0) {
177 Tuple tuple;
178 DequeueLocked(attempt->context, &tuple);
179 attempt->done_callback = [callback, tuple]() { callback(tuple); };
180 return kComplete;
181 } else {
182 return kNoProgress;
183 }
184 });
185 }
186 }
187 if (!already_cancelled) {
188 FlushUnlocked();
189 } else {
190 ctx->SetStatus(errors::Cancelled("Dequeue operation was cancelled"));
191 callback(Tuple());
192 }
193}
194
195void FIFOQueue::TryDequeueMany(int num_elements, OpKernelContext* ctx,
196 bool allow_small_batch,
197 CallbackWithTuple callback) {
198 if (!specified_shapes()) {
199 ctx->SetStatus(errors::InvalidArgument(
200 "FIFOQueue's DequeueMany and DequeueUpTo require the "
201 "components to have specified shapes."));
202 callback(Tuple());
203 return;
204 }
205 if (num_elements == 0) {
206 Tuple tuple;
207 tuple.reserve(num_components());
208 for (int i = 0; i < num_components(); ++i) {
209 // TODO(josh11b,misard): Switch to allocate_output(). Problem is
210 // this breaks the abstraction boundary since we don't *really*
211 // know if and how the Tensors in the tuple we pass to callback
212 // correspond to the outputs of *ctx. For example, the
213 // ReaderRead Op uses TryDequeue() to get a filename out of a
214 // queue that is used internally by the reader and is not
215 // associated with any output of the ReaderRead.
216 // mrry@ adds:
217 // Maybe we need to pass a std::function<Tensor*(...)> (or
218 // better signature) that calls the appropriate allocator
219 // function in addition to ctx? (Or support a shim Allocator
220 // that has an internal OpKernelContext*, and dispatches to the
221 // appropriate method?)
222 // misard@ adds:
223 // I don't see that a std::function would help. The problem is
224 // that at this point (allocation time) the system doesn't know
225 // what is going to happen to the element read out of the
226 // queue. As long as we keep the generality that TensorFlow Ops
227 // do their own dynamic allocation in arbitrary C++ code, we
228 // need to preserve robustness to allocating output Tensors with
229 // the 'wrong' attributes, and fixing up with a copy. The only
230 // improvement I can see here in the future would be to support
231 // an optimized case where the queue 'knows' what attributes to
232 // use, and plumbs them through here.
233 Tensor element;
234 Status status = ctx->allocate_temp(component_dtypes_[i],
235 ManyOutShape(i, 0), &element);
236 if (!status.ok()) {
237 ctx->SetStatus(status);
238 callback(Tuple());
239 return;
240 }
241 tuple.emplace_back(element);
242 }
243 callback(tuple);
244 return;
245 }
246
247 CancellationManager* cm = ctx->cancellation_manager();
248 CancellationToken token = cm->get_cancellation_token();
249 bool already_cancelled;
250 {
251 mutex_lock l(mu_);
252 already_cancelled = !cm->RegisterCallback(
253 token, [this, cm, token]() { Cancel(kDequeue, cm, token); });
254 if (!already_cancelled) {
255 // TODO(josh11b): This makes two copies of callback, avoid this if possible.
256 dequeue_attempts_.emplace_back(
257 num_elements, [callback]() { callback(Tuple()); }, ctx, cm, token,
258 [callback, allow_small_batch,
259 this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) {
260 int64_t queue_size = queues_[0].size();
261
262 if (closed_ && queue_size < attempt->elements_requested) {
263 // If we don't have enough for a full dequeue, we have
264 // to reset the attempt tuple.
265 if (!attempt->tuple.empty()) {
266 // Restore already-dequeued elements to the front of the
267 // queue.
268 for (int64_t i = attempt->tuple[0].dim_size(0) -
269 attempt->elements_requested - 1;
270 i >= 0; --i) {
271 for (int j = 0; j < num_components(); ++j) {
272 Tensor element;
273 Status s = GetElementComponentFromBatch(
274 attempt->tuple, i, j, attempt->context, &element);
275 if (!s.ok()) {
276 attempt->context->SetStatus(
277 errors::DataLoss("Failed to restore element from "
278 "partially-dequeued batch "
279 "to FIFOQueue: ",
280 s.error_message()));
281 }
282 queues_[j].push_front(element);
283 }
284 }
285 }
286 if (allow_small_batch && !queues_[0].empty()) {
287 // Request all remaining elements in the queue.
288 queue_size = queues_[0].size();
289 attempt->tuple.clear();
290 attempt->elements_requested = queue_size;
291 } else {
292 if (allow_small_batch) {
293 // There may be some other attempts containing
294 // values. If so, we'll yield and wait for them
295 // to add elements to the queue.
296 if (!enqueue_attempts_.empty()) return kProgress;
297 }
298 if (attempt->context->status().ok()) {
299 attempt->context->SetStatus(errors::OutOfRange(
300 "FIFOQueue '", name_, "' is closed and has ",
301 "insufficient elements (requested ",
302 attempt->elements_requested, ", current size ",
303 queue_size, ")"));
304 }
305 return kComplete;
306 }
307 }
308
309 RunResult result = kNoProgress;
310 for (; queue_size > 0; --queue_size) {
311 if (attempt->tuple.empty()) {
312 // Only allocate tuple when we have something to dequeue
313 // so we don't use excessive memory when there are many
314 // blocked dequeue attempts waiting.
315 attempt->tuple.reserve(num_components());
316 for (int i = 0; i < num_components(); ++i) {
317 const TensorShape shape =
318 ManyOutShape(i, attempt->elements_requested);
319 Tensor element;
320 attempt->context->SetStatus(attempt->context->allocate_temp(
321 component_dtypes_[i], shape, &element));
322 if (!attempt->context->status().ok()) return kComplete;
323 attempt->tuple.emplace_back(element);
324 }
325 }
326 result = kProgress;
327 Tuple tuple;
328 DequeueLocked(attempt->context, &tuple);
329 const int64_t index =
330 attempt->tuple[0].dim_size(0) - attempt->elements_requested;
331 for (int i = 0; i < num_components(); ++i) {
332 attempt->context->SetStatus(batch_util::CopyElementToSlice(
333 std::move(tuple[i]), &attempt->tuple[i], index));
334 if (!attempt->context->status().ok()) return kComplete;
335 }
336 tuple.clear();
337 --attempt->elements_requested;
338 if (attempt->elements_requested == 0) {
339 tuple = attempt->tuple;
340 attempt->done_callback = [callback, tuple]() {
341 callback(tuple);
342 };
343 return kComplete;
344 }
345 }
346 return result;
347 });
348 }
349 }
350 if (!already_cancelled) {
351 FlushUnlocked();
352 } else {
353 ctx->SetStatus(errors::Cancelled("Dequeue operation was cancelled"));
354 callback(Tuple());
355 }
356}
357
358Status FIFOQueue::MatchesNodeDef(const NodeDef& node_def) {
359 if (!MatchesNodeDefOp(node_def, "FIFOQueue").ok() &&
360 !MatchesNodeDefOp(node_def, "FIFOQueueV2").ok()) {
361 return errors::InvalidArgument("Expected FIFOQueue, found ", node_def.op());
362 }
363 TF_RETURN_IF_ERROR(MatchesNodeDefCapacity(node_def, capacity_));
364 TF_RETURN_IF_ERROR(MatchesNodeDefTypes(node_def));
365 TF_RETURN_IF_ERROR(MatchesNodeDefShapes(node_def));
366 return OkStatus();
367}
368
369// Defines a FIFOQueueOp, which produces a Queue (specifically, one
370// backed by FIFOQueue) that persists across different graph
371// executions, and sessions. Running this op produces a single-element
372// tensor of handles to Queues in the corresponding device.
373FIFOQueueOp::FIFOQueueOp(OpKernelConstruction* context)
374 : TypedQueueOp(context) {
375 OP_REQUIRES_OK(context, context->GetAttr("shapes", &component_shapes_));
376}
377
378Status FIFOQueueOp::CreateResource(QueueInterface** ret) {
379 FIFOQueue* queue = new FIFOQueue(capacity_, component_types_,
380 component_shapes_, cinfo_.name());
381 return CreateTypedQueue(queue, ret);
382}
383
384} // namespace tensorflow
385