Merge commit '7ce18848c8327967ce27b90abf2e280953530b5f'
[nikiroo-utils.git] / streams / NextableInputStream.java
1 package be.nikiroo.utils.streams;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.UnsupportedEncodingException;
6 import java.util.Arrays;
7
8 /**
9 * This {@link InputStream} can be separated into sub-streams (you can process
10 * it as a normal {@link InputStream} but, when it is spent, you can call
11 * {@link NextableInputStream#next()} on it to unlock new data).
12 * <p>
13 * The separation in sub-streams is done via {@link NextableInputStreamStep}.
14 *
15 * @author niki
16 */
17 public class NextableInputStream extends BufferedInputStream {
18 private NextableInputStreamStep step;
19 private boolean started;
20 private boolean stopped;
21
22 /**
23 * Create a new {@link NextableInputStream} that wraps the given
24 * {@link InputStream}.
25 *
26 * @param in
27 * the {@link InputStream} to wrap
28 * @param step
29 * how to separate it into sub-streams (can be NULL, but in that
30 * case it will behave as a normal {@link InputStream})
31 */
32 public NextableInputStream(InputStream in, NextableInputStreamStep step) {
33 super(in);
34 this.step = step;
35 }
36
37 /**
38 * Create a new {@link NextableInputStream} that wraps the given bytes array
39 * as a data source.
40 *
41 * @param in
42 * the array to wrap, cannot be NULL
43 * @param step
44 * how to separate it into sub-streams (can be NULL, but in that
45 * case it will behave as a normal {@link InputStream})
46 */
47 public NextableInputStream(byte[] in, NextableInputStreamStep step) {
48 this(in, step, 0, in.length);
49 }
50
51 /**
52 * Create a new {@link NextableInputStream} that wraps the given bytes array
53 * as a data source.
54 *
55 * @param in
56 * the array to wrap, cannot be NULL
57 * @param step
58 * how to separate it into sub-streams (can be NULL, but in that
59 * case it will behave as a normal {@link InputStream})
60 * @param offset
61 * the offset to start the reading at
62 * @param length
63 * the number of bytes to take into account in the array,
64 * starting from the offset
65 *
66 * @throws NullPointerException
67 * if the array is NULL
68 * @throws IndexOutOfBoundsException
69 * if the offset and length do not correspond to the given array
70 */
71 public NextableInputStream(byte[] in, NextableInputStreamStep step,
72 int offset, int length) {
73 super(in, offset, length);
74 this.step = step;
75 checkBuffer(true);
76 }
77
78 /**
79 * Unblock the processing of the next sub-stream.
80 * <p>
81 * It can only be called when the "current" stream is spent (i.e., you must
82 * first process the stream until it is spent).
83 * <p>
84 * {@link IOException}s can happen when we have no data available in the
85 * buffer; in that case, we fetch more data to know if we can have a next
86 * sub-stream or not.
87 * <p>
88 * This is can be a blocking call when data need to be fetched.
89 *
90 * @return TRUE if we unblocked the next sub-stream, FALSE if not (i.e.,
91 * FALSE when there are no more sub-streams to fetch)
92 *
93 * @throws IOException
94 * in case of I/O error or if the stream is closed
95 */
96 public boolean next() throws IOException {
97 return next(false);
98 }
99
100 /**
101 * Unblock the next sub-stream as would have done
102 * {@link NextableInputStream#next()}, but disable the sub-stream systems.
103 * <p>
104 * That is, the next stream, if any, will be the last one and will not be
105 * subject to the {@link NextableInputStreamStep}.
106 * <p>
107 * This is can be a blocking call when data need to be fetched.
108 *
109 * @return TRUE if we unblocked the next sub-stream, FALSE if not
110 *
111 * @throws IOException
112 * in case of I/O error or if the stream is closed
113 */
114 public boolean nextAll() throws IOException {
115 return next(true);
116 }
117
118 /**
119 * Check if this stream is totally spent (no more data to read or to
120 * process).
121 * <p>
122 * Note: when the stream is divided into sub-streams, each sub-stream will
123 * report its own eof when spent.
124 *
125 * @return TRUE if it is
126 *
127 * @throws IOException
128 * in case of I/O error
129 */
130 @Override
131 public boolean eof() throws IOException {
132 return super.eof();
133 }
134
135 /**
136 * Check if we still have some data in the buffer and, if not, fetch some.
137 *
138 * @return TRUE if we fetched some data, FALSE if there are still some in
139 * the buffer
140 *
141 * @throws IOException
142 * in case of I/O error
143 */
144 @Override
145 protected boolean preRead() throws IOException {
146 if (!stopped) {
147 boolean bufferChanged = super.preRead();
148 checkBuffer(bufferChanged);
149 return bufferChanged;
150 }
151
152 if (start >= stop) {
153 eof = true;
154 }
155
156 return false;
157 }
158
159 @Override
160 protected boolean hasMoreData() {
161 return started && super.hasMoreData();
162 }
163
164 /**
165 * Check that the buffer didn't overshot to the next item, and fix
166 * {@link NextableInputStream#stop} if needed.
167 * <p>
168 * If {@link NextableInputStream#stop} is fixed,
169 * {@link NextableInputStream#eof} and {@link NextableInputStream#stopped}
170 * are set to TRUE.
171 *
172 * @param newBuffer
173 * we changed the buffer, we need to clear some information in
174 * the {@link NextableInputStreamStep}
175 */
176 private void checkBuffer(boolean newBuffer) {
177 if (step != null && stop >= 0) {
178 if (newBuffer) {
179 step.clearBuffer();
180 }
181
182 int stopAt = step.stop(buffer, start, stop, eof);
183 if (stopAt >= 0) {
184 stop = stopAt;
185 eof = true;
186 stopped = true;
187 }
188 }
189 }
190
191 /**
192 * The implementation of {@link NextableInputStream#next()} and
193 * {@link NextableInputStream#nextAll()}.
194 * <p>
195 * This is can be a blocking call when data need to be fetched.
196 *
197 * @param all
198 * TRUE for {@link NextableInputStream#nextAll()}, FALSE for
199 * {@link NextableInputStream#next()}
200 *
201 * @return TRUE if we unblocked the next sub-stream, FALSE if not (i.e.,
202 * FALSE when there are no more sub-streams to fetch)
203 *
204 * @throws IOException
205 * in case of I/O error or if the stream is closed
206 */
207 private boolean next(boolean all) throws IOException {
208 checkClose();
209
210 if (!started) {
211 // First call before being allowed to read
212 started = true;
213
214 if (all) {
215 step = null;
216 }
217
218 return true;
219 }
220
221 // If started, must be stopped and no more data to continue
222 // i.e., sub-stream must be spent
223 if (!stopped || hasMoreData()) {
224 return false;
225 }
226
227 if (step != null) {
228 stop = step.getResumeLen();
229 start += step.getResumeSkip();
230 eof = step.getResumeEof();
231 stopped = false;
232
233 if (all) {
234 step = null;
235 }
236
237 checkBuffer(false);
238
239 return true;
240 }
241
242 return false;
243
244 // // consider that if EOF, there is no next
245 // if (start >= stop) {
246 // // Make sure, block if necessary
247 // preRead();
248 //
249 // return hasMoreData();
250 // }
251 //
252 // return true;
253 }
254
255 /**
256 * Display a DEBUG {@link String} representation of this object.
257 * <p>
258 * Do <b>not</b> use for release code.
259 */
260 @Override
261 public String toString() {
262 String data = "";
263 if (stop > 0) {
264 try {
265 data = new String(Arrays.copyOfRange(buffer, 0, stop), "UTF-8");
266 } catch (UnsupportedEncodingException e) {
267 }
268 if (data.length() > 200) {
269 data = data.substring(0, 197) + "...";
270 }
271 }
272 String rep = String.format(
273 "Nextable %s: %d -> %d [eof: %s] [more data: %s]: %s",
274 (stopped ? "stopped" : "running"), start, stop, "" + eof, ""
275 + hasMoreData(), data);
276
277 return rep;
278 }
279 }