001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.ArrayList;
018    import java.util.HashSet;
019    import java.util.List;
020    import java.util.Set;
021    import java.util.concurrent.AbstractExecutorService;
022    import java.util.concurrent.Executors;
023    import java.util.concurrent.ThreadFactory;
024    import java.util.concurrent.TimeUnit;
025    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
026    import java.util.concurrent.locks.Condition;
027    import java.util.concurrent.locks.ReentrantLock;
028    
029    /**
030     * <p>
031     * See http://issues.liferay.com/browse/LPS-14986.
032     * </p>
033     *
034     * @author Shuyang Zhou
035     */
036    public class ThreadPoolExecutor extends AbstractExecutorService {
037    
038            public ThreadPoolExecutor(int corePoolSize, int maxPoolSize) {
039                    this(
040                            corePoolSize, maxPoolSize, 60, TimeUnit.SECONDS, false,
041                            Integer.MAX_VALUE, new AbortPolicy(),
042                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
043            }
044    
045            public ThreadPoolExecutor(
046                    int corePoolSize, int maxPoolSize, long keepAliveTime,
047                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize) {
048    
049                    this(
050                            corePoolSize, maxPoolSize, keepAliveTime, timeUnit,
051                            allowCoreThreadTimeout, maxQueueSize, new AbortPolicy(),
052                            Executors.defaultThreadFactory(), new ThreadPoolHandlerAdapter());
053            }
054    
055            public ThreadPoolExecutor(
056                    int corePoolSize, int maxPoolSize, long keepAliveTime,
057                    TimeUnit timeUnit, boolean allowCoreThreadTimeout, int maxQueueSize,
058                    RejectedExecutionHandler rejectedExecutionHandler,
059                    ThreadFactory threadFactory, ThreadPoolHandler threadPoolHandler) {
060    
061                    if ((corePoolSize < 0) || (maxPoolSize <= 0) ||
062                            (maxPoolSize < corePoolSize) || (keepAliveTime < 0) ||
063                            (maxQueueSize <= 0)) {
064    
065                            throw new IllegalArgumentException();
066                    }
067    
068                    if ((rejectedExecutionHandler == null) || (threadFactory == null) ||
069                            (threadPoolHandler == null)) {
070    
071                            throw new NullPointerException();
072                    }
073    
074                    _corePoolSize = corePoolSize;
075                    _maxPoolSize = maxPoolSize;
076                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
077                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
078                    _rejectedExecutionHandler = rejectedExecutionHandler;
079                    _threadFactory = threadFactory;
080                    _threadPoolHandler = threadPoolHandler;
081                    _taskQueue = new TaskQueue<Runnable>(maxQueueSize);
082                    _workerTasks = new HashSet<WorkerTask>();
083            }
084    
085            public void adjustPoolSize(int newCorePoolSize, int newMaxPoolSize) {
086                    if ((newCorePoolSize < 0) || (newMaxPoolSize <= 0) ||
087                            (newMaxPoolSize < newCorePoolSize)) {
088    
089                            throw new IllegalArgumentException();
090                    }
091    
092                    _mainLock.lock();
093    
094                    try {
095                            int surplusCoreThreads = _corePoolSize - newCorePoolSize;
096                            int surplusMaxPoolSize = _maxPoolSize - newMaxPoolSize;
097    
098                            _corePoolSize = newCorePoolSize;
099                            _maxPoolSize = newMaxPoolSize;
100    
101                            if (((surplusCoreThreads > 0) && (_poolSize > _corePoolSize)) ||
102                                    ((surplusMaxPoolSize > 0) && (_poolSize > _maxPoolSize))) {
103    
104                                    int interruptCount = Math.max(
105                                            surplusCoreThreads, surplusMaxPoolSize);
106    
107                                    for (WorkerTask workerTask : _workerTasks) {
108                                            if (interruptCount > 0) {
109                                                    if (workerTask._interruptIfWaiting()) {
110                                                            interruptCount--;
111                                                    }
112                                            }
113                                            else {
114                                                    break;
115                                            }
116                                    }
117                            }
118                            else {
119                                    Runnable runnable = null;
120    
121                                    while ((surplusCoreThreads++ < 0) &&
122                                               (_poolSize < _corePoolSize) &&
123                                               ((runnable = _taskQueue.poll()) != null)) {
124    
125                                            _doAddWorkerThread(runnable);
126                                    }
127                            }
128                    }
129                    finally {
130                            _mainLock.unlock();
131                    }
132            }
133    
134            @Override
135            public boolean awaitTermination(long timeout, TimeUnit timeUnit)
136                    throws InterruptedException {
137    
138                    long nanos = timeUnit.toNanos(timeout);
139    
140                    _mainLock.lock();
141    
142                    try {
143                            while (true) {
144                                    if (_runState == _TERMINATED) {
145                                            return true;
146                                    }
147    
148                                    if (nanos <= 0) {
149                                            return false;
150                                    }
151    
152                                    nanos = _terminationCondition.awaitNanos(nanos);
153                            }
154                    }
155                    finally {
156                            _mainLock.unlock();
157                    }
158            }
159    
160            @Override
161            public void execute(Runnable runnable) {
162                    if (runnable == null) {
163                            throw new NullPointerException();
164                    }
165    
166                    boolean[] hasWaiterMarker = new boolean[1];
167    
168                    if ((_runState == _RUNNING) &&
169                            _taskQueue.offer(runnable, hasWaiterMarker)) {
170    
171                            if (_runState != _RUNNING) {
172                                    if (_taskQueue.remove(runnable)) {
173                                            _rejectedExecutionHandler.rejectedExecution(runnable, this);
174                                    }
175    
176                                    return;
177                            }
178    
179                            if (!hasWaiterMarker[0]) {
180                                    _addWorkerThread();
181                            }
182    
183                            return;
184                    }
185    
186                    _rejectedExecutionHandler.rejectedExecution(runnable, this);
187            }
188    
189            public int getActiveCount() {
190                    _mainLock.lock();
191    
192                    try {
193                            int count = 0;
194    
195                            for (WorkerTask workerTask : _workerTasks) {
196                                    if (workerTask._isLocked()) {
197                                            count++;
198                                    }
199                            }
200    
201                            return count;
202                    }
203                    finally {
204                            _mainLock.unlock();
205                    }
206            }
207    
208            public long getCompletedTaskCount() {
209                    _mainLock.lock();
210    
211                    try {
212                            long count = _completedTaskCount;
213    
214                            for (WorkerTask workerTask : _workerTasks) {
215                                    count += workerTask._localCompletedTaskCount;
216                            }
217    
218                            return count;
219                    }
220                    finally {
221                            _mainLock.unlock();
222                    }
223            }
224    
225            public int getCorePoolSize() {
226                    return _corePoolSize;
227            }
228    
229            public long getKeepAliveTime(TimeUnit timeUnit) {
230                    return timeUnit.convert(_keepAliveTime, TimeUnit.NANOSECONDS);
231            }
232    
233            public int getLargestPoolSize() {
234                    return _largestPoolSize;
235            }
236    
237            public int getMaxPoolSize() {
238                    return _maxPoolSize;
239            }
240    
241            public int getPendingTaskCount() {
242                    return _taskQueue.size();
243            }
244    
245            public int getPoolSize() {
246                    return _poolSize;
247            }
248    
249            public RejectedExecutionHandler getRejectedExecutionHandler() {
250                    return _rejectedExecutionHandler;
251            }
252    
253            public int getRemainingTaskQueueCapacity() {
254                    return _taskQueue.remainingCapacity();
255            }
256    
257            public long getTaskCount() {
258                    _mainLock.lock();
259    
260                    try {
261                            long count = _completedTaskCount;
262    
263                            for (WorkerTask workerTask : _workerTasks) {
264                                    count += workerTask._localCompletedTaskCount;
265    
266                                    if (workerTask._isLocked()) {
267                                            count++;
268                                    }
269                            }
270    
271                            return count + _taskQueue.size();
272                    }
273                    finally {
274                            _mainLock.unlock();
275                    }
276            }
277    
278            public ThreadFactory getThreadFactory() {
279                    return _threadFactory;
280            }
281    
282            public ThreadPoolHandler getThreadPoolHandler() {
283                    return _threadPoolHandler;
284            }
285    
286            public boolean isAllowCoreThreadTimeout() {
287                    return _allowCoreThreadTimeout;
288            }
289    
290            @Override
291            public boolean isShutdown() {
292                    if (_runState != _RUNNING) {
293                            return true;
294                    }
295                    else {
296                            return false;
297                    }
298            }
299    
300            @Override
301            public boolean isTerminated() {
302                    if (_runState == _TERMINATED) {
303                            return true;
304                    }
305                    else {
306                            return false;
307                    }
308            }
309    
310            public boolean isTerminating() {
311                    if (_runState == _STOP) {
312                            return true;
313                    }
314                    else {
315                            return false;
316                    }
317            }
318    
319            public void setAllowCoreThreadTimeout(boolean allowCoreThreadTimeout) {
320                    _allowCoreThreadTimeout = allowCoreThreadTimeout;
321            }
322    
323            public void setKeepAliveTime(long keepAliveTime, TimeUnit timeUnit) {
324                    if (keepAliveTime < 0) {
325                            throw new IllegalArgumentException();
326                    }
327    
328                    _keepAliveTime = timeUnit.toNanos(keepAliveTime);
329            }
330    
331            public void setRejectedExecutionHandler(
332                    RejectedExecutionHandler rejectedExecutionHandler) {
333    
334                    if (rejectedExecutionHandler == null) {
335                            throw new NullPointerException();
336                    }
337    
338                    _rejectedExecutionHandler = rejectedExecutionHandler;
339            }
340    
341            public void setThreadFactory(ThreadFactory threadFactory) {
342                    if (threadFactory == null) {
343                            throw new NullPointerException();
344                    }
345    
346                    _threadFactory = threadFactory;
347            }
348    
349            public void setThreadPoolHandler(ThreadPoolHandler threadPoolHandler) {
350                    if (threadPoolHandler == null) {
351                            throw new NullPointerException();
352                    }
353    
354                    _threadPoolHandler = threadPoolHandler;
355            }
356    
357            @Override
358            public void shutdown() {
359                    _mainLock.lock();
360    
361                    try {
362                            int state = _runState;
363    
364                            if (state < _SHUTDOWN) {
365                                    _runState = _SHUTDOWN;
366                            }
367    
368                            for (WorkerTask workerTask : _workerTasks) {
369                                    workerTask._interruptIfWaiting();
370                            }
371    
372                            _tryTerminate();
373                    }
374                    finally {
375                            _mainLock.unlock();
376                    }
377            }
378    
379            @Override
380            public List<Runnable> shutdownNow() {
381                    _mainLock.lock();
382    
383                    try {
384                            int state = _runState;
385    
386                            if (state < _STOP) {
387                                    _runState = _STOP;
388                            }
389    
390                            for (WorkerTask workerTask : _workerTasks) {
391                                    workerTask._thread.interrupt();
392                            }
393    
394                            List<Runnable> runnables = new ArrayList<Runnable>();
395    
396                            _taskQueue.drainTo(runnables);
397    
398                            _tryTerminate();
399    
400                            return runnables;
401                    }
402                    finally {
403                            _mainLock.unlock();
404                    }
405            }
406    
407            @Override
408            protected void finalize() {
409                    shutdown();
410            }
411    
412            protected ReentrantLock getMainLock() {
413                    return _mainLock;
414            }
415    
416            protected TaskQueue<Runnable> getTaskQueue() {
417                    return _taskQueue;
418            }
419    
420            protected Set<WorkerTask> getWorkerTasks() {
421                    return _workerTasks;
422            }
423    
424            private void _addWorkerThread() {
425                    int runState = _runState;
426                    int poolSize = _poolSize;
427    
428                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
429                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
430                             !_taskQueue.isEmpty())) {
431    
432                            _mainLock.lock();
433    
434                            try {
435                                    runState = _runState;
436                                    poolSize = _poolSize;
437    
438                                    if (((runState == _RUNNING) && (poolSize < _maxPoolSize)) ||
439                                            ((runState == _SHUTDOWN) && (poolSize == 0) &&
440                                             !_taskQueue.isEmpty())) {
441    
442                                            Runnable runnable = _taskQueue.poll();
443    
444                                            if (runnable != null) {
445                                                    _doAddWorkerThread(runnable);
446                                            }
447                                    }
448                            }
449                            finally {
450                                    _mainLock.unlock();
451                            }
452                    }
453            }
454    
455            private void _doAddWorkerThread(Runnable runnable) {
456                    WorkerTask workerTask = new WorkerTask(runnable);
457    
458                    _workerTasks.add(workerTask);
459    
460                    int poolSize = ++_poolSize;
461    
462                    if (poolSize > _largestPoolSize) {
463                            _largestPoolSize = poolSize;
464                    }
465    
466                    workerTask._startWork();
467            }
468    
469            private Runnable _getTask(WorkerTask workerTask, boolean[] cleanUpMarker) {
470                    while (true) {
471                            try {
472                                    int state = _runState;
473    
474                                    if (state >= _STOP) {
475                                            return null;
476                                    }
477    
478                                    Runnable runnable = null;
479    
480                                    if (state == _SHUTDOWN) {
481                                            runnable = _taskQueue.poll();
482                                    }
483                                    else if ((_poolSize > _corePoolSize) ||
484                                                     _allowCoreThreadTimeout) {
485    
486                                            runnable = _taskQueue.poll(
487                                                    _keepAliveTime, TimeUnit.NANOSECONDS);
488                                    }
489                                    else {
490                                            runnable = _taskQueue.take();
491                                    }
492    
493                                    if (runnable != null) {
494                                            return runnable;
495                                    }
496    
497                                    _mainLock.lock();
498    
499                                    try {
500                                            if ((_runState >= _STOP) ||
501                                                    ((_runState >= _SHUTDOWN) && _taskQueue.isEmpty()) ||
502                                                    (_allowCoreThreadTimeout &&
503                                                     ((_poolSize > 1) || _taskQueue.isEmpty())) ||
504                                                    (!_allowCoreThreadTimeout &&
505                                                     (_poolSize > _corePoolSize))) {
506    
507                                                    _completedTaskCount +=
508                                                            workerTask._localCompletedTaskCount;
509    
510                                                    _workerTasks.remove(workerTask);
511    
512                                                    if (--_poolSize == 0) {
513                                                            _tryTerminate();
514                                                    }
515    
516                                                    cleanUpMarker[0] = true;
517    
518                                                    return null;
519                                            }
520                                    }
521                                    finally {
522                                            _mainLock.unlock();
523                                    }
524                            }
525                            catch (InterruptedException ie) {
526                            }
527                    }
528            }
529    
530            private void _tryTerminate() {
531                    if (_poolSize == 0) {
532                            int state = _runState;
533    
534                            if ((state == _STOP) ||
535                                    ((state == _SHUTDOWN) && _taskQueue.isEmpty())) {
536    
537                                    _runState = _TERMINATED;
538    
539                                    _terminationCondition.signalAll();
540                                    _threadPoolHandler.terminated();
541    
542                                    return;
543                            }
544    
545                            if (!_taskQueue.isEmpty()) {
546                                    _doAddWorkerThread(_taskQueue.poll());
547                            }
548                    }
549            }
550    
551            private static final int _RUNNING = 0;
552    
553            private static final int _SHUTDOWN = 1;
554    
555            private static final int _STOP = 2;
556    
557            private static final int _TERMINATED = 3;
558    
559            private volatile boolean _allowCoreThreadTimeout;
560            private long _completedTaskCount;
561            private volatile int _corePoolSize;
562            private volatile long _keepAliveTime;
563            private volatile int _largestPoolSize;
564            private final ReentrantLock _mainLock = new ReentrantLock();
565            private volatile int _maxPoolSize;
566            private volatile int _poolSize;
567            private volatile RejectedExecutionHandler _rejectedExecutionHandler;
568            private volatile int _runState;
569            private final TaskQueue<Runnable> _taskQueue;
570            private final Condition _terminationCondition = _mainLock.newCondition();
571            private volatile ThreadFactory _threadFactory;
572            private volatile ThreadPoolHandler _threadPoolHandler;
573            private final Set<WorkerTask> _workerTasks;
574    
575            private class WorkerTask
576                    extends AbstractQueuedSynchronizer implements Runnable {
577    
578                    public WorkerTask(Runnable runnable) {
579                            _runnable = runnable;
580                    }
581    
582                    @Override
583                    public void run() {
584                            boolean[] cleanUpMarker = new boolean[1];
585    
586                            try {
587                                    Runnable runnable = _runnable;
588    
589                                    _runnable = null;
590    
591                                    do {
592                                            if (runnable != null) {
593                                                    _runTask(runnable);
594    
595                                                    runnable = null;
596                                            }
597                                    }
598                                    while ((runnable = _getTask(this, cleanUpMarker)) != null);
599                            }
600                            finally {
601                                    if (!cleanUpMarker[0]) {
602                                            _mainLock.lock();
603    
604                                            try {
605                                                    _completedTaskCount += _localCompletedTaskCount;
606    
607                                                    _workerTasks.remove(this);
608    
609                                                    if (--_poolSize == 0) {
610                                                            _tryTerminate();
611                                                    }
612                                            }
613                                            finally {
614                                                    _mainLock.unlock();
615                                            }
616                                    }
617    
618                                    _threadPoolHandler.beforeThreadEnd(_thread);
619                            }
620                    }
621    
622                    @Override
623                    protected boolean isHeldExclusively() {
624                            if (getState() == 1) {
625                                    return true;
626                            }
627                            else {
628                                    return false;
629                            }
630                    }
631    
632                    @Override
633                    protected boolean tryAcquire(int unused) {
634                            return compareAndSetState(0, 1);
635                    }
636    
637                    @Override
638                    protected boolean tryRelease(int unused) {
639                            setState(0);
640    
641                            return true;
642                    }
643    
644                    private boolean _interruptIfWaiting() {
645                            if (!_thread.isInterrupted() && tryAcquire(1)) {
646                                    try {
647                                            _thread.interrupt();
648    
649                                            return true;
650                                    }
651                                    finally {
652                                            _unlock();
653                                    }
654                            }
655    
656                            return false;
657                    }
658    
659                    private boolean _isLocked() {
660                            return isHeldExclusively();
661                    }
662    
663                    private void _lock() {
664                            acquire(1);
665                    }
666    
667                    private void _runTask(Runnable task) {
668                            _lock();
669    
670                            try {
671                                    if ((_runState < _STOP) && Thread.interrupted() &&
672                                            (_runState >= _STOP)) {
673    
674                                            _thread.interrupt();
675                                    }
676    
677                                    Throwable throwable = null;
678    
679                                    _threadPoolHandler.beforeExecute(_thread, task);
680    
681                                    try {
682                                            task.run();
683    
684                                            _localCompletedTaskCount++;
685                                    }
686                                    catch (RuntimeException re) {
687                                            throwable = re;
688    
689                                            throw re;
690                                    }
691                                    finally {
692                                            _threadPoolHandler.afterExecute(task, throwable);
693                                    }
694                            }
695                            finally {
696                                    _unlock();
697                            }
698                    }
699    
700                    private void _startWork() {
701                            _thread = _threadFactory.newThread(this);
702    
703                            _threadPoolHandler.beforeThreadStart(_thread);
704    
705                            _thread.start();
706                    }
707    
708                    private void _unlock() {
709                            release(1);
710                    }
711    
712                    private volatile long _localCompletedTaskCount;
713                    private Runnable _runnable;
714                    private Thread _thread;
715    
716            }
717    
718    }