7ced59804b02ce5ccad360cf8c193ef1992765f2
[nikiroo-utils.git] / src / be / nikiroo / utils / NextableInputStream.java
1 package be.nikiroo.utils;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.Arrays;
6
7 /**
8 * This {@link InputStream} can be separated into sub-streams (you can process
9 * it as a normal {@link InputStream} but, when it is spent, you can call
10 * {@link NextableInputStream#next()} on it to unlock new data).
11 * <p>
12 * The separation in sub-streams is done via {@link NextableInputStreamStep}.
13 *
14 * @author niki
15 */
16 public class NextableInputStream extends InputStream {
17 private NextableInputStreamStep step;
18 private boolean started;
19 private boolean stopped;
20 private boolean closed;
21
22 private InputStream in;
23 private int openCounter;
24 private boolean eof;
25 private int pos;
26 private int len;
27 private byte[] buffer;
28
29 // special use, prefetched next buffer
30 private byte[] buffer2;
31 private int pos2;
32 private int len2;
33 private byte[] originalBuffer;
34
35 private long bytesRead;
36
37 /**
38 * Create a new {@link NextableInputStream} that wraps the given
39 * {@link InputStream}.
40 *
41 * @param in
42 * the {@link InputStream} to wrap
43 * @param step
44 * how to separate it into sub-streams (can be NULL, but in that
45 * case it will behave as a normal {@link InputStream})
46 */
47 public NextableInputStream(InputStream in, NextableInputStreamStep step) {
48 this.in = in;
49 this.step = step;
50
51 this.buffer = new byte[4096];
52 this.originalBuffer = this.buffer;
53 this.pos = 0;
54 this.len = 0;
55 }
56
57 /**
58 * Create a new {@link NextableInputStream} that wraps the given bytes array
59 * as a data source.
60 *
61 * @param in
62 * the array to wrap, cannot be NULL
63 * @param step
64 * how to separate it into sub-streams (can be NULL, but in that
65 * case it will behave as a normal {@link InputStream})
66 */
67 public NextableInputStream(byte[] in, NextableInputStreamStep step) {
68 this(in, step, 0, in.length);
69 }
70
71 /**
72 * Create a new {@link NextableInputStream} that wraps the given bytes array
73 * as a data source.
74 *
75 * @param in
76 * the array to wrap, cannot be NULL
77 * @param step
78 * how to separate it into sub-streams (can be NULL, but in that
79 * case it will behave as a normal {@link InputStream})
80 * @param offset
81 * the offset to start the reading at
82 * @param length
83 * the number of bytes to take into account in the array,
84 * starting from the offset
85 *
86 * @throws NullPointerException
87 * if the array is NULL
88 * @throws IndexOutOfBoundsException
89 * if the offset and length do not correspond to the given array
90 */
91 public NextableInputStream(byte[] in, NextableInputStreamStep step,
92 int offset, int length) {
93 if (in == null) {
94 throw new NullPointerException();
95 } else if (offset < 0 || length < 0 || length > in.length - offset) {
96 throw new IndexOutOfBoundsException();
97 }
98
99 this.in = null;
100 this.step = step;
101
102 this.buffer = in;
103 this.originalBuffer = this.buffer;
104 this.pos = offset;
105 this.len = length;
106
107 checkBuffer(true);
108 }
109
110 /**
111 * Return this very same {@link NextableInputStream}, but keep a counter of
112 * how many streams were open this way. When calling
113 * {@link NextableInputStream#close()}, decrease this counter if it is not
114 * already zero instead of actually closing the stream.
115 * <p>
116 * You are now responsible for it &mdash; you <b>must</b> close it.
117 * <p>
118 * This method allows you to use a wrapping stream around this one and still
119 * close the wrapping stream.
120 *
121 * @return the same stream, but you are now responsible for closing it
122 *
123 * @throws IOException
124 * in case of I/O error or if the stream is closed
125 */
126 public synchronized InputStream open() throws IOException {
127 checkClose();
128 openCounter++;
129 return this;
130 }
131
132 /**
133 * Unblock the processing of the next sub-stream.
134 * <p>
135 * It can only be called when the "current" stream is spent (i.e., you must
136 * first process the stream until it is spent).
137 * <p>
138 * We consider that when the under-laying {@link InputStream} is also spent,
139 * we cannot have a next sub-stream (it will thus return FALSE).
140 * <p>
141 * {@link IOException}s can happen when we have no data available in the
142 * buffer; in that case, we fetch more data to know if we can have a next
143 * sub-stream or not.
144 *
145 * @return TRUE if we unblocked the next sub-stream, FALSE if not
146 *
147 * @throws IOException
148 * in case of I/O error or if the stream is closed
149 */
150 public boolean next() throws IOException {
151 return next(false);
152 }
153
154 /**
155 * Unblock the next sub-stream as would have done
156 * {@link NextableInputStream#next()}, but disable the sub-stream systems.
157 * <p>
158 * That is, the next stream, if any, will be the last one and will not be
159 * subject to the {@link NextableInputStreamStep}.
160 *
161 * @return TRUE if we unblocked the next sub-stream, FALSE if not
162 *
163 * @throws IOException
164 * in case of I/O error or if the stream is closed
165 */
166 public boolean nextAll() throws IOException {
167 return next(true);
168 }
169
170 // max is buffer.size !
171 public boolean startsWiths(String search) throws IOException {
172 return startsWith(search.getBytes("UTF-8"));
173 }
174
175 // max is buffer.size !
176 public boolean startsWith(byte[] search) throws IOException {
177 if (search.length > originalBuffer.length) {
178 throw new IOException(
179 "This stream does not support searching for more than "
180 + buffer.length + " bytes");
181 }
182
183 checkClose();
184
185 if (available() < search.length) {
186 preRead();
187 }
188
189 if (available() >= search.length) {
190 // Easy path
191 return startsWith(search, buffer, pos);
192 } else if (!eof) {
193 // Harder path
194 if (buffer2 == null && buffer.length == originalBuffer.length) {
195 buffer2 = Arrays.copyOf(buffer, buffer.length * 2);
196
197 pos2 = buffer.length;
198 len2 = in.read(buffer2, pos2, buffer.length);
199 if (len2 > 0) {
200 bytesRead += len2;
201 }
202
203 // Note: here, len/len2 = INDEX of last good byte
204 len2 += pos2;
205 }
206
207 if (available() + (len2 - pos2) >= search.length) {
208 return startsWith(search, buffer2, pos2);
209 }
210 }
211
212 return false;
213 }
214
215 /**
216 * The number of bytes read from the under-laying {@link InputStream}.
217 *
218 * @return the number of bytes
219 */
220 public long getBytesRead() {
221 return bytesRead;
222 }
223
224 /**
225 * Check if this stream is totally spent (no more data to read or to
226 * process).
227 *
228 * @return TRUE if it is
229 */
230 public boolean eof() {
231 return closed || (len < 0 && !hasMoreData());
232 }
233
234 @Override
235 public int read() throws IOException {
236 checkClose();
237
238 preRead();
239 if (eof) {
240 return -1;
241 }
242
243 return buffer[pos++];
244 }
245
246 @Override
247 public int read(byte[] b) throws IOException {
248 return read(b, 0, b.length);
249 }
250
251 @Override
252 public int read(byte[] b, int boff, int blen) throws IOException {
253 checkClose();
254
255 if (b == null) {
256 throw new NullPointerException();
257 } else if (boff < 0 || blen < 0 || blen > b.length - boff) {
258 throw new IndexOutOfBoundsException();
259 } else if (blen == 0) {
260 return 0;
261 }
262
263 int done = 0;
264 while (hasMoreData() && done < blen) {
265 preRead();
266 if (hasMoreData()) {
267 for (int i = pos; i < blen && i < len; i++) {
268 b[boff + done] = buffer[i];
269 pos++;
270 done++;
271 }
272 }
273 }
274
275 return done > 0 ? done : -1;
276 }
277
278 @Override
279 public long skip(long n) throws IOException {
280 // TODO Auto-generated method stub
281 return super.skip(n);
282 }
283
284 @Override
285 public int available() {
286 if (closed) {
287 return 0;
288 }
289
290 return Math.max(0, len - pos);
291 }
292
293 /**
294 * Closes this stream and releases any system resources associated with the
295 * stream.
296 * <p>
297 * Including the under-laying {@link InputStream}.
298 * <p>
299 * <b>Note:</b> if you called the {@link NextableInputStream#open()} method
300 * prior to this one, it will just decrease the internal count of how many
301 * open streams it held and do nothing else. The stream will actually be
302 * closed when you have called {@link NextableInputStream#close()} once more
303 * than {@link NextableInputStream#open()}.
304 *
305 * @exception IOException
306 * in case of I/O error
307 */
308 @Override
309 public synchronized void close() throws IOException {
310 close(true);
311 }
312
313 /**
314 * Closes this stream and releases any system resources associated with the
315 * stream.
316 * <p>
317 * Including the under-laying {@link InputStream} if
318 * <tt>incudingSubStream</tt> is true.
319 * <p>
320 * You can call this method multiple times, it will not cause an
321 * {@link IOException} for subsequent calls.
322 * <p>
323 * <b>Note:</b> if you called the {@link NextableInputStream#open()} method
324 * prior to this one, it will just decrease the internal count of how many
325 * open streams it held and do nothing else. The stream will actually be
326 * closed when you have called {@link NextableInputStream#close()} once more
327 * than {@link NextableInputStream#open()}.
328 *
329 * @exception IOException
330 * in case of I/O error
331 */
332 public synchronized void close(boolean includingSubStream)
333 throws IOException {
334 if (!closed) {
335 if (openCounter > 0) {
336 openCounter--;
337 } else {
338 closed = true;
339 if (includingSubStream && in != null) {
340 in.close();
341 }
342 }
343 }
344 }
345
346 /**
347 * Check if we still have some data in the buffer and, if not, fetch some.
348 *
349 * @return TRUE if we fetched some data, FALSE if there are still some in
350 * the buffer
351 *
352 * @throws IOException
353 * in case of I/O error
354 */
355 private boolean preRead() throws IOException {
356 boolean hasRead = false;
357 if (!eof && in != null && pos >= len && !stopped) {
358 pos = 0;
359 if (buffer2 != null) {
360 buffer = buffer2;
361 pos = pos2;
362 len = len2;
363
364 buffer2 = null;
365 pos2 = 0;
366 len2 = 0;
367 } else {
368 buffer = originalBuffer;
369 len = in.read(buffer);
370 if (len > 0) {
371 bytesRead += len;
372 }
373 }
374
375 checkBuffer(true);
376 hasRead = true;
377 }
378
379 if (pos >= len) {
380 eof = true;
381 }
382
383 return hasRead;
384 }
385
386 /**
387 * We have more data available in the buffer or we can fetch more.
388 *
389 * @return TRUE if it is the case, FALSE if not
390 */
391 private boolean hasMoreData() {
392 return !closed && started && !(eof && pos >= len);
393 }
394
395 /**
396 * Check that the buffer didn't overshot to the next item, and fix
397 * {@link NextableInputStream#len} if needed.
398 * <p>
399 * If {@link NextableInputStream#len} is fixed,
400 * {@link NextableInputStream#eof} and {@link NextableInputStream#stopped}
401 * are set to TRUE.
402 *
403 * @param newBuffer
404 * we changed the buffer, we need to clear some information in
405 * the {@link NextableInputStreamStep}
406 */
407 private void checkBuffer(boolean newBuffer) {
408 if (step != null && len > 0) {
409 if (newBuffer) {
410 step.clearBuffer();
411 }
412
413 int stopAt = step.stop(buffer, pos, len);
414 if (stopAt >= 0) {
415 len = stopAt;
416 eof = true;
417 stopped = true;
418 }
419 }
420 }
421
422 /**
423 * The implementation of {@link NextableInputStream#next()} and
424 * {@link NextableInputStream#nextAll()}.
425 *
426 * @param all
427 * TRUE for {@link NextableInputStream#nextAll()}, FALSE for
428 * {@link NextableInputStream#next()}
429 *
430 * @return TRUE if we unblocked the next sub-stream, FALSE if not
431 *
432 * @throws IOException
433 * in case of I/O error or if the stream is closed
434 */
435 private boolean next(boolean all) throws IOException {
436 checkClose();
437
438 if (!started) {
439 // First call before being allowed to read
440 started = true;
441
442 if (all) {
443 step = null;
444 }
445
446 return true;
447 }
448
449 if (step != null && !hasMoreData() && stopped) {
450 len = step.getResumeLen();
451 pos += step.getResumeSkip();
452 eof = false;
453
454 if (all) {
455 step = null;
456 }
457
458 if (!preRead()) {
459 checkBuffer(false);
460 }
461
462 // consider that if EOF, there is no next
463 return hasMoreData();
464 }
465
466 return false;
467 }
468
469 /**
470 * Check that the stream was not closed, and throw an {@link IOException} if
471 * it was.
472 *
473 * @throws IOException
474 * if it was closed
475 */
476 private void checkClose() throws IOException {
477 if (closed) {
478 throw new IOException(
479 "This NextableInputStream was closed, you cannot use it anymore.");
480 }
481 }
482
483 // buffer must be > search
484 static private boolean startsWith(byte[] search, byte[] buffer,
485 int offset) {
486 boolean same = true;
487 for (int i = 0; i < search.length; i++) {
488 if (search[i] != buffer[offset + i]) {
489 same = false;
490 break;
491 }
492 }
493
494 return same;
495 }
496 }