001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.io.unsync;
016    
017    import java.io.IOException;
018    import java.io.InputStream;
019    
020    /**
021     * <p>
022     * See http://issues.liferay.com/browse/LPS-6648.
023     * </p>
024     *
025     * @author Shuyang Zhou
026     */
027    public class UnsyncBufferedInputStream extends UnsyncFilterInputStream {
028    
029            public UnsyncBufferedInputStream(InputStream inputStream) {
030                    this(inputStream, _DEFAULT_BUFFER_SIZE);
031            }
032    
033            public UnsyncBufferedInputStream(InputStream inputStream, int size) {
034                    super(inputStream);
035    
036                    if (size <= 0) {
037                            throw new IllegalArgumentException("Size is less than 0");
038                    }
039    
040                    buffer = new byte[size];
041            }
042    
043            @Override
044            public int available() throws IOException {
045                    if (inputStream == null) {
046                            throw new IOException("Input stream is null");
047                    }
048    
049                    return inputStream.available() + (firstInvalidIndex - index);
050            }
051    
052            @Override
053            public void close() throws IOException {
054                    if (inputStream != null) {
055                            inputStream.close();
056    
057                            inputStream = null;
058                            buffer = null;
059                    }
060            }
061    
062            @Override
063            public void mark(int readLimit) {
064                    if (readLimit <= 0) {
065                            return;
066                    }
067    
068                    markLimitIndex = readLimit;
069    
070                    if (index == 0) {
071                            return;
072                    }
073    
074                    int available = firstInvalidIndex - index;
075    
076                    if (available > 0) {
077    
078                            // Shuffle mark beginning to buffer beginning
079    
080                            System.arraycopy(buffer, index, buffer, 0, available);
081    
082                            index = 0;
083    
084                            firstInvalidIndex = available;
085                    }
086                    else {
087    
088                            // Reset buffer states
089    
090                            index = firstInvalidIndex = 0;
091                    }
092            }
093    
094            @Override
095            public boolean markSupported() {
096                    return true;
097            }
098    
099            @Override
100            public int read() throws IOException {
101                    if (inputStream == null) {
102                            throw new IOException("Input stream is null");
103                    }
104    
105                    if (index >= firstInvalidIndex) {
106                            fillInBuffer();
107    
108                            if (index >= firstInvalidIndex) {
109                                    return -1;
110                            }
111                    }
112    
113                    return buffer[index++] & 0xff;
114            }
115    
116            @Override
117            public int read(byte[] bytes) throws IOException {
118                    return read(bytes, 0, bytes.length);
119            }
120    
121            @Override
122            public int read(byte[] bytes, int offset, int length) throws IOException {
123                    if (inputStream == null) {
124                            throw new IOException("Input stream is null");
125                    }
126    
127                    if (length <= 0) {
128                            return 0;
129                    }
130    
131                    int read = 0;
132    
133                    while (true) {
134    
135                            // Try to at least read some data
136    
137                            int currentRead = readOnce(bytes, offset + read, length - read);
138    
139                            if (currentRead <= 0) {
140                                    if (read == 0) {
141                                            read = currentRead;
142                                    }
143    
144                                    break;
145                            }
146    
147                            read += currentRead;
148    
149                            if ((read >= length) || (inputStream.available() <= 0)) {
150    
151                                    // Read enough or further reading may be blocked, stop reading
152    
153                                    break;
154                            }
155                    }
156    
157                    return read;
158            }
159    
160            @Override
161            public void reset() throws IOException {
162                    if (inputStream == null) {
163                            throw new IOException("Input stream is null");
164                    }
165    
166                    if (markLimitIndex < 0) {
167                            throw new IOException("Resetting to invalid mark");
168                    }
169    
170                    index = 0;
171            }
172    
173            @Override
174            public long skip(long skip) throws IOException {
175                    if (inputStream == null) {
176                            throw new IOException("Input stream is null");
177                    }
178    
179                    if (skip <= 0) {
180                            return 0;
181                    }
182    
183                    long available = firstInvalidIndex - index;
184    
185                    if (available <= 0) {
186                            if (markLimitIndex < 0) {
187    
188                                    // No mark required, skip the underlying input stream
189    
190                                    return inputStream.skip(skip);
191                            }
192                            else {
193    
194                                    // Mark required, save the skipped data
195    
196                                    fillInBuffer();
197    
198                                    available = firstInvalidIndex - index;
199    
200                                    if (available <= 0) {
201                                            return 0;
202                                    }
203                            }
204                    }
205    
206                    // Skip the data in buffer
207    
208                    if (available < skip) {
209                            skip = available;
210                    }
211    
212                    index += skip;
213    
214                    return skip;
215            }
216    
217            protected void fillInBuffer() throws IOException {
218                    if (markLimitIndex < 0) {
219    
220                            // No mark required, fill the buffer
221    
222                            index = firstInvalidIndex = 0;
223    
224                            int number = inputStream.read(buffer);
225    
226                            if (number > 0) {
227                                    firstInvalidIndex = number;
228                            }
229    
230                            return;
231                    }
232    
233                    // Mark required
234    
235                    if (index >= markLimitIndex) {
236    
237                            // Passed mark limit indexs, get rid of all cache data
238    
239                            markLimitIndex = -1;
240    
241                            index = firstInvalidIndex = 0;
242                    }
243                    else if (index == buffer.length) {
244    
245                            // Cannot get rid of cache data and there is no room to read in any
246                            // more data, so grow the buffer
247    
248                            int newBufferSize = buffer.length * 2;
249    
250                            if (newBufferSize > markLimitIndex) {
251                                    newBufferSize = markLimitIndex;
252                            }
253    
254                            byte[] newBuffer = new byte[newBufferSize];
255    
256                            System.arraycopy(buffer, 0, newBuffer, 0, buffer.length);
257    
258                            buffer = newBuffer;
259                    }
260    
261                    // Read underlying input stream since the buffer has more space
262    
263                    firstInvalidIndex = index;
264    
265                    int number = inputStream.read(buffer, index, buffer.length - index);
266    
267                    if (number > 0) {
268                            firstInvalidIndex += number;
269                    }
270            }
271    
272            protected int readOnce(byte[] bytes, int offset, int length)
273                    throws IOException {
274    
275                    int available = firstInvalidIndex - index;
276    
277                    if (available <= 0) {
278    
279                            // Buffer is empty, read from under input stream
280    
281                            if ((markLimitIndex < 0) && (length >= buffer.length)) {
282    
283                                    // No mark required, left read block is no less than buffer,
284                                    // read through buffer is inefficient, so directly read from
285                                    // underlying input stream
286    
287                                    return inputStream.read(bytes, offset, length);
288                            }
289                            else {
290    
291                                    // Mark is required, has to read through the buffer to remember
292                                    // data
293    
294                                    fillInBuffer();
295    
296                                    available = firstInvalidIndex - index;
297    
298                                    if (available <= 0) {
299                                            return -1;
300                                    }
301                            }
302                    }
303    
304                    if (length > available) {
305                            length = available;
306                    }
307    
308                    System.arraycopy(buffer, index, bytes, offset, length);
309    
310                    index += length;
311    
312                    return length;
313            }
314    
315            protected byte[] buffer;
316            protected int firstInvalidIndex;
317            protected int index;
318            protected int markLimitIndex = -1;
319    
320            private static final int _DEFAULT_BUFFER_SIZE = 8192;
321    
322    }