001
014
015 package com.liferay.portal.executor;
016
017 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
018 import com.liferay.portal.kernel.executor.PortalExecutorFactory;
019 import com.liferay.portal.kernel.executor.PortalExecutorManager;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
023
024 import java.util.Map;
025 import java.util.concurrent.Callable;
026 import java.util.concurrent.ConcurrentHashMap;
027 import java.util.concurrent.ExecutionException;
028 import java.util.concurrent.Future;
029 import java.util.concurrent.TimeUnit;
030 import java.util.concurrent.TimeoutException;
031
032
035 @DoPrivileged
036 public class PortalExecutorManagerImpl implements PortalExecutorManager {
037
038 public void afterPropertiesSet() {
039 if (_portalExecutorFactory == null) {
040 throw new IllegalArgumentException(
041 "Portal executor factory is null");
042 }
043 }
044
045 @Override
046 public <T> Future<T> execute(String name, Callable<T> callable) {
047 ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
048
049 return threadPoolExecutor.submit(callable);
050 }
051
052 @Override
053 public <T> T execute(
054 String name, Callable<T> callable, long timeout, TimeUnit timeUnit)
055 throws ExecutionException, InterruptedException, TimeoutException {
056
057 ThreadPoolExecutor threadPoolExecutor = getPortalExecutor(name);
058
059 Future<T> future = threadPoolExecutor.submit(callable);
060
061 return future.get(timeout, timeUnit);
062 }
063
064 @Override
065 public ThreadPoolExecutor getPortalExecutor(String name) {
066 return getPortalExecutor(name, true);
067 }
068
069 @Override
070 public ThreadPoolExecutor getPortalExecutor(
071 String name, boolean createIfAbsent) {
072
073 ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.get(name);
074
075 if ((threadPoolExecutor == null) && createIfAbsent) {
076 synchronized (_threadPoolExecutors) {
077 threadPoolExecutor = _threadPoolExecutors.get(name);
078
079 if (threadPoolExecutor == null) {
080 threadPoolExecutor =
081 _portalExecutorFactory.createPortalExecutor(name);
082
083 _threadPoolExecutors.put(name, threadPoolExecutor);
084 }
085 }
086 }
087
088 return threadPoolExecutor;
089 }
090
091 @Override
092 public ThreadPoolExecutor registerPortalExecutor(
093 String name, ThreadPoolExecutor threadPoolExecutor) {
094
095 ThreadPoolExecutor oldThreadPoolExecutor = _threadPoolExecutors.get(
096 name);
097
098 if (oldThreadPoolExecutor == null) {
099 synchronized (_threadPoolExecutors) {
100 oldThreadPoolExecutor = _threadPoolExecutors.get(name);
101
102 if (oldThreadPoolExecutor == null) {
103 oldThreadPoolExecutor = _threadPoolExecutors.put(
104 name, threadPoolExecutor);
105 }
106 }
107 }
108
109 return oldThreadPoolExecutor;
110 }
111
112 public void setPortalExecutorFactory(
113 PortalExecutorFactory portalExecutorFactory) {
114
115 _portalExecutorFactory = portalExecutorFactory;
116 }
117
118 public void setPortalExecutors(
119 Map<String, ThreadPoolExecutor> threadPoolExecutors) {
120
121 if (threadPoolExecutors != null) {
122 _threadPoolExecutors =
123 new ConcurrentHashMap<String, ThreadPoolExecutor>(
124 threadPoolExecutors);
125 }
126 }
127
128 @Override
129 public void shutdown() {
130 shutdown(false);
131 }
132
133 @Override
134 public void shutdown(boolean interrupt) {
135 for (Map.Entry<String, ThreadPoolExecutor> entry :
136 _threadPoolExecutors.entrySet()) {
137
138 ThreadPoolExecutor threadPoolExecutor = entry.getValue();
139
140 if (interrupt) {
141 threadPoolExecutor.shutdownNow();
142 }
143 else {
144 threadPoolExecutor.shutdown();
145 }
146 }
147
148 _threadPoolExecutors.clear();
149 }
150
151 @Override
152 public void shutdown(String name) {
153 shutdown(name, false);
154 }
155
156 @Override
157 public void shutdown(String name, boolean interrupt) {
158 ThreadPoolExecutor threadPoolExecutor = _threadPoolExecutors.remove(
159 name);
160
161 if (threadPoolExecutor == null) {
162 if (_log.isDebugEnabled()) {
163 _log.debug("No portal executor found for name " + name);
164 }
165
166 return;
167 }
168
169 if (interrupt) {
170 threadPoolExecutor.shutdownNow();
171 }
172 else {
173 threadPoolExecutor.shutdown();
174 }
175 }
176
177 private static Log _log = LogFactoryUtil.getLog(
178 PortalExecutorManagerImpl.class);
179
180 private PortalExecutorFactory _portalExecutorFactory;
181 private Map<String, ThreadPoolExecutor> _threadPoolExecutors =
182 new ConcurrentHashMap<String, ThreadPoolExecutor>();
183
184 }