1 | /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | // See docs in ../ops/data_flow_ops.cc. |
17 | |
18 | #include "tensorflow/core/kernels/fifo_queue.h" |
19 | |
20 | #include <algorithm> |
21 | #include <deque> |
22 | #include <vector> |
23 | |
24 | #include "tensorflow/core/framework/node_def.pb.h" |
25 | #include "tensorflow/core/framework/tensor.h" |
26 | #include "tensorflow/core/framework/tensor_shape.h" |
27 | #include "tensorflow/core/framework/types.h" |
28 | #include "tensorflow/core/kernels/queue_base.h" |
29 | #include "tensorflow/core/lib/core/errors.h" |
30 | #include "tensorflow/core/platform/logging.h" |
31 | #include "tensorflow/core/platform/mutex.h" |
32 | #include "tensorflow/core/platform/types.h" |
33 | #include "tensorflow/core/util/batch_util.h" |
34 | |
35 | namespace tensorflow { |
36 | |
37 | FIFOQueue::FIFOQueue(int capacity, const DataTypeVector& component_dtypes, |
38 | const std::vector<TensorShape>& component_shapes, |
39 | const string& name) |
40 | : TypedQueue(capacity, component_dtypes, component_shapes, name) {} |
41 | |
42 | void FIFOQueue::DequeueLocked(OpKernelContext* ctx, Tuple* tuple) { |
43 | DCHECK_GT(queues_[0].size(), size_t{0}); |
44 | (*tuple).reserve(num_components()); |
45 | for (int i = 0; i < num_components(); ++i) { |
46 | (*tuple).push_back(queues_[i][0]); |
47 | queues_[i].pop_front(); |
48 | } |
49 | } |
50 | |
51 | void FIFOQueue::TryEnqueue(const Tuple& tuple, OpKernelContext* ctx, |
52 | DoneCallback callback) { |
53 | CancellationManager* cm = ctx->cancellation_manager(); |
54 | CancellationToken token = cm->get_cancellation_token(); |
55 | bool already_cancelled; |
56 | { |
57 | mutex_lock l(mu_); |
58 | already_cancelled = !cm->RegisterCallback( |
59 | token, [this, cm, token]() { Cancel(kEnqueue, cm, token); }); |
60 | if (!already_cancelled) { |
61 | enqueue_attempts_.emplace_back( |
62 | 1, callback, ctx, cm, token, |
63 | [tuple, this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
64 | if (closed_) { |
65 | attempt->context->SetStatus( |
66 | errors::Cancelled("FIFOQueue '" , name_, "' is closed." )); |
67 | return kComplete; |
68 | } |
69 | if (queues_[0].size() < static_cast<size_t>(capacity_)) { |
70 | for (int i = 0; i < num_components(); ++i) { |
71 | queues_[i].push_back(tuple[i]); |
72 | } |
73 | return kComplete; |
74 | } else { |
75 | return kNoProgress; |
76 | } |
77 | }); |
78 | } |
79 | } |
80 | if (!already_cancelled) { |
81 | FlushUnlocked(); |
82 | } else { |
83 | ctx->SetStatus(errors::Cancelled("Enqueue operation was cancelled" )); |
84 | callback(); |
85 | } |
86 | } |
87 | |
88 | /* static */ |
89 | Status FIFOQueue::GetElementComponentFromBatch(const FIFOQueue::Tuple& tuple, |
90 | int64_t index, int component, |
91 | OpKernelContext* ctx, |
92 | Tensor* out_tensor) { |
93 | TensorShape element_shape(tuple[component].shape()); |
94 | element_shape.RemoveDim(0); |
95 | TF_RETURN_IF_ERROR( |
96 | ctx->allocate_temp(tuple[component].dtype(), element_shape, out_tensor)); |
97 | TF_RETURN_IF_ERROR( |
98 | batch_util::CopySliceToElement(tuple[component], out_tensor, index)); |
99 | return OkStatus(); |
100 | } |
101 | |
102 | void FIFOQueue::TryEnqueueMany(const Tuple& tuple, OpKernelContext* ctx, |
103 | DoneCallback callback) { |
104 | const int64_t batch_size = tuple[0].dim_size(0); |
105 | if (batch_size == 0) { |
106 | callback(); |
107 | return; |
108 | } |
109 | |
110 | CancellationManager* cm = ctx->cancellation_manager(); |
111 | CancellationToken token = cm->get_cancellation_token(); |
112 | bool already_cancelled; |
113 | { |
114 | mutex_lock l(mu_); |
115 | already_cancelled = !cm->RegisterCallback( |
116 | token, [this, cm, token]() { Cancel(kEnqueue, cm, token); }); |
117 | if (!already_cancelled) { |
118 | enqueue_attempts_.emplace_back( |
119 | batch_size, callback, ctx, cm, token, |
120 | [tuple, this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
121 | if (closed_) { |
122 | attempt->context->SetStatus( |
123 | errors::Cancelled("FIFOQueue '" , name_, "' is closed." )); |
124 | return kComplete; |
125 | } |
126 | RunResult result = kNoProgress; |
127 | while (queues_[0].size() < static_cast<size_t>(capacity_)) { |
128 | result = kProgress; |
129 | const int64_t index = |
130 | tuple[0].dim_size(0) - attempt->elements_requested; |
131 | for (int i = 0; i < num_components(); ++i) { |
132 | Tensor element; |
133 | attempt->context->SetStatus(GetElementComponentFromBatch( |
134 | tuple, index, i, attempt->context, &element)); |
135 | if (!attempt->context->status().ok()) return kComplete; |
136 | queues_[i].push_back(element); |
137 | } |
138 | --attempt->elements_requested; |
139 | if (attempt->elements_requested == 0) { |
140 | return kComplete; |
141 | } |
142 | } |
143 | return result; |
144 | }); |
145 | } |
146 | } |
147 | if (!already_cancelled) { |
148 | FlushUnlocked(); |
149 | } else { |
150 | ctx->SetStatus(errors::Cancelled("Enqueue operation was cancelled" )); |
151 | callback(); |
152 | } |
153 | } |
154 | |
155 | void FIFOQueue::TryDequeue(OpKernelContext* ctx, CallbackWithTuple callback) { |
156 | CancellationManager* cm = ctx->cancellation_manager(); |
157 | CancellationToken token = cm->get_cancellation_token(); |
158 | bool already_cancelled; |
159 | { |
160 | mutex_lock l(mu_); |
161 | already_cancelled = !cm->RegisterCallback( |
162 | token, [this, cm, token]() { Cancel(kDequeue, cm, token); }); |
163 | if (!already_cancelled) { |
164 | // TODO(josh11b): This makes two copies of callback, avoid this if possible. |
165 | dequeue_attempts_.emplace_back( |
166 | 1, [callback]() { callback(Tuple()); }, ctx, cm, token, |
167 | [callback, this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
168 | const int64_t queue_size = queues_[0].size(); |
169 | if (closed_ && queue_size == 0) { |
170 | attempt->context->SetStatus(errors::OutOfRange( |
171 | "FIFOQueue '" , name_, "' is closed and has " , |
172 | "insufficient elements (requested " , 1, ", current size " , |
173 | queue_size, ")" )); |
174 | return kComplete; |
175 | } |
176 | if (queue_size > 0) { |
177 | Tuple tuple; |
178 | DequeueLocked(attempt->context, &tuple); |
179 | attempt->done_callback = [callback, tuple]() { callback(tuple); }; |
180 | return kComplete; |
181 | } else { |
182 | return kNoProgress; |
183 | } |
184 | }); |
185 | } |
186 | } |
187 | if (!already_cancelled) { |
188 | FlushUnlocked(); |
189 | } else { |
190 | ctx->SetStatus(errors::Cancelled("Dequeue operation was cancelled" )); |
191 | callback(Tuple()); |
192 | } |
193 | } |
194 | |
195 | void FIFOQueue::TryDequeueMany(int num_elements, OpKernelContext* ctx, |
196 | bool allow_small_batch, |
197 | CallbackWithTuple callback) { |
198 | if (!specified_shapes()) { |
199 | ctx->SetStatus(errors::InvalidArgument( |
200 | "FIFOQueue's DequeueMany and DequeueUpTo require the " |
201 | "components to have specified shapes." )); |
202 | callback(Tuple()); |
203 | return; |
204 | } |
205 | if (num_elements == 0) { |
206 | Tuple tuple; |
207 | tuple.reserve(num_components()); |
208 | for (int i = 0; i < num_components(); ++i) { |
209 | // TODO(josh11b,misard): Switch to allocate_output(). Problem is |
210 | // this breaks the abstraction boundary since we don't *really* |
211 | // know if and how the Tensors in the tuple we pass to callback |
212 | // correspond to the outputs of *ctx. For example, the |
213 | // ReaderRead Op uses TryDequeue() to get a filename out of a |
214 | // queue that is used internally by the reader and is not |
215 | // associated with any output of the ReaderRead. |
216 | // mrry@ adds: |
217 | // Maybe we need to pass a std::function<Tensor*(...)> (or |
218 | // better signature) that calls the appropriate allocator |
219 | // function in addition to ctx? (Or support a shim Allocator |
220 | // that has an internal OpKernelContext*, and dispatches to the |
221 | // appropriate method?) |
222 | // misard@ adds: |
223 | // I don't see that a std::function would help. The problem is |
224 | // that at this point (allocation time) the system doesn't know |
225 | // what is going to happen to the element read out of the |
226 | // queue. As long as we keep the generality that TensorFlow Ops |
227 | // do their own dynamic allocation in arbitrary C++ code, we |
228 | // need to preserve robustness to allocating output Tensors with |
229 | // the 'wrong' attributes, and fixing up with a copy. The only |
230 | // improvement I can see here in the future would be to support |
231 | // an optimized case where the queue 'knows' what attributes to |
232 | // use, and plumbs them through here. |
233 | Tensor element; |
234 | Status status = ctx->allocate_temp(component_dtypes_[i], |
235 | ManyOutShape(i, 0), &element); |
236 | if (!status.ok()) { |
237 | ctx->SetStatus(status); |
238 | callback(Tuple()); |
239 | return; |
240 | } |
241 | tuple.emplace_back(element); |
242 | } |
243 | callback(tuple); |
244 | return; |
245 | } |
246 | |
247 | CancellationManager* cm = ctx->cancellation_manager(); |
248 | CancellationToken token = cm->get_cancellation_token(); |
249 | bool already_cancelled; |
250 | { |
251 | mutex_lock l(mu_); |
252 | already_cancelled = !cm->RegisterCallback( |
253 | token, [this, cm, token]() { Cancel(kDequeue, cm, token); }); |
254 | if (!already_cancelled) { |
255 | // TODO(josh11b): This makes two copies of callback, avoid this if possible. |
256 | dequeue_attempts_.emplace_back( |
257 | num_elements, [callback]() { callback(Tuple()); }, ctx, cm, token, |
258 | [callback, allow_small_batch, |
259 | this](Attempt* attempt) TF_EXCLUSIVE_LOCKS_REQUIRED(mu_) { |
260 | int64_t queue_size = queues_[0].size(); |
261 | |
262 | if (closed_ && queue_size < attempt->elements_requested) { |
263 | // If we don't have enough for a full dequeue, we have |
264 | // to reset the attempt tuple. |
265 | if (!attempt->tuple.empty()) { |
266 | // Restore already-dequeued elements to the front of the |
267 | // queue. |
268 | for (int64_t i = attempt->tuple[0].dim_size(0) - |
269 | attempt->elements_requested - 1; |
270 | i >= 0; --i) { |
271 | for (int j = 0; j < num_components(); ++j) { |
272 | Tensor element; |
273 | Status s = GetElementComponentFromBatch( |
274 | attempt->tuple, i, j, attempt->context, &element); |
275 | if (!s.ok()) { |
276 | attempt->context->SetStatus( |
277 | errors::DataLoss("Failed to restore element from " |
278 | "partially-dequeued batch " |
279 | "to FIFOQueue: " , |
280 | s.error_message())); |
281 | } |
282 | queues_[j].push_front(element); |
283 | } |
284 | } |
285 | } |
286 | if (allow_small_batch && !queues_[0].empty()) { |
287 | // Request all remaining elements in the queue. |
288 | queue_size = queues_[0].size(); |
289 | attempt->tuple.clear(); |
290 | attempt->elements_requested = queue_size; |
291 | } else { |
292 | if (allow_small_batch) { |
293 | // There may be some other attempts containing |
294 | // values. If so, we'll yield and wait for them |
295 | // to add elements to the queue. |
296 | if (!enqueue_attempts_.empty()) return kProgress; |
297 | } |
298 | if (attempt->context->status().ok()) { |
299 | attempt->context->SetStatus(errors::OutOfRange( |
300 | "FIFOQueue '" , name_, "' is closed and has " , |
301 | "insufficient elements (requested " , |
302 | attempt->elements_requested, ", current size " , |
303 | queue_size, ")" )); |
304 | } |
305 | return kComplete; |
306 | } |
307 | } |
308 | |
309 | RunResult result = kNoProgress; |
310 | for (; queue_size > 0; --queue_size) { |
311 | if (attempt->tuple.empty()) { |
312 | // Only allocate tuple when we have something to dequeue |
313 | // so we don't use excessive memory when there are many |
314 | // blocked dequeue attempts waiting. |
315 | attempt->tuple.reserve(num_components()); |
316 | for (int i = 0; i < num_components(); ++i) { |
317 | const TensorShape shape = |
318 | ManyOutShape(i, attempt->elements_requested); |
319 | Tensor element; |
320 | attempt->context->SetStatus(attempt->context->allocate_temp( |
321 | component_dtypes_[i], shape, &element)); |
322 | if (!attempt->context->status().ok()) return kComplete; |
323 | attempt->tuple.emplace_back(element); |
324 | } |
325 | } |
326 | result = kProgress; |
327 | Tuple tuple; |
328 | DequeueLocked(attempt->context, &tuple); |
329 | const int64_t index = |
330 | attempt->tuple[0].dim_size(0) - attempt->elements_requested; |
331 | for (int i = 0; i < num_components(); ++i) { |
332 | attempt->context->SetStatus(batch_util::CopyElementToSlice( |
333 | std::move(tuple[i]), &attempt->tuple[i], index)); |
334 | if (!attempt->context->status().ok()) return kComplete; |
335 | } |
336 | tuple.clear(); |
337 | --attempt->elements_requested; |
338 | if (attempt->elements_requested == 0) { |
339 | tuple = attempt->tuple; |
340 | attempt->done_callback = [callback, tuple]() { |
341 | callback(tuple); |
342 | }; |
343 | return kComplete; |
344 | } |
345 | } |
346 | return result; |
347 | }); |
348 | } |
349 | } |
350 | if (!already_cancelled) { |
351 | FlushUnlocked(); |
352 | } else { |
353 | ctx->SetStatus(errors::Cancelled("Dequeue operation was cancelled" )); |
354 | callback(Tuple()); |
355 | } |
356 | } |
357 | |
358 | Status FIFOQueue::MatchesNodeDef(const NodeDef& node_def) { |
359 | if (!MatchesNodeDefOp(node_def, "FIFOQueue" ).ok() && |
360 | !MatchesNodeDefOp(node_def, "FIFOQueueV2" ).ok()) { |
361 | return errors::InvalidArgument("Expected FIFOQueue, found " , node_def.op()); |
362 | } |
363 | TF_RETURN_IF_ERROR(MatchesNodeDefCapacity(node_def, capacity_)); |
364 | TF_RETURN_IF_ERROR(MatchesNodeDefTypes(node_def)); |
365 | TF_RETURN_IF_ERROR(MatchesNodeDefShapes(node_def)); |
366 | return OkStatus(); |
367 | } |
368 | |
369 | // Defines a FIFOQueueOp, which produces a Queue (specifically, one |
370 | // backed by FIFOQueue) that persists across different graph |
371 | // executions, and sessions. Running this op produces a single-element |
372 | // tensor of handles to Queues in the corresponding device. |
373 | FIFOQueueOp::FIFOQueueOp(OpKernelConstruction* context) |
374 | : TypedQueueOp(context) { |
375 | OP_REQUIRES_OK(context, context->GetAttr("shapes" , &component_shapes_)); |
376 | } |
377 | |
378 | Status FIFOQueueOp::CreateResource(QueueInterface** ret) { |
379 | FIFOQueue* queue = new FIFOQueue(capacity_, component_types_, |
380 | component_shapes_, cinfo_.name()); |
381 | return CreateTypedQueue(queue, ret); |
382 | } |
383 | |
384 | } // namespace tensorflow |
385 | |