Commit | Line | Data |
---|---|---|
2e7584da NR |
1 | package be.nikiroo.utils; |
2 | ||
3 | import java.io.IOException; | |
4 | import java.io.InputStream; | |
32d89af1 | 5 | import java.util.Arrays; |
2e7584da | 6 | |
63b46ca9 NR |
7 | /** |
8 | * This {@link InputStream} can be separated into sub-streams (you can process | |
9 | * it as a normal {@link InputStream} but, when it is spent, you can call | |
10 | * {@link NextableInputStream#next()} on it to unlock new data). | |
11 | * <p> | |
12 | * The separation in sub-streams is done via {@link NextableInputStreamStep}. | |
13 | * | |
14 | * @author niki | |
15 | */ | |
2e7584da | 16 | public class NextableInputStream extends InputStream { |
63b46ca9 | 17 | private NextableInputStreamStep step; |
d6f9bd9f | 18 | private boolean started; |
63b46ca9 | 19 | private boolean stopped; |
d6f9bd9f | 20 | private boolean closed; |
4098af70 | 21 | |
2e7584da | 22 | private InputStream in; |
d6f9bd9f | 23 | private int openCounter; |
2e7584da | 24 | private boolean eof; |
d6f9bd9f NR |
25 | private int pos; |
26 | private int len; | |
27 | private byte[] buffer; | |
28 | ||
32d89af1 N |
29 | // special use, prefetched next buffer |
30 | private byte[] buffer2; | |
31 | private int pos2; | |
32 | private int len2; | |
33 | private byte[] originalBuffer; | |
34 | ||
d6f9bd9f | 35 | private long bytesRead; |
2e7584da | 36 | |
63b46ca9 NR |
37 | /** |
38 | * Create a new {@link NextableInputStream} that wraps the given | |
39 | * {@link InputStream}. | |
40 | * | |
41 | * @param in | |
42 | * the {@link InputStream} to wrap | |
43 | * @param step | |
44 | * how to separate it into sub-streams (can be NULL, but in that | |
45 | * case it will behave as a normal {@link InputStream}) | |
46 | */ | |
47 | public NextableInputStream(InputStream in, NextableInputStreamStep step) { | |
2e7584da | 48 | this.in = in; |
63b46ca9 | 49 | this.step = step; |
d6f9bd9f NR |
50 | |
51 | this.buffer = new byte[4096]; | |
32d89af1 | 52 | this.originalBuffer = this.buffer; |
d6f9bd9f NR |
53 | this.pos = 0; |
54 | this.len = 0; | |
55 | } | |
56 | ||
57 | /** | |
58 | * Create a new {@link NextableInputStream} that wraps the given bytes array | |
59 | * as a data source. | |
60 | * | |
61 | * @param in | |
62 | * the array to wrap, cannot be NULL | |
63 | * @param step | |
64 | * how to separate it into sub-streams (can be NULL, but in that | |
65 | * case it will behave as a normal {@link InputStream}) | |
66 | */ | |
67 | public NextableInputStream(byte[] in, NextableInputStreamStep step) { | |
68 | this(in, step, 0, in.length); | |
69 | } | |
70 | ||
71 | /** | |
72 | * Create a new {@link NextableInputStream} that wraps the given bytes array | |
73 | * as a data source. | |
74 | * | |
75 | * @param in | |
76 | * the array to wrap, cannot be NULL | |
77 | * @param step | |
78 | * how to separate it into sub-streams (can be NULL, but in that | |
79 | * case it will behave as a normal {@link InputStream}) | |
80 | * @param offset | |
81 | * the offset to start the reading at | |
82 | * @param length | |
83 | * the number of bytes to take into account in the array, | |
84 | * starting from the offset | |
85 | * | |
86 | * @throws NullPointerException | |
87 | * if the array is NULL | |
88 | * @throws IndexOutOfBoundsException | |
89 | * if the offset and length do not correspond to the given array | |
90 | */ | |
91 | public NextableInputStream(byte[] in, NextableInputStreamStep step, | |
92 | int offset, int length) { | |
93 | if (in == null) { | |
94 | throw new NullPointerException(); | |
95 | } else if (offset < 0 || length < 0 || length > in.length - offset) { | |
96 | throw new IndexOutOfBoundsException(); | |
97 | } | |
98 | ||
99 | this.in = null; | |
100 | this.step = step; | |
101 | ||
102 | this.buffer = in; | |
32d89af1 | 103 | this.originalBuffer = this.buffer; |
d6f9bd9f NR |
104 | this.pos = offset; |
105 | this.len = length; | |
106 | ||
107 | checkBuffer(true); | |
108 | } | |
109 | ||
110 | /** | |
111 | * Return this very same {@link NextableInputStream}, but keep a counter of | |
112 | * how many streams were open this way. When calling | |
113 | * {@link NextableInputStream#close()}, decrease this counter if it is not | |
114 | * already zero instead of actually closing the stream. | |
115 | * <p> | |
116 | * You are now responsible for it — you <b>must</b> close it. | |
117 | * <p> | |
118 | * This method allows you to use a wrapping stream around this one and still | |
119 | * close the wrapping stream. | |
120 | * | |
121 | * @return the same stream, but you are now responsible for closing it | |
122 | * | |
123 | * @throws IOException | |
124 | * in case of I/O error or if the stream is closed | |
125 | */ | |
126 | public synchronized InputStream open() throws IOException { | |
127 | checkClose(); | |
128 | openCounter++; | |
129 | return this; | |
2e7584da NR |
130 | } |
131 | ||
63b46ca9 NR |
132 | /** |
133 | * Unblock the processing of the next sub-stream. | |
134 | * <p> | |
135 | * It can only be called when the "current" stream is spent (i.e., you must | |
136 | * first process the stream until it is spent). | |
137 | * <p> | |
138 | * We consider that when the under-laying {@link InputStream} is also spent, | |
139 | * we cannot have a next sub-stream (it will thus return FALSE). | |
140 | * <p> | |
141 | * {@link IOException}s can happen when we have no data available in the | |
142 | * buffer; in that case, we fetch more data to know if we can have a next | |
143 | * sub-stream or not. | |
144 | * | |
145 | * @return TRUE if we unblocked the next sub-stream, FALSE if not | |
146 | * | |
147 | * @throws IOException | |
d6f9bd9f | 148 | * in case of I/O error or if the stream is closed |
63b46ca9 NR |
149 | */ |
150 | public boolean next() throws IOException { | |
d6f9bd9f NR |
151 | return next(false); |
152 | } | |
4098af70 | 153 | |
d6f9bd9f NR |
154 | /** |
155 | * Unblock the next sub-stream as would have done | |
156 | * {@link NextableInputStream#next()}, but disable the sub-stream systems. | |
157 | * <p> | |
158 | * That is, the next stream, if any, will be the last one and will not be | |
159 | * subject to the {@link NextableInputStreamStep}. | |
160 | * | |
161 | * @return TRUE if we unblocked the next sub-stream, FALSE if not | |
162 | * | |
163 | * @throws IOException | |
164 | * in case of I/O error or if the stream is closed | |
165 | */ | |
166 | public boolean nextAll() throws IOException { | |
167 | return next(true); | |
168 | } | |
63b46ca9 | 169 | |
32d89af1 N |
170 | // max is buffer.size ! |
171 | public boolean startsWiths(String search) throws IOException { | |
172 | return startsWith(search.getBytes("UTF-8")); | |
d6f9bd9f | 173 | } |
4098af70 | 174 | |
32d89af1 N |
175 | // max is buffer.size ! |
176 | public boolean startsWith(byte[] search) throws IOException { | |
177 | if (search.length > originalBuffer.length) { | |
178 | throw new IOException( | |
179 | "This stream does not support searching for more than " | |
180 | + buffer.length + " bytes"); | |
181 | } | |
182 | ||
183 | checkClose(); | |
184 | ||
185 | if (available() < search.length) { | |
186 | preRead(); | |
187 | } | |
188 | ||
189 | if (available() >= search.length) { | |
190 | // Easy path | |
191 | return startsWith(search, buffer, pos); | |
192 | } else if (!eof) { | |
193 | // Harder path | |
194 | if (buffer2 == null && buffer.length == originalBuffer.length) { | |
195 | buffer2 = Arrays.copyOf(buffer, buffer.length * 2); | |
196 | ||
197 | pos2 = buffer.length; | |
198 | len2 = in.read(buffer2, pos2, buffer.length); | |
199 | if (len2 > 0) { | |
200 | bytesRead += len2; | |
201 | } | |
202 | ||
203 | // Note: here, len/len2 = INDEX of last good byte | |
204 | len2 += pos2; | |
205 | } | |
206 | ||
207 | if (available() + (len2 - pos2) >= search.length) { | |
208 | return startsWith(search, buffer2, pos2); | |
209 | } | |
210 | } | |
211 | ||
4098af70 N |
212 | return false; |
213 | } | |
214 | ||
d6f9bd9f NR |
215 | /** |
216 | * The number of bytes read from the under-laying {@link InputStream}. | |
217 | * | |
218 | * @return the number of bytes | |
219 | */ | |
220 | public long getBytesRead() { | |
221 | return bytesRead; | |
222 | } | |
223 | ||
224 | /** | |
225 | * Check if this stream is totally spent (no more data to read or to | |
226 | * process). | |
59509e75 NR |
227 | * <p> |
228 | * Note: an empty stream that is still not started will return FALSE, as we | |
229 | * don't know yet if it is empty. | |
d6f9bd9f NR |
230 | * |
231 | * @return TRUE if it is | |
232 | */ | |
233 | public boolean eof() { | |
234 | return closed || (len < 0 && !hasMoreData()); | |
235 | } | |
236 | ||
2e7584da NR |
237 | @Override |
238 | public int read() throws IOException { | |
d6f9bd9f NR |
239 | checkClose(); |
240 | ||
2e7584da NR |
241 | preRead(); |
242 | if (eof) { | |
243 | return -1; | |
244 | } | |
245 | ||
246 | return buffer[pos++]; | |
247 | } | |
248 | ||
249 | @Override | |
250 | public int read(byte[] b) throws IOException { | |
251 | return read(b, 0, b.length); | |
252 | } | |
253 | ||
254 | @Override | |
255 | public int read(byte[] b, int boff, int blen) throws IOException { | |
d6f9bd9f NR |
256 | checkClose(); |
257 | ||
2e7584da NR |
258 | if (b == null) { |
259 | throw new NullPointerException(); | |
260 | } else if (boff < 0 || blen < 0 || blen > b.length - boff) { | |
261 | throw new IndexOutOfBoundsException(); | |
262 | } else if (blen == 0) { | |
263 | return 0; | |
264 | } | |
265 | ||
266 | int done = 0; | |
4098af70 | 267 | while (hasMoreData() && done < blen) { |
2e7584da | 268 | preRead(); |
4098af70 | 269 | if (hasMoreData()) { |
59509e75 NR |
270 | int now = Math.min(blen, len) - pos; |
271 | if (now > 0) { | |
272 | System.arraycopy(buffer, pos, b, boff, now); | |
273 | pos += now; | |
274 | done += now; | |
4098af70 | 275 | } |
2e7584da NR |
276 | } |
277 | } | |
278 | ||
279 | return done > 0 ? done : -1; | |
280 | } | |
281 | ||
4098af70 | 282 | @Override |
d6f9bd9f | 283 | public long skip(long n) throws IOException { |
473e5f31 N |
284 | if (n <= 0) { |
285 | return 0; | |
286 | } | |
287 | ||
288 | long skipped = 0; | |
289 | while (hasMoreData() && n > 0) { | |
290 | preRead(); | |
291 | ||
292 | long inBuffer = Math.min(n, available()); | |
293 | pos += inBuffer; | |
294 | n -= inBuffer; | |
295 | skipped += inBuffer; | |
296 | } | |
297 | ||
298 | return skipped; | |
d6f9bd9f NR |
299 | } |
300 | ||
301 | @Override | |
302 | public int available() { | |
303 | if (closed) { | |
304 | return 0; | |
305 | } | |
306 | ||
4098af70 N |
307 | return Math.max(0, len - pos); |
308 | } | |
309 | ||
d6f9bd9f NR |
310 | /** |
311 | * Closes this stream and releases any system resources associated with the | |
312 | * stream. | |
313 | * <p> | |
314 | * Including the under-laying {@link InputStream}. | |
315 | * <p> | |
316 | * <b>Note:</b> if you called the {@link NextableInputStream#open()} method | |
317 | * prior to this one, it will just decrease the internal count of how many | |
318 | * open streams it held and do nothing else. The stream will actually be | |
319 | * closed when you have called {@link NextableInputStream#close()} once more | |
320 | * than {@link NextableInputStream#open()}. | |
321 | * | |
322 | * @exception IOException | |
323 | * in case of I/O error | |
324 | */ | |
325 | @Override | |
326 | public synchronized void close() throws IOException { | |
327 | close(true); | |
328 | } | |
329 | ||
330 | /** | |
331 | * Closes this stream and releases any system resources associated with the | |
332 | * stream. | |
333 | * <p> | |
334 | * Including the under-laying {@link InputStream} if | |
335 | * <tt>incudingSubStream</tt> is true. | |
336 | * <p> | |
337 | * You can call this method multiple times, it will not cause an | |
338 | * {@link IOException} for subsequent calls. | |
339 | * <p> | |
340 | * <b>Note:</b> if you called the {@link NextableInputStream#open()} method | |
341 | * prior to this one, it will just decrease the internal count of how many | |
342 | * open streams it held and do nothing else. The stream will actually be | |
343 | * closed when you have called {@link NextableInputStream#close()} once more | |
344 | * than {@link NextableInputStream#open()}. | |
345 | * | |
346 | * @exception IOException | |
347 | * in case of I/O error | |
348 | */ | |
349 | public synchronized void close(boolean includingSubStream) | |
350 | throws IOException { | |
351 | if (!closed) { | |
352 | if (openCounter > 0) { | |
353 | openCounter--; | |
354 | } else { | |
355 | closed = true; | |
356 | if (includingSubStream && in != null) { | |
357 | in.close(); | |
358 | } | |
359 | } | |
360 | } | |
361 | } | |
362 | ||
63b46ca9 NR |
363 | /** |
364 | * Check if we still have some data in the buffer and, if not, fetch some. | |
365 | * | |
366 | * @return TRUE if we fetched some data, FALSE if there are still some in | |
367 | * the buffer | |
368 | * | |
369 | * @throws IOException | |
370 | * in case of I/O error | |
371 | */ | |
372 | private boolean preRead() throws IOException { | |
373 | boolean hasRead = false; | |
374 | if (!eof && in != null && pos >= len && !stopped) { | |
2e7584da | 375 | pos = 0; |
32d89af1 N |
376 | if (buffer2 != null) { |
377 | buffer = buffer2; | |
378 | pos = pos2; | |
379 | len = len2; | |
380 | ||
381 | buffer2 = null; | |
382 | pos2 = 0; | |
383 | len2 = 0; | |
384 | } else { | |
385 | buffer = originalBuffer; | |
386 | len = in.read(buffer); | |
387 | if (len > 0) { | |
388 | bytesRead += len; | |
389 | } | |
d6f9bd9f NR |
390 | } |
391 | ||
63b46ca9 NR |
392 | checkBuffer(true); |
393 | hasRead = true; | |
2e7584da NR |
394 | } |
395 | ||
396 | if (pos >= len) { | |
397 | eof = true; | |
398 | } | |
63b46ca9 NR |
399 | |
400 | return hasRead; | |
2e7584da | 401 | } |
4098af70 | 402 | |
63b46ca9 NR |
403 | /** |
404 | * We have more data available in the buffer or we can fetch more. | |
405 | * | |
406 | * @return TRUE if it is the case, FALSE if not | |
407 | */ | |
4098af70 | 408 | private boolean hasMoreData() { |
d6f9bd9f | 409 | return !closed && started && !(eof && pos >= len); |
4098af70 N |
410 | } |
411 | ||
63b46ca9 NR |
412 | /** |
413 | * Check that the buffer didn't overshot to the next item, and fix | |
414 | * {@link NextableInputStream#len} if needed. | |
415 | * <p> | |
416 | * If {@link NextableInputStream#len} is fixed, | |
417 | * {@link NextableInputStream#eof} and {@link NextableInputStream#stopped} | |
418 | * are set to TRUE. | |
419 | * | |
420 | * @param newBuffer | |
421 | * we changed the buffer, we need to clear some information in | |
422 | * the {@link NextableInputStreamStep} | |
423 | */ | |
424 | private void checkBuffer(boolean newBuffer) { | |
d6f9bd9f | 425 | if (step != null && len > 0) { |
63b46ca9 NR |
426 | if (newBuffer) { |
427 | step.clearBuffer(); | |
428 | } | |
4098af70 | 429 | |
63b46ca9 NR |
430 | int stopAt = step.stop(buffer, pos, len); |
431 | if (stopAt >= 0) { | |
432 | len = stopAt; | |
433 | eof = true; | |
434 | stopped = true; | |
4098af70 N |
435 | } |
436 | } | |
437 | } | |
d6f9bd9f NR |
438 | |
439 | /** | |
440 | * The implementation of {@link NextableInputStream#next()} and | |
441 | * {@link NextableInputStream#nextAll()}. | |
442 | * | |
443 | * @param all | |
444 | * TRUE for {@link NextableInputStream#nextAll()}, FALSE for | |
445 | * {@link NextableInputStream#next()} | |
446 | * | |
447 | * @return TRUE if we unblocked the next sub-stream, FALSE if not | |
448 | * | |
449 | * @throws IOException | |
450 | * in case of I/O error or if the stream is closed | |
451 | */ | |
452 | private boolean next(boolean all) throws IOException { | |
453 | checkClose(); | |
454 | ||
455 | if (!started) { | |
456 | // First call before being allowed to read | |
457 | started = true; | |
458 | ||
459 | if (all) { | |
460 | step = null; | |
461 | } | |
462 | ||
463 | return true; | |
464 | } | |
465 | ||
466 | if (step != null && !hasMoreData() && stopped) { | |
467 | len = step.getResumeLen(); | |
468 | pos += step.getResumeSkip(); | |
469 | eof = false; | |
470 | ||
471 | if (all) { | |
472 | step = null; | |
473 | } | |
474 | ||
475 | if (!preRead()) { | |
476 | checkBuffer(false); | |
477 | } | |
478 | ||
479 | // consider that if EOF, there is no next | |
480 | return hasMoreData(); | |
481 | } | |
482 | ||
483 | return false; | |
484 | } | |
485 | ||
486 | /** | |
487 | * Check that the stream was not closed, and throw an {@link IOException} if | |
488 | * it was. | |
489 | * | |
490 | * @throws IOException | |
491 | * if it was closed | |
492 | */ | |
493 | private void checkClose() throws IOException { | |
494 | if (closed) { | |
495 | throw new IOException( | |
496 | "This NextableInputStream was closed, you cannot use it anymore."); | |
497 | } | |
498 | } | |
32d89af1 N |
499 | |
500 | // buffer must be > search | |
59509e75 | 501 | static private boolean startsWith(byte[] search, byte[] buffer, int offset) { |
32d89af1 N |
502 | boolean same = true; |
503 | for (int i = 0; i < search.length; i++) { | |
504 | if (search[i] != buffer[offset + i]) { | |
505 | same = false; | |
506 | break; | |
507 | } | |
508 | } | |
509 | ||
510 | return same; | |
511 | } | |
2e7584da | 512 | } |