001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.Collection;
018    import java.util.concurrent.TimeUnit;
019    import java.util.concurrent.atomic.AtomicInteger;
020    import java.util.concurrent.locks.Condition;
021    import java.util.concurrent.locks.ReentrantLock;
022    
023    /**
024     * @author Shuyang Zhou
025     */
026    public class TaskQueue<E> {
027    
028            public TaskQueue() {
029                    this(Integer.MAX_VALUE);
030            }
031    
032            public TaskQueue(int capacity) {
033                    if (capacity <= 0) {
034                            throw new IllegalArgumentException();
035                    }
036    
037                    _capacity = capacity;
038    
039                    _headNode = new Node<E>(null);
040                    _tailNode = _headNode;
041    
042                    _notEmptyCondition = _takeLock.newCondition();
043            }
044    
045            public int drainTo(Collection<E> collection) {
046                    if (collection == null) {
047                            throw new NullPointerException();
048                    }
049    
050                    _takeLock.lock();
051    
052                    try {
053                            Node<E> headNode = _headNode;
054    
055                            int size = _count.get();
056    
057                            int count = 0;
058    
059                            try {
060                                    while (count < size) {
061                                            Node<E> currentNode = headNode._nextNode;
062    
063                                            collection.add(currentNode._element);
064    
065                                            currentNode._element = null;
066    
067                                            headNode._nextNode = null;
068    
069                                            headNode = currentNode;
070    
071                                            count++;
072                                    }
073    
074                                    return count;
075                            }
076                            finally {
077                                    if (count > 0) {
078                                            _headNode = headNode;
079    
080                                            _count.getAndAdd(-count);
081                                    }
082                            }
083                    }
084                    finally {
085                            _takeLock.unlock();
086                    }
087            }
088    
089            public boolean isEmpty() {
090                    if (_count.get() == 0) {
091                            return true;
092                    }
093                    else {
094                            return false;
095                    }
096            }
097    
098            public boolean offer(E element, boolean[] hasWaiterMarker) {
099                    if ((element == null) || (hasWaiterMarker == null)) {
100                            throw new NullPointerException();
101                    }
102    
103                    if (hasWaiterMarker.length == 0) {
104                            throw new IllegalArgumentException();
105                    }
106    
107                    if (_count.get() == _capacity) {
108                            return false;
109                    }
110    
111                    int count = -1;
112    
113                    _putLock.lock();
114    
115                    try {
116                            if (_count.get() < _capacity) {
117                                    _enqueue(element);
118    
119                                    count = _count.getAndIncrement();
120    
121                                    _takeLock.lock();
122    
123                                    try {
124                                            hasWaiterMarker[0] = _takeLock.hasWaiters(
125                                                    _notEmptyCondition);
126    
127                                            if (!hasWaiterMarker[0] && (count >= _count.get())) {
128                                                    hasWaiterMarker[0] = true;
129                                            }
130                                    }
131                                    finally {
132                                            _takeLock.unlock();
133                                    }
134                            }
135                    }
136                    finally {
137                            _putLock.unlock();
138                    }
139    
140                    if (count == 0) {
141                            _takeLock.lock();
142    
143                            try {
144                                    _notEmptyCondition.signal();
145                            }
146                            finally {
147                                    _takeLock.unlock();
148                            }
149                    }
150    
151                    return count >= 0;
152            }
153    
154            public E poll() {
155                    if (_count.get() == 0) {
156                            return null;
157                    }
158    
159                    E element = null;
160    
161                    _takeLock.lock();
162    
163                    try {
164                            if (_count.get() > 0) {
165                                    element = _dequeue();
166    
167                                    if (_count.getAndDecrement() > 1) {
168                                            _notEmptyCondition.signal();
169                                    }
170                            }
171                    }
172                    finally {
173                            _takeLock.unlock();
174                    }
175    
176                    return element;
177            }
178    
179            public E poll(long timeout, TimeUnit timeUnit) throws InterruptedException {
180                    E element = null;
181    
182                    long nanos = timeUnit.toNanos(timeout);
183    
184                    _takeLock.lockInterruptibly();
185    
186                    try {
187                            while (_count.get() == 0) {
188                                    if (nanos <= 0) {
189                                            return null;
190                                    }
191    
192                                    nanos = _notEmptyCondition.awaitNanos(nanos);
193                            }
194    
195                            element = _dequeue();
196    
197                            if (_count.getAndDecrement() > 1) {
198                                    _notEmptyCondition.signal();
199                            }
200                    }
201                    finally {
202                            _takeLock.unlock();
203                    }
204    
205                    return element;
206            }
207    
208            public int remainingCapacity() {
209                    return _capacity - _count.get();
210            }
211    
212            public boolean remove(E element) {
213                    if (element == null) {
214                            return false;
215                    }
216    
217                    _fullyLock();
218    
219                    try {
220                            Node<E> previousNode = _headNode;
221                            Node<E> currentNode = previousNode._nextNode;
222    
223                            while (currentNode != null) {
224                                    if (element.equals(currentNode._element)) {
225                                            _unlink(currentNode, previousNode);
226    
227                                            return true;
228                                    }
229    
230                                    previousNode = currentNode;
231                                    currentNode = currentNode._nextNode;
232                            }
233    
234                            return false;
235                    }
236                    finally {
237                            _fullyUnlock();
238                    }
239            }
240    
241            public int size() {
242                    return _count.get();
243            }
244    
245            public E take() throws InterruptedException {
246                    E element = null;
247    
248                    _takeLock.lockInterruptibly();
249    
250                    try {
251                            while (_count.get() == 0) {
252                                    _notEmptyCondition.await();
253                            }
254    
255                            element = _dequeue();
256    
257                            if (_count.getAndDecrement() > 1) {
258                                    _notEmptyCondition.signal();
259                            }
260                    }
261                    finally {
262                            _takeLock.unlock();
263                    }
264    
265                    return element;
266            }
267    
268            protected ReentrantLock getPutLock() {
269                    return _putLock;
270            }
271    
272            protected ReentrantLock getTakeLock() {
273                    return _takeLock;
274            }
275    
276            private E _dequeue() {
277                    Node<E> headNode = _headNode;
278                    Node<E> firstNode = headNode._nextNode;
279    
280                    headNode._nextNode = null;
281    
282                    _headNode = firstNode;
283    
284                    E element = firstNode._element;
285    
286                    firstNode._element = null;
287    
288                    return element;
289            }
290    
291            private void _enqueue(E element) {
292                    _tailNode._nextNode = new Node<E>(element);
293    
294                    _tailNode = _tailNode._nextNode;
295            }
296    
297            private void _fullyLock() {
298                    _putLock.lock();
299                    _takeLock.lock();
300            }
301    
302            private void _fullyUnlock() {
303                    _takeLock.unlock();
304                    _putLock.unlock();
305            }
306    
307            private void _unlink(Node<E> currentNode, Node<E> previousNode) {
308                    currentNode._element = null;
309                    previousNode._nextNode = currentNode._nextNode;
310    
311                    if (_tailNode == currentNode) {
312                            _tailNode = previousNode;
313                    }
314    
315                    _count.getAndDecrement();
316            }
317    
318            private final int _capacity;
319            private final AtomicInteger _count = new AtomicInteger();
320            private Node<E> _headNode;
321            private final Condition _notEmptyCondition;
322            private final ReentrantLock _putLock = new ReentrantLock();
323            private Node<E> _tailNode;
324            private final ReentrantLock _takeLock = new ReentrantLock();
325    
326            private static class Node<E> {
327    
328                    private Node(E element) {
329                            _element = element;
330                    }
331    
332                    private E _element;
333                    private Node<E> _nextNode;
334    
335            }
336    
337    }