Commit | Line | Data |
---|---|---|
2e7584da NR |
1 | package be.nikiroo.utils; |
2 | ||
3 | import java.io.IOException; | |
4 | import java.io.InputStream; | |
32d89af1 | 5 | import java.util.Arrays; |
2e7584da | 6 | |
63b46ca9 NR |
7 | /** |
8 | * This {@link InputStream} can be separated into sub-streams (you can process | |
9 | * it as a normal {@link InputStream} but, when it is spent, you can call | |
10 | * {@link NextableInputStream#next()} on it to unlock new data). | |
11 | * <p> | |
12 | * The separation in sub-streams is done via {@link NextableInputStreamStep}. | |
13 | * | |
14 | * @author niki | |
15 | */ | |
2e7584da | 16 | public class NextableInputStream extends InputStream { |
63b46ca9 | 17 | private NextableInputStreamStep step; |
d6f9bd9f | 18 | private boolean started; |
63b46ca9 | 19 | private boolean stopped; |
d6f9bd9f | 20 | private boolean closed; |
4098af70 | 21 | |
2e7584da | 22 | private InputStream in; |
d6f9bd9f | 23 | private int openCounter; |
2e7584da | 24 | private boolean eof; |
d6f9bd9f NR |
25 | private int pos; |
26 | private int len; | |
27 | private byte[] buffer; | |
28 | ||
32d89af1 N |
29 | // special use, prefetched next buffer |
30 | private byte[] buffer2; | |
31 | private int pos2; | |
32 | private int len2; | |
33 | private byte[] originalBuffer; | |
34 | ||
d6f9bd9f | 35 | private long bytesRead; |
2e7584da | 36 | |
63b46ca9 NR |
37 | /** |
38 | * Create a new {@link NextableInputStream} that wraps the given | |
39 | * {@link InputStream}. | |
40 | * | |
41 | * @param in | |
42 | * the {@link InputStream} to wrap | |
43 | * @param step | |
44 | * how to separate it into sub-streams (can be NULL, but in that | |
45 | * case it will behave as a normal {@link InputStream}) | |
46 | */ | |
47 | public NextableInputStream(InputStream in, NextableInputStreamStep step) { | |
2e7584da | 48 | this.in = in; |
63b46ca9 | 49 | this.step = step; |
d6f9bd9f NR |
50 | |
51 | this.buffer = new byte[4096]; | |
32d89af1 | 52 | this.originalBuffer = this.buffer; |
d6f9bd9f NR |
53 | this.pos = 0; |
54 | this.len = 0; | |
55 | } | |
56 | ||
57 | /** | |
58 | * Create a new {@link NextableInputStream} that wraps the given bytes array | |
59 | * as a data source. | |
60 | * | |
61 | * @param in | |
62 | * the array to wrap, cannot be NULL | |
63 | * @param step | |
64 | * how to separate it into sub-streams (can be NULL, but in that | |
65 | * case it will behave as a normal {@link InputStream}) | |
66 | */ | |
67 | public NextableInputStream(byte[] in, NextableInputStreamStep step) { | |
68 | this(in, step, 0, in.length); | |
69 | } | |
70 | ||
71 | /** | |
72 | * Create a new {@link NextableInputStream} that wraps the given bytes array | |
73 | * as a data source. | |
74 | * | |
75 | * @param in | |
76 | * the array to wrap, cannot be NULL | |
77 | * @param step | |
78 | * how to separate it into sub-streams (can be NULL, but in that | |
79 | * case it will behave as a normal {@link InputStream}) | |
80 | * @param offset | |
81 | * the offset to start the reading at | |
82 | * @param length | |
83 | * the number of bytes to take into account in the array, | |
84 | * starting from the offset | |
85 | * | |
86 | * @throws NullPointerException | |
87 | * if the array is NULL | |
88 | * @throws IndexOutOfBoundsException | |
89 | * if the offset and length do not correspond to the given array | |
90 | */ | |
91 | public NextableInputStream(byte[] in, NextableInputStreamStep step, | |
92 | int offset, int length) { | |
93 | if (in == null) { | |
94 | throw new NullPointerException(); | |
95 | } else if (offset < 0 || length < 0 || length > in.length - offset) { | |
96 | throw new IndexOutOfBoundsException(); | |
97 | } | |
98 | ||
99 | this.in = null; | |
100 | this.step = step; | |
101 | ||
102 | this.buffer = in; | |
32d89af1 | 103 | this.originalBuffer = this.buffer; |
d6f9bd9f NR |
104 | this.pos = offset; |
105 | this.len = length; | |
106 | ||
107 | checkBuffer(true); | |
108 | } | |
109 | ||
110 | /** | |
111 | * Return this very same {@link NextableInputStream}, but keep a counter of | |
112 | * how many streams were open this way. When calling | |
113 | * {@link NextableInputStream#close()}, decrease this counter if it is not | |
114 | * already zero instead of actually closing the stream. | |
115 | * <p> | |
116 | * You are now responsible for it — you <b>must</b> close it. | |
117 | * <p> | |
118 | * This method allows you to use a wrapping stream around this one and still | |
119 | * close the wrapping stream. | |
120 | * | |
121 | * @return the same stream, but you are now responsible for closing it | |
122 | * | |
123 | * @throws IOException | |
124 | * in case of I/O error or if the stream is closed | |
125 | */ | |
126 | public synchronized InputStream open() throws IOException { | |
127 | checkClose(); | |
128 | openCounter++; | |
129 | return this; | |
2e7584da NR |
130 | } |
131 | ||
63b46ca9 NR |
132 | /** |
133 | * Unblock the processing of the next sub-stream. | |
134 | * <p> | |
135 | * It can only be called when the "current" stream is spent (i.e., you must | |
136 | * first process the stream until it is spent). | |
137 | * <p> | |
138 | * We consider that when the under-laying {@link InputStream} is also spent, | |
139 | * we cannot have a next sub-stream (it will thus return FALSE). | |
140 | * <p> | |
141 | * {@link IOException}s can happen when we have no data available in the | |
142 | * buffer; in that case, we fetch more data to know if we can have a next | |
143 | * sub-stream or not. | |
144 | * | |
145 | * @return TRUE if we unblocked the next sub-stream, FALSE if not | |
146 | * | |
147 | * @throws IOException | |
d6f9bd9f | 148 | * in case of I/O error or if the stream is closed |
63b46ca9 NR |
149 | */ |
150 | public boolean next() throws IOException { | |
d6f9bd9f NR |
151 | return next(false); |
152 | } | |
4098af70 | 153 | |
d6f9bd9f NR |
154 | /** |
155 | * Unblock the next sub-stream as would have done | |
156 | * {@link NextableInputStream#next()}, but disable the sub-stream systems. | |
157 | * <p> | |
158 | * That is, the next stream, if any, will be the last one and will not be | |
159 | * subject to the {@link NextableInputStreamStep}. | |
160 | * | |
161 | * @return TRUE if we unblocked the next sub-stream, FALSE if not | |
162 | * | |
163 | * @throws IOException | |
164 | * in case of I/O error or if the stream is closed | |
165 | */ | |
166 | public boolean nextAll() throws IOException { | |
167 | return next(true); | |
168 | } | |
63b46ca9 | 169 | |
32d89af1 N |
170 | // max is buffer.size ! |
171 | public boolean startsWiths(String search) throws IOException { | |
172 | return startsWith(search.getBytes("UTF-8")); | |
d6f9bd9f | 173 | } |
4098af70 | 174 | |
32d89af1 N |
175 | // max is buffer.size ! |
176 | public boolean startsWith(byte[] search) throws IOException { | |
177 | if (search.length > originalBuffer.length) { | |
178 | throw new IOException( | |
179 | "This stream does not support searching for more than " | |
180 | + buffer.length + " bytes"); | |
181 | } | |
182 | ||
183 | checkClose(); | |
184 | ||
185 | if (available() < search.length) { | |
186 | preRead(); | |
187 | } | |
188 | ||
189 | if (available() >= search.length) { | |
190 | // Easy path | |
191 | return startsWith(search, buffer, pos); | |
192 | } else if (!eof) { | |
193 | // Harder path | |
194 | if (buffer2 == null && buffer.length == originalBuffer.length) { | |
195 | buffer2 = Arrays.copyOf(buffer, buffer.length * 2); | |
196 | ||
197 | pos2 = buffer.length; | |
198 | len2 = in.read(buffer2, pos2, buffer.length); | |
199 | if (len2 > 0) { | |
200 | bytesRead += len2; | |
201 | } | |
202 | ||
203 | // Note: here, len/len2 = INDEX of last good byte | |
204 | len2 += pos2; | |
205 | } | |
206 | ||
207 | if (available() + (len2 - pos2) >= search.length) { | |
208 | return startsWith(search, buffer2, pos2); | |
209 | } | |
210 | } | |
211 | ||
4098af70 N |
212 | return false; |
213 | } | |
214 | ||
d6f9bd9f NR |
215 | /** |
216 | * The number of bytes read from the under-laying {@link InputStream}. | |
217 | * | |
218 | * @return the number of bytes | |
219 | */ | |
220 | public long getBytesRead() { | |
221 | return bytesRead; | |
222 | } | |
223 | ||
224 | /** | |
225 | * Check if this stream is totally spent (no more data to read or to | |
226 | * process). | |
227 | * | |
228 | * @return TRUE if it is | |
229 | */ | |
230 | public boolean eof() { | |
231 | return closed || (len < 0 && !hasMoreData()); | |
232 | } | |
233 | ||
2e7584da NR |
234 | @Override |
235 | public int read() throws IOException { | |
d6f9bd9f NR |
236 | checkClose(); |
237 | ||
2e7584da NR |
238 | preRead(); |
239 | if (eof) { | |
240 | return -1; | |
241 | } | |
242 | ||
243 | return buffer[pos++]; | |
244 | } | |
245 | ||
246 | @Override | |
247 | public int read(byte[] b) throws IOException { | |
248 | return read(b, 0, b.length); | |
249 | } | |
250 | ||
251 | @Override | |
252 | public int read(byte[] b, int boff, int blen) throws IOException { | |
d6f9bd9f NR |
253 | checkClose(); |
254 | ||
2e7584da NR |
255 | if (b == null) { |
256 | throw new NullPointerException(); | |
257 | } else if (boff < 0 || blen < 0 || blen > b.length - boff) { | |
258 | throw new IndexOutOfBoundsException(); | |
259 | } else if (blen == 0) { | |
260 | return 0; | |
261 | } | |
262 | ||
263 | int done = 0; | |
4098af70 | 264 | while (hasMoreData() && done < blen) { |
2e7584da | 265 | preRead(); |
4098af70 N |
266 | if (hasMoreData()) { |
267 | for (int i = pos; i < blen && i < len; i++) { | |
268 | b[boff + done] = buffer[i]; | |
269 | pos++; | |
270 | done++; | |
271 | } | |
2e7584da NR |
272 | } |
273 | } | |
274 | ||
275 | return done > 0 ? done : -1; | |
276 | } | |
277 | ||
4098af70 | 278 | @Override |
d6f9bd9f | 279 | public long skip(long n) throws IOException { |
473e5f31 N |
280 | if (n <= 0) { |
281 | return 0; | |
282 | } | |
283 | ||
284 | long skipped = 0; | |
285 | while (hasMoreData() && n > 0) { | |
286 | preRead(); | |
287 | ||
288 | long inBuffer = Math.min(n, available()); | |
289 | pos += inBuffer; | |
290 | n -= inBuffer; | |
291 | skipped += inBuffer; | |
292 | } | |
293 | ||
294 | return skipped; | |
d6f9bd9f NR |
295 | } |
296 | ||
297 | @Override | |
298 | public int available() { | |
299 | if (closed) { | |
300 | return 0; | |
301 | } | |
302 | ||
4098af70 N |
303 | return Math.max(0, len - pos); |
304 | } | |
305 | ||
d6f9bd9f NR |
306 | /** |
307 | * Closes this stream and releases any system resources associated with the | |
308 | * stream. | |
309 | * <p> | |
310 | * Including the under-laying {@link InputStream}. | |
311 | * <p> | |
312 | * <b>Note:</b> if you called the {@link NextableInputStream#open()} method | |
313 | * prior to this one, it will just decrease the internal count of how many | |
314 | * open streams it held and do nothing else. The stream will actually be | |
315 | * closed when you have called {@link NextableInputStream#close()} once more | |
316 | * than {@link NextableInputStream#open()}. | |
317 | * | |
318 | * @exception IOException | |
319 | * in case of I/O error | |
320 | */ | |
321 | @Override | |
322 | public synchronized void close() throws IOException { | |
323 | close(true); | |
324 | } | |
325 | ||
326 | /** | |
327 | * Closes this stream and releases any system resources associated with the | |
328 | * stream. | |
329 | * <p> | |
330 | * Including the under-laying {@link InputStream} if | |
331 | * <tt>incudingSubStream</tt> is true. | |
332 | * <p> | |
333 | * You can call this method multiple times, it will not cause an | |
334 | * {@link IOException} for subsequent calls. | |
335 | * <p> | |
336 | * <b>Note:</b> if you called the {@link NextableInputStream#open()} method | |
337 | * prior to this one, it will just decrease the internal count of how many | |
338 | * open streams it held and do nothing else. The stream will actually be | |
339 | * closed when you have called {@link NextableInputStream#close()} once more | |
340 | * than {@link NextableInputStream#open()}. | |
341 | * | |
342 | * @exception IOException | |
343 | * in case of I/O error | |
344 | */ | |
345 | public synchronized void close(boolean includingSubStream) | |
346 | throws IOException { | |
347 | if (!closed) { | |
348 | if (openCounter > 0) { | |
349 | openCounter--; | |
350 | } else { | |
351 | closed = true; | |
352 | if (includingSubStream && in != null) { | |
353 | in.close(); | |
354 | } | |
355 | } | |
356 | } | |
357 | } | |
358 | ||
63b46ca9 NR |
359 | /** |
360 | * Check if we still have some data in the buffer and, if not, fetch some. | |
361 | * | |
362 | * @return TRUE if we fetched some data, FALSE if there are still some in | |
363 | * the buffer | |
364 | * | |
365 | * @throws IOException | |
366 | * in case of I/O error | |
367 | */ | |
368 | private boolean preRead() throws IOException { | |
369 | boolean hasRead = false; | |
370 | if (!eof && in != null && pos >= len && !stopped) { | |
2e7584da | 371 | pos = 0; |
32d89af1 N |
372 | if (buffer2 != null) { |
373 | buffer = buffer2; | |
374 | pos = pos2; | |
375 | len = len2; | |
376 | ||
377 | buffer2 = null; | |
378 | pos2 = 0; | |
379 | len2 = 0; | |
380 | } else { | |
381 | buffer = originalBuffer; | |
382 | len = in.read(buffer); | |
383 | if (len > 0) { | |
384 | bytesRead += len; | |
385 | } | |
d6f9bd9f NR |
386 | } |
387 | ||
63b46ca9 NR |
388 | checkBuffer(true); |
389 | hasRead = true; | |
2e7584da NR |
390 | } |
391 | ||
392 | if (pos >= len) { | |
393 | eof = true; | |
394 | } | |
63b46ca9 NR |
395 | |
396 | return hasRead; | |
2e7584da | 397 | } |
4098af70 | 398 | |
63b46ca9 NR |
399 | /** |
400 | * We have more data available in the buffer or we can fetch more. | |
401 | * | |
402 | * @return TRUE if it is the case, FALSE if not | |
403 | */ | |
4098af70 | 404 | private boolean hasMoreData() { |
d6f9bd9f | 405 | return !closed && started && !(eof && pos >= len); |
4098af70 N |
406 | } |
407 | ||
63b46ca9 NR |
408 | /** |
409 | * Check that the buffer didn't overshot to the next item, and fix | |
410 | * {@link NextableInputStream#len} if needed. | |
411 | * <p> | |
412 | * If {@link NextableInputStream#len} is fixed, | |
413 | * {@link NextableInputStream#eof} and {@link NextableInputStream#stopped} | |
414 | * are set to TRUE. | |
415 | * | |
416 | * @param newBuffer | |
417 | * we changed the buffer, we need to clear some information in | |
418 | * the {@link NextableInputStreamStep} | |
419 | */ | |
420 | private void checkBuffer(boolean newBuffer) { | |
d6f9bd9f | 421 | if (step != null && len > 0) { |
63b46ca9 NR |
422 | if (newBuffer) { |
423 | step.clearBuffer(); | |
424 | } | |
4098af70 | 425 | |
63b46ca9 NR |
426 | int stopAt = step.stop(buffer, pos, len); |
427 | if (stopAt >= 0) { | |
428 | len = stopAt; | |
429 | eof = true; | |
430 | stopped = true; | |
4098af70 N |
431 | } |
432 | } | |
433 | } | |
d6f9bd9f NR |
434 | |
435 | /** | |
436 | * The implementation of {@link NextableInputStream#next()} and | |
437 | * {@link NextableInputStream#nextAll()}. | |
438 | * | |
439 | * @param all | |
440 | * TRUE for {@link NextableInputStream#nextAll()}, FALSE for | |
441 | * {@link NextableInputStream#next()} | |
442 | * | |
443 | * @return TRUE if we unblocked the next sub-stream, FALSE if not | |
444 | * | |
445 | * @throws IOException | |
446 | * in case of I/O error or if the stream is closed | |
447 | */ | |
448 | private boolean next(boolean all) throws IOException { | |
449 | checkClose(); | |
450 | ||
451 | if (!started) { | |
452 | // First call before being allowed to read | |
453 | started = true; | |
454 | ||
455 | if (all) { | |
456 | step = null; | |
457 | } | |
458 | ||
459 | return true; | |
460 | } | |
461 | ||
462 | if (step != null && !hasMoreData() && stopped) { | |
463 | len = step.getResumeLen(); | |
464 | pos += step.getResumeSkip(); | |
465 | eof = false; | |
466 | ||
467 | if (all) { | |
468 | step = null; | |
469 | } | |
470 | ||
471 | if (!preRead()) { | |
472 | checkBuffer(false); | |
473 | } | |
474 | ||
475 | // consider that if EOF, there is no next | |
476 | return hasMoreData(); | |
477 | } | |
478 | ||
479 | return false; | |
480 | } | |
481 | ||
482 | /** | |
483 | * Check that the stream was not closed, and throw an {@link IOException} if | |
484 | * it was. | |
485 | * | |
486 | * @throws IOException | |
487 | * if it was closed | |
488 | */ | |
489 | private void checkClose() throws IOException { | |
490 | if (closed) { | |
491 | throw new IOException( | |
492 | "This NextableInputStream was closed, you cannot use it anymore."); | |
493 | } | |
494 | } | |
32d89af1 N |
495 | |
496 | // buffer must be > search | |
497 | static private boolean startsWith(byte[] search, byte[] buffer, | |
498 | int offset) { | |
499 | boolean same = true; | |
500 | for (int i = 0; i < search.length; i++) { | |
501 | if (search[i] != buffer[offset + i]) { | |
502 | same = false; | |
503 | break; | |
504 | } | |
505 | } | |
506 | ||
507 | return same; | |
508 | } | |
2e7584da | 509 | } |