001
014
015 package com.liferay.portal.kernel.io;
016
017 import com.liferay.portal.kernel.util.StringPool;
018
019 import java.io.IOException;
020 import java.io.Reader;
021 import java.io.Writer;
022
023 import java.nio.CharBuffer;
024
025 import java.util.concurrent.locks.Condition;
026 import java.util.concurrent.locks.Lock;
027 import java.util.concurrent.locks.ReentrantLock;
028
029
032 public class CharPipe {
033
034 public CharPipe() {
035 this(_DEFAULT_BUFFER_SIZE);
036 }
037
038 public CharPipe(int bufferSize) {
039 buffer = new char[bufferSize];
040 count = 0;
041 readIndex = 0;
042 writeIndex = 0;
043 }
044
045 public void close() {
046 close(false);
047 }
048
049 public void close(boolean force) {
050 _pipeWriter.close();
051
052 if (force) {
053 _pipeReader.close();
054 buffer = null;
055 }
056 else {
057 bufferLock.lock();
058
059 finished = true;
060
061 try {
062 notEmpty.signalAll();
063 }
064 finally {
065 bufferLock.unlock();
066 }
067 }
068 }
069
070 public Reader getReader() {
071 return _pipeReader;
072 }
073
074 public Writer getWriter() {
075 return _pipeWriter;
076 }
077
078 protected char[] buffer;
079 protected Lock bufferLock = new ReentrantLock();
080 protected int count;
081 protected boolean finished;
082 protected Condition notEmpty = bufferLock.newCondition();
083 protected Condition notFull = bufferLock.newCondition();
084 protected int readIndex;
085 protected int writeIndex;
086
087 private static final int _DEFAULT_BUFFER_SIZE = 1024 * 8;
088
089 private PipeReader _pipeReader = new PipeReader();
090 private PipeWriter _pipeWriter = new PipeWriter();
091
092 private class PipeReader extends Reader {
093
094 @Override
095 public void close() {
096 bufferLock.lock();
097
098 try {
099 _closed = true;
100
101 notEmpty.signalAll();
102 }
103 finally {
104 bufferLock.unlock();
105 }
106 }
107
108 @Override
109 public void mark(int readAheadLimit) throws IOException {
110 throw new IOException("Mark is not supported");
111 }
112
113 @Override
114 public boolean markSupported() {
115 return false;
116 }
117
118 @Override
119 public int read() throws IOException {
120 if (_closed) {
121 throw new IOException("Stream closed");
122 }
123
124 bufferLock.lock();
125
126 try {
127 if (waitUntilNotEmpty()) {
128 return -1;
129 }
130
131 char result = buffer[readIndex];
132
133 increaseReadIndex(1);
134
135 return result;
136 }
137 finally {
138 bufferLock.unlock();
139 }
140 }
141
142 @Override
143 public int read(char[] chars) throws IOException {
144 return read(chars, 0, chars.length);
145 }
146
147 @Override
148 public int read(char[] chars, int offset, int length)
149 throws IOException {
150
151 if (_closed) {
152 throw new IOException("Stream closed");
153 }
154
155 if (length <= 0) {
156 return 0;
157 }
158
159 bufferLock.lock();
160
161 try {
162 if (waitUntilNotEmpty()) {
163 return -1;
164 }
165
166 int read = length;
167
168 if (length > count) {
169 read = count;
170 }
171
172 if ((buffer.length - readIndex) >= read) {
173
174
175
176 System.arraycopy(buffer, readIndex, chars, offset, read);
177 }
178 else {
179
180
181
182 int tailLength = buffer.length - readIndex;
183 int headLength = read - tailLength;
184
185 System.arraycopy(
186 buffer, readIndex, chars, offset, tailLength);
187 System.arraycopy(
188 buffer, 0, chars, offset + tailLength, headLength);
189 }
190
191 increaseReadIndex(read);
192
193 return read;
194 }
195 finally {
196 bufferLock.unlock();
197 }
198 }
199
200 @Override
201 public int read(CharBuffer charBuffer) throws IOException {
202 if (_closed) {
203 throw new IOException("Stream closed");
204 }
205
206 int length = charBuffer.remaining();
207
208 if (length <= 0) {
209 return 0;
210 }
211
212 char[] tempBuffer = new char[length];
213
214 int read = read(tempBuffer, 0, length);
215
216 if (read > 0) {
217 charBuffer.put(tempBuffer, 0, read);
218 }
219
220 return read;
221 }
222
223 @Override
224 public boolean ready() throws IOException {
225 if (_closed) {
226 throw new IOException("Stream closed");
227 }
228
229 bufferLock.lock();
230
231 try {
232 return count > 0;
233 }
234 finally {
235 bufferLock.unlock();
236 }
237 }
238
239 @Override
240 public void reset() throws IOException {
241 throw new IOException("Reset is not supported");
242 }
243
244 @Override
245 public long skip(long skip) throws IOException {
246 if (skip < 0) {
247 throw new IllegalArgumentException("Skip value is negative");
248 }
249
250 if (_closed) {
251 throw new IOException("Stream closed");
252 }
253
254 int skipBufferSize = (int)Math.min(skip, _MAX_SKIP_BUFFER_SIZE);
255
256 bufferLock.lock();
257
258 try {
259 if ((_skipBuffer == null) ||
260 (_skipBuffer.length < skipBufferSize)) {
261
262 _skipBuffer = new char[skipBufferSize];
263 }
264
265 long remaining = skip;
266
267 while (remaining > 0) {
268 int skipped = read(
269 _skipBuffer, 0,
270 (int)Math.min(remaining, skipBufferSize));
271
272 if (skipped == -1) {
273 break;
274 }
275
276 remaining -= skipped;
277 }
278
279 return skip - remaining;
280 }
281 finally {
282 bufferLock.unlock();
283 }
284 }
285
286 protected boolean waitUntilNotEmpty() throws IOException {
287 while ((count == 0) && !finished) {
288 notEmpty.awaitUninterruptibly();
289
290 if (_closed) {
291 throw new IOException("Stream closed");
292 }
293 }
294
295 if ((count == 0) && finished) {
296 return true;
297 }
298 else {
299 return false;
300 }
301 }
302
303 private void increaseReadIndex(int consumed) {
304 readIndex += consumed;
305
306 if (readIndex >= buffer.length) {
307 readIndex -= buffer.length;
308 }
309
310 if (count == buffer.length) {
311 notFull.signalAll();
312 }
313
314 count -= consumed;
315 }
316
317 private static final int _MAX_SKIP_BUFFER_SIZE = 8192;
318
319 private volatile boolean _closed;
320 private char[] _skipBuffer;
321
322 }
323
324 private class PipeWriter extends Writer {
325
326 @Override
327 public Writer append(char c) throws IOException {
328 write(c);
329
330 return this;
331 }
332
333 @Override
334 public Writer append(CharSequence charSequence) throws IOException {
335 String string = null;
336
337 if (charSequence == null) {
338 string = StringPool.NULL;
339 }
340 else {
341 string = charSequence.toString();
342 }
343
344 write(string, 0, string.length());
345
346 return this;
347 }
348
349 @Override
350 public Writer append(CharSequence charSequence, int start, int end)
351 throws IOException {
352
353 String string = null;
354
355 if (charSequence == null) {
356 string = StringPool.NULL;
357 }
358 else {
359 string = charSequence.subSequence(start, end).toString();
360 }
361
362 write(string, 0, string.length());
363
364 return this;
365 }
366
367 @Override
368 public void close() {
369 bufferLock.lock();
370
371 try {
372 _closed = true;
373
374 notFull.signalAll();
375 }
376 finally {
377 bufferLock.unlock();
378 }
379 }
380
381 @Override
382 public void flush() {
383 }
384
385 @Override
386 public void write(char[] chars) throws IOException {
387 write(chars, 0, chars.length);
388 }
389
390 @Override
391 public void write(char[] chars, int offset, int length)
392 throws IOException {
393
394 if (_closed) {
395 throw new IOException("Stream closed");
396 }
397
398 if (length <= 0) {
399 return;
400 }
401
402 bufferLock.lock();
403
404 try {
405 int remaining = length;
406
407 while (remaining > 0) {
408 waitUntilNotFull();
409
410 int write = remaining;
411
412 if (remaining > (buffer.length - count)) {
413 write = buffer.length - count;
414 }
415
416 int sourceBegin = offset + length - remaining;
417
418 if ((buffer.length - writeIndex) >= write) {
419
420
421
422 System.arraycopy(
423 chars, sourceBegin, buffer, writeIndex, write);
424 }
425 else {
426
427
428
429 int tailLength = buffer.length - writeIndex;
430 int headLength = write - tailLength;
431
432 System.arraycopy(
433 chars, sourceBegin, buffer, writeIndex, tailLength);
434 System.arraycopy(
435 chars, sourceBegin + tailLength, buffer, 0,
436 headLength);
437 }
438
439 increaseWriteIndex(write);
440
441 remaining -= write;
442 }
443 }
444 finally {
445 bufferLock.unlock();
446 }
447 }
448
449 @Override
450 public void write(int c) throws IOException {
451 if (_closed) {
452 throw new IOException("Stream closed");
453 }
454
455 bufferLock.lock();
456
457 try {
458 waitUntilNotFull();
459
460 buffer[writeIndex] = (char)c;
461
462 increaseWriteIndex(1);
463 }
464 finally {
465 bufferLock.unlock();
466 }
467 }
468
469 @Override
470 public void write(String string) throws IOException {
471 write(string, 0, string.length());
472 }
473
474 @Override
475 public void write(String string, int offset, int length)
476 throws IOException {
477
478 if (_closed) {
479 throw new IOException("Stream closed");
480 }
481
482 if (length <= 0) {
483 return;
484 }
485
486 bufferLock.lock();
487
488 try {
489 int remaining = length;
490
491 while (remaining > 0) {
492 waitUntilNotFull();
493
494 int write = remaining;
495
496 if (remaining > (buffer.length - count)) {
497 write = buffer.length - count;
498 }
499
500 int sourceBegin = offset + length - remaining;
501
502 if ((buffer.length - writeIndex) >= write) {
503
504
505
506 string.getChars(
507 sourceBegin, sourceBegin + write, buffer,
508 writeIndex);
509 }
510 else {
511
512
513
514 int tailLength = buffer.length - writeIndex;
515 int headLength = write - tailLength;
516
517 string.getChars(
518 sourceBegin, sourceBegin + tailLength, buffer,
519 writeIndex);
520 string.getChars(
521 sourceBegin + tailLength,
522 sourceBegin + tailLength + headLength, buffer, 0);
523 }
524
525 increaseWriteIndex(write);
526
527 remaining -= write;
528 }
529 }
530 finally {
531 bufferLock.unlock();
532 }
533 }
534
535 protected void waitUntilNotFull() throws IOException {
536 while (count == buffer.length) {
537 notFull.awaitUninterruptibly();
538
539 if (_closed) {
540 throw new IOException("Stream closed");
541 }
542 }
543 }
544
545 private void increaseWriteIndex(int produced) {
546 writeIndex += produced;
547
548 if (writeIndex >= buffer.length) {
549 writeIndex -= buffer.length;
550 }
551
552 if (count == 0) {
553 notEmpty.signalAll();
554 }
555
556 count += produced;
557 }
558
559 private volatile boolean _closed;
560
561 }
562
563 }