From: Niki Roo Date: Wed, 24 Apr 2019 19:42:42 +0000 (+0200) Subject: Merge branch 'master' into streamify X-Git-Url: http://git.nikiroo.be/?p=nikiroo-utils.git;a=commitdiff_plain;h=ecd81c088d8df94c47c85399afdab87bbd39afaf;hp=143a64f4c6804d0cd17595ffcf7a44f4a151ffdc Merge branch 'master' into streamify --- diff --git a/src/be/nikiroo/utils/IOUtils.java b/src/be/nikiroo/utils/IOUtils.java index c7cbe25..a263eb1 100644 --- a/src/be/nikiroo/utils/IOUtils.java +++ b/src/be/nikiroo/utils/IOUtils.java @@ -55,7 +55,7 @@ public class IOUtils { */ public static void write(InputStream in, OutputStream out) throws IOException { - byte buffer[] = new byte[4069]; + byte buffer[] = new byte[4096]; for (int len = 0; (len = in.read(buffer)) > 0;) { out.write(buffer, 0, len); } diff --git a/src/be/nikiroo/utils/NextableInputStream.java b/src/be/nikiroo/utils/NextableInputStream.java new file mode 100644 index 0000000..0def936 --- /dev/null +++ b/src/be/nikiroo/utils/NextableInputStream.java @@ -0,0 +1,180 @@ +package be.nikiroo.utils; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This {@link InputStream} can be separated into sub-streams (you can process + * it as a normal {@link InputStream} but, when it is spent, you can call + * {@link NextableInputStream#next()} on it to unlock new data). + *

+ * The separation in sub-streams is done via {@link NextableInputStreamStep}. + * + * @author niki + */ +public class NextableInputStream extends InputStream { + private NextableInputStreamStep step; + private boolean stopped; + + private InputStream in; + private boolean eof; + private int pos = 0; + private int len = 0; + private byte[] buffer = new byte[4096]; + + /** + * Create a new {@link NextableInputStream} that wraps the given + * {@link InputStream}. + * + * @param in + * the {@link InputStream} to wrap + * @param step + * how to separate it into sub-streams (can be NULL, but in that + * case it will behave as a normal {@link InputStream}) + */ + public NextableInputStream(InputStream in, NextableInputStreamStep step) { + this.in = in; + this.step = step; + } + + /** + * Unblock the processing of the next sub-stream. + *

+ * It can only be called when the "current" stream is spent (i.e., you must + * first process the stream until it is spent). + *

+ * We consider that when the under-laying {@link InputStream} is also spent, + * we cannot have a next sub-stream (it will thus return FALSE). + *

+ * {@link IOException}s can happen when we have no data available in the + * buffer; in that case, we fetch more data to know if we can have a next + * sub-stream or not. + * + * @return TRUE if we unblocked the next sub-stream, FALSE if not + * + * @throws IOException + * in case of I/O error + */ + public boolean next() throws IOException { + if (!hasMoreData() && stopped) { + len = step.getResumeLen(); + pos += step.getResumeSkip(); + eof = false; + + if (!preRead()) { + checkBuffer(false); + } + + // consider that if EOF, there is no next + return hasMoreData(); + } + + return false; + } + + @Override + public int read() throws IOException { + preRead(); + if (eof) { + return -1; + } + + return buffer[pos++]; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } + + @Override + public int read(byte[] b, int boff, int blen) throws IOException { + if (b == null) { + throw new NullPointerException(); + } else if (boff < 0 || blen < 0 || blen > b.length - boff) { + throw new IndexOutOfBoundsException(); + } else if (blen == 0) { + return 0; + } + + int done = 0; + while (hasMoreData() && done < blen) { + preRead(); + if (hasMoreData()) { + for (int i = pos; i < blen && i < len; i++) { + b[boff + done] = buffer[i]; + pos++; + done++; + } + } + } + + return done > 0 ? done : -1; + } + + @Override + public int available() throws IOException { + return Math.max(0, len - pos); + } + + /** + * Check if we still have some data in the buffer and, if not, fetch some. + * + * @return TRUE if we fetched some data, FALSE if there are still some in + * the buffer + * + * @throws IOException + * in case of I/O error + */ + private boolean preRead() throws IOException { + boolean hasRead = false; + if (!eof && in != null && pos >= len && !stopped) { + pos = 0; + len = in.read(buffer); + checkBuffer(true); + hasRead = true; + } + + if (pos >= len) { + eof = true; + } + + return hasRead; + } + + /** + * We have more data available in the buffer or we can fetch more. + * + * @return TRUE if it is the case, FALSE if not + */ + private boolean hasMoreData() { + return !(eof && pos >= len); + } + + /** + * Check that the buffer didn't overshot to the next item, and fix + * {@link NextableInputStream#len} if needed. + *

+ * If {@link NextableInputStream#len} is fixed, + * {@link NextableInputStream#eof} and {@link NextableInputStream#stopped} + * are set to TRUE. + * + * @param newBuffer + * we changed the buffer, we need to clear some information in + * the {@link NextableInputStreamStep} + */ + private void checkBuffer(boolean newBuffer) { + if (step != null) { + if (newBuffer) { + step.clearBuffer(); + } + + int stopAt = step.stop(buffer, pos, len); + if (stopAt >= 0) { + len = stopAt; + eof = true; + stopped = true; + } + } + } +} diff --git a/src/be/nikiroo/utils/NextableInputStreamStep.java b/src/be/nikiroo/utils/NextableInputStreamStep.java new file mode 100755 index 0000000..a2ee039 --- /dev/null +++ b/src/be/nikiroo/utils/NextableInputStreamStep.java @@ -0,0 +1,96 @@ +package be.nikiroo.utils; + +import java.io.InputStream; + +/** + * Divide an {@link InputStream} into sub-streams. + * + * @author niki + */ +public class NextableInputStreamStep { + private int stopAt; + private int resumeLen; + private int last = -1; + private int skip; + + /** + * Create a new divider that will separate the sub-streams each time it sees + * this byte. + *

+ * Note that the byte will be bypassed by the {@link InputStream} as far as + * the consumers will be aware. + * + * @param byt + * the byte at which to separate two sub-streams + */ + public NextableInputStreamStep(int byt) { + stopAt = byt; + } + + /** + * Check if we need to stop the {@link InputStream} reading at some point in + * the current buffer. + *

+ * If we do, return the index at which to stop; if not, return -1. + *

+ * This method will not return the same index a second time (unless + * we cleared the buffer). + * + * @param buffer + * the buffer to check + * @param pos + * the current position of what was read in the buffer + * @param len + * the maximum index to use in the buffer (anything above that is + * not to be used) + * + * @return the index at which to stop, or -1 + */ + public int stop(byte[] buffer, int pos, int len) { + for (int i = pos; i < len; i++) { + if (buffer[i] == stopAt) { + if (i > this.last) { + // we skip the sep + this.skip = 1; + + this.resumeLen = len; + this.last = i; + return i; + } + } + } + + return -1; + } + + /** + * Get the maximum index to use in the buffer used in + * {@link NextableInputStreamStep#stop(byte[], int, int)} at resume time. + * + * @return the index + */ + public int getResumeLen() { + return resumeLen; + } + + /** + * Get the number of bytes to skip at resume time. + * + * @return the number of bytes to skip + */ + public int getResumeSkip() { + return skip; + } + + /** + * Clear the information we may have kept about the current buffer + *

+ * You should call this method each time you change the content of the + * buffer used in {@link NextableInputStreamStep#stop(byte[], int, int)}. + */ + public void clearBuffer() { + this.last = -1; + this.skip = 0; + this.resumeLen = 0; + } +} diff --git a/src/be/nikiroo/utils/test_code/NextableInputStreamTest.java b/src/be/nikiroo/utils/test_code/NextableInputStreamTest.java new file mode 100644 index 0000000..87d64ac --- /dev/null +++ b/src/be/nikiroo/utils/test_code/NextableInputStreamTest.java @@ -0,0 +1,111 @@ +package be.nikiroo.utils.test_code; + +import java.io.ByteArrayInputStream; + +import be.nikiroo.utils.IOUtils; +import be.nikiroo.utils.NextableInputStream; +import be.nikiroo.utils.NextableInputStreamStep; +import be.nikiroo.utils.test.TestCase; +import be.nikiroo.utils.test.TestLauncher; + +public class NextableInputStreamTest extends TestLauncher { + public NextableInputStreamTest(String[] args) { + super("NextableInputStream test", args); + + addTest(new TestCase("Simple byte array reading") { + @Override + public void test() throws Exception { + byte[] expected = new byte[] { 42, 12, 0, 127 }; + NextableInputStream in = new NextableInputStream( + new ByteArrayInputStream(expected), null); + byte[] actual = IOUtils.toByteArray(in); + + assertEquals( + "The resulting array has not the same number of items", + expected.length, actual.length); + for (int i = 0; i < expected.length; i++) { + assertEquals("Item " + i + " (0-based) is not the same", + expected[i], actual[i]); + } + } + }); + + addTest(new TestCase("Stop at 12") { + @Override + public void test() throws Exception { + byte[] expected = new byte[] { 42, 12, 0, 127 }; + NextableInputStream in = new NextableInputStream( + new ByteArrayInputStream(expected), + new NextableInputStreamStep(12)); + + checkNext(this, false, "FIRST", in, new byte[] { 42 }); + } + }); + + addTest(new TestCase("Stop at 12, resume, stop again, resume") { + @Override + public void test() throws Exception { + byte[] data = new byte[] { 42, 12, 0, 127, 12, 51, 11, 12 }; + NextableInputStream in = new NextableInputStream( + new ByteArrayInputStream(data), + new NextableInputStreamStep(12)); + + checkNext(this, false, "FIRST", in, new byte[] { 42 }); + checkNext(this, true, "SECOND", in, new byte[] { 0, 127 }); + checkNext(this, true, "THIRD", in, new byte[] { 51, 11 }); + } + }); + + addTest(new TestCase("Encapsulation") { + @Override + public void test() throws Exception { + byte[] data = new byte[] { 42, 12, 0, 4, 127, 12, 5 }; + NextableInputStream in4 = new NextableInputStream( + new ByteArrayInputStream(data), + new NextableInputStreamStep(4)); + NextableInputStream subIn12 = new NextableInputStream(in4, + new NextableInputStreamStep(12)); + + checkNext(this, false, "SUB FIRST", subIn12, new byte[] { 42 }); + checkNext(this, true, "SUB SECOND", subIn12, new byte[] { 0 }); + + assertEquals("The subIn still has some data", false, + subIn12.next()); + + checkNext(this, true, "MAIN LAST", in4, + new byte[] { 127, 12, 5 }); + } + }); + + addTest(new TestCase("UTF-8 text lines test") { + @Override + public void test() throws Exception { + String ln1 = "Ligne première"; + String ln2 = "Ligne la deuxième du nom"; + byte[] data = (ln1 + "\n" + ln2).getBytes("UTF-8"); + NextableInputStream in = new NextableInputStream( + new ByteArrayInputStream(data), + new NextableInputStreamStep('\n')); + + checkNext(this, false, "FIRST", in, ln1.getBytes("UTF-8")); + checkNext(this, true, "SECOND", in, ln2.getBytes("UTF-8")); + } + }); + } + + static void checkNext(TestCase test, boolean callNext, String prefix, + NextableInputStream in, byte[] expected) throws Exception { + if (callNext) { + test.assertEquals("Cannot get " + prefix + " entry", true, + in.next()); + } + byte[] actual = IOUtils.toByteArray(in); + test.assertEquals("The " + prefix + + " resulting array has not the correct number of items", + expected.length, actual.length); + for (int i = 0; i < actual.length; i++) { + test.assertEquals("Item " + i + " (0-based) is not the same", + expected[i], actual[i]); + } + } +} diff --git a/src/be/nikiroo/utils/test_code/Test.java b/src/be/nikiroo/utils/test_code/Test.java index f25c02d..01766ac 100644 --- a/src/be/nikiroo/utils/test_code/Test.java +++ b/src/be/nikiroo/utils/test_code/Test.java @@ -33,6 +33,7 @@ public class Test extends TestLauncher { addSeries(new StringUtilsTest(args)); addSeries(new TempFilesTest(args)); addSeries(new CryptUtilsTest(args)); + addSeries(new NextableInputStreamTest(args)); // TODO: test cache and downloader Cache cache = null;