1 | /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | // See docs in ../ops/parse_ops.cc. |
17 | |
18 | #include <algorithm> |
19 | #include "tensorflow/core/framework/op_kernel.h" |
20 | #include "tensorflow/core/framework/tensor.h" |
21 | #include "tensorflow/core/framework/tensor_shape.h" |
22 | #include "tensorflow/core/framework/types.h" |
23 | #include "tensorflow/core/lib/core/errors.h" |
24 | #include "tensorflow/core/lib/io/zlib_compression_options.h" |
25 | #include "tensorflow/core/lib/io/zlib_inputstream.h" |
26 | |
27 | namespace tensorflow { |
28 | namespace { |
29 | // Wrap memory buffer into InputStreamInterface |
30 | class MemoryInputStream : public io::InputStreamInterface { |
31 | public: |
32 | explicit MemoryInputStream(const char* buffer, size_t length) |
33 | : buf_(buffer), len_(length), pos_(0) {} |
34 | |
35 | ~MemoryInputStream() override {} |
36 | |
37 | Status ReadNBytes(int64_t bytes_to_read, tstring* result) override { |
38 | result->clear(); |
39 | if (bytes_to_read < 0) { |
40 | return errors::InvalidArgument("Can't read a negative number of bytes: " , |
41 | bytes_to_read); |
42 | } |
43 | int64_t bytes = bytes_to_read; |
44 | Status s = OkStatus(); |
45 | if (pos_ + bytes_to_read > len_) { |
46 | bytes = len_ - pos_; |
47 | s = errors::OutOfRange("reached end of file" ); |
48 | } |
49 | if (bytes > 0) { |
50 | result->resize(bytes); |
51 | memcpy(&(*result)[0], &buf_[pos_], bytes); |
52 | pos_ += bytes; |
53 | } |
54 | return s; |
55 | } |
56 | |
57 | int64_t Tell() const override { return pos_; } |
58 | |
59 | Status Reset() override { |
60 | pos_ = 0; |
61 | return OkStatus(); |
62 | } |
63 | |
64 | private: |
65 | const char* buf_; // Not owned. |
66 | int64_t len_; |
67 | int64_t pos_ = 0; // Tracks where we are in the file. |
68 | }; |
69 | } // namespace |
70 | |
71 | class DecodeCompressedOp : public OpKernel { |
72 | public: |
73 | explicit DecodeCompressedOp(OpKernelConstruction* context) |
74 | : OpKernel(context) { |
75 | OP_REQUIRES_OK(context, |
76 | context->GetAttr("compression_type" , &compression_type_)); |
77 | OP_REQUIRES(context, |
78 | (compression_type_.empty() || compression_type_ == "ZLIB" || |
79 | compression_type_ == "GZIP" ), |
80 | errors::InvalidArgument( |
81 | "Only ZLIB, GZIP or NONE are supported compressions" )); |
82 | } |
83 | |
84 | void Compute(OpKernelContext* context) override { |
85 | const Tensor* bytes_tensor; |
86 | OP_REQUIRES_OK(context, context->input("bytes" , &bytes_tensor)); |
87 | const auto& bytes_flat = bytes_tensor->flat<tstring>(); |
88 | |
89 | Tensor* output_tensor = nullptr; |
90 | OP_REQUIRES_OK(context, |
91 | context->allocate_output("output" , bytes_tensor->shape(), |
92 | &output_tensor)); |
93 | auto output_flat = output_tensor->flat<tstring>(); |
94 | if (compression_type_.empty()) { |
95 | for (int64_t i = 0; i < bytes_flat.size(); i++) { |
96 | output_flat(i) = bytes_flat(i); |
97 | } |
98 | } else { |
99 | const io::ZlibCompressionOptions zlib_options = |
100 | compression_type_ == "ZLIB" ? io::ZlibCompressionOptions::DEFAULT() |
101 | : io::ZlibCompressionOptions::GZIP(); |
102 | for (int64_t i = 0; i < bytes_flat.size(); i++) { |
103 | std::unique_ptr<MemoryInputStream> input_stream( |
104 | new MemoryInputStream(bytes_flat(i).data(), bytes_flat(i).size())); |
105 | std::unique_ptr<io::ZlibInputStream> zlib_stream( |
106 | new io::ZlibInputStream( |
107 | input_stream.get(), static_cast<size_t>(kBufferSize), |
108 | static_cast<size_t>(kBufferSize), zlib_options)); |
109 | tstring output_string; |
110 | Status s = zlib_stream->ReadNBytes(INT_MAX, &output_string); |
111 | OP_REQUIRES(context, (s.ok() || errors::IsOutOfRange(s)), s); |
112 | output_flat(i) = std::move(output_string); |
113 | } |
114 | } |
115 | } |
116 | |
117 | private: |
118 | enum { kBufferSize = 256 << 10 /* 256 kB */ }; |
119 | string compression_type_; |
120 | }; |
121 | |
122 | REGISTER_KERNEL_BUILDER(Name("DecodeCompressed" ).Device(DEVICE_CPU), |
123 | DecodeCompressedOp) |
124 | |
125 | } // namespace tensorflow |
126 | |