NextableInputStream: better skip() + more tests
[nikiroo-utils.git] / src / be / nikiroo / utils / NextableInputStream.java
1 package be.nikiroo.utils;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Arrays;
6
7 /**
8 * This {@link InputStream} can be separated into sub-streams (you can process
9 * it as a normal {@link InputStream} but, when it is spent, you can call
10 * {@link NextableInputStream#next()} on it to unlock new data).
11 * <p>
12 * The separation in sub-streams is done via {@link NextableInputStreamStep}.
13 *
14 * @author niki
15 */
16 public class NextableInputStream extends InputStream {
17 private NextableInputStreamStep step;
18 private boolean started;
19 private boolean stopped;
20 private boolean closed;
21
22 private InputStream in;
23 private int openCounter;
24 private boolean eof;
25 private int pos;
26 private int len;
27 private byte[] buffer;
28
29 // special use, prefetched next buffer
30 private byte[] buffer2;
31 private int pos2;
32 private int len2;
33 private byte[] originalBuffer;
34
35 private long bytesRead;
36
37 /**
38 * Create a new {@link NextableInputStream} that wraps the given
39 * {@link InputStream}.
40 *
41 * @param in
42 * the {@link InputStream} to wrap
43 * @param step
44 * how to separate it into sub-streams (can be NULL, but in that
45 * case it will behave as a normal {@link InputStream})
46 */
47 public NextableInputStream(InputStream in, NextableInputStreamStep step) {
48 this.in = in;
49 this.step = step;
50
51 this.buffer = new byte[4096];
52 this.originalBuffer = this.buffer;
53 this.pos = 0;
54 this.len = 0;
55 }
56
57 /**
58 * Create a new {@link NextableInputStream} that wraps the given bytes array
59 * as a data source.
60 *
61 * @param in
62 * the array to wrap, cannot be NULL
63 * @param step
64 * how to separate it into sub-streams (can be NULL, but in that
65 * case it will behave as a normal {@link InputStream})
66 */
67 public NextableInputStream(byte[] in, NextableInputStreamStep step) {
68 this(in, step, 0, in.length);
69 }
70
71 /**
72 * Create a new {@link NextableInputStream} that wraps the given bytes array
73 * as a data source.
74 *
75 * @param in
76 * the array to wrap, cannot be NULL
77 * @param step
78 * how to separate it into sub-streams (can be NULL, but in that
79 * case it will behave as a normal {@link InputStream})
80 * @param offset
81 * the offset to start the reading at
82 * @param length
83 * the number of bytes to take into account in the array,
84 * starting from the offset
85 *
86 * @throws NullPointerException
87 * if the array is NULL
88 * @throws IndexOutOfBoundsException
89 * if the offset and length do not correspond to the given array
90 */
91 public NextableInputStream(byte[] in, NextableInputStreamStep step,
92 int offset, int length) {
93 if (in == null) {
94 throw new NullPointerException();
95 } else if (offset < 0 || length < 0 || length > in.length - offset) {
96 throw new IndexOutOfBoundsException();
97 }
98
99 this.in = null;
100 this.step = step;
101
102 this.buffer = in;
103 this.originalBuffer = this.buffer;
104 this.pos = offset;
105 this.len = length;
106
107 checkBuffer(true);
108 }
109
110 /**
111 * Return this very same {@link NextableInputStream}, but keep a counter of
112 * how many streams were open this way. When calling
113 * {@link NextableInputStream#close()}, decrease this counter if it is not
114 * already zero instead of actually closing the stream.
115 * <p>
116 * You are now responsible for it &mdash; you <b>must</b> close it.
117 * <p>
118 * This method allows you to use a wrapping stream around this one and still
119 * close the wrapping stream.
120 *
121 * @return the same stream, but you are now responsible for closing it
122 *
123 * @throws IOException
124 * in case of I/O error or if the stream is closed
125 */
126 public synchronized InputStream open() throws IOException {
127 checkClose();
128 openCounter++;
129 return this;
130 }
131
132 /**
133 * Unblock the processing of the next sub-stream.
134 * <p>
135 * It can only be called when the "current" stream is spent (i.e., you must
136 * first process the stream until it is spent).
137 * <p>
138 * We consider that when the under-laying {@link InputStream} is also spent,
139 * we cannot have a next sub-stream (it will thus return FALSE).
140 * <p>
141 * {@link IOException}s can happen when we have no data available in the
142 * buffer; in that case, we fetch more data to know if we can have a next
143 * sub-stream or not.
144 *
145 * @return TRUE if we unblocked the next sub-stream, FALSE if not
146 *
147 * @throws IOException
148 * in case of I/O error or if the stream is closed
149 */
150 public boolean next() throws IOException {
151 return next(false);
152 }
153
154 /**
155 * Unblock the next sub-stream as would have done
156 * {@link NextableInputStream#next()}, but disable the sub-stream systems.
157 * <p>
158 * That is, the next stream, if any, will be the last one and will not be
159 * subject to the {@link NextableInputStreamStep}.
160 *
161 * @return TRUE if we unblocked the next sub-stream, FALSE if not
162 *
163 * @throws IOException
164 * in case of I/O error or if the stream is closed
165 */
166 public boolean nextAll() throws IOException {
167 return next(true);
168 }
169
170 // max is buffer.size !
171 public boolean startsWiths(String search) throws IOException {
172 return startsWith(search.getBytes("UTF-8"));
173 }
174
175 // max is buffer.size !
176 public boolean startsWith(byte[] search) throws IOException {
177 if (search.length > originalBuffer.length) {
178 throw new IOException(
179 "This stream does not support searching for more than "
180 + buffer.length + " bytes");
181 }
182
183 checkClose();
184
185 if (available() < search.length) {
186 preRead();
187 }
188
189 if (available() >= search.length) {
190 // Easy path
191 return startsWith(search, buffer, pos);
192 } else if (!eof) {
193 // Harder path
194 if (buffer2 == null && buffer.length == originalBuffer.length) {
195 buffer2 = Arrays.copyOf(buffer, buffer.length * 2);
196
197 pos2 = buffer.length;
198 len2 = in.read(buffer2, pos2, buffer.length);
199 if (len2 > 0) {
200 bytesRead += len2;
201 }
202
203 // Note: here, len/len2 = INDEX of last good byte
204 len2 += pos2;
205 }
206
207 if (available() + (len2 - pos2) >= search.length) {
208 return startsWith(search, buffer2, pos2);
209 }
210 }
211
212 return false;
213 }
214
215 /**
216 * The number of bytes read from the under-laying {@link InputStream}.
217 *
218 * @return the number of bytes
219 */
220 public long getBytesRead() {
221 return bytesRead;
222 }
223
224 /**
225 * Check if this stream is totally spent (no more data to read or to
226 * process).
227 *
228 * @return TRUE if it is
229 */
230 public boolean eof() {
231 return closed || (len < 0 && !hasMoreData());
232 }
233
234 @Override
235 public int read() throws IOException {
236 checkClose();
237
238 preRead();
239 if (eof) {
240 return -1;
241 }
242
243 return buffer[pos++];
244 }
245
246 @Override
247 public int read(byte[] b) throws IOException {
248 return read(b, 0, b.length);
249 }
250
251 @Override
252 public int read(byte[] b, int boff, int blen) throws IOException {
253 checkClose();
254
255 if (b == null) {
256 throw new NullPointerException();
257 } else if (boff < 0 || blen < 0 || blen > b.length - boff) {
258 throw new IndexOutOfBoundsException();
259 } else if (blen == 0) {
260 return 0;
261 }
262
263 int done = 0;
264 while (hasMoreData() && done < blen) {
265 preRead();
266 if (hasMoreData()) {
267 for (int i = pos; i < blen && i < len; i++) {
268 b[boff + done] = buffer[i];
269 pos++;
270 done++;
271 }
272 }
273 }
274
275 return done > 0 ? done : -1;
276 }
277
278 @Override
279 public long skip(long n) throws IOException {
280 if (n <= 0) {
281 return 0;
282 }
283
284 long skipped = 0;
285 while (hasMoreData() && n > 0) {
286 preRead();
287
288 long inBuffer = Math.min(n, available());
289 pos += inBuffer;
290 n -= inBuffer;
291 skipped += inBuffer;
292 }
293
294 return skipped;
295 }
296
297 @Override
298 public int available() {
299 if (closed) {
300 return 0;
301 }
302
303 return Math.max(0, len - pos);
304 }
305
306 /**
307 * Closes this stream and releases any system resources associated with the
308 * stream.
309 * <p>
310 * Including the under-laying {@link InputStream}.
311 * <p>
312 * <b>Note:</b> if you called the {@link NextableInputStream#open()} method
313 * prior to this one, it will just decrease the internal count of how many
314 * open streams it held and do nothing else. The stream will actually be
315 * closed when you have called {@link NextableInputStream#close()} once more
316 * than {@link NextableInputStream#open()}.
317 *
318 * @exception IOException
319 * in case of I/O error
320 */
321 @Override
322 public synchronized void close() throws IOException {
323 close(true);
324 }
325
326 /**
327 * Closes this stream and releases any system resources associated with the
328 * stream.
329 * <p>
330 * Including the under-laying {@link InputStream} if
331 * <tt>incudingSubStream</tt> is true.
332 * <p>
333 * You can call this method multiple times, it will not cause an
334 * {@link IOException} for subsequent calls.
335 * <p>
336 * <b>Note:</b> if you called the {@link NextableInputStream#open()} method
337 * prior to this one, it will just decrease the internal count of how many
338 * open streams it held and do nothing else. The stream will actually be
339 * closed when you have called {@link NextableInputStream#close()} once more
340 * than {@link NextableInputStream#open()}.
341 *
342 * @exception IOException
343 * in case of I/O error
344 */
345 public synchronized void close(boolean includingSubStream)
346 throws IOException {
347 if (!closed) {
348 if (openCounter > 0) {
349 openCounter--;
350 } else {
351 closed = true;
352 if (includingSubStream && in != null) {
353 in.close();
354 }
355 }
356 }
357 }
358
359 /**
360 * Check if we still have some data in the buffer and, if not, fetch some.
361 *
362 * @return TRUE if we fetched some data, FALSE if there are still some in
363 * the buffer
364 *
365 * @throws IOException
366 * in case of I/O error
367 */
368 private boolean preRead() throws IOException {
369 boolean hasRead = false;
370 if (!eof && in != null && pos >= len && !stopped) {
371 pos = 0;
372 if (buffer2 != null) {
373 buffer = buffer2;
374 pos = pos2;
375 len = len2;
376
377 buffer2 = null;
378 pos2 = 0;
379 len2 = 0;
380 } else {
381 buffer = originalBuffer;
382 len = in.read(buffer);
383 if (len > 0) {
384 bytesRead += len;
385 }
386 }
387
388 checkBuffer(true);
389 hasRead = true;
390 }
391
392 if (pos >= len) {
393 eof = true;
394 }
395
396 return hasRead;
397 }
398
399 /**
400 * We have more data available in the buffer or we can fetch more.
401 *
402 * @return TRUE if it is the case, FALSE if not
403 */
404 private boolean hasMoreData() {
405 return !closed && started && !(eof && pos >= len);
406 }
407
408 /**
409 * Check that the buffer didn't overshot to the next item, and fix
410 * {@link NextableInputStream#len} if needed.
411 * <p>
412 * If {@link NextableInputStream#len} is fixed,
413 * {@link NextableInputStream#eof} and {@link NextableInputStream#stopped}
414 * are set to TRUE.
415 *
416 * @param newBuffer
417 * we changed the buffer, we need to clear some information in
418 * the {@link NextableInputStreamStep}
419 */
420 private void checkBuffer(boolean newBuffer) {
421 if (step != null && len > 0) {
422 if (newBuffer) {
423 step.clearBuffer();
424 }
425
426 int stopAt = step.stop(buffer, pos, len);
427 if (stopAt >= 0) {
428 len = stopAt;
429 eof = true;
430 stopped = true;
431 }
432 }
433 }
434
435 /**
436 * The implementation of {@link NextableInputStream#next()} and
437 * {@link NextableInputStream#nextAll()}.
438 *
439 * @param all
440 * TRUE for {@link NextableInputStream#nextAll()}, FALSE for
441 * {@link NextableInputStream#next()}
442 *
443 * @return TRUE if we unblocked the next sub-stream, FALSE if not
444 *
445 * @throws IOException
446 * in case of I/O error or if the stream is closed
447 */
448 private boolean next(boolean all) throws IOException {
449 checkClose();
450
451 if (!started) {
452 // First call before being allowed to read
453 started = true;
454
455 if (all) {
456 step = null;
457 }
458
459 return true;
460 }
461
462 if (step != null && !hasMoreData() && stopped) {
463 len = step.getResumeLen();
464 pos += step.getResumeSkip();
465 eof = false;
466
467 if (all) {
468 step = null;
469 }
470
471 if (!preRead()) {
472 checkBuffer(false);
473 }
474
475 // consider that if EOF, there is no next
476 return hasMoreData();
477 }
478
479 return false;
480 }
481
482 /**
483 * Check that the stream was not closed, and throw an {@link IOException} if
484 * it was.
485 *
486 * @throws IOException
487 * if it was closed
488 */
489 private void checkClose() throws IOException {
490 if (closed) {
491 throw new IOException(
492 "This NextableInputStream was closed, you cannot use it anymore.");
493 }
494 }
495
496 // buffer must be > search
497 static private boolean startsWith(byte[] search, byte[] buffer,
498 int offset) {
499 boolean same = true;
500 for (int i = 0; i < search.length; i++) {
501 if (search[i] != buffer[offset + i]) {
502 same = false;
503 break;
504 }
505 }
506
507 return same;
508 }
509 }