001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.io;
016    
017    import com.liferay.portal.kernel.util.StringPool;
018    
019    import java.io.IOException;
020    import java.io.Reader;
021    import java.io.Writer;
022    
023    import java.nio.CharBuffer;
024    
025    import java.util.concurrent.locks.Condition;
026    import java.util.concurrent.locks.Lock;
027    import java.util.concurrent.locks.ReentrantLock;
028    
029    /**
030     * @author Shuyang Zhou
031     */
032    public class CharPipe {
033    
034            public CharPipe() {
035                    this(_DEFAULT_BUFFER_SIZE);
036            }
037    
038            public CharPipe(int bufferSize) {
039                    buffer = new char[bufferSize];
040                    count = 0;
041                    readIndex = 0;
042                    writeIndex = 0;
043            }
044    
045            public void close() {
046                    close(false);
047            }
048    
049            public void close(boolean force) {
050                    _pipeWriter.close();
051    
052                    if (force) {
053                            _pipeReader.close();
054                            buffer = null;
055                    }
056                    else {
057                            bufferLock.lock();
058    
059                            finished = true;
060    
061                            try {
062                                    notEmpty.signalAll();
063                            }
064                            finally {
065                                    bufferLock.unlock();
066                            }
067                    }
068            }
069    
070            public Reader getReader() {
071                    return _pipeReader;
072            }
073    
074            public Writer getWriter() {
075                    return _pipeWriter;
076            }
077    
078            protected char[] buffer;
079            protected Lock bufferLock = new ReentrantLock();
080            protected int count;
081            protected boolean finished;
082            protected Condition notEmpty = bufferLock.newCondition();
083            protected Condition notFull = bufferLock.newCondition();
084            protected int readIndex;
085            protected int writeIndex;
086    
087            private static final int _DEFAULT_BUFFER_SIZE = 1024 * 8;
088    
089            private PipeReader _pipeReader = new PipeReader();
090            private PipeWriter _pipeWriter = new PipeWriter();
091    
092            private class PipeReader extends Reader {
093    
094                    @Override
095                    public void close() {
096                            bufferLock.lock();
097    
098                            try {
099                                    _closed = true;
100    
101                                    notEmpty.signalAll();
102                            }
103                            finally {
104                                    bufferLock.unlock();
105                            }
106                    }
107    
108                    @Override
109                    public void mark(int readAheadLimit) throws IOException {
110                            throw new IOException("Mark is not supported");
111                    }
112    
113                    @Override
114                    public boolean markSupported() {
115                            return false;
116                    }
117    
118                    @Override
119                    public int read() throws IOException {
120                            if (_closed) {
121                                    throw new IOException("Stream closed");
122                            }
123    
124                            bufferLock.lock();
125    
126                            try {
127                                    if (waitUntilNotEmpty()) {
128                                            return -1;
129                                    }
130    
131                                    char result = buffer[readIndex];
132    
133                                    increaseReadIndex(1);
134    
135                                    return result;
136                            }
137                            finally {
138                                    bufferLock.unlock();
139                            }
140                    }
141    
142                    @Override
143                    public int read(char[] chars) throws IOException {
144                            return read(chars, 0, chars.length);
145                    }
146    
147                    @Override
148                    public int read(char[] chars, int offset, int length)
149                            throws IOException {
150    
151                            if (_closed) {
152                                    throw new IOException("Stream closed");
153                            }
154    
155                            if (length <= 0) {
156                                    return 0;
157                            }
158    
159                            bufferLock.lock();
160    
161                            try {
162                                    if (waitUntilNotEmpty()) {
163                                            return -1;
164                                    }
165    
166                                    int read = length;
167    
168                                    if (length > count) {
169                                            read = count;
170                                    }
171    
172                                    if ((buffer.length - readIndex) >= read) {
173    
174                                            // One step read
175    
176                                            System.arraycopy(buffer, readIndex, chars, offset, read);
177                                    }
178                                    else {
179    
180                                            // Two step read
181    
182                                            int tailLength = buffer.length - readIndex;
183                                            int headLength = read - tailLength;
184    
185                                            System.arraycopy(
186                                                    buffer, readIndex, chars, offset, tailLength);
187                                            System.arraycopy(
188                                                    buffer, 0, chars, offset + tailLength, headLength);
189                                    }
190    
191                                    increaseReadIndex(read);
192    
193                                    return read;
194                            }
195                            finally {
196                                    bufferLock.unlock();
197                            }
198                    }
199    
200                    @Override
201                    public int read(CharBuffer charBuffer) throws IOException {
202                            if (_closed) {
203                                    throw new IOException("Stream closed");
204                            }
205    
206                            int length = charBuffer.remaining();
207    
208                            if (length <= 0) {
209                                    return 0;
210                            }
211    
212                            char[] tempBuffer = new char[length];
213    
214                            int read = read(tempBuffer, 0, length);
215    
216                            if (read > 0) {
217                                    charBuffer.put(tempBuffer, 0, read);
218                            }
219    
220                            return read;
221                    }
222    
223                    @Override
224                    public boolean ready() throws IOException {
225                            if (_closed) {
226                                    throw new IOException("Stream closed");
227                            }
228    
229                            bufferLock.lock();
230    
231                            try {
232                                    return count > 0;
233                            }
234                            finally {
235                                    bufferLock.unlock();
236                            }
237                    }
238    
239                    @Override
240                    public void reset() throws IOException {
241                            throw new IOException("Reset is not supported");
242                    }
243    
244                    @Override
245                    public long skip(long skip) throws IOException {
246                            if (skip < 0) {
247                                    throw new IllegalArgumentException("Skip value is negative");
248                            }
249    
250                            if (_closed) {
251                                    throw new IOException("Stream closed");
252                            }
253    
254                            int skipBufferSize = (int)Math.min(skip, _MAX_SKIP_BUFFER_SIZE);
255    
256                            bufferLock.lock();
257    
258                            try {
259                                    if ((_skipBuffer == null) ||
260                                            (_skipBuffer.length < skipBufferSize)) {
261    
262                                            _skipBuffer = new char[skipBufferSize];
263                                    }
264    
265                                    long remaining = skip;
266    
267                                    while (remaining > 0) {
268                                            int skipped = read(
269                                                    _skipBuffer, 0,
270                                                    (int)Math.min(remaining, skipBufferSize));
271    
272                                            if (skipped == -1) {
273                                                    break;
274                                            }
275    
276                                            remaining -= skipped;
277                                    }
278    
279                                    return skip - remaining;
280                            }
281                            finally {
282                                    bufferLock.unlock();
283                            }
284                    }
285    
286                    protected boolean waitUntilNotEmpty() throws IOException {
287                            while ((count == 0) && !finished) {
288                                    notEmpty.awaitUninterruptibly();
289    
290                                    if (_closed) {
291                                            throw new IOException("Stream closed");
292                                    }
293                            }
294    
295                            if ((count == 0) && finished) {
296                                    return true;
297                            }
298                            else {
299                                    return false;
300                            }
301                    }
302    
303                    private void increaseReadIndex(int consumed) {
304                            readIndex += consumed;
305    
306                            if (readIndex >= buffer.length) {
307                                    readIndex -= buffer.length;
308                            }
309    
310                            if (count == buffer.length) {
311                                    notFull.signalAll();
312                            }
313    
314                            count -= consumed;
315                    }
316    
317                    private static final int _MAX_SKIP_BUFFER_SIZE = 8192;
318    
319                    private volatile boolean _closed;
320                    private char[] _skipBuffer;
321    
322            }
323    
324            private class PipeWriter extends Writer {
325    
326                    @Override
327                    public Writer append(char c) throws IOException {
328                            write(c);
329    
330                            return this;
331                    }
332    
333                    @Override
334                    public Writer append(CharSequence charSequence) throws IOException {
335                            String string = null;
336    
337                            if (charSequence == null) {
338                                    string = StringPool.NULL;
339                            }
340                            else {
341                                    string = charSequence.toString();
342                            }
343    
344                            write(string, 0, string.length());
345    
346                            return this;
347                    }
348    
349                    @Override
350                    public Writer append(CharSequence charSequence, int start, int end)
351                            throws IOException {
352    
353                            String string = null;
354    
355                            if (charSequence == null) {
356                                    string = StringPool.NULL;
357                            }
358                            else {
359                                    string = charSequence.subSequence(start, end).toString();
360                            }
361    
362                            write(string, 0, string.length());
363    
364                            return this;
365                    }
366    
367                    @Override
368                    public void close() {
369                            bufferLock.lock();
370    
371                            try {
372                                    _closed = true;
373    
374                                    notFull.signalAll();
375                            }
376                            finally {
377                                    bufferLock.unlock();
378                            }
379                    }
380    
381                    @Override
382                    public void flush() {
383                    }
384    
385                    @Override
386                    public void write(char[] chars) throws IOException {
387                            write(chars, 0, chars.length);
388                    }
389    
390                    @Override
391                    public void write(char[] chars, int offset, int length)
392                            throws IOException {
393    
394                            if (_closed) {
395                                    throw new IOException("Stream closed");
396                            }
397    
398                            if (length <= 0) {
399                                    return;
400                            }
401    
402                            bufferLock.lock();
403    
404                            try {
405                                    int remaining = length;
406    
407                                    while (remaining > 0) {
408                                            waitUntilNotFull();
409    
410                                            int write = remaining;
411    
412                                            if (remaining > (buffer.length - count)) {
413                                                    write = buffer.length - count;
414                                            }
415    
416                                            int sourceBegin = offset + length - remaining;
417    
418                                            if ((buffer.length - writeIndex) >= write) {
419    
420                                                    // One step write
421    
422                                                    System.arraycopy(
423                                                            chars, sourceBegin, buffer, writeIndex, write);
424                                            }
425                                            else {
426    
427                                                    // Two step write
428    
429                                                    int tailLength = buffer.length - writeIndex;
430                                                    int headLength = write - tailLength;
431    
432                                                    System.arraycopy(
433                                                            chars, sourceBegin, buffer, writeIndex, tailLength);
434                                                    System.arraycopy(
435                                                            chars, sourceBegin + tailLength, buffer, 0,
436                                                            headLength);
437                                            }
438    
439                                            increaseWriteIndex(write);
440    
441                                            remaining -= write;
442                                    }
443                            }
444                            finally {
445                                    bufferLock.unlock();
446                            }
447                    }
448    
449                    @Override
450                    public void write(int c) throws IOException {
451                            if (_closed) {
452                                    throw new IOException("Stream closed");
453                            }
454    
455                            bufferLock.lock();
456    
457                            try {
458                                    waitUntilNotFull();
459    
460                                    buffer[writeIndex] = (char)c;
461    
462                                    increaseWriteIndex(1);
463                            }
464                            finally {
465                                    bufferLock.unlock();
466                            }
467                    }
468    
469                    @Override
470                    public void write(String string) throws IOException {
471                            write(string, 0, string.length());
472                    }
473    
474                    @Override
475                    public void write(String string, int offset, int length)
476                            throws IOException {
477    
478                            if (_closed) {
479                                    throw new IOException("Stream closed");
480                            }
481    
482                            if (length <= 0) {
483                                    return;
484                            }
485    
486                            bufferLock.lock();
487    
488                            try {
489                                    int remaining = length;
490    
491                                    while (remaining > 0) {
492                                            waitUntilNotFull();
493    
494                                            int write = remaining;
495    
496                                            if (remaining > (buffer.length - count)) {
497                                                    write = buffer.length - count;
498                                            }
499    
500                                            int sourceBegin = offset + length - remaining;
501    
502                                            if ((buffer.length - writeIndex) >= write) {
503    
504                                                    // One step write
505    
506                                                    string.getChars(
507                                                            sourceBegin, sourceBegin + write, buffer,
508                                                            writeIndex);
509                                            }
510                                            else {
511    
512                                                    // Two step write
513    
514                                                    int tailLength = buffer.length - writeIndex;
515                                                    int headLength = write - tailLength;
516    
517                                                    string.getChars(
518                                                            sourceBegin, sourceBegin + tailLength, buffer,
519                                                            writeIndex);
520                                                    string.getChars(
521                                                            sourceBegin + tailLength,
522                                                            sourceBegin + tailLength + headLength, buffer, 0);
523                                            }
524    
525                                            increaseWriteIndex(write);
526    
527                                            remaining -= write;
528                                    }
529                            }
530                            finally {
531                                    bufferLock.unlock();
532                            }
533                    }
534    
535                    protected void waitUntilNotFull() throws IOException {
536                            while (count == buffer.length) {
537                                    notFull.awaitUninterruptibly();
538    
539                                    if (_closed) {
540                                            throw new IOException("Stream closed");
541                                    }
542                            }
543                    }
544    
545                    private void increaseWriteIndex(int produced) {
546                            writeIndex += produced;
547    
548                            if (writeIndex >= buffer.length) {
549                                    writeIndex -= buffer.length;
550                            }
551    
552                            if (count == 0) {
553                                    notEmpty.signalAll();
554                            }
555    
556                            count += produced;
557                    }
558    
559                    private volatile boolean _closed;
560    
561            }
562    
563    }