001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.ArrayList;
018 import java.util.HashSet;
019 import java.util.List;
020 import java.util.Set;
021 import java.util.concurrent.AbstractExecutorService;
022 import java.util.concurrent.Executors;
023 import java.util.concurrent.ThreadFactory;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
026 import java.util.concurrent.locks.Condition;
027 import java.util.concurrent.locks.ReentrantLock;
028
029
036 public class ThreadPoolExecutor extends AbstractExecutorService {
037
038 public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
039 this(
040 corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
041 Integer.MAX_VALUE, new AbortPolicy(),
042 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
043 }
044
045 public ThreadPoolExecutor(
046 int corePoolSize, int maxPoolSize, long keepAliveTime,
047 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
048
049 this(
050 corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
051 allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
052 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
053 }
054
055 public ThreadPoolExecutor(
056 int corePoolSize, int maxPoolSize, long keepAliveTime,
057 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
058 RejectedExecutionHandler rejectedExecutionHandler,
059 ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
060
061 if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
062 (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
063 (maxQueueSize <= 0)) {
064
065 throw new IllegalArgumentException();
066 }
067
068 if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
069 (threadPoolHandler == null)) {
070
071 throw new NullPointerException();
072 }
073
074 _corePoolSize = corePoolSize;
075 _maxPoolSize = maxPoolSize;
076 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
077 _allowCoreThreadTimeout = allowCoreThreadTimeout;
078 _rejectedExecutionHandler = rejectedExecutionHandler;
079 _threadFactory = threadFactory;
080 _threadPoolHandler = threadPoolHandler;
081 _taskQueue = new TaskQueue<Runnable>(maxQueueSize);
082 _workerTasks = new HashSet<WorkerTask>();
083 }
084
085 public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
086 if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
087 (newMaxPoolSize < newCorePoolSize)) {
088
089 throw new IllegalArgumentException();
090 }
091
092 _mainLock.lock();
093
094 try {
095 int surplusCoreThreads = _corePoolSize - newCorePoolSize;
096 int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
097
098 _corePoolSize = newCorePoolSize;
099 _maxPoolSize = newMaxPoolSize;
100
101 if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
102 ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
103
104 int interruptCount = Math.max(
105 surplusCoreThreads, surplusMaxPoolSize);
106
107 for (WorkerTask workerTask : _workerTasks) {
108 if (interruptCount > 0) {
109 if (workerTask._interruptIfWaiting()) {
110 interruptCount--;
111 }
112 }
113 else {
114 break;
115 }
116 }
117 }
118 else {
119 Runnable runnable = null;
120
121 while ((surplusCoreThreads++ < 0) &&
122 (_poolSize < _corePoolSize) &&
123 ((runnable = _taskQueue.poll()) != null)) {
124
125 _doAddWorkerThread(runnable);
126 }
127 }
128 }
129 finally {
130 _mainLock.unlock();
131 }
132 }
133
134 @Override
135 public boolean awaitTermination(long timeout, TimeUnit timeUnit)
136 throws InterruptedException {
137
138 long nanos = timeUnit.toNanos(timeout);
139
140 _mainLock.lock();
141
142 try {
143 while (true) {
144 if (_runState == _TERMINATED) {
145 return true;
146 }
147
148 if (nanos <= 0) {
149 return false;
150 }
151
152 nanos = _terminationCondition.awaitNanos(nanos);
153 }
154 }
155 finally {
156 _mainLock.unlock();
157 }
158 }
159
160 @Override
161 public void execute(Runnable runnable) {
162 if (runnable == null) {
163 throw new NullPointerException();
164 }
165
166 boolean[] hasWaiterMarker = new boolean[1];
167
168 if ((_runState == _RUNNING) &&
169 _taskQueue.offer(runnable, hasWaiterMarker)) {
170
171 if (_runState != _RUNNING) {
172 if (_taskQueue.remove(runnable)) {
173 _rejectedExecutionHandler.rejectedExecution(runnable, this);
174 }
175
176 return;
177 }
178
179 if (!hasWaiterMarker[0]) {
180 _addWorkerThread();
181 }
182
183 return;
184 }
185
186 _rejectedExecutionHandler.rejectedExecution(runnable, this);
187 }
188
189 public int getActiveCount() {
190 _mainLock.lock();
191
192 try {
193 int count = 0;
194
195 for (WorkerTask workerTask : _workerTasks) {
196 if (workerTask._isLocked()) {
197 count++;
198 }
199 }
200
201 return count;
202 }
203 finally {
204 _mainLock.unlock();
205 }
206 }
207
208 public long getCompletedTaskCount() {
209 _mainLock.lock();
210
211 try {
212 long count = _completedTaskCount;
213
214 for (WorkerTask workerTask : _workerTasks) {
215 count += workerTask._localCompletedTaskCount;
216 }
217
218 return count;
219 }
220 finally {
221 _mainLock.unlock();
222 }
223 }
224
225 public int getCorePoolSize() {
226 return _corePoolSize;
227 }
228
229 public long getKeepAliveTime(TimeUnit timeUnit) {
230 return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
231 }
232
233 public int getLargestPoolSize() {
234 return _largestPoolSize;
235 }
236
237 public int getMaxPoolSize() {
238 return _maxPoolSize;
239 }
240
241 public int getPendingTaskCount() {
242 return _taskQueue.size();
243 }
244
245 public int getPoolSize() {
246 return _poolSize;
247 }
248
249 public RejectedExecutionHandler getRejectedExecutionHandler() {
250 return _rejectedExecutionHandler;
251 }
252
253 public int getRemainingTaskQueueCapacity() {
254 return _taskQueue.remainingCapacity();
255 }
256
257 public long getTaskCount() {
258 _mainLock.lock();
259
260 try {
261 long count = _completedTaskCount;
262
263 for (WorkerTask workerTask : _workerTasks) {
264 count += workerTask._localCompletedTaskCount;
265
266 if (workerTask._isLocked()) {
267 count++;
268 }
269 }
270
271 return count + _taskQueue.size();
272 }
273 finally {
274 _mainLock.unlock();
275 }
276 }
277
278 public ThreadFactory getThreadFactory() {
279 return _threadFactory;
280 }
281
282 public ThreadPoolHandler getThreadPoolHandler() {
283 return _threadPoolHandler;
284 }
285
286 public boolean isAllowCoreThreadTimeout() {
287 return _allowCoreThreadTimeout;
288 }
289
290 @Override
291 public boolean isShutdown() {
292 if (_runState != _RUNNING) {
293 return true;
294 }
295 else {
296 return false;
297 }
298 }
299
300 @Override
301 public boolean isTerminated() {
302 if (_runState == _TERMINATED) {
303 return true;
304 }
305 else {
306 return false;
307 }
308 }
309
310 public boolean isTerminating() {
311 if (_runState == _STOP) {
312 return true;
313 }
314 else {
315 return false;
316 }
317 }
318
319 public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
320 _allowCoreThreadTimeout = allowCoreThreadTimeout;
321 }
322
323 public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
324 if (keepAliveTime < 0) {
325 throw new IllegalArgumentException();
326 }
327
328 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
329 }
330
331 public void setRejectedExecutionHandler(
332 RejectedExecutionHandler rejectedExecutionHandler) {
333
334 if (rejectedExecutionHandler == null) {
335 throw new NullPointerException();
336 }
337
338 _rejectedExecutionHandler = rejectedExecutionHandler;
339 }
340
341 public void setThreadFactory(ThreadFactory threadFactory) {
342 if (threadFactory == null) {
343 throw new NullPointerException();
344 }
345
346 _threadFactory = threadFactory;
347 }
348
349 public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
350 if (threadPoolHandler == null) {
351 throw new NullPointerException();
352 }
353
354 _threadPoolHandler = threadPoolHandler;
355 }
356
357 @Override
358 public void shutdown() {
359 _mainLock.lock();
360
361 try {
362 int state = _runState;
363
364 if (state < _SHUTDOWN) {
365 _runState = _SHUTDOWN;
366 }
367
368 for (WorkerTask workerTask : _workerTasks) {
369 workerTask._interruptIfWaiting();
370 }
371
372 _tryTerminate();
373 }
374 finally {
375 _mainLock.unlock();
376 }
377 }
378
379 @Override
380 public List<Runnable> shutdownNow() {
381 _mainLock.lock();
382
383 try {
384 int state = _runState;
385
386 if (state < _STOP) {
387 _runState = _STOP;
388 }
389
390 for (WorkerTask workerTask : _workerTasks) {
391 workerTask._thread.interrupt();
392 }
393
394 List<Runnable> runnables = new ArrayList<Runnable>();
395
396 _taskQueue.drainTo(runnables);
397
398 _tryTerminate();
399
400 return runnables;
401 }
402 finally {
403 _mainLock.unlock();
404 }
405 }
406
407 @Override
408 protected void finalize() {
409 shutdown();
410 }
411
412 protected ReentrantLock getMainLock() {
413 return _mainLock;
414 }
415
416 protected TaskQueue<Runnable> getTaskQueue() {
417 return _taskQueue;
418 }
419
420 protected Set<WorkerTask> getWorkerTasks() {
421 return _workerTasks;
422 }
423
424 private void _addWorkerThread() {
425 int runState = _runState;
426 int poolSize = _poolSize;
427
428 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
429 ((runState == _SHUTDOWN) && (poolSize == 0) &&
430 !_taskQueue.isEmpty())) {
431
432 _mainLock.lock();
433
434 try {
435 runState = _runState;
436 poolSize = _poolSize;
437
438 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
439 ((runState == _SHUTDOWN) && (poolSize == 0) &&
440 !_taskQueue.isEmpty())) {
441
442 _doAddWorkerThread(_taskQueue.poll());
443 }
444 }
445 finally {
446 _mainLock.unlock();
447 }
448 }
449 }
450
451 private void _doAddWorkerThread(Runnable runnable) {
452 WorkerTask workerTask = new WorkerTask(runnable);
453
454 _workerTasks.add(workerTask);
455
456 int poolSize = ++_poolSize;
457
458 if (poolSize > _largestPoolSize) {
459 _largestPoolSize = poolSize;
460 }
461
462 workerTask._startWork();
463 }
464
465 private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
466 while (true) {
467 try {
468 int state = _runState;
469
470 if (state >= _STOP) {
471 return null;
472 }
473
474 Runnable runnable = null;
475
476 if (state == _SHUTDOWN) {
477 runnable = _taskQueue.poll();
478 }
479 else if ((_poolSize > _corePoolSize) ||
480 _allowCoreThreadTimeout) {
481
482 runnable = _taskQueue.poll(
483 _keepAliveTime, TimeUnit.NANOSECONDS);
484 }
485 else {
486 runnable = _taskQueue.take();
487 }
488
489 if (runnable != null) {
490 return runnable;
491 }
492
493 _mainLock.lock();
494
495 try {
496 if ((_runState >= _STOP) ||
497 ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
498 (_allowCoreThreadTimeout &&
499 ((_poolSize > 1) || _taskQueue.isEmpty())) ||
500 (!_allowCoreThreadTimeout &&
501 (_poolSize > _corePoolSize))) {
502
503 _completedTaskCount +=
504 workerTask._localCompletedTaskCount;
505
506 _workerTasks.remove(workerTask);
507
508 if (--_poolSize == 0) {
509 _tryTerminate();
510 }
511
512 cleanUpMarker[0] = true;
513
514 return null;
515 }
516 }
517 finally {
518 _mainLock.unlock();
519 }
520 }
521 catch (InterruptedException ie) {
522 }
523 }
524 }
525
526 private void _tryTerminate() {
527 if (_poolSize == 0) {
528 int state = _runState;
529
530 if ((state == _STOP) ||
531 ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
532
533 _runState = _TERMINATED;
534
535 _terminationCondition.signalAll();
536 _threadPoolHandler.terminated();
537
538 return;
539 }
540
541 if (!_taskQueue.isEmpty()) {
542 _doAddWorkerThread(_taskQueue.poll());
543 }
544 }
545 }
546
547 private static final int _RUNNING = 0;
548
549 private static final int _SHUTDOWN = 1;
550
551 private static final int _STOP = 2;
552
553 private static final int _TERMINATED = 3;
554
555 private volatile boolean _allowCoreThreadTimeout;
556 private long _completedTaskCount;
557 private volatile int _corePoolSize;
558 private volatile long _keepAliveTime;
559 private volatile int _largestPoolSize;
560 private final ReentrantLock _mainLock = new ReentrantLock();
561 private volatile int _maxPoolSize;
562 private volatile int _poolSize;
563 private volatile RejectedExecutionHandler _rejectedExecutionHandler;
564 private volatile int _runState;
565 private final TaskQueue<Runnable> _taskQueue;
566 private final Condition _terminationCondition = _mainLock.newCondition();
567 private volatile ThreadFactory _threadFactory;
568 private volatile ThreadPoolHandler _threadPoolHandler;
569 private final Set<WorkerTask> _workerTasks;
570
571 private class WorkerTask
572 extends AbstractQueuedSynchronizer implements Runnable {
573
574 public WorkerTask(Runnable runnable) {
575 _runnable = runnable;
576 }
577
578 @Override
579 public void run() {
580 boolean[] cleanUpMarker = new boolean[1];
581
582 try {
583 Runnable runnable = _runnable;
584
585 _runnable = null;
586
587 do {
588 if (runnable != null) {
589 _runTask(runnable);
590
591 runnable = null;
592 }
593 }
594 while ((runnable = _getTask(this, cleanUpMarker)) != null);
595 }
596 finally {
597 if (!cleanUpMarker[0]) {
598 _mainLock.lock();
599
600 try {
601 _completedTaskCount += _localCompletedTaskCount;
602
603 _workerTasks.remove(this);
604
605 if (--_poolSize == 0) {
606 _tryTerminate();
607 }
608 }
609 finally {
610 _mainLock.unlock();
611 }
612 }
613
614 _threadPoolHandler.beforeThreadEnd(_thread);
615 }
616 }
617
618 @Override
619 protected boolean isHeldExclusively() {
620 if (getState() == 1) {
621 return true;
622 }
623 else {
624 return false;
625 }
626 }
627
628 @Override
629 protected boolean tryAcquire(int unused) {
630 return compareAndSetState(0, 1);
631 }
632
633 @Override
634 protected boolean tryRelease(int unused) {
635 setState(0);
636
637 return true;
638 }
639
640 private boolean _interruptIfWaiting() {
641 if (!_thread.isInterrupted() && tryAcquire(1)) {
642 try {
643 _thread.interrupt();
644
645 return true;
646 }
647 finally {
648 _unlock();
649 }
650 }
651
652 return false;
653 }
654
655 private boolean _isLocked() {
656 return isHeldExclusively();
657 }
658
659 private void _lock() {
660 acquire(1);
661 }
662
663 private void _runTask(Runnable task) {
664 _lock();
665
666 try {
667 if ((_runState < _STOP) && Thread.interrupted() &&
668 (_runState >= _STOP)) {
669
670 _thread.interrupt();
671 }
672
673 Throwable throwable = null;
674
675 _threadPoolHandler.beforeExecute(_thread, task);
676
677 try {
678 task.run();
679
680 _localCompletedTaskCount++;
681 }
682 catch (RuntimeException re) {
683 throwable = re;
684
685 throw re;
686 }
687 finally {
688 _threadPoolHandler.afterExecute(task, throwable);
689 }
690 }
691 finally {
692 _unlock();
693 }
694 }
695
696 private void _startWork() {
697 _thread = _threadFactory.newThread(this);
698
699 _threadPoolHandler.beforeThreadStart(_thread);
700
701 _thread.start();
702 }
703
704 private void _unlock() {
705 release(1);
706 }
707
708 private volatile long _localCompletedTaskCount;
709 private Runnable _runnable;
710 private Thread _thread;
711
712 }
713
714 }