1/* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
2
3Licensed under the Apache License, Version 2.0 (the "License");
4you may not use this file except in compliance with the License.
5You may obtain a copy of the License at
6
7 http://www.apache.org/licenses/LICENSE-2.0
8
9Unless required by applicable law or agreed to in writing, software
10distributed under the License is distributed on an "AS IS" BASIS,
11WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12See the License for the specific language governing permissions and
13limitations under the License.
14==============================================================================*/
15
16#include "tensorflow/core/lib/io/buffered_inputstream.h"
17
18#include "tensorflow/core/lib/io/random_inputstream.h"
19
20namespace tensorflow {
21namespace io {
22
23BufferedInputStream::BufferedInputStream(InputStreamInterface* input_stream,
24 size_t buffer_size,
25 bool owns_input_stream)
26 : input_stream_(input_stream),
27 size_(buffer_size),
28 owns_input_stream_(owns_input_stream) {
29 buf_.reserve(size_);
30}
31
32BufferedInputStream::BufferedInputStream(RandomAccessFile* file,
33 size_t buffer_size)
34 : BufferedInputStream(new RandomAccessInputStream(file), buffer_size,
35 true) {}
36
37BufferedInputStream::~BufferedInputStream() {
38 if (owns_input_stream_) {
39 delete input_stream_;
40 }
41}
42
43Status BufferedInputStream::FillBuffer() {
44 if (!file_status_.ok()) {
45 pos_ = 0;
46 limit_ = 0;
47 return file_status_;
48 }
49 Status s = input_stream_->ReadNBytes(size_, &buf_);
50 pos_ = 0;
51 limit_ = buf_.size();
52 if (buf_.empty()) {
53 DCHECK(!s.ok());
54 file_status_ = s;
55 }
56 return s;
57}
58
59Status BufferedInputStream::ReadLineHelper(string* result, bool include_eol) {
60 result->clear();
61 Status s;
62 while (true) {
63 if (pos_ == limit_) {
64 // Get more data into buffer
65 s = FillBuffer();
66 if (limit_ == 0) {
67 break;
68 }
69 }
70 char c = buf_[pos_++];
71 if (c == '\n') {
72 if (include_eol) {
73 *result += c;
74 }
75 return Status::OK();
76 }
77 // We don't append '\r' to *result
78 if (c != '\r') {
79 *result += c;
80 }
81 }
82 if (errors::IsOutOfRange(s) && !result->empty()) {
83 return Status::OK();
84 }
85 return s;
86}
87
88Status BufferedInputStream::ReadNBytes(int64 bytes_to_read, string* result) {
89 if (bytes_to_read < 0) {
90 return errors::InvalidArgument("Can't read a negative number of bytes: ",
91 bytes_to_read);
92 }
93 result->clear();
94 if (!file_status_.ok() && bytes_to_read > 0) {
95 return file_status_;
96 }
97 result->reserve(bytes_to_read);
98
99 Status s;
100 while (result->size() < static_cast<size_t>(bytes_to_read)) {
101 // Check whether the buffer is fully read or not.
102 if (pos_ == limit_) {
103 s = FillBuffer();
104 // If we didn't read any bytes, we're at the end of the file; break out.
105 if (limit_ == 0) {
106 DCHECK(!s.ok());
107 file_status_ = s;
108 break;
109 }
110 }
111 const int64 bytes_to_copy =
112 std::min<int64>(limit_ - pos_, bytes_to_read - result->size());
113 result->insert(result->size(), buf_, pos_, bytes_to_copy);
114 pos_ += bytes_to_copy;
115 }
116 // Filling the buffer might lead to a situation when we go past the end of
117 // the file leading to an OutOfRange() status return. But we might have
118 // obtained enough data to satisfy the function call. Returning OK then.
119 if (errors::IsOutOfRange(s) &&
120 (result->size() == static_cast<size_t>(bytes_to_read))) {
121 return Status::OK();
122 }
123 return s;
124}
125
126Status BufferedInputStream::SkipNBytes(int64 bytes_to_skip) {
127 if (bytes_to_skip < 0) {
128 return errors::InvalidArgument("Can only skip forward, not ",
129 bytes_to_skip);
130 }
131 if (pos_ + bytes_to_skip < limit_) {
132 // If we aren't skipping too much, then we can just move pos_;
133 pos_ += bytes_to_skip;
134 } else {
135 // Otherwise, we already have read limit_ - pos_, so skip the rest. At this
136 // point we need to get fresh data into the buffer, so reset pos_ and
137 // limit_.
138 Status s = input_stream_->SkipNBytes(bytes_to_skip - (limit_ - pos_));
139 pos_ = 0;
140 limit_ = 0;
141 if (errors::IsOutOfRange(s)) {
142 file_status_ = s;
143 }
144 return s;
145 }
146 return Status::OK();
147}
148
149int64 BufferedInputStream::Tell() const {
150 return input_stream_->Tell() - (limit_ - pos_);
151}
152
153Status BufferedInputStream::Seek(int64 position) {
154 if (position < 0) {
155 return errors::InvalidArgument("Seeking to a negative position: ",
156 position);
157 }
158
159 // Position of the buffer within file.
160 const int64 bufpos = Tell();
161 if (position < bufpos) {
162 // Reset input stream and skip 'position' bytes.
163 TF_RETURN_IF_ERROR(Reset());
164 return SkipNBytes(position);
165 }
166
167 return SkipNBytes(position - bufpos);
168}
169
170Status BufferedInputStream::ReadAll(string* result) {
171 result->clear();
172 Status status;
173 while (status.ok()) {
174 status = FillBuffer();
175 if (limit_ == 0) {
176 break;
177 }
178 result->append(buf_);
179 pos_ = limit_;
180 }
181
182 if (errors::IsOutOfRange(status)) {
183 file_status_ = status;
184 return Status::OK();
185 }
186 return status;
187}
188
189Status BufferedInputStream::Reset() {
190 TF_RETURN_IF_ERROR(input_stream_->Reset());
191 pos_ = 0;
192 limit_ = 0;
193 file_status_ = Status::OK();
194 return Status::OK();
195}
196
197Status BufferedInputStream::ReadLine(string* result) {
198 return ReadLineHelper(result, false);
199}
200
201string BufferedInputStream::ReadLineAsString() {
202 string result;
203 ReadLineHelper(&result, true).IgnoreError();
204 return result;
205}
206
207} // namespace io
208} // namespace tensorflow
209