001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.ArrayList;
018 import java.util.HashSet;
019 import java.util.List;
020 import java.util.Set;
021 import java.util.concurrent.AbstractExecutorService;
022 import java.util.concurrent.Executors;
023 import java.util.concurrent.ThreadFactory;
024 import java.util.concurrent.TimeUnit;
025 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
026 import java.util.concurrent.locks.Condition;
027 import java.util.concurrent.locks.ReentrantLock;
028
029
036 public class ThreadPoolExecutor extends AbstractExecutorService {
037
038 public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
039 this(
040 corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
041 Integer.MAX_VALUE, new AbortPolicy(),
042 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
043 }
044
045 public ThreadPoolExecutor(
046 int corePoolSize, int maxPoolSize, long keepAliveTime,
047 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
048
049 this(
050 corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
051 allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
052 Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
053 }
054
055 public ThreadPoolExecutor(
056 int corePoolSize, int maxPoolSize, long keepAliveTime,
057 TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
058 RejectedExecutionHandler rejectedExecutionHandler,
059 ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
060
061 if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
062 (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
063 (maxQueueSize <= 0)) {
064
065 throw new IllegalArgumentException();
066 }
067
068 if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
069 (threadPoolHandler == null)) {
070
071 throw new NullPointerException();
072 }
073
074 _corePoolSize = corePoolSize;
075 _maxPoolSize = maxPoolSize;
076 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
077 _allowCoreThreadTimeout = allowCoreThreadTimeout;
078 _rejectedExecutionHandler = rejectedExecutionHandler;
079 _threadFactory = threadFactory;
080 _threadPoolHandler = threadPoolHandler;
081 _taskQueue = new TaskQueue<Runnable>(maxQueueSize);
082 _workerTasks = new HashSet<WorkerTask>();
083 }
084
085 public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
086 if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
087 (newMaxPoolSize < newCorePoolSize)) {
088
089 throw new IllegalArgumentException();
090 }
091
092 _mainLock.lock();
093
094 try {
095 int surplusCoreThreads = _corePoolSize - newCorePoolSize;
096 int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
097
098 _corePoolSize = newCorePoolSize;
099 _maxPoolSize = newMaxPoolSize;
100
101 if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
102 ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
103
104 int interruptCount = Math.max(
105 surplusCoreThreads, surplusMaxPoolSize);
106
107 for (WorkerTask workerTask : _workerTasks) {
108 if (interruptCount > 0) {
109 if (workerTask._interruptIfWaiting()) {
110 interruptCount--;
111 }
112 }
113 else {
114 break;
115 }
116 }
117 }
118 else {
119 Runnable runnable = null;
120
121 while ((surplusCoreThreads++ < 0) &&
122 (_poolSize < _corePoolSize) &&
123 ((runnable = _taskQueue.poll()) != null)) {
124
125 _doAddWorkerThread(runnable);
126 }
127 }
128 }
129 finally {
130 _mainLock.unlock();
131 }
132 }
133
134 @Override
135 public boolean awaitTermination(long timeout, TimeUnit timeUnit)
136 throws InterruptedException {
137
138 long nanos = timeUnit.toNanos(timeout);
139
140 _mainLock.lock();
141
142 try {
143 while (true) {
144 if (_runState == _TERMINATED) {
145 return true;
146 }
147
148 if (nanos <= 0) {
149 return false;
150 }
151
152 nanos = _terminationCondition.awaitNanos(nanos);
153 }
154 }
155 finally {
156 _mainLock.unlock();
157 }
158 }
159
160 @Override
161 public void execute(Runnable runnable) {
162 if (runnable == null) {
163 throw new NullPointerException();
164 }
165
166 boolean[] hasWaiterMarker = new boolean[1];
167
168 if ((_runState == _RUNNING) &&
169 _taskQueue.offer(runnable, hasWaiterMarker)) {
170
171 if (_runState != _RUNNING) {
172 if (_taskQueue.remove(runnable)) {
173 _rejectedExecutionHandler.rejectedExecution(runnable, this);
174 }
175
176 return;
177 }
178
179 if (!hasWaiterMarker[0]) {
180 _addWorkerThread();
181 }
182
183 return;
184 }
185
186 _rejectedExecutionHandler.rejectedExecution(runnable, this);
187 }
188
189 public int getActiveCount() {
190 _mainLock.lock();
191
192 try {
193 int count = 0;
194
195 for (WorkerTask workerTask : _workerTasks) {
196 if (workerTask._isLocked()) {
197 count++;
198 }
199 }
200
201 return count;
202 }
203 finally {
204 _mainLock.unlock();
205 }
206 }
207
208 public long getCompletedTaskCount() {
209 _mainLock.lock();
210
211 try {
212 long count = _completedTaskCount;
213
214 for (WorkerTask workerTask : _workerTasks) {
215 count += workerTask._localCompletedTaskCount;
216 }
217
218 return count;
219 }
220 finally {
221 _mainLock.unlock();
222 }
223 }
224
225 public int getCorePoolSize() {
226 return _corePoolSize;
227 }
228
229 public long getKeepAliveTime(TimeUnit timeUnit) {
230 return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
231 }
232
233 public int getLargestPoolSize() {
234 return _largestPoolSize;
235 }
236
237 public int getMaxPoolSize() {
238 return _maxPoolSize;
239 }
240
241 public int getPendingTaskCount() {
242 return _taskQueue.size();
243 }
244
245 public int getPoolSize() {
246 return _poolSize;
247 }
248
249 public RejectedExecutionHandler getRejectedExecutionHandler() {
250 return _rejectedExecutionHandler;
251 }
252
253 public int getRemainingTaskQueueCapacity() {
254 return _taskQueue.remainingCapacity();
255 }
256
257 public long getTaskCount() {
258 _mainLock.lock();
259
260 try {
261 long count = _completedTaskCount;
262
263 for (WorkerTask workerTask : _workerTasks) {
264 count += workerTask._localCompletedTaskCount;
265
266 if (workerTask._isLocked()) {
267 count++;
268 }
269 }
270
271 return count + _taskQueue.size();
272 }
273 finally {
274 _mainLock.unlock();
275 }
276 }
277
278 public ThreadFactory getThreadFactory() {
279 return _threadFactory;
280 }
281
282 public ThreadPoolHandler getThreadPoolHandler() {
283 return _threadPoolHandler;
284 }
285
286 public boolean isAllowCoreThreadTimeout() {
287 return _allowCoreThreadTimeout;
288 }
289
290 @Override
291 public boolean isShutdown() {
292 if (_runState != _RUNNING) {
293 return true;
294 }
295 else {
296 return false;
297 }
298 }
299
300 @Override
301 public boolean isTerminated() {
302 if (_runState == _TERMINATED) {
303 return true;
304 }
305 else {
306 return false;
307 }
308 }
309
310 public boolean isTerminating() {
311 if (_runState == _STOP) {
312 return true;
313 }
314 else {
315 return false;
316 }
317 }
318
319 public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
320 _allowCoreThreadTimeout = allowCoreThreadTimeout;
321 }
322
323 public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
324 if (keepAliveTime < 0) {
325 throw new IllegalArgumentException();
326 }
327
328 _keepAliveTime = timeUnit.toNanos(keepAliveTime);
329 }
330
331 public void setRejectedExecutionHandler(
332 RejectedExecutionHandler rejectedExecutionHandler) {
333
334 if (rejectedExecutionHandler == null) {
335 throw new NullPointerException();
336 }
337
338 _rejectedExecutionHandler = rejectedExecutionHandler;
339 }
340
341 public void setThreadFactory(ThreadFactory threadFactory) {
342 if (threadFactory == null) {
343 throw new NullPointerException();
344 }
345
346 _threadFactory = threadFactory;
347 }
348
349 public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
350 if (threadPoolHandler == null) {
351 throw new NullPointerException();
352 }
353
354 _threadPoolHandler = threadPoolHandler;
355 }
356
357 @Override
358 public void shutdown() {
359 _mainLock.lock();
360
361 try {
362 int state = _runState;
363
364 if (state < _SHUTDOWN) {
365 _runState = _SHUTDOWN;
366 }
367
368 for (WorkerTask workerTask : _workerTasks) {
369 workerTask._interruptIfWaiting();
370 }
371
372 _tryTerminate();
373 }
374 finally {
375 _mainLock.unlock();
376 }
377 }
378
379 @Override
380 public List<Runnable> shutdownNow() {
381 _mainLock.lock();
382
383 try {
384 int state = _runState;
385
386 if (state < _STOP) {
387 _runState = _STOP;
388 }
389
390 for (WorkerTask workerTask : _workerTasks) {
391 workerTask._thread.interrupt();
392 }
393
394 List<Runnable> runnables = new ArrayList<Runnable>();
395
396 _taskQueue.drainTo(runnables);
397
398 _tryTerminate();
399
400 return runnables;
401 }
402 finally {
403 _mainLock.unlock();
404 }
405 }
406
407 @Override
408 protected void finalize() {
409 shutdown();
410 }
411
412 protected ReentrantLock getMainLock() {
413 return _mainLock;
414 }
415
416 protected TaskQueue<Runnable> getTaskQueue() {
417 return _taskQueue;
418 }
419
420 protected Set<WorkerTask> getWorkerTasks() {
421 return _workerTasks;
422 }
423
424 private void _addWorkerThread() {
425 int runState = _runState;
426 int poolSize = _poolSize;
427
428 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
429 ((runState == _SHUTDOWN) && (poolSize == 0) &&
430 !_taskQueue.isEmpty())) {
431
432 _mainLock.lock();
433
434 try {
435 runState = _runState;
436 poolSize = _poolSize;
437
438 if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
439 ((runState == _SHUTDOWN) && (poolSize == 0) &&
440 !_taskQueue.isEmpty())) {
441
442 Runnable runnable = _taskQueue.poll();
443
444 if (runnable != null) {
445 _doAddWorkerThread(runnable);
446 }
447 }
448 }
449 finally {
450 _mainLock.unlock();
451 }
452 }
453 }
454
455 private void _doAddWorkerThread(Runnable runnable) {
456 WorkerTask workerTask = new WorkerTask(runnable);
457
458 _workerTasks.add(workerTask);
459
460 int poolSize = ++_poolSize;
461
462 if (poolSize > _largestPoolSize) {
463 _largestPoolSize = poolSize;
464 }
465
466 workerTask._startWork();
467 }
468
469 private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
470 while (true) {
471 try {
472 int state = _runState;
473
474 if (state >= _STOP) {
475 return null;
476 }
477
478 Runnable runnable = null;
479
480 if (state == _SHUTDOWN) {
481 runnable = _taskQueue.poll();
482 }
483 else if ((_poolSize > _corePoolSize) ||
484 _allowCoreThreadTimeout) {
485
486 runnable = _taskQueue.poll(
487 _keepAliveTime, TimeUnit.NANOSECONDS);
488 }
489 else {
490 runnable = _taskQueue.take();
491 }
492
493 if (runnable != null) {
494 return runnable;
495 }
496
497 _mainLock.lock();
498
499 try {
500 if ((_runState >= _STOP) ||
501 ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
502 (_allowCoreThreadTimeout &&
503 ((_poolSize > 1) || _taskQueue.isEmpty())) ||
504 (!_allowCoreThreadTimeout &&
505 (_poolSize > _corePoolSize))) {
506
507 _completedTaskCount +=
508 workerTask._localCompletedTaskCount;
509
510 _workerTasks.remove(workerTask);
511
512 if (--_poolSize == 0) {
513 _tryTerminate();
514 }
515
516 cleanUpMarker[0] = true;
517
518 return null;
519 }
520 }
521 finally {
522 _mainLock.unlock();
523 }
524 }
525 catch (InterruptedException ie) {
526 }
527 }
528 }
529
530 private void _tryTerminate() {
531 if (_poolSize == 0) {
532 int state = _runState;
533
534 if ((state == _STOP) ||
535 ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
536
537 _runState = _TERMINATED;
538
539 _terminationCondition.signalAll();
540 _threadPoolHandler.terminated();
541
542 return;
543 }
544
545 if (!_taskQueue.isEmpty()) {
546 _doAddWorkerThread(_taskQueue.poll());
547 }
548 }
549 }
550
551 private static final int _RUNNING = 0;
552
553 private static final int _SHUTDOWN = 1;
554
555 private static final int _STOP = 2;
556
557 private static final int _TERMINATED = 3;
558
559 private volatile boolean _allowCoreThreadTimeout;
560 private long _completedTaskCount;
561 private volatile int _corePoolSize;
562 private volatile long _keepAliveTime;
563 private volatile int _largestPoolSize;
564 private final ReentrantLock _mainLock = new ReentrantLock();
565 private volatile int _maxPoolSize;
566 private volatile int _poolSize;
567 private volatile RejectedExecutionHandler _rejectedExecutionHandler;
568 private volatile int _runState;
569 private final TaskQueue<Runnable> _taskQueue;
570 private final Condition _terminationCondition = _mainLock.newCondition();
571 private volatile ThreadFactory _threadFactory;
572 private volatile ThreadPoolHandler _threadPoolHandler;
573 private final Set<WorkerTask> _workerTasks;
574
575 private class WorkerTask
576 extends AbstractQueuedSynchronizer implements Runnable {
577
578 public WorkerTask(Runnable runnable) {
579 _runnable = runnable;
580 }
581
582 @Override
583 public void run() {
584 boolean[] cleanUpMarker = new boolean[1];
585
586 try {
587 Runnable runnable = _runnable;
588
589 _runnable = null;
590
591 do {
592 if (runnable != null) {
593 _runTask(runnable);
594
595 runnable = null;
596 }
597 }
598 while ((runnable = _getTask(this, cleanUpMarker)) != null);
599 }
600 finally {
601 if (!cleanUpMarker[0]) {
602 _mainLock.lock();
603
604 try {
605 _completedTaskCount += _localCompletedTaskCount;
606
607 _workerTasks.remove(this);
608
609 if (--_poolSize == 0) {
610 _tryTerminate();
611 }
612 }
613 finally {
614 _mainLock.unlock();
615 }
616 }
617
618 _threadPoolHandler.beforeThreadEnd(_thread);
619 }
620 }
621
622 @Override
623 protected boolean isHeldExclusively() {
624 if (getState() == 1) {
625 return true;
626 }
627 else {
628 return false;
629 }
630 }
631
632 @Override
633 protected boolean tryAcquire(int unused) {
634 return compareAndSetState(0, 1);
635 }
636
637 @Override
638 protected boolean tryRelease(int unused) {
639 setState(0);
640
641 return true;
642 }
643
644 private boolean _interruptIfWaiting() {
645 if (!_thread.isInterrupted() && tryAcquire(1)) {
646 try {
647 _thread.interrupt();
648
649 return true;
650 }
651 finally {
652 _unlock();
653 }
654 }
655
656 return false;
657 }
658
659 private boolean _isLocked() {
660 return isHeldExclusively();
661 }
662
663 private void _lock() {
664 acquire(1);
665 }
666
667 private void _runTask(Runnable task) {
668 _lock();
669
670 try {
671 if ((_runState < _STOP) && Thread.interrupted() &&
672 (_runState >= _STOP)) {
673
674 _thread.interrupt();
675 }
676
677 Throwable throwable = null;
678
679 _threadPoolHandler.beforeExecute(_thread, task);
680
681 try {
682 task.run();
683
684 _localCompletedTaskCount++;
685 }
686 catch (RuntimeException re) {
687 throwable = re;
688
689 throw re;
690 }
691 finally {
692 _threadPoolHandler.afterExecute(task, throwable);
693 }
694 }
695 finally {
696 _unlock();
697 }
698 }
699
700 private void _startWork() {
701 _thread = _threadFactory.newThread(this);
702
703 _threadPoolHandler.beforeThreadStart(_thread);
704
705 _thread.start();
706 }
707
708 private void _unlock() {
709 release(1);
710 }
711
712 private volatile long _localCompletedTaskCount;
713 private Runnable _runnable;
714 private Thread _thread;
715
716 }
717
718 }