d9f27ceff87446bf2e6f5bfe49578b4bf687986c
[nikiroo-utils.git] / src / be / nikiroo / utils / NextableInputStream.java
1 package be.nikiroo.utils;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Arrays;
6
7 /**
8 * This {@link InputStream} can be separated into sub-streams (you can process
9 * it as a normal {@link InputStream} but, when it is spent, you can call
10 * {@link NextableInputStream#next()} on it to unlock new data).
11 * <p>
12 * The separation in sub-streams is done via {@link NextableInputStreamStep}.
13 *
14 * @author niki
15 */
16 public class NextableInputStream extends InputStream {
17 private NextableInputStreamStep step;
18 private boolean started;
19 private boolean stopped;
20 private boolean closed;
21
22 private InputStream in;
23 private int openCounter;
24 private boolean eof;
25 private int pos;
26 private int len;
27 private byte[] buffer;
28
29 // special use, prefetched next buffer
30 private byte[] buffer2;
31 private int pos2;
32 private int len2;
33 private byte[] originalBuffer;
34
35 private long bytesRead;
36
37 /**
38 * Create a new {@link NextableInputStream} that wraps the given
39 * {@link InputStream}.
40 *
41 * @param in
42 * the {@link InputStream} to wrap
43 * @param step
44 * how to separate it into sub-streams (can be NULL, but in that
45 * case it will behave as a normal {@link InputStream})
46 */
47 public NextableInputStream(InputStream in, NextableInputStreamStep step) {
48 this.in = in;
49 this.step = step;
50
51 this.buffer = new byte[4096];
52 this.originalBuffer = this.buffer;
53 this.pos = 0;
54 this.len = 0;
55 }
56
57 /**
58 * Create a new {@link NextableInputStream} that wraps the given bytes array
59 * as a data source.
60 *
61 * @param in
62 * the array to wrap, cannot be NULL
63 * @param step
64 * how to separate it into sub-streams (can be NULL, but in that
65 * case it will behave as a normal {@link InputStream})
66 */
67 public NextableInputStream(byte[] in, NextableInputStreamStep step) {
68 this(in, step, 0, in.length);
69 }
70
71 /**
72 * Create a new {@link NextableInputStream} that wraps the given bytes array
73 * as a data source.
74 *
75 * @param in
76 * the array to wrap, cannot be NULL
77 * @param step
78 * how to separate it into sub-streams (can be NULL, but in that
79 * case it will behave as a normal {@link InputStream})
80 * @param offset
81 * the offset to start the reading at
82 * @param length
83 * the number of bytes to take into account in the array,
84 * starting from the offset
85 *
86 * @throws NullPointerException
87 * if the array is NULL
88 * @throws IndexOutOfBoundsException
89 * if the offset and length do not correspond to the given array
90 */
91 public NextableInputStream(byte[] in, NextableInputStreamStep step,
92 int offset, int length) {
93 if (in == null) {
94 throw new NullPointerException();
95 } else if (offset < 0 || length < 0 || length > in.length - offset) {
96 throw new IndexOutOfBoundsException();
97 }
98
99 this.in = null;
100 this.step = step;
101
102 this.buffer = in;
103 this.originalBuffer = this.buffer;
104 this.pos = offset;
105 this.len = length;
106
107 checkBuffer(true);
108 }
109
110 /**
111 * Return this very same {@link NextableInputStream}, but keep a counter of
112 * how many streams were open this way. When calling
113 * {@link NextableInputStream#close()}, decrease this counter if it is not
114 * already zero instead of actually closing the stream.
115 * <p>
116 * You are now responsible for it &mdash; you <b>must</b> close it.
117 * <p>
118 * This method allows you to use a wrapping stream around this one and still
119 * close the wrapping stream.
120 *
121 * @return the same stream, but you are now responsible for closing it
122 *
123 * @throws IOException
124 * in case of I/O error or if the stream is closed
125 */
126 public synchronized InputStream open() throws IOException {
127 checkClose();
128 openCounter++;
129 return this;
130 }
131
132 /**
133 * Unblock the processing of the next sub-stream.
134 * <p>
135 * It can only be called when the "current" stream is spent (i.e., you must
136 * first process the stream until it is spent).
137 * <p>
138 * We consider that when the under-laying {@link InputStream} is also spent,
139 * we cannot have a next sub-stream (it will thus return FALSE).
140 * <p>
141 * {@link IOException}s can happen when we have no data available in the
142 * buffer; in that case, we fetch more data to know if we can have a next
143 * sub-stream or not.
144 *
145 * @return TRUE if we unblocked the next sub-stream, FALSE if not
146 *
147 * @throws IOException
148 * in case of I/O error or if the stream is closed
149 */
150 public boolean next() throws IOException {
151 return next(false);
152 }
153
154 /**
155 * Unblock the next sub-stream as would have done
156 * {@link NextableInputStream#next()}, but disable the sub-stream systems.
157 * <p>
158 * That is, the next stream, if any, will be the last one and will not be
159 * subject to the {@link NextableInputStreamStep}.
160 *
161 * @return TRUE if we unblocked the next sub-stream, FALSE if not
162 *
163 * @throws IOException
164 * in case of I/O error or if the stream is closed
165 */
166 public boolean nextAll() throws IOException {
167 return next(true);
168 }
169
170 // max is buffer.size !
171 public boolean startsWiths(String search) throws IOException {
172 return startsWith(search.getBytes("UTF-8"));
173 }
174
175 // max is buffer.size !
176 public boolean startsWith(byte[] search) throws IOException {
177 if (search.length > originalBuffer.length) {
178 throw new IOException(
179 "This stream does not support searching for more than "
180 + buffer.length + " bytes");
181 }
182
183 checkClose();
184
185 if (available() < search.length) {
186 preRead();
187 }
188
189 if (available() >= search.length) {
190 // Easy path
191 return startsWith(search, buffer, pos);
192 } else if (!eof) {
193 // Harder path
194 if (buffer2 == null && buffer.length == originalBuffer.length) {
195 buffer2 = Arrays.copyOf(buffer, buffer.length * 2);
196
197 pos2 = buffer.length;
198 len2 = in.read(buffer2, pos2, buffer.length);
199 if (len2 > 0) {
200 bytesRead += len2;
201 }
202
203 // Note: here, len/len2 = INDEX of last good byte
204 len2 += pos2;
205 }
206
207 if (available() + (len2 - pos2) >= search.length) {
208 return startsWith(search, buffer2, pos2);
209 }
210 }
211
212 return false;
213 }
214
215 /**
216 * The number of bytes read from the under-laying {@link InputStream}.
217 *
218 * @return the number of bytes
219 */
220 public long getBytesRead() {
221 return bytesRead;
222 }
223
224 /**
225 * Check if this stream is totally spent (no more data to read or to
226 * process).
227 * <p>
228 * Note: an empty stream that is still not started will return FALSE, as we
229 * don't know yet if it is empty.
230 *
231 * @return TRUE if it is
232 */
233 public boolean eof() {
234 return closed || (len < 0 && !hasMoreData());
235 }
236
237 @Override
238 public int read() throws IOException {
239 checkClose();
240
241 preRead();
242 if (eof) {
243 return -1;
244 }
245
246 return buffer[pos++];
247 }
248
249 @Override
250 public int read(byte[] b) throws IOException {
251 return read(b, 0, b.length);
252 }
253
254 @Override
255 public int read(byte[] b, int boff, int blen) throws IOException {
256 checkClose();
257
258 if (b == null) {
259 throw new NullPointerException();
260 } else if (boff < 0 || blen < 0 || blen > b.length - boff) {
261 throw new IndexOutOfBoundsException();
262 } else if (blen == 0) {
263 return 0;
264 }
265
266 int done = 0;
267 while (hasMoreData() && done < blen) {
268 preRead();
269 if (hasMoreData()) {
270 int now = Math.min(blen, len) - pos;
271 if (now > 0) {
272 System.arraycopy(buffer, pos, b, boff, now);
273 pos += now;
274 done += now;
275 }
276 }
277 }
278
279 return done > 0 ? done : -1;
280 }
281
282 @Override
283 public long skip(long n) throws IOException {
284 if (n <= 0) {
285 return 0;
286 }
287
288 long skipped = 0;
289 while (hasMoreData() && n > 0) {
290 preRead();
291
292 long inBuffer = Math.min(n, available());
293 pos += inBuffer;
294 n -= inBuffer;
295 skipped += inBuffer;
296 }
297
298 return skipped;
299 }
300
301 @Override
302 public int available() {
303 if (closed) {
304 return 0;
305 }
306
307 return Math.max(0, len - pos);
308 }
309
310 /**
311 * Closes this stream and releases any system resources associated with the
312 * stream.
313 * <p>
314 * Including the under-laying {@link InputStream}.
315 * <p>
316 * <b>Note:</b> if you called the {@link NextableInputStream#open()} method
317 * prior to this one, it will just decrease the internal count of how many
318 * open streams it held and do nothing else. The stream will actually be
319 * closed when you have called {@link NextableInputStream#close()} once more
320 * than {@link NextableInputStream#open()}.
321 *
322 * @exception IOException
323 * in case of I/O error
324 */
325 @Override
326 public synchronized void close() throws IOException {
327 close(true);
328 }
329
330 /**
331 * Closes this stream and releases any system resources associated with the
332 * stream.
333 * <p>
334 * Including the under-laying {@link InputStream} if
335 * <tt>incudingSubStream</tt> is true.
336 * <p>
337 * You can call this method multiple times, it will not cause an
338 * {@link IOException} for subsequent calls.
339 * <p>
340 * <b>Note:</b> if you called the {@link NextableInputStream#open()} method
341 * prior to this one, it will just decrease the internal count of how many
342 * open streams it held and do nothing else. The stream will actually be
343 * closed when you have called {@link NextableInputStream#close()} once more
344 * than {@link NextableInputStream#open()}.
345 *
346 * @exception IOException
347 * in case of I/O error
348 */
349 public synchronized void close(boolean includingSubStream)
350 throws IOException {
351 if (!closed) {
352 if (openCounter > 0) {
353 openCounter--;
354 } else {
355 closed = true;
356 if (includingSubStream && in != null) {
357 in.close();
358 }
359 }
360 }
361 }
362
363 /**
364 * Check if we still have some data in the buffer and, if not, fetch some.
365 *
366 * @return TRUE if we fetched some data, FALSE if there are still some in
367 * the buffer
368 *
369 * @throws IOException
370 * in case of I/O error
371 */
372 private boolean preRead() throws IOException {
373 boolean hasRead = false;
374 if (!eof && in != null && pos >= len && !stopped) {
375 pos = 0;
376 if (buffer2 != null) {
377 buffer = buffer2;
378 pos = pos2;
379 len = len2;
380
381 buffer2 = null;
382 pos2 = 0;
383 len2 = 0;
384 } else {
385 buffer = originalBuffer;
386 len = in.read(buffer);
387 if (len > 0) {
388 bytesRead += len;
389 }
390 }
391
392 checkBuffer(true);
393 hasRead = true;
394 }
395
396 if (pos >= len) {
397 eof = true;
398 }
399
400 return hasRead;
401 }
402
403 /**
404 * We have more data available in the buffer or we can fetch more.
405 *
406 * @return TRUE if it is the case, FALSE if not
407 */
408 private boolean hasMoreData() {
409 return !closed && started && !(eof && pos >= len);
410 }
411
412 /**
413 * Check that the buffer didn't overshot to the next item, and fix
414 * {@link NextableInputStream#len} if needed.
415 * <p>
416 * If {@link NextableInputStream#len} is fixed,
417 * {@link NextableInputStream#eof} and {@link NextableInputStream#stopped}
418 * are set to TRUE.
419 *
420 * @param newBuffer
421 * we changed the buffer, we need to clear some information in
422 * the {@link NextableInputStreamStep}
423 */
424 private void checkBuffer(boolean newBuffer) {
425 if (step != null && len > 0) {
426 if (newBuffer) {
427 step.clearBuffer();
428 }
429
430 int stopAt = step.stop(buffer, pos, len);
431 if (stopAt >= 0) {
432 len = stopAt;
433 eof = true;
434 stopped = true;
435 }
436 }
437 }
438
439 /**
440 * The implementation of {@link NextableInputStream#next()} and
441 * {@link NextableInputStream#nextAll()}.
442 *
443 * @param all
444 * TRUE for {@link NextableInputStream#nextAll()}, FALSE for
445 * {@link NextableInputStream#next()}
446 *
447 * @return TRUE if we unblocked the next sub-stream, FALSE if not
448 *
449 * @throws IOException
450 * in case of I/O error or if the stream is closed
451 */
452 private boolean next(boolean all) throws IOException {
453 checkClose();
454
455 if (!started) {
456 // First call before being allowed to read
457 started = true;
458
459 if (all) {
460 step = null;
461 }
462
463 return true;
464 }
465
466 if (step != null && !hasMoreData() && stopped) {
467 len = step.getResumeLen();
468 pos += step.getResumeSkip();
469 eof = false;
470
471 if (all) {
472 step = null;
473 }
474
475 if (!preRead()) {
476 checkBuffer(false);
477 }
478
479 // consider that if EOF, there is no next
480 return hasMoreData();
481 }
482
483 return false;
484 }
485
486 /**
487 * Check that the stream was not closed, and throw an {@link IOException} if
488 * it was.
489 *
490 * @throws IOException
491 * if it was closed
492 */
493 private void checkClose() throws IOException {
494 if (closed) {
495 throw new IOException(
496 "This NextableInputStream was closed, you cannot use it anymore.");
497 }
498 }
499
500 // buffer must be > search
501 static private boolean startsWith(byte[] search, byte[] buffer, int offset) {
502 boolean same = true;
503 for (int i = 0; i < search.length; i++) {
504 if (search[i] != buffer[offset + i]) {
505 same = false;
506 break;
507 }
508 }
509
510 return same;
511 }
512 }