Commit | Line | Data |
---|---|---|
2e7584da NR |
1 | package be.nikiroo.utils; |
2 | ||
3 | import java.io.IOException; | |
4 | import java.io.InputStream; | |
32d89af1 | 5 | import java.util.Arrays; |
2e7584da | 6 | |
63b46ca9 NR |
7 | /** |
8 | * This {@link InputStream} can be separated into sub-streams (you can process | |
9 | * it as a normal {@link InputStream} but, when it is spent, you can call | |
10 | * {@link NextableInputStream#next()} on it to unlock new data). | |
11 | * <p> | |
12 | * The separation in sub-streams is done via {@link NextableInputStreamStep}. | |
13 | * | |
14 | * @author niki | |
15 | */ | |
2e7584da | 16 | public class NextableInputStream extends InputStream { |
63b46ca9 | 17 | private NextableInputStreamStep step; |
d6f9bd9f | 18 | private boolean started; |
63b46ca9 | 19 | private boolean stopped; |
d6f9bd9f | 20 | private boolean closed; |
4098af70 | 21 | |
2e7584da | 22 | private InputStream in; |
d6f9bd9f | 23 | private int openCounter; |
2e7584da | 24 | private boolean eof; |
d6f9bd9f NR |
25 | private int pos; |
26 | private int len; | |
27 | private byte[] buffer; | |
28 | ||
32d89af1 N |
29 | // special use, prefetched next buffer |
30 | private byte[] buffer2; | |
31 | private int pos2; | |
32 | private int len2; | |
33 | private byte[] originalBuffer; | |
34 | ||
d6f9bd9f | 35 | private long bytesRead; |
2e7584da | 36 | |
63b46ca9 NR |
37 | /** |
38 | * Create a new {@link NextableInputStream} that wraps the given | |
39 | * {@link InputStream}. | |
40 | * | |
41 | * @param in | |
42 | * the {@link InputStream} to wrap | |
43 | * @param step | |
44 | * how to separate it into sub-streams (can be NULL, but in that | |
45 | * case it will behave as a normal {@link InputStream}) | |
46 | */ | |
47 | public NextableInputStream(InputStream in, NextableInputStreamStep step) { | |
2e7584da | 48 | this.in = in; |
63b46ca9 | 49 | this.step = step; |
d6f9bd9f NR |
50 | |
51 | this.buffer = new byte[4096]; | |
32d89af1 | 52 | this.originalBuffer = this.buffer; |
d6f9bd9f NR |
53 | this.pos = 0; |
54 | this.len = 0; | |
55 | } | |
56 | ||
57 | /** | |
58 | * Create a new {@link NextableInputStream} that wraps the given bytes array | |
59 | * as a data source. | |
60 | * | |
61 | * @param in | |
62 | * the array to wrap, cannot be NULL | |
63 | * @param step | |
64 | * how to separate it into sub-streams (can be NULL, but in that | |
65 | * case it will behave as a normal {@link InputStream}) | |
66 | */ | |
67 | public NextableInputStream(byte[] in, NextableInputStreamStep step) { | |
68 | this(in, step, 0, in.length); | |
69 | } | |
70 | ||
71 | /** | |
72 | * Create a new {@link NextableInputStream} that wraps the given bytes array | |
73 | * as a data source. | |
74 | * | |
75 | * @param in | |
76 | * the array to wrap, cannot be NULL | |
77 | * @param step | |
78 | * how to separate it into sub-streams (can be NULL, but in that | |
79 | * case it will behave as a normal {@link InputStream}) | |
80 | * @param offset | |
81 | * the offset to start the reading at | |
82 | * @param length | |
83 | * the number of bytes to take into account in the array, | |
84 | * starting from the offset | |
85 | * | |
86 | * @throws NullPointerException | |
87 | * if the array is NULL | |
88 | * @throws IndexOutOfBoundsException | |
89 | * if the offset and length do not correspond to the given array | |
90 | */ | |
91 | public NextableInputStream(byte[] in, NextableInputStreamStep step, | |
92 | int offset, int length) { | |
93 | if (in == null) { | |
94 | throw new NullPointerException(); | |
95 | } else if (offset < 0 || length < 0 || length > in.length - offset) { | |
96 | throw new IndexOutOfBoundsException(); | |
97 | } | |
98 | ||
99 | this.in = null; | |
100 | this.step = step; | |
101 | ||
102 | this.buffer = in; | |
32d89af1 | 103 | this.originalBuffer = this.buffer; |
d6f9bd9f NR |
104 | this.pos = offset; |
105 | this.len = length; | |
106 | ||
107 | checkBuffer(true); | |
108 | } | |
109 | ||
110 | /** | |
111 | * Return this very same {@link NextableInputStream}, but keep a counter of | |
112 | * how many streams were open this way. When calling | |
113 | * {@link NextableInputStream#close()}, decrease this counter if it is not | |
114 | * already zero instead of actually closing the stream. | |
115 | * <p> | |
116 | * You are now responsible for it — you <b>must</b> close it. | |
117 | * <p> | |
118 | * This method allows you to use a wrapping stream around this one and still | |
119 | * close the wrapping stream. | |
120 | * | |
121 | * @return the same stream, but you are now responsible for closing it | |
122 | * | |
123 | * @throws IOException | |
124 | * in case of I/O error or if the stream is closed | |
125 | */ | |
126 | public synchronized InputStream open() throws IOException { | |
127 | checkClose(); | |
128 | openCounter++; | |
129 | return this; | |
2e7584da NR |
130 | } |
131 | ||
63b46ca9 NR |
132 | /** |
133 | * Unblock the processing of the next sub-stream. | |
134 | * <p> | |
135 | * It can only be called when the "current" stream is spent (i.e., you must | |
136 | * first process the stream until it is spent). | |
137 | * <p> | |
138 | * We consider that when the under-laying {@link InputStream} is also spent, | |
139 | * we cannot have a next sub-stream (it will thus return FALSE). | |
140 | * <p> | |
141 | * {@link IOException}s can happen when we have no data available in the | |
142 | * buffer; in that case, we fetch more data to know if we can have a next | |
143 | * sub-stream or not. | |
144 | * | |
145 | * @return TRUE if we unblocked the next sub-stream, FALSE if not | |
146 | * | |
147 | * @throws IOException | |
d6f9bd9f | 148 | * in case of I/O error or if the stream is closed |
63b46ca9 NR |
149 | */ |
150 | public boolean next() throws IOException { | |
d6f9bd9f NR |
151 | return next(false); |
152 | } | |
4098af70 | 153 | |
d6f9bd9f NR |
154 | /** |
155 | * Unblock the next sub-stream as would have done | |
156 | * {@link NextableInputStream#next()}, but disable the sub-stream systems. | |
157 | * <p> | |
158 | * That is, the next stream, if any, will be the last one and will not be | |
159 | * subject to the {@link NextableInputStreamStep}. | |
160 | * | |
161 | * @return TRUE if we unblocked the next sub-stream, FALSE if not | |
162 | * | |
163 | * @throws IOException | |
164 | * in case of I/O error or if the stream is closed | |
165 | */ | |
166 | public boolean nextAll() throws IOException { | |
167 | return next(true); | |
168 | } | |
63b46ca9 | 169 | |
32d89af1 N |
170 | // max is buffer.size ! |
171 | public boolean startsWiths(String search) throws IOException { | |
172 | return startsWith(search.getBytes("UTF-8")); | |
d6f9bd9f | 173 | } |
4098af70 | 174 | |
32d89af1 N |
175 | // max is buffer.size ! |
176 | public boolean startsWith(byte[] search) throws IOException { | |
177 | if (search.length > originalBuffer.length) { | |
178 | throw new IOException( | |
179 | "This stream does not support searching for more than " | |
180 | + buffer.length + " bytes"); | |
181 | } | |
182 | ||
183 | checkClose(); | |
184 | ||
185 | if (available() < search.length) { | |
186 | preRead(); | |
187 | } | |
188 | ||
189 | if (available() >= search.length) { | |
190 | // Easy path | |
191 | return startsWith(search, buffer, pos); | |
192 | } else if (!eof) { | |
193 | // Harder path | |
194 | if (buffer2 == null && buffer.length == originalBuffer.length) { | |
195 | buffer2 = Arrays.copyOf(buffer, buffer.length * 2); | |
196 | ||
197 | pos2 = buffer.length; | |
198 | len2 = in.read(buffer2, pos2, buffer.length); | |
199 | if (len2 > 0) { | |
200 | bytesRead += len2; | |
201 | } | |
202 | ||
203 | // Note: here, len/len2 = INDEX of last good byte | |
204 | len2 += pos2; | |
205 | } | |
206 | ||
207 | if (available() + (len2 - pos2) >= search.length) { | |
208 | return startsWith(search, buffer2, pos2); | |
209 | } | |
210 | } | |
211 | ||
4098af70 N |
212 | return false; |
213 | } | |
214 | ||
d6f9bd9f NR |
215 | /** |
216 | * The number of bytes read from the under-laying {@link InputStream}. | |
217 | * | |
218 | * @return the number of bytes | |
219 | */ | |
220 | public long getBytesRead() { | |
221 | return bytesRead; | |
222 | } | |
223 | ||
224 | /** | |
225 | * Check if this stream is totally spent (no more data to read or to | |
226 | * process). | |
227 | * | |
228 | * @return TRUE if it is | |
229 | */ | |
230 | public boolean eof() { | |
231 | return closed || (len < 0 && !hasMoreData()); | |
232 | } | |
233 | ||
2e7584da NR |
234 | @Override |
235 | public int read() throws IOException { | |
d6f9bd9f NR |
236 | checkClose(); |
237 | ||
2e7584da NR |
238 | preRead(); |
239 | if (eof) { | |
240 | return -1; | |
241 | } | |
242 | ||
243 | return buffer[pos++]; | |
244 | } | |
245 | ||
246 | @Override | |
247 | public int read(byte[] b) throws IOException { | |
248 | return read(b, 0, b.length); | |
249 | } | |
250 | ||
251 | @Override | |
252 | public int read(byte[] b, int boff, int blen) throws IOException { | |
d6f9bd9f NR |
253 | checkClose(); |
254 | ||
2e7584da NR |
255 | if (b == null) { |
256 | throw new NullPointerException(); | |
257 | } else if (boff < 0 || blen < 0 || blen > b.length - boff) { | |
258 | throw new IndexOutOfBoundsException(); | |
259 | } else if (blen == 0) { | |
260 | return 0; | |
261 | } | |
262 | ||
263 | int done = 0; | |
4098af70 | 264 | while (hasMoreData() && done < blen) { |
2e7584da | 265 | preRead(); |
4098af70 N |
266 | if (hasMoreData()) { |
267 | for (int i = pos; i < blen && i < len; i++) { | |
268 | b[boff + done] = buffer[i]; | |
269 | pos++; | |
270 | done++; | |
271 | } | |
2e7584da NR |
272 | } |
273 | } | |
274 | ||
275 | return done > 0 ? done : -1; | |
276 | } | |
277 | ||
4098af70 | 278 | @Override |
d6f9bd9f NR |
279 | public long skip(long n) throws IOException { |
280 | // TODO Auto-generated method stub | |
281 | return super.skip(n); | |
282 | } | |
283 | ||
284 | @Override | |
285 | public int available() { | |
286 | if (closed) { | |
287 | return 0; | |
288 | } | |
289 | ||
4098af70 N |
290 | return Math.max(0, len - pos); |
291 | } | |
292 | ||
d6f9bd9f NR |
293 | /** |
294 | * Closes this stream and releases any system resources associated with the | |
295 | * stream. | |
296 | * <p> | |
297 | * Including the under-laying {@link InputStream}. | |
298 | * <p> | |
299 | * <b>Note:</b> if you called the {@link NextableInputStream#open()} method | |
300 | * prior to this one, it will just decrease the internal count of how many | |
301 | * open streams it held and do nothing else. The stream will actually be | |
302 | * closed when you have called {@link NextableInputStream#close()} once more | |
303 | * than {@link NextableInputStream#open()}. | |
304 | * | |
305 | * @exception IOException | |
306 | * in case of I/O error | |
307 | */ | |
308 | @Override | |
309 | public synchronized void close() throws IOException { | |
310 | close(true); | |
311 | } | |
312 | ||
313 | /** | |
314 | * Closes this stream and releases any system resources associated with the | |
315 | * stream. | |
316 | * <p> | |
317 | * Including the under-laying {@link InputStream} if | |
318 | * <tt>incudingSubStream</tt> is true. | |
319 | * <p> | |
320 | * You can call this method multiple times, it will not cause an | |
321 | * {@link IOException} for subsequent calls. | |
322 | * <p> | |
323 | * <b>Note:</b> if you called the {@link NextableInputStream#open()} method | |
324 | * prior to this one, it will just decrease the internal count of how many | |
325 | * open streams it held and do nothing else. The stream will actually be | |
326 | * closed when you have called {@link NextableInputStream#close()} once more | |
327 | * than {@link NextableInputStream#open()}. | |
328 | * | |
329 | * @exception IOException | |
330 | * in case of I/O error | |
331 | */ | |
332 | public synchronized void close(boolean includingSubStream) | |
333 | throws IOException { | |
334 | if (!closed) { | |
335 | if (openCounter > 0) { | |
336 | openCounter--; | |
337 | } else { | |
338 | closed = true; | |
339 | if (includingSubStream && in != null) { | |
340 | in.close(); | |
341 | } | |
342 | } | |
343 | } | |
344 | } | |
345 | ||
63b46ca9 NR |
346 | /** |
347 | * Check if we still have some data in the buffer and, if not, fetch some. | |
348 | * | |
349 | * @return TRUE if we fetched some data, FALSE if there are still some in | |
350 | * the buffer | |
351 | * | |
352 | * @throws IOException | |
353 | * in case of I/O error | |
354 | */ | |
355 | private boolean preRead() throws IOException { | |
356 | boolean hasRead = false; | |
357 | if (!eof && in != null && pos >= len && !stopped) { | |
2e7584da | 358 | pos = 0; |
32d89af1 N |
359 | if (buffer2 != null) { |
360 | buffer = buffer2; | |
361 | pos = pos2; | |
362 | len = len2; | |
363 | ||
364 | buffer2 = null; | |
365 | pos2 = 0; | |
366 | len2 = 0; | |
367 | } else { | |
368 | buffer = originalBuffer; | |
369 | len = in.read(buffer); | |
370 | if (len > 0) { | |
371 | bytesRead += len; | |
372 | } | |
d6f9bd9f NR |
373 | } |
374 | ||
63b46ca9 NR |
375 | checkBuffer(true); |
376 | hasRead = true; | |
2e7584da NR |
377 | } |
378 | ||
379 | if (pos >= len) { | |
380 | eof = true; | |
381 | } | |
63b46ca9 NR |
382 | |
383 | return hasRead; | |
2e7584da | 384 | } |
4098af70 | 385 | |
63b46ca9 NR |
386 | /** |
387 | * We have more data available in the buffer or we can fetch more. | |
388 | * | |
389 | * @return TRUE if it is the case, FALSE if not | |
390 | */ | |
4098af70 | 391 | private boolean hasMoreData() { |
d6f9bd9f | 392 | return !closed && started && !(eof && pos >= len); |
4098af70 N |
393 | } |
394 | ||
63b46ca9 NR |
395 | /** |
396 | * Check that the buffer didn't overshot to the next item, and fix | |
397 | * {@link NextableInputStream#len} if needed. | |
398 | * <p> | |
399 | * If {@link NextableInputStream#len} is fixed, | |
400 | * {@link NextableInputStream#eof} and {@link NextableInputStream#stopped} | |
401 | * are set to TRUE. | |
402 | * | |
403 | * @param newBuffer | |
404 | * we changed the buffer, we need to clear some information in | |
405 | * the {@link NextableInputStreamStep} | |
406 | */ | |
407 | private void checkBuffer(boolean newBuffer) { | |
d6f9bd9f | 408 | if (step != null && len > 0) { |
63b46ca9 NR |
409 | if (newBuffer) { |
410 | step.clearBuffer(); | |
411 | } | |
4098af70 | 412 | |
63b46ca9 NR |
413 | int stopAt = step.stop(buffer, pos, len); |
414 | if (stopAt >= 0) { | |
415 | len = stopAt; | |
416 | eof = true; | |
417 | stopped = true; | |
4098af70 N |
418 | } |
419 | } | |
420 | } | |
d6f9bd9f NR |
421 | |
422 | /** | |
423 | * The implementation of {@link NextableInputStream#next()} and | |
424 | * {@link NextableInputStream#nextAll()}. | |
425 | * | |
426 | * @param all | |
427 | * TRUE for {@link NextableInputStream#nextAll()}, FALSE for | |
428 | * {@link NextableInputStream#next()} | |
429 | * | |
430 | * @return TRUE if we unblocked the next sub-stream, FALSE if not | |
431 | * | |
432 | * @throws IOException | |
433 | * in case of I/O error or if the stream is closed | |
434 | */ | |
435 | private boolean next(boolean all) throws IOException { | |
436 | checkClose(); | |
437 | ||
438 | if (!started) { | |
439 | // First call before being allowed to read | |
440 | started = true; | |
441 | ||
442 | if (all) { | |
443 | step = null; | |
444 | } | |
445 | ||
446 | return true; | |
447 | } | |
448 | ||
449 | if (step != null && !hasMoreData() && stopped) { | |
450 | len = step.getResumeLen(); | |
451 | pos += step.getResumeSkip(); | |
452 | eof = false; | |
453 | ||
454 | if (all) { | |
455 | step = null; | |
456 | } | |
457 | ||
458 | if (!preRead()) { | |
459 | checkBuffer(false); | |
460 | } | |
461 | ||
462 | // consider that if EOF, there is no next | |
463 | return hasMoreData(); | |
464 | } | |
465 | ||
466 | return false; | |
467 | } | |
468 | ||
469 | /** | |
470 | * Check that the stream was not closed, and throw an {@link IOException} if | |
471 | * it was. | |
472 | * | |
473 | * @throws IOException | |
474 | * if it was closed | |
475 | */ | |
476 | private void checkClose() throws IOException { | |
477 | if (closed) { | |
478 | throw new IOException( | |
479 | "This NextableInputStream was closed, you cannot use it anymore."); | |
480 | } | |
481 | } | |
32d89af1 N |
482 | |
483 | // buffer must be > search | |
484 | static private boolean startsWith(byte[] search, byte[] buffer, | |
485 | int offset) { | |
486 | boolean same = true; | |
487 | for (int i = 0; i < search.length; i++) { | |
488 | if (search[i] != buffer[offset + i]) { | |
489 | same = false; | |
490 | break; | |
491 | } | |
492 | } | |
493 | ||
494 | return same; | |
495 | } | |
2e7584da | 496 | } |