001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.Collection;
018 import java.util.concurrent.TimeUnit;
019 import java.util.concurrent.atomic.AtomicInteger;
020 import java.util.concurrent.locks.Condition;
021 import java.util.concurrent.locks.ReentrantLock;
022
023
026 public class TaskQueue<E> {
027
028 public TaskQueue() {
029 this(Integer.MAX_VALUE);
030 }
031
032 public TaskQueue(int capacity) {
033 if (capacity <= 0) {
034 throw new IllegalArgumentException();
035 }
036
037 _capacity = capacity;
038
039 _headNode = new Node<E>(null);
040 _tailNode = _headNode;
041
042 _notEmptyCondition = _takeLock.newCondition();
043 }
044
045 public int drainTo(Collection<E> collection) {
046 if (collection == null) {
047 throw new NullPointerException();
048 }
049
050 _takeLock.lock();
051
052 try {
053 Node<E> headNode = _headNode;
054
055 int size = _count.get();
056
057 int count = 0;
058
059 try {
060 while (count < size) {
061 Node<E> currentNode = headNode._nextNode;
062
063 collection.add(currentNode._element);
064
065 currentNode._element = null;
066
067 headNode._nextNode = null;
068
069 headNode = currentNode;
070
071 count++;
072 }
073
074 return count;
075 }
076 finally {
077 if (count > 0) {
078 _headNode = headNode;
079
080 _count.getAndAdd(-count);
081 }
082 }
083 }
084 finally {
085 _takeLock.unlock();
086 }
087 }
088
089 public boolean isEmpty() {
090 if (_count.get() == 0) {
091 return true;
092 }
093 else {
094 return false;
095 }
096 }
097
098 public boolean offer(E element, boolean[] hasWaiterMarker) {
099 if ((element == null) || (hasWaiterMarker == null)) {
100 throw new NullPointerException();
101 }
102
103 if (hasWaiterMarker.length == 0) {
104 throw new IllegalArgumentException();
105 }
106
107 if (_count.get() == _capacity) {
108 return false;
109 }
110
111 int count = -1;
112
113 _putLock.lock();
114
115 try {
116 if (_count.get() < _capacity) {
117 _enqueue(element);
118
119 count = _count.getAndIncrement();
120
121 _takeLock.lock();
122
123 try {
124 hasWaiterMarker[0] = _takeLock.hasWaiters(
125 _notEmptyCondition);
126
127 if (!hasWaiterMarker[0] && (count >= _count.get())) {
128 hasWaiterMarker[0] = true;
129 }
130 }
131 finally {
132 _takeLock.unlock();
133 }
134 }
135 }
136 finally {
137 _putLock.unlock();
138 }
139
140 if (count == 0) {
141 _takeLock.lock();
142
143 try {
144 _notEmptyCondition.signal();
145 }
146 finally {
147 _takeLock.unlock();
148 }
149 }
150
151 return count >= 0;
152 }
153
154 public E poll() {
155 if (_count.get() == 0) {
156 return null;
157 }
158
159 E element = null;
160
161 _takeLock.lock();
162
163 try {
164 if (_count.get() > 0) {
165 element = _dequeue();
166
167 if (_count.getAndDecrement() > 1) {
168 _notEmptyCondition.signal();
169 }
170 }
171 }
172 finally {
173 _takeLock.unlock();
174 }
175
176 return element;
177 }
178
179 public E poll(long timeout, TimeUnit timeUnit) throws InterruptedException {
180 E element = null;
181
182 long nanos = timeUnit.toNanos(timeout);
183
184 _takeLock.lockInterruptibly();
185
186 try {
187 while (_count.get() == 0) {
188 if (nanos <= 0) {
189 return null;
190 }
191
192 nanos = _notEmptyCondition.awaitNanos(nanos);
193 }
194
195 element = _dequeue();
196
197 if (_count.getAndDecrement() > 1) {
198 _notEmptyCondition.signal();
199 }
200 }
201 finally {
202 _takeLock.unlock();
203 }
204
205 return element;
206 }
207
208 public int remainingCapacity() {
209 return _capacity - _count.get();
210 }
211
212 public boolean remove(E element) {
213 if (element == null) {
214 return false;
215 }
216
217 _fullyLock();
218
219 try {
220 Node<E> previousNode = _headNode;
221 Node<E> currentNode = previousNode._nextNode;
222
223 while (currentNode != null) {
224 if (element.equals(currentNode._element)) {
225 _unlink(currentNode, previousNode);
226
227 return true;
228 }
229
230 previousNode = currentNode;
231 currentNode = currentNode._nextNode;
232 }
233
234 return false;
235 }
236 finally {
237 _fullyUnlock();
238 }
239 }
240
241 public int size() {
242 return _count.get();
243 }
244
245 public E take() throws InterruptedException {
246 E element = null;
247
248 _takeLock.lockInterruptibly();
249
250 try {
251 while (_count.get() == 0) {
252 _notEmptyCondition.await();
253 }
254
255 element = _dequeue();
256
257 if (_count.getAndDecrement() > 1) {
258 _notEmptyCondition.signal();
259 }
260 }
261 finally {
262 _takeLock.unlock();
263 }
264
265 return element;
266 }
267
268 protected ReentrantLock getPutLock() {
269 return _putLock;
270 }
271
272 protected ReentrantLock getTakeLock() {
273 return _takeLock;
274 }
275
276 private E _dequeue() {
277 Node<E> headNode = _headNode;
278 Node<E> firstNode = headNode._nextNode;
279
280 headNode._nextNode = null;
281
282 _headNode = firstNode;
283
284 E element = firstNode._element;
285
286 firstNode._element = null;
287
288 return element;
289 }
290
291 private void _enqueue(E element) {
292 _tailNode._nextNode = new Node<E>(element);
293
294 _tailNode = _tailNode._nextNode;
295 }
296
297 private void _fullyLock() {
298 _putLock.lock();
299 _takeLock.lock();
300 }
301
302 private void _fullyUnlock() {
303 _takeLock.unlock();
304 _putLock.unlock();
305 }
306
307 private void _unlink(Node<E> currentNode, Node<E> previousNode) {
308 currentNode._element = null;
309 previousNode._nextNode = currentNode._nextNode;
310
311 if (_tailNode == currentNode) {
312 _tailNode = previousNode;
313 }
314
315 _count.getAndDecrement();
316 }
317
318 private final int _capacity;
319 private final AtomicInteger _count = new AtomicInteger();
320 private Node<E> _headNode;
321 private final Condition _notEmptyCondition;
322 private final ReentrantLock _putLock = new ReentrantLock();
323 private Node<E> _tailNode;
324 private final ReentrantLock _takeLock = new ReentrantLock();
325
326 private static class Node<E> {
327
328 private Node(E element) {
329 _element = element;
330 }
331
332 private E _element;
333 private Node<E> _nextNode;
334
335 }
336
337 }