001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.executor;
016    
017    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
018    import com.liferay.portal.kernel.executor.PortalExecutorFactory;
019    import com.liferay.portal.kernel.executor.PortalExecutorManager;
020    import com.liferay.portal.kernel.log.Log;
021    import com.liferay.portal.kernel.log.LogFactoryUtil;
022    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
023    
024    import java.util.Map;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ConcurrentHashMap;
027    import java.util.concurrent.ExecutionException;
028    import java.util.concurrent.Future;
029    import java.util.concurrent.TimeUnit;
030    import java.util.concurrent.TimeoutException;
031    
032    /**
033     * @author Shuyang Zhou
034     */
035    @DoPrivileged
036    public class PortalExecutorManagerImpl implements PortalExecutorManager {
037    
038            public void afterPropertiesSet() {
039                    if (_portalExecutorFactory == null) {
040                            throw new IllegalArgumentException(
041                                    "Portal executor factory is null");
042                    }
043            }
044    
045            @Override
046            public <T> Future<T> execute(String name, Callable<T> callable) {
047                    ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
048    
049                    return threadPoolExecutor.submit(callable);
050            }
051    
052            @Override
053            public <T> T execute(
054                            String name, Callable<T> callable, long timeout, TimeUnit timeUnit)
055                    throws ExecutionException, InterruptedException, TimeoutException {
056    
057                    ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
058    
059                    Future<T> future = threadPoolExecutor.submit(callable);
060    
061                    return future.get(timeout, timeUnit);
062            }
063    
064            @Override
065            public ThreadPoolExecutor getPortalExecutor(String name) {
066                    return getPortalExecutor(name, true);
067            }
068    
069            @Override
070            public ThreadPoolExecutor getPortalExecutor(
071                    String name, boolean createIfAbsent) {
072    
073                    ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.get(name);
074    
075                    if ((threadPoolExecutor == null) && createIfAbsent) {
076                            synchronized (_threadPoolExecutors) {
077                                    threadPoolExecutor = _threadPoolExecutors.get(name);
078    
079                                    if (threadPoolExecutor == null) {
080                                            threadPoolExecutor =
081                                                    _portalExecutorFactory.createPortalExecutor(name);
082    
083                                            _threadPoolExecutors.put(name, threadPoolExecutor);
084                                    }
085                            }
086                    }
087    
088                    return threadPoolExecutor;
089            }
090    
091            @Override
092            public ThreadPoolExecutor registerPortalExecutor(
093                    String name, ThreadPoolExecutor threadPoolExecutor) {
094    
095                    ThreadPoolExecutor oldThreadPoolExecutor = _threadPoolExecutors.get(
096                            name);
097    
098                    if (oldThreadPoolExecutor == null) {
099                            synchronized (_threadPoolExecutors) {
100                                    oldThreadPoolExecutor = _threadPoolExecutors.get(name);
101    
102                                    if (oldThreadPoolExecutor == null) {
103                                            oldThreadPoolExecutor = _threadPoolExecutors.put(
104                                                    name, threadPoolExecutor);
105                                    }
106                            }
107                    }
108    
109                    return oldThreadPoolExecutor;
110            }
111    
112            public void setPortalExecutorFactory(
113                    PortalExecutorFactory portalExecutorFactory) {
114    
115                    _portalExecutorFactory = portalExecutorFactory;
116            }
117    
118            public void setPortalExecutors(
119                    Map<String, ThreadPoolExecutor> threadPoolExecutors) {
120    
121                    if (threadPoolExecutors != null) {
122                            _threadPoolExecutors =
123                                    new ConcurrentHashMap<String, ThreadPoolExecutor>(
124                                            threadPoolExecutors);
125                    }
126            }
127    
128            @Override
129            public void shutdown() {
130                    shutdown(false);
131            }
132    
133            @Override
134            public void shutdown(boolean interrupt) {
135                    for (Map.Entry<String, ThreadPoolExecutor> entry :
136                                    _threadPoolExecutors.entrySet()) {
137    
138                            ThreadPoolExecutor threadPoolExecutor = entry.getValue();
139    
140                            if (interrupt) {
141                                    threadPoolExecutor.shutdownNow();
142                            }
143                            else {
144                                    threadPoolExecutor.shutdown();
145                            }
146                    }
147    
148                    _threadPoolExecutors.clear();
149            }
150    
151            @Override
152            public void shutdown(String name) {
153                    shutdown(name, false);
154            }
155    
156            @Override
157            public void shutdown(String name, boolean interrupt) {
158                    ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.remove(
159                            name);
160    
161                    if (threadPoolExecutor == null) {
162                            if (_log.isDebugEnabled()) {
163                                    _log.debug("No portal executor found for name " + name);
164                            }
165    
166                            return;
167                    }
168    
169                    if (interrupt) {
170                            threadPoolExecutor.shutdownNow();
171                    }
172                    else {
173                            threadPoolExecutor.shutdown();
174                    }
175            }
176    
177            private static Log _log = LogFactoryUtil.getLog(
178                    PortalExecutorManagerImpl.class);
179    
180            private PortalExecutorFactory _portalExecutorFactory;
181            private Map<String, ThreadPoolExecutor> _threadPoolExecutors =
182                    new ConcurrentHashMap<String, ThreadPoolExecutor>();
183    
184    }