001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.cluster.ClusterLink;
018 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
025 import com.liferay.portal.kernel.util.GroupThreadLocal;
026 import com.liferay.portal.kernel.util.LocaleThreadLocal;
027 import com.liferay.portal.kernel.util.NamedThreadFactory;
028 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
029 import com.liferay.portal.kernel.util.Validator;
030 import com.liferay.portal.model.User;
031 import com.liferay.portal.security.auth.CompanyThreadLocal;
032 import com.liferay.portal.security.auth.PrincipalThreadLocal;
033 import com.liferay.portal.security.permission.PermissionChecker;
034 import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
035 import com.liferay.portal.security.permission.PermissionThreadLocal;
036 import com.liferay.portal.service.UserLocalServiceUtil;
037
038 import java.util.Locale;
039 import java.util.Set;
040 import java.util.concurrent.TimeUnit;
041
042
046 public abstract class BaseAsyncDestination extends BaseDestination {
047
048 public BaseAsyncDestination() {
049 }
050
051
054 public BaseAsyncDestination(String name) {
055 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
056 }
057
058
061 public BaseAsyncDestination(
062 String name, int workersCoreSize, int workersMaxSize) {
063
064 this.name = name;
065 _workersCoreSize = workersCoreSize;
066 _workersMaxSize = workersMaxSize;
067
068 open();
069 }
070
071 @Override
072 public void close(boolean force) {
073 PortalExecutorManagerUtil.shutdown(getName(), force);
074 }
075
076 @Override
077 public DestinationStatistics getDestinationStatistics() {
078 DestinationStatistics destinationStatistics =
079 new DestinationStatistics();
080
081 destinationStatistics.setActiveThreadCount(
082 _threadPoolExecutor.getActiveCount());
083 destinationStatistics.setCurrentThreadCount(
084 _threadPoolExecutor.getPoolSize());
085 destinationStatistics.setLargestThreadCount(
086 _threadPoolExecutor.getLargestPoolSize());
087 destinationStatistics.setMaxThreadPoolSize(
088 _threadPoolExecutor.getMaxPoolSize());
089 destinationStatistics.setMinThreadPoolSize(
090 _threadPoolExecutor.getCorePoolSize());
091 destinationStatistics.setPendingMessageCount(
092 _threadPoolExecutor.getPendingTaskCount());
093 destinationStatistics.setSentMessageCount(
094 _threadPoolExecutor.getCompletedTaskCount());
095
096 return destinationStatistics;
097 }
098
099 public int getMaximumQueueSize() {
100 return _maximumQueueSize;
101 }
102
103 public int getWorkersCoreSize() {
104 return _workersCoreSize;
105 }
106
107 public int getWorkersMaxSize() {
108 return _workersMaxSize;
109 }
110
111 @Override
112 public void open() {
113 if ((_threadPoolExecutor != null) &&
114 !_threadPoolExecutor.isShutdown()) {
115
116 return;
117 }
118
119 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
120
121 if (_rejectedExecutionHandler == null) {
122 _rejectedExecutionHandler = createRejectionExecutionHandler();
123 }
124
125 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
126 _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
127 _maximumQueueSize, _rejectedExecutionHandler,
128 new NamedThreadFactory(
129 getName(), Thread.NORM_PRIORITY, classLoader),
130 new ThreadPoolHandlerAdapter());
131
132 ThreadPoolExecutor oldThreadPoolExecutor =
133 PortalExecutorManagerUtil.registerPortalExecutor(
134 getName(), threadPoolExecutor);
135
136 if (oldThreadPoolExecutor != null) {
137 if (_log.isWarnEnabled()) {
138 _log.warn(
139 "Abort creating a new thread pool for destination " +
140 getName() + " and reuse previous one");
141 }
142
143 threadPoolExecutor.shutdownNow();
144
145 threadPoolExecutor = oldThreadPoolExecutor;
146 }
147
148 _threadPoolExecutor = threadPoolExecutor;
149 }
150
151 @Override
152 public void send(Message message) {
153 if (messageListeners.isEmpty()) {
154 if (_log.isDebugEnabled()) {
155 _log.debug("No message listeners for destination " + getName());
156 }
157
158 return;
159 }
160
161 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
162
163 if (threadPoolExecutor.isShutdown()) {
164 throw new IllegalStateException(
165 "Destination " + getName() + " is shutdown and cannot " +
166 "receive more messages");
167 }
168
169 populateMessageFromThreadLocals(message);
170
171 if (_log.isDebugEnabled()) {
172 _log.debug(
173 "Sending message " + message + " from destination " +
174 getName() + " to message listeners " + messageListeners);
175 }
176
177 dispatch(messageListeners, message);
178 }
179
180 public void setMaximumQueueSize(int maximumQueueSize) {
181 _maximumQueueSize = maximumQueueSize;
182 }
183
184 public void setRejectedExecutionHandler(
185 RejectedExecutionHandler rejectedExecutionHandler) {
186
187 _rejectedExecutionHandler = rejectedExecutionHandler;
188 }
189
190 public void setWorkersCoreSize(int workersCoreSize) {
191 _workersCoreSize = workersCoreSize;
192 }
193
194 public void setWorkersMaxSize(int workersMaxSize) {
195 _workersMaxSize = workersMaxSize;
196 }
197
198 protected RejectedExecutionHandler createRejectionExecutionHandler() {
199 return new RejectedExecutionHandler() {
200
201 @Override
202 public void rejectedExecution(
203 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
204
205 if (!_log.isWarnEnabled()) {
206 return;
207 }
208
209 MessageRunnable messageRunnable = (MessageRunnable)runnable;
210
211 _log.warn(
212 "Discarding message " + messageRunnable.getMessage() +
213 " because it exceeds the maximum queue size of " +
214 _maximumQueueSize);
215 }
216
217 };
218 }
219
220 protected abstract void dispatch(
221 Set<MessageListener> messageListeners, Message message);
222
223 protected ThreadPoolExecutor getThreadPoolExecutor() {
224 return _threadPoolExecutor;
225 }
226
227 protected void populateMessageFromThreadLocals(Message message) {
228 if (!message.contains("companyId")) {
229 message.put("companyId", CompanyThreadLocal.getCompanyId());
230 }
231
232 if (!message.contains("defaultLocale")) {
233 message.put("defaultLocale", LocaleThreadLocal.getDefaultLocale());
234 }
235
236 if (!message.contains("groupId")) {
237 message.put("groupId", GroupThreadLocal.getGroupId());
238 }
239
240 if (!message.contains("permissionChecker")) {
241 message.put(
242 "permissionChecker",
243 PermissionThreadLocal.getPermissionChecker());
244 }
245
246 if (!message.contains("principalName")) {
247 message.put("principalName", PrincipalThreadLocal.getName());
248 }
249
250 if (!message.contains("principalPassword")) {
251 message.put(
252 "principalPassword", PrincipalThreadLocal.getPassword());
253 }
254
255 if (!message.contains("siteDefaultLocale")) {
256 message.put(
257 "siteDefaultLocale", LocaleThreadLocal.getSiteDefaultLocale());
258 }
259
260 if (!message.contains("themeDisplayLocale")) {
261 message.put(
262 "themeDisplayLocale",
263 LocaleThreadLocal.getThemeDisplayLocale());
264 }
265 }
266
267 protected void populateThreadLocalsFromMessage(Message message) {
268 long companyId = message.getLong("companyId");
269
270 if (companyId > 0) {
271 CompanyThreadLocal.setCompanyId(companyId);
272 }
273
274 Locale defaultLocale = (Locale)message.get("defaultLocale");
275
276 if (defaultLocale != null) {
277 LocaleThreadLocal.setDefaultLocale(defaultLocale);
278 }
279
280 long groupId = message.getLong("groupId");
281
282 if (groupId > 0) {
283 GroupThreadLocal.setGroupId(groupId);
284 }
285
286 PermissionChecker permissionChecker = (PermissionChecker)message.get(
287 "permissionChecker");
288
289 String principalName = message.getString("principalName");
290
291 if (Validator.isNotNull(principalName)) {
292 PrincipalThreadLocal.setName(principalName);
293 }
294
295 if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
296 try {
297 User user = UserLocalServiceUtil.fetchUser(
298 PrincipalThreadLocal.getUserId());
299
300 permissionChecker = PermissionCheckerFactoryUtil.create(user);
301 }
302 catch (Exception e) {
303 throw new RuntimeException(e);
304 }
305 }
306
307 if (permissionChecker != null) {
308 PermissionThreadLocal.setPermissionChecker(permissionChecker);
309 }
310
311 String principalPassword = message.getString("principalPassword");
312
313 if (Validator.isNotNull(principalPassword)) {
314 PrincipalThreadLocal.setPassword(principalPassword);
315 }
316
317 Boolean clusterForwardMessage = (Boolean)message.get(
318 ClusterLink.CLUSTER_FORWARD_MESSAGE);
319
320 if (clusterForwardMessage != null) {
321 MessageValuesThreadLocal.setValue(
322 ClusterLink.CLUSTER_FORWARD_MESSAGE, clusterForwardMessage);
323 }
324
325 Locale siteDefaultLocale = (Locale)message.get("siteDefaultLocale");
326
327 if (siteDefaultLocale != null) {
328 LocaleThreadLocal.setSiteDefaultLocale(siteDefaultLocale);
329 }
330
331 Locale themeDisplayLocale = (Locale)message.get("themeDisplayLocale");
332
333 if (themeDisplayLocale != null) {
334 LocaleThreadLocal.setThemeDisplayLocale(themeDisplayLocale);
335 }
336 }
337
338 private static final int _WORKERS_CORE_SIZE = 2;
339
340 private static final int _WORKERS_MAX_SIZE = 5;
341
342 private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
343
344 private int _maximumQueueSize = Integer.MAX_VALUE;
345 private RejectedExecutionHandler _rejectedExecutionHandler;
346 private ThreadPoolExecutor _threadPoolExecutor;
347 private int _workersCoreSize = _WORKERS_CORE_SIZE;
348 private int _workersMaxSize = _WORKERS_MAX_SIZE;
349
350 }