NextableInputStream: more options
[nikiroo-utils.git] / src / be / nikiroo / utils / NextableInputStream.java
1 package be.nikiroo.utils;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5
6 /**
7 * This {@link InputStream} can be separated into sub-streams (you can process
8 * it as a normal {@link InputStream} but, when it is spent, you can call
9 * {@link NextableInputStream#next()} on it to unlock new data).
10 * <p>
11 * The separation in sub-streams is done via {@link NextableInputStreamStep}.
12 *
13 * @author niki
14 */
15 public class NextableInputStream extends InputStream {
16 private NextableInputStreamStep step;
17 private boolean started;
18 private boolean stopped;
19 private boolean closed;
20
21 private InputStream in;
22 private int openCounter;
23 private boolean eof;
24 private int pos;
25 private int len;
26 private byte[] buffer;
27
28 private long bytesRead;
29
30 /**
31 * Create a new {@link NextableInputStream} that wraps the given
32 * {@link InputStream}.
33 *
34 * @param in
35 * the {@link InputStream} to wrap
36 * @param step
37 * how to separate it into sub-streams (can be NULL, but in that
38 * case it will behave as a normal {@link InputStream})
39 */
40 public NextableInputStream(InputStream in, NextableInputStreamStep step) {
41 this.in = in;
42 this.step = step;
43
44 this.buffer = new byte[4096];
45 this.pos = 0;
46 this.len = 0;
47 }
48
49 /**
50 * Create a new {@link NextableInputStream} that wraps the given bytes array
51 * as a data source.
52 *
53 * @param in
54 * the array to wrap, cannot be NULL
55 * @param step
56 * how to separate it into sub-streams (can be NULL, but in that
57 * case it will behave as a normal {@link InputStream})
58 */
59 public NextableInputStream(byte[] in, NextableInputStreamStep step) {
60 this(in, step, 0, in.length);
61 }
62
63 /**
64 * Create a new {@link NextableInputStream} that wraps the given bytes array
65 * as a data source.
66 *
67 * @param in
68 * the array to wrap, cannot be NULL
69 * @param step
70 * how to separate it into sub-streams (can be NULL, but in that
71 * case it will behave as a normal {@link InputStream})
72 * @param offset
73 * the offset to start the reading at
74 * @param length
75 * the number of bytes to take into account in the array,
76 * starting from the offset
77 *
78 * @throws NullPointerException
79 * if the array is NULL
80 * @throws IndexOutOfBoundsException
81 * if the offset and length do not correspond to the given array
82 */
83 public NextableInputStream(byte[] in, NextableInputStreamStep step,
84 int offset, int length) {
85 if (in == null) {
86 throw new NullPointerException();
87 } else if (offset < 0 || length < 0 || length > in.length - offset) {
88 throw new IndexOutOfBoundsException();
89 }
90
91 this.in = null;
92 this.step = step;
93
94 this.buffer = in;
95 this.pos = offset;
96 this.len = length;
97
98 checkBuffer(true);
99 }
100
101 /**
102 * Return this very same {@link NextableInputStream}, but keep a counter of
103 * how many streams were open this way. When calling
104 * {@link NextableInputStream#close()}, decrease this counter if it is not
105 * already zero instead of actually closing the stream.
106 * <p>
107 * You are now responsible for it &mdash; you <b>must</b> close it.
108 * <p>
109 * This method allows you to use a wrapping stream around this one and still
110 * close the wrapping stream.
111 *
112 * @return the same stream, but you are now responsible for closing it
113 *
114 * @throws IOException
115 * in case of I/O error or if the stream is closed
116 */
117 public synchronized InputStream open() throws IOException {
118 checkClose();
119 openCounter++;
120 return this;
121 }
122
123 /**
124 * Unblock the processing of the next sub-stream.
125 * <p>
126 * It can only be called when the "current" stream is spent (i.e., you must
127 * first process the stream until it is spent).
128 * <p>
129 * We consider that when the under-laying {@link InputStream} is also spent,
130 * we cannot have a next sub-stream (it will thus return FALSE).
131 * <p>
132 * {@link IOException}s can happen when we have no data available in the
133 * buffer; in that case, we fetch more data to know if we can have a next
134 * sub-stream or not.
135 *
136 * @return TRUE if we unblocked the next sub-stream, FALSE if not
137 *
138 * @throws IOException
139 * in case of I/O error or if the stream is closed
140 */
141 public boolean next() throws IOException {
142 return next(false);
143 }
144
145 /**
146 * Unblock the next sub-stream as would have done
147 * {@link NextableInputStream#next()}, but disable the sub-stream systems.
148 * <p>
149 * That is, the next stream, if any, will be the last one and will not be
150 * subject to the {@link NextableInputStreamStep}.
151 *
152 * @return TRUE if we unblocked the next sub-stream, FALSE if not
153 *
154 * @throws IOException
155 * in case of I/O error or if the stream is closed
156 */
157 public boolean nextAll() throws IOException {
158 return next(true);
159 }
160
161 public boolean startWith() {
162 // TODO
163 return false;
164 }
165
166 public boolean startWiths() {
167 // TODO
168 return false;
169 }
170
171 /**
172 * The number of bytes read from the under-laying {@link InputStream}.
173 *
174 * @return the number of bytes
175 */
176 public long getBytesRead() {
177 return bytesRead;
178 }
179
180 /**
181 * Check if this stream is totally spent (no more data to read or to
182 * process).
183 *
184 * @return TRUE if it is
185 */
186 public boolean eof() {
187 return closed || (len < 0 && !hasMoreData());
188 }
189
190 @Override
191 public int read() throws IOException {
192 checkClose();
193
194 preRead();
195 if (eof) {
196 return -1;
197 }
198
199 return buffer[pos++];
200 }
201
202 @Override
203 public int read(byte[] b) throws IOException {
204 return read(b, 0, b.length);
205 }
206
207 @Override
208 public int read(byte[] b, int boff, int blen) throws IOException {
209 checkClose();
210
211 if (b == null) {
212 throw new NullPointerException();
213 } else if (boff < 0 || blen < 0 || blen > b.length - boff) {
214 throw new IndexOutOfBoundsException();
215 } else if (blen == 0) {
216 return 0;
217 }
218
219 int done = 0;
220 while (hasMoreData() && done < blen) {
221 preRead();
222 if (hasMoreData()) {
223 for (int i = pos; i < blen && i < len; i++) {
224 b[boff + done] = buffer[i];
225 pos++;
226 done++;
227 }
228 }
229 }
230
231 return done > 0 ? done : -1;
232 }
233
234 @Override
235 public long skip(long n) throws IOException {
236 // TODO Auto-generated method stub
237 return super.skip(n);
238 }
239
240 @Override
241 public int available() {
242 if (closed) {
243 return 0;
244 }
245
246 return Math.max(0, len - pos);
247 }
248
249 /**
250 * Closes this stream and releases any system resources associated with the
251 * stream.
252 * <p>
253 * Including the under-laying {@link InputStream}.
254 * <p>
255 * <b>Note:</b> if you called the {@link NextableInputStream#open()} method
256 * prior to this one, it will just decrease the internal count of how many
257 * open streams it held and do nothing else. The stream will actually be
258 * closed when you have called {@link NextableInputStream#close()} once more
259 * than {@link NextableInputStream#open()}.
260 *
261 * @exception IOException
262 * in case of I/O error
263 */
264 @Override
265 public synchronized void close() throws IOException {
266 close(true);
267 }
268
269 /**
270 * Closes this stream and releases any system resources associated with the
271 * stream.
272 * <p>
273 * Including the under-laying {@link InputStream} if
274 * <tt>incudingSubStream</tt> is true.
275 * <p>
276 * You can call this method multiple times, it will not cause an
277 * {@link IOException} for subsequent calls.
278 * <p>
279 * <b>Note:</b> if you called the {@link NextableInputStream#open()} method
280 * prior to this one, it will just decrease the internal count of how many
281 * open streams it held and do nothing else. The stream will actually be
282 * closed when you have called {@link NextableInputStream#close()} once more
283 * than {@link NextableInputStream#open()}.
284 *
285 * @exception IOException
286 * in case of I/O error
287 */
288 public synchronized void close(boolean includingSubStream)
289 throws IOException {
290 if (!closed) {
291 if (openCounter > 0) {
292 openCounter--;
293 } else {
294 closed = true;
295 if (includingSubStream && in != null) {
296 in.close();
297 }
298 }
299 }
300 }
301
302 /**
303 * Check if we still have some data in the buffer and, if not, fetch some.
304 *
305 * @return TRUE if we fetched some data, FALSE if there are still some in
306 * the buffer
307 *
308 * @throws IOException
309 * in case of I/O error
310 */
311 private boolean preRead() throws IOException {
312 boolean hasRead = false;
313 if (!eof && in != null && pos >= len && !stopped) {
314 pos = 0;
315 len = in.read(buffer);
316 if (len > 0) {
317 bytesRead += len;
318 }
319
320 checkBuffer(true);
321 hasRead = true;
322 }
323
324 if (pos >= len) {
325 eof = true;
326 }
327
328 return hasRead;
329 }
330
331 /**
332 * We have more data available in the buffer or we can fetch more.
333 *
334 * @return TRUE if it is the case, FALSE if not
335 */
336 private boolean hasMoreData() {
337 return !closed && started && !(eof && pos >= len);
338 }
339
340 /**
341 * Check that the buffer didn't overshot to the next item, and fix
342 * {@link NextableInputStream#len} if needed.
343 * <p>
344 * If {@link NextableInputStream#len} is fixed,
345 * {@link NextableInputStream#eof} and {@link NextableInputStream#stopped}
346 * are set to TRUE.
347 *
348 * @param newBuffer
349 * we changed the buffer, we need to clear some information in
350 * the {@link NextableInputStreamStep}
351 */
352 private void checkBuffer(boolean newBuffer) {
353 if (step != null && len > 0) {
354 if (newBuffer) {
355 step.clearBuffer();
356 }
357
358 int stopAt = step.stop(buffer, pos, len);
359 if (stopAt >= 0) {
360 len = stopAt;
361 eof = true;
362 stopped = true;
363 }
364 }
365 }
366
367 /**
368 * The implementation of {@link NextableInputStream#next()} and
369 * {@link NextableInputStream#nextAll()}.
370 *
371 * @param all
372 * TRUE for {@link NextableInputStream#nextAll()}, FALSE for
373 * {@link NextableInputStream#next()}
374 *
375 * @return TRUE if we unblocked the next sub-stream, FALSE if not
376 *
377 * @throws IOException
378 * in case of I/O error or if the stream is closed
379 */
380 private boolean next(boolean all) throws IOException {
381 checkClose();
382
383 if (!started) {
384 // First call before being allowed to read
385 started = true;
386
387 if (all) {
388 step = null;
389 }
390
391 return true;
392 }
393
394 if (step != null && !hasMoreData() && stopped) {
395 len = step.getResumeLen();
396 pos += step.getResumeSkip();
397 eof = false;
398
399 if (all) {
400 step = null;
401 }
402
403 if (!preRead()) {
404 checkBuffer(false);
405 }
406
407 // consider that if EOF, there is no next
408 return hasMoreData();
409 }
410
411 return false;
412 }
413
414 /**
415 * Check that the stream was not closed, and throw an {@link IOException} if
416 * it was.
417 *
418 * @throws IOException
419 * if it was closed
420 */
421 private void checkClose() throws IOException {
422 if (closed) {
423 throw new IOException(
424 "This NextableInputStream was closed, you cannot use it anymore.");
425 }
426 }
427 }