1 | /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. |
2 | |
3 | Licensed under the Apache License, Version 2.0 (the "License"); |
4 | you may not use this file except in compliance with the License. |
5 | You may obtain a copy of the License at |
6 | |
7 | http://www.apache.org/licenses/LICENSE-2.0 |
8 | |
9 | Unless required by applicable law or agreed to in writing, software |
10 | distributed under the License is distributed on an "AS IS" BASIS, |
11 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | See the License for the specific language governing permissions and |
13 | limitations under the License. |
14 | ==============================================================================*/ |
15 | |
16 | #include "tensorflow/core/lib/io/buffered_inputstream.h" |
17 | |
18 | #include "tensorflow/core/lib/io/random_inputstream.h" |
19 | |
20 | namespace tensorflow { |
21 | namespace io { |
22 | |
23 | BufferedInputStream::BufferedInputStream(InputStreamInterface* input_stream, |
24 | size_t buffer_size, |
25 | bool owns_input_stream) |
26 | : input_stream_(input_stream), |
27 | size_(buffer_size), |
28 | owns_input_stream_(owns_input_stream) { |
29 | buf_.reserve(size_); |
30 | } |
31 | |
32 | BufferedInputStream::BufferedInputStream(RandomAccessFile* file, |
33 | size_t buffer_size) |
34 | : BufferedInputStream(new RandomAccessInputStream(file), buffer_size, |
35 | true) {} |
36 | |
37 | BufferedInputStream::~BufferedInputStream() { |
38 | if (owns_input_stream_) { |
39 | delete input_stream_; |
40 | } |
41 | } |
42 | |
43 | Status BufferedInputStream::FillBuffer() { |
44 | if (!file_status_.ok()) { |
45 | pos_ = 0; |
46 | limit_ = 0; |
47 | return file_status_; |
48 | } |
49 | Status s = input_stream_->ReadNBytes(size_, &buf_); |
50 | pos_ = 0; |
51 | limit_ = buf_.size(); |
52 | if (buf_.empty()) { |
53 | DCHECK(!s.ok()); |
54 | file_status_ = s; |
55 | } |
56 | return s; |
57 | } |
58 | |
59 | Status BufferedInputStream::ReadLineHelper(string* result, bool include_eol) { |
60 | result->clear(); |
61 | Status s; |
62 | while (true) { |
63 | if (pos_ == limit_) { |
64 | // Get more data into buffer |
65 | s = FillBuffer(); |
66 | if (limit_ == 0) { |
67 | break; |
68 | } |
69 | } |
70 | char c = buf_[pos_++]; |
71 | if (c == '\n') { |
72 | if (include_eol) { |
73 | *result += c; |
74 | } |
75 | return Status::OK(); |
76 | } |
77 | // We don't append '\r' to *result |
78 | if (c != '\r') { |
79 | *result += c; |
80 | } |
81 | } |
82 | if (errors::IsOutOfRange(s) && !result->empty()) { |
83 | return Status::OK(); |
84 | } |
85 | return s; |
86 | } |
87 | |
88 | Status BufferedInputStream::ReadNBytes(int64 bytes_to_read, string* result) { |
89 | if (bytes_to_read < 0) { |
90 | return errors::InvalidArgument("Can't read a negative number of bytes: " , |
91 | bytes_to_read); |
92 | } |
93 | result->clear(); |
94 | if (!file_status_.ok() && bytes_to_read > 0) { |
95 | return file_status_; |
96 | } |
97 | result->reserve(bytes_to_read); |
98 | |
99 | Status s; |
100 | while (result->size() < static_cast<size_t>(bytes_to_read)) { |
101 | // Check whether the buffer is fully read or not. |
102 | if (pos_ == limit_) { |
103 | s = FillBuffer(); |
104 | // If we didn't read any bytes, we're at the end of the file; break out. |
105 | if (limit_ == 0) { |
106 | DCHECK(!s.ok()); |
107 | file_status_ = s; |
108 | break; |
109 | } |
110 | } |
111 | const int64 bytes_to_copy = |
112 | std::min<int64>(limit_ - pos_, bytes_to_read - result->size()); |
113 | result->insert(result->size(), buf_, pos_, bytes_to_copy); |
114 | pos_ += bytes_to_copy; |
115 | } |
116 | // Filling the buffer might lead to a situation when we go past the end of |
117 | // the file leading to an OutOfRange() status return. But we might have |
118 | // obtained enough data to satisfy the function call. Returning OK then. |
119 | if (errors::IsOutOfRange(s) && |
120 | (result->size() == static_cast<size_t>(bytes_to_read))) { |
121 | return Status::OK(); |
122 | } |
123 | return s; |
124 | } |
125 | |
126 | Status BufferedInputStream::SkipNBytes(int64 bytes_to_skip) { |
127 | if (bytes_to_skip < 0) { |
128 | return errors::InvalidArgument("Can only skip forward, not " , |
129 | bytes_to_skip); |
130 | } |
131 | if (pos_ + bytes_to_skip < limit_) { |
132 | // If we aren't skipping too much, then we can just move pos_; |
133 | pos_ += bytes_to_skip; |
134 | } else { |
135 | // Otherwise, we already have read limit_ - pos_, so skip the rest. At this |
136 | // point we need to get fresh data into the buffer, so reset pos_ and |
137 | // limit_. |
138 | Status s = input_stream_->SkipNBytes(bytes_to_skip - (limit_ - pos_)); |
139 | pos_ = 0; |
140 | limit_ = 0; |
141 | if (errors::IsOutOfRange(s)) { |
142 | file_status_ = s; |
143 | } |
144 | return s; |
145 | } |
146 | return Status::OK(); |
147 | } |
148 | |
149 | int64 BufferedInputStream::Tell() const { |
150 | return input_stream_->Tell() - (limit_ - pos_); |
151 | } |
152 | |
153 | Status BufferedInputStream::Seek(int64 position) { |
154 | if (position < 0) { |
155 | return errors::InvalidArgument("Seeking to a negative position: " , |
156 | position); |
157 | } |
158 | |
159 | // Position of the buffer within file. |
160 | const int64 bufpos = Tell(); |
161 | if (position < bufpos) { |
162 | // Reset input stream and skip 'position' bytes. |
163 | TF_RETURN_IF_ERROR(Reset()); |
164 | return SkipNBytes(position); |
165 | } |
166 | |
167 | return SkipNBytes(position - bufpos); |
168 | } |
169 | |
170 | Status BufferedInputStream::ReadAll(string* result) { |
171 | result->clear(); |
172 | Status status; |
173 | while (status.ok()) { |
174 | status = FillBuffer(); |
175 | if (limit_ == 0) { |
176 | break; |
177 | } |
178 | result->append(buf_); |
179 | pos_ = limit_; |
180 | } |
181 | |
182 | if (errors::IsOutOfRange(status)) { |
183 | file_status_ = status; |
184 | return Status::OK(); |
185 | } |
186 | return status; |
187 | } |
188 | |
189 | Status BufferedInputStream::Reset() { |
190 | TF_RETURN_IF_ERROR(input_stream_->Reset()); |
191 | pos_ = 0; |
192 | limit_ = 0; |
193 | file_status_ = Status::OK(); |
194 | return Status::OK(); |
195 | } |
196 | |
197 | Status BufferedInputStream::ReadLine(string* result) { |
198 | return ReadLineHelper(result, false); |
199 | } |
200 | |
201 | string BufferedInputStream::ReadLineAsString() { |
202 | string result; |
203 | ReadLineHelper(&result, true).IgnoreError(); |
204 | return result; |
205 | } |
206 | |
207 | } // namespace io |
208 | } // namespace tensorflow |
209 | |