Merge branch 'subtree'
[nikiroo-utils.git] / src / be / nikiroo / utils / streams / BufferedInputStream.java
1 package be.nikiroo.utils.streams;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.util.AbstractMap;
6 import java.util.ArrayList;
7 import java.util.List;
8 import java.util.Map.Entry;
9
10 import be.nikiroo.utils.StringUtils;
11
12 /**
13 * A simple {@link InputStream} that is buffered with a bytes array.
14 * <p>
15 * It is mostly intended to be used as a base class to create new
16 * {@link InputStream}s with special operation modes, and to give some default
17 * methods.
18 *
19 * @author niki
20 */
21 public class BufferedInputStream extends InputStream {
22 /**
23 * The size of the internal buffer (can be different if you pass your own
24 * buffer, of course, and can also expand to search for longer "startsWith"
25 * data).
26 * <p>
27 * Note that special "push-back" buffers can also be created during the life
28 * of this stream.
29 */
30 static private final int BUFFER_SIZE = 4096;
31
32 /** The current position in the buffer. */
33 protected int start;
34 /** The index of the last usable position of the buffer. */
35 protected int stop;
36 /** The buffer itself. */
37 protected byte[] buffer;
38 /** An End-Of-File (or {@link InputStream}, here) marker. */
39 protected boolean eof;
40
41 private boolean closed;
42 private InputStream in;
43 private int openCounter;
44 private byte[] singleByteReader = new byte[1];
45
46 /** array + offset of pushed-back buffers */
47 private List<Entry<byte[], Integer>> backBuffers;
48
49 private long bytesRead;
50
51 /**
52 * Create a new {@link BufferedInputStream} that wraps the given
53 * {@link InputStream}.
54 *
55 * @param in
56 * the {@link InputStream} to wrap
57 */
58 public BufferedInputStream(InputStream in) {
59 this.in = in;
60
61 this.buffer = new byte[BUFFER_SIZE];
62 this.start = 0;
63 this.stop = 0;
64 this.backBuffers = new ArrayList<Entry<byte[], Integer>>();
65 }
66
67 /**
68 * Create a new {@link BufferedInputStream} that wraps the given bytes array
69 * as a data source.
70 *
71 * @param in
72 * the array to wrap, cannot be NULL
73 */
74 public BufferedInputStream(byte[] in) {
75 this(in, 0, in.length);
76 }
77
78 /**
79 * Create a new {@link BufferedInputStream} that wraps the given bytes array
80 * as a data source.
81 *
82 * @param in
83 * the array to wrap, cannot be NULL
84 * @param offset
85 * the offset to start the reading at
86 * @param length
87 * the number of bytes to take into account in the array,
88 * starting from the offset
89 *
90 * @throws NullPointerException
91 * if the array is NULL
92 * @throws IndexOutOfBoundsException
93 * if the offset and length do not correspond to the given array
94 */
95 public BufferedInputStream(byte[] in, int offset, int length) {
96 if (in == null) {
97 throw new NullPointerException();
98 } else if (offset < 0 || length < 0 || length > in.length - offset) {
99 throw new IndexOutOfBoundsException();
100 }
101
102 this.in = null;
103
104 this.buffer = in;
105 this.start = offset;
106 this.stop = length;
107 this.backBuffers = new ArrayList<Entry<byte[], Integer>>();
108 }
109
110 /**
111 * Return this very same {@link BufferedInputStream}, but keep a counter of
112 * how many streams were open this way. When calling
113 * {@link BufferedInputStream#close()}, decrease this counter if it is not
114 * already zero instead of actually closing the stream.
115 * <p>
116 * You are now responsible for it &mdash; you <b>must</b> close it.
117 * <p>
118 * This method allows you to use a wrapping stream around this one and still
119 * close the wrapping stream.
120 *
121 * @return the same stream, but you are now responsible for closing it
122 *
123 * @throws IOException
124 * in case of I/O error or if the stream is closed
125 */
126 public synchronized InputStream open() throws IOException {
127 checkClose();
128 openCounter++;
129 return this;
130 }
131
132 /**
133 * Check if the current content (until eof) is equal to the given search
134 * term.
135 * <p>
136 * Note: the search term size <b>must</b> be smaller or equal the internal
137 * buffer size.
138 *
139 * @param search
140 * the term to search for
141 *
142 * @return TRUE if the content that will be read starts with it
143 *
144 * @throws IOException
145 * in case of I/O error or if the size of the search term is
146 * greater than the internal buffer
147 */
148 public boolean is(String search) throws IOException {
149 return is(StringUtils.getBytes(search));
150 }
151
152 /**
153 * Check if the current content (until eof) is equal to the given search
154 * term.
155 * <p>
156 * Note: the search term size <b>must</b> be smaller or equal the internal
157 * buffer size.
158 *
159 * @param search
160 * the term to search for
161 *
162 * @return TRUE if the content that will be read starts with it
163 *
164 * @throws IOException
165 * in case of I/O error or if the size of the search term is
166 * greater than the internal buffer
167 */
168 public boolean is(byte[] search) throws IOException {
169 if (startsWith(search)) {
170 return available() == search.length;
171 }
172
173 return false;
174 }
175
176 /**
177 * Check if the current content (what will be read next) starts with the
178 * given search term.
179 * <p>
180 * Note: the search term size <b>must</b> be smaller or equal the internal
181 * buffer size.
182 *
183 * @param search
184 * the term to search for
185 *
186 * @return TRUE if the content that will be read starts with it
187 *
188 * @throws IOException
189 * in case of I/O error or if the size of the search term is
190 * greater than the internal buffer
191 */
192 public boolean startsWith(String search) throws IOException {
193 return startsWith(StringUtils.getBytes(search));
194 }
195
196 /**
197 * Check if the current content (what will be read next) starts with the
198 * given search term.
199 * <p>
200 * An empty string will always return true (unless the stream is closed,
201 * which would throw an {@link IOException}).
202 * <p>
203 * Note: the search term size <b>must</b> be smaller or equal the internal
204 * buffer size.
205 *
206 * @param search
207 * the term to search for
208 *
209 * @return TRUE if the content that will be read starts with it
210 *
211 * @throws IOException
212 * in case of I/O error or if the size of the search term is
213 * greater than the internal buffer
214 */
215 public boolean startsWith(byte[] search) throws IOException {
216 checkClose();
217
218 while (consolidatePushBack(search.length) < search.length) {
219 preRead();
220 if (start >= stop) {
221 // Not enough data left to start with that
222 return false;
223 }
224
225 byte[] newBuffer = new byte[stop - start];
226 System.arraycopy(buffer, start, newBuffer, 0, stop - start);
227 pushback(newBuffer, 0);
228 start = stop;
229 }
230
231 Entry<byte[], Integer> bb = backBuffers.get(backBuffers.size() - 1);
232 byte[] bbBuffer = bb.getKey();
233 int bbOffset = bb.getValue();
234
235 return StreamUtils.startsWith(search, bbBuffer, bbOffset,
236 bbBuffer.length);
237 }
238
239 /**
240 * The number of bytes read from the under-laying {@link InputStream}.
241 *
242 * @return the number of bytes
243 */
244 public long getBytesRead() {
245 return bytesRead;
246 }
247
248 /**
249 * Check if this stream is spent (no more data to read or to process).
250 *
251 * @return TRUE if it is
252 *
253 * @throws IOException
254 * in case of I/O error
255 */
256 public boolean eof() throws IOException {
257 if (closed) {
258 return true;
259 }
260
261 preRead();
262 return !hasMoreData();
263 }
264
265 /**
266 * Read the whole {@link InputStream} until the end and return the number of
267 * bytes read.
268 *
269 * @return the number of bytes read
270 *
271 * @throws IOException
272 * in case of I/O error
273 */
274 public long end() throws IOException {
275 long skipped = 0;
276 while (hasMoreData()) {
277 skipped += skip(buffer.length);
278 }
279
280 return skipped;
281 }
282
283 @Override
284 public int read() throws IOException {
285 if (read(singleByteReader) < 0) {
286 return -1;
287 }
288
289 return singleByteReader[0];
290 }
291
292 @Override
293 public int read(byte[] b) throws IOException {
294 return read(b, 0, b.length);
295 }
296
297 @Override
298 public int read(byte[] b, int boff, int blen) throws IOException {
299 checkClose();
300
301 if (b == null) {
302 throw new NullPointerException();
303 } else if (boff < 0 || blen < 0 || blen > b.length - boff) {
304 throw new IndexOutOfBoundsException();
305 } else if (blen == 0) {
306 return 0;
307 }
308
309 // Read from the pushed-back buffers if any
310 if (backBuffers.isEmpty()) {
311 preRead(); // an implementation could pushback in preRead()
312 }
313
314 if (!backBuffers.isEmpty()) {
315 int read = 0;
316
317 Entry<byte[], Integer> bb = backBuffers
318 .remove(backBuffers.size() - 1);
319 byte[] bbBuffer = bb.getKey();
320 int bbOffset = bb.getValue();
321 int bbSize = bbBuffer.length - bbOffset;
322
323 if (bbSize > blen) {
324 read = blen;
325 System.arraycopy(bbBuffer, bbOffset, b, boff, read);
326 pushback(bbBuffer, bbOffset + read);
327 } else {
328 read = bbSize;
329 System.arraycopy(bbBuffer, bbOffset, b, boff, read);
330 }
331
332 return read;
333 }
334
335 int done = 0;
336 while (hasMoreData() && done < blen) {
337 preRead();
338 if (hasMoreData()) {
339 int now = Math.min(blen - done, stop - start);
340 if (now > 0) {
341 System.arraycopy(buffer, start, b, boff + done, now);
342 start += now;
343 done += now;
344 }
345 }
346 }
347
348 return done > 0 ? done : -1;
349 }
350
351 @Override
352 public long skip(long n) throws IOException {
353 if (n <= 0) {
354 return 0;
355 }
356
357 long skipped = 0;
358 while (!backBuffers.isEmpty() && n > 0) {
359 Entry<byte[], Integer> bb = backBuffers
360 .remove(backBuffers.size() - 1);
361 byte[] bbBuffer = bb.getKey();
362 int bbOffset = bb.getValue();
363 int bbSize = bbBuffer.length - bbOffset;
364
365 int localSkip = 0;
366 localSkip = (int) Math.min(n, bbSize);
367
368 n -= localSkip;
369 bbSize -= localSkip;
370
371 if (bbSize > 0) {
372 pushback(bbBuffer, bbOffset + localSkip);
373 }
374 }
375 while (hasMoreData() && n > 0) {
376 preRead();
377
378 long inBuffer = Math.min(n, available());
379 start += inBuffer;
380 n -= inBuffer;
381 skipped += inBuffer;
382 }
383
384 return skipped;
385 }
386
387 @Override
388 public int available() {
389 if (closed) {
390 return 0;
391 }
392
393 int avail = 0;
394 for (Entry<byte[], Integer> entry : backBuffers) {
395 avail += entry.getKey().length - entry.getValue();
396 }
397
398 return avail + Math.max(0, stop - start);
399 }
400
401 /**
402 * Closes this stream and releases any system resources associated with the
403 * stream.
404 * <p>
405 * Including the under-laying {@link InputStream}.
406 * <p>
407 * <b>Note:</b> if you called the {@link BufferedInputStream#open()} method
408 * prior to this one, it will just decrease the internal count of how many
409 * open streams it held and do nothing else. The stream will actually be
410 * closed when you have called {@link BufferedInputStream#close()} once more
411 * than {@link BufferedInputStream#open()}.
412 *
413 * @exception IOException
414 * in case of I/O error
415 */
416 @Override
417 public synchronized void close() throws IOException {
418 close(true);
419 }
420
421 /**
422 * Closes this stream and releases any system resources associated with the
423 * stream.
424 * <p>
425 * Including the under-laying {@link InputStream} if
426 * <tt>incudingSubStream</tt> is true.
427 * <p>
428 * You can call this method multiple times, it will not cause an
429 * {@link IOException} for subsequent calls.
430 * <p>
431 * <b>Note:</b> if you called the {@link BufferedInputStream#open()} method
432 * prior to this one, it will just decrease the internal count of how many
433 * open streams it held and do nothing else. The stream will actually be
434 * closed when you have called {@link BufferedInputStream#close()} once more
435 * than {@link BufferedInputStream#open()}.
436 *
437 * @param includingSubStream
438 * also close the under-laying stream
439 *
440 * @exception IOException
441 * in case of I/O error
442 */
443 public synchronized void close(boolean includingSubStream)
444 throws IOException {
445 if (!closed) {
446 if (openCounter > 0) {
447 openCounter--;
448 } else {
449 closed = true;
450 if (includingSubStream && in != null) {
451 in.close();
452 }
453 }
454 }
455 }
456
457 /**
458 * Consolidate the push-back buffers so the last one is at least the given
459 * size, if possible.
460 * <p>
461 * If there is not enough data in the push-back buffers, they will all be
462 * consolidated.
463 *
464 * @param size
465 * the minimum size of the consolidated buffer, or -1 to force
466 * the consolidation of all push-back buffers
467 *
468 * @return the size of the last, consolidated buffer; can be less than the
469 * requested size if not enough data
470 */
471 protected int consolidatePushBack(int size) {
472 int bbIndex = -1;
473 int bbUpToSize = 0;
474 for (Entry<byte[], Integer> entry : backBuffers) {
475 bbIndex++;
476 bbUpToSize += entry.getKey().length - entry.getValue();
477
478 if (size >= 0 && bbUpToSize >= size) {
479 break;
480 }
481 }
482
483 // Index 0 means "the last buffer is already big enough"
484 if (bbIndex > 0) {
485 byte[] consolidatedBuffer = new byte[bbUpToSize];
486 int consolidatedPos = 0;
487 for (int i = 0; i <= bbIndex; i++) {
488 Entry<byte[], Integer> bb = backBuffers
489 .remove(backBuffers.size() - 1);
490 byte[] bbBuffer = bb.getKey();
491 int bbOffset = bb.getValue();
492 int bbSize = bbBuffer.length - bbOffset;
493 System.arraycopy(bbBuffer, bbOffset, consolidatedBuffer,
494 consolidatedPos, bbSize);
495 }
496
497 pushback(consolidatedBuffer, 0);
498 }
499
500 return bbUpToSize;
501 }
502
503 /**
504 * Check if we still have some data in the buffer and, if not, fetch some.
505 *
506 * @return TRUE if we fetched some data, FALSE if there are still some in
507 * the buffer
508 *
509 * @throws IOException
510 * in case of I/O error
511 */
512 protected boolean preRead() throws IOException {
513 boolean hasRead = false;
514 if (in != null && !eof && start >= stop) {
515 start = 0;
516 stop = read(in, buffer);
517 if (stop > 0) {
518 bytesRead += stop;
519 }
520
521 hasRead = true;
522 }
523
524 if (start >= stop) {
525 eof = true;
526 }
527
528 return hasRead;
529 }
530
531 /**
532 * Push back some data that will be read again at the next read call.
533 *
534 * @param buffer
535 * the buffer to push back
536 * @param offset
537 * the offset at which to start reading in the buffer
538 */
539 protected void pushback(byte[] buffer, int offset) {
540 backBuffers.add(
541 new AbstractMap.SimpleEntry<byte[], Integer>(buffer, offset));
542 }
543
544 /**
545 * Push back some data that will be read again at the next read call.
546 *
547 * @param buffer
548 * the buffer to push back
549 * @param offset
550 * the offset at which to start reading in the buffer
551 * @param len
552 * the length to copy
553 */
554 protected void pushback(byte[] buffer, int offset, int len) {
555 // TODO: not efficient!
556 if (buffer.length != len) {
557 byte[] lenNotSupportedYet = new byte[len];
558 System.arraycopy(buffer, offset, lenNotSupportedYet, 0, len);
559 buffer = lenNotSupportedYet;
560 offset = 0;
561 }
562
563 pushback(buffer, offset);
564 }
565
566 /**
567 * Read the under-laying stream into the given local buffer.
568 *
569 * @param in
570 * the under-laying {@link InputStream}
571 * @param buffer
572 * the buffer we use in this {@link BufferedInputStream}
573 *
574 * @return the number of bytes read
575 *
576 * @throws IOException
577 * in case of I/O error
578 */
579 protected int read(InputStream in, byte[] buffer) throws IOException {
580 return in.read(buffer, 0, buffer.length);
581 }
582
583 /**
584 * We have more data available in the buffer <b>or</b> we can, maybe, fetch
585 * more.
586 *
587 * @return TRUE if it is the case, FALSE if not
588 */
589 protected boolean hasMoreData() {
590 if (closed) {
591 return false;
592 }
593
594 return !backBuffers.isEmpty() || (start < stop) || !eof;
595 }
596
597 /**
598 * Check that the stream was not closed, and throw an {@link IOException} if
599 * it was.
600 *
601 * @throws IOException
602 * if it was closed
603 */
604 protected void checkClose() throws IOException {
605 if (closed) {
606 throw new IOException(
607 "This BufferedInputStream was closed, you cannot use it anymore.");
608 }
609 }
610 }