001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.Comparator;
018    import java.util.concurrent.atomic.AtomicInteger;
019    import java.util.concurrent.atomic.AtomicLong;
020    import java.util.concurrent.locks.Condition;
021    import java.util.concurrent.locks.ReentrantLock;
022    
023    /**
024     * @author Shuyang Zhou
025     */
026    public class CoalescedPipe<E> {
027    
028            public CoalescedPipe() {
029                    this(null);
030            }
031    
032            public CoalescedPipe(Comparator<E> comparator) {
033                    _comparator = comparator;
034                    _notEmptyCondition = _takeLock.newCondition();
035    
036                    _headElementLink = new ElementLink<E>(null);
037                    _lastElementLink = _headElementLink;
038            }
039    
040            public long coalescedCount() {
041                    return _coalescedCount.get();
042            }
043    
044            public int pendingCount() {
045                    return _pendingCount.get();
046            }
047    
048            public void put(E e) throws InterruptedException {
049                    if (e == null) {
050                            throw new NullPointerException();
051                    }
052    
053                    int pendingElements = -1;
054    
055                    _putLock.lockInterruptibly();
056    
057                    try {
058                            if (_coalesceElement(e)) {
059                                    return;
060                            }
061    
062                            _lastElementLink._nextElementLink = new ElementLink<E>(e);
063    
064                            _lastElementLink = _lastElementLink._nextElementLink;
065    
066                            pendingElements = _pendingCount.getAndIncrement();
067                    }
068                    finally {
069                            _putLock.unlock();
070                    }
071    
072                    if (pendingElements == 0) {
073                            _takeLock.lock();
074    
075                            try {
076                                    _notEmptyCondition.signal();
077                            }
078                            finally {
079                                    _takeLock.unlock();
080                            }
081                    }
082            }
083    
084            public E take() throws InterruptedException {
085                    E element = null;
086    
087                    _takeLock.lockInterruptibly();
088    
089                    try {
090                            while (_pendingCount.get() == 0) {
091                                    _notEmptyCondition.await();
092                            }
093    
094                            ElementLink<E> garbageELementLink = _headElementLink;
095    
096                            _headElementLink = _headElementLink._nextElementLink;
097    
098                            garbageELementLink._nextElementLink = null;
099    
100                            element = _headElementLink._element;
101    
102                            _headElementLink._element = null;
103    
104                            int pendingElements = _pendingCount.getAndDecrement();
105    
106                            if (pendingElements > 1) {
107                                    _notEmptyCondition.signal();
108                            }
109                    }
110                    finally {
111                            _takeLock.unlock();
112                    }
113    
114                    return element;
115            }
116    
117            public Object[] takeSnapshot() {
118                    _putLock.lock();
119                    _takeLock.lock();
120    
121                    try {
122                            Object[] pendingElements = new Object[_pendingCount.get()];
123    
124                            ElementLink<E> currentElementLink =
125                                    _headElementLink._nextElementLink;
126    
127                            for (int i = 0; currentElementLink != null; i++) {
128                                    pendingElements[i] = currentElementLink._element;
129    
130                                    currentElementLink = currentElementLink._nextElementLink;
131                            }
132    
133                            return pendingElements;
134                    }
135                    finally {
136                            _putLock.unlock();
137                            _takeLock.unlock();
138                    }
139            }
140    
141            private boolean _coalesceElement(E e) {
142                    try {
143                            _takeLock.lockInterruptibly();
144    
145                            try {
146                                    ElementLink<E> currentElementLink =
147                                            _headElementLink._nextElementLink;
148    
149                                    if (_comparator != null) {
150                                            while (currentElementLink != null) {
151                                                    if (_comparator.compare(
152                                                                    currentElementLink._element, e) == 0) {
153    
154                                                            _coalescedCount.incrementAndGet();
155    
156                                                            return true;
157                                                    }
158    
159                                                    currentElementLink =
160                                                            currentElementLink._nextElementLink;
161                                            }
162                                    }
163                                    else {
164                                            while (currentElementLink != null) {
165                                                    if (currentElementLink._element.equals(e)) {
166                                                            _coalescedCount.incrementAndGet();
167    
168                                                            return true;
169                                                    }
170    
171                                                    currentElementLink =
172                                                            currentElementLink._nextElementLink;
173                                            }
174                                    }
175                            }
176                            finally {
177                                    _takeLock.unlock();
178                            }
179                    }
180                    catch (InterruptedException ie) {
181    
182                            // Continue to let the current element enter the pipe
183    
184                    }
185    
186                    return false;
187            }
188    
189            private final AtomicLong _coalescedCount = new AtomicLong(0);
190            private final Comparator<E> _comparator;
191            private ElementLink<E> _headElementLink;
192            private ElementLink<E> _lastElementLink;
193            private final Condition _notEmptyCondition;
194            private final AtomicInteger _pendingCount = new AtomicInteger(0);
195            private final ReentrantLock _putLock = new ReentrantLock();
196            private final ReentrantLock _takeLock = new ReentrantLock();
197    
198            private static class ElementLink<E> {
199    
200                    private ElementLink(E element) {
201                            _element = element;
202                    }
203    
204                    private E _element;
205                    private ElementLink<E> _nextElementLink;
206    
207            }
208    
209    }