001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
018 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022 import com.liferay.portal.kernel.log.Log;
023 import com.liferay.portal.kernel.log.LogFactoryUtil;
024 import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
025 import com.liferay.portal.kernel.util.NamedThreadFactory;
026 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027 import com.liferay.portal.kernel.util.Validator;
028 import com.liferay.portal.model.User;
029 import com.liferay.portal.security.auth.CompanyThreadLocal;
030 import com.liferay.portal.security.auth.PrincipalThreadLocal;
031 import com.liferay.portal.security.permission.PermissionChecker;
032 import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
033 import com.liferay.portal.security.permission.PermissionThreadLocal;
034 import com.liferay.portal.service.UserLocalServiceUtil;
035
036 import java.util.Set;
037 import java.util.concurrent.TimeUnit;
038
039
043 public abstract class BaseAsyncDestination extends BaseDestination {
044
045 public BaseAsyncDestination() {
046 }
047
048
051 public BaseAsyncDestination(String name) {
052 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
053 }
054
055
058 public BaseAsyncDestination(
059 String name, int workersCoreSize, int workersMaxSize) {
060
061 this.name = name;
062 _workersCoreSize = workersCoreSize;
063 _workersMaxSize = workersMaxSize;
064
065 open();
066 }
067
068 @Override
069 public void close(boolean force) {
070 PortalExecutorManagerUtil.shutdown(getName(), force);
071 }
072
073 @Override
074 public DestinationStatistics getDestinationStatistics() {
075 DestinationStatistics destinationStatistics =
076 new DestinationStatistics();
077
078 destinationStatistics.setActiveThreadCount(
079 _threadPoolExecutor.getActiveCount());
080 destinationStatistics.setCurrentThreadCount(
081 _threadPoolExecutor.getPoolSize());
082 destinationStatistics.setLargestThreadCount(
083 _threadPoolExecutor.getLargestPoolSize());
084 destinationStatistics.setMaxThreadPoolSize(
085 _threadPoolExecutor.getMaxPoolSize());
086 destinationStatistics.setMinThreadPoolSize(
087 _threadPoolExecutor.getCorePoolSize());
088 destinationStatistics.setPendingMessageCount(
089 _threadPoolExecutor.getPendingTaskCount());
090 destinationStatistics.setSentMessageCount(
091 _threadPoolExecutor.getCompletedTaskCount());
092
093 return destinationStatistics;
094 }
095
096 public int getMaximumQueueSize() {
097 return _maximumQueueSize;
098 }
099
100 public int getWorkersCoreSize() {
101 return _workersCoreSize;
102 }
103
104 public int getWorkersMaxSize() {
105 return _workersMaxSize;
106 }
107
108 @Override
109 public void open() {
110 if ((_threadPoolExecutor != null) &&
111 !_threadPoolExecutor.isShutdown()) {
112
113 return;
114 }
115
116 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
117
118 if (_rejectedExecutionHandler == null) {
119 _rejectedExecutionHandler = createRejectionExecutionHandler();
120 }
121
122 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
123 _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
124 _maximumQueueSize, _rejectedExecutionHandler,
125 new NamedThreadFactory(
126 getName(), Thread.NORM_PRIORITY, classLoader),
127 new ThreadPoolHandlerAdapter());
128
129 ThreadPoolExecutor oldThreadPoolExecutor =
130 PortalExecutorManagerUtil.registerPortalExecutor(
131 getName(), threadPoolExecutor);
132
133 if (oldThreadPoolExecutor != null) {
134 if (_log.isWarnEnabled()) {
135 _log.warn(
136 "Abort creating a new thread pool for destination " +
137 getName() + " and reuse previous one");
138 }
139
140 threadPoolExecutor.shutdownNow();
141
142 threadPoolExecutor = oldThreadPoolExecutor;
143 }
144
145 _threadPoolExecutor = threadPoolExecutor;
146 }
147
148 @Override
149 public void send(Message message) {
150 if (messageListeners.isEmpty()) {
151 if (_log.isDebugEnabled()) {
152 _log.debug("No message listeners for destination " + getName());
153 }
154
155 return;
156 }
157
158 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
159
160 if (threadPoolExecutor.isShutdown()) {
161 throw new IllegalStateException(
162 "Destination " + getName() + " is shutdown and cannot " +
163 "receive more messages");
164 }
165
166 populateMessageFromThreadLocals(message);
167
168 dispatch(messageListeners, message);
169 }
170
171 public void setMaximumQueueSize(int maximumQueueSize) {
172 _maximumQueueSize = maximumQueueSize;
173 }
174
175 public void setRejectedExecutionHandler(
176 RejectedExecutionHandler rejectedExecutionHandler) {
177
178 _rejectedExecutionHandler = rejectedExecutionHandler;
179 }
180
181 public void setWorkersCoreSize(int workersCoreSize) {
182 _workersCoreSize = workersCoreSize;
183 }
184
185 public void setWorkersMaxSize(int workersMaxSize) {
186 _workersMaxSize = workersMaxSize;
187 }
188
189 protected RejectedExecutionHandler createRejectionExecutionHandler() {
190 return new RejectedExecutionHandler() {
191
192 @Override
193 public void rejectedExecution(
194 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
195
196 if (!_log.isWarnEnabled()) {
197 return;
198 }
199
200 MessageRunnable messageRunnable = (MessageRunnable)runnable;
201
202 _log.warn(
203 "Discarding message " + messageRunnable.getMessage() +
204 " because it exceeds the maximum queue size of " +
205 _maximumQueueSize);
206 }
207
208 };
209 }
210
211 protected abstract void dispatch(
212 Set<MessageListener> messageListeners, Message message);
213
214 protected ThreadPoolExecutor getThreadPoolExecutor() {
215 return _threadPoolExecutor;
216 }
217
218 protected void populateMessageFromThreadLocals(Message message) {
219 if (!message.contains("companyId")) {
220 message.put("companyId", CompanyThreadLocal.getCompanyId());
221 }
222
223 if (!message.contains("permissionChecker")) {
224 message.put(
225 "permissionChecker",
226 PermissionThreadLocal.getPermissionChecker());
227 }
228
229 if (!message.contains("principalName")) {
230 message.put("principalName", PrincipalThreadLocal.getName());
231 }
232
233 if (!message.contains("principalPassword")) {
234 message.put(
235 "principalPassword", PrincipalThreadLocal.getPassword());
236 }
237 }
238
239 protected void populateThreadLocalsFromMessage(Message message) {
240 long companyId = message.getLong("companyId");
241
242 if (companyId > 0) {
243 CompanyThreadLocal.setCompanyId(companyId);
244 }
245
246 PermissionChecker permissionChecker = (PermissionChecker)message.get(
247 "permissionChecker");
248
249 String principalName = message.getString("principalName");
250
251 if (Validator.isNotNull(principalName)) {
252 PrincipalThreadLocal.setName(principalName);
253 }
254
255 if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
256 try {
257 User user = UserLocalServiceUtil.fetchUser(
258 PrincipalThreadLocal.getUserId());
259
260 permissionChecker = PermissionCheckerFactoryUtil.create(user);
261 }
262 catch (Exception e) {
263 throw new RuntimeException(e);
264 }
265 }
266
267 if (permissionChecker != null) {
268 PermissionThreadLocal.setPermissionChecker(permissionChecker);
269 }
270
271 String principalPassword = message.getString("principalPassword");
272
273 if (Validator.isNotNull(principalPassword)) {
274 PrincipalThreadLocal.setPassword(principalPassword);
275 }
276
277 Boolean clusterForwardMessage = (Boolean)message.get(
278 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE);
279
280 if (clusterForwardMessage != null) {
281 MessageValuesThreadLocal.setValue(
282 ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE, clusterForwardMessage);
283 }
284 }
285
286 private static final int _WORKERS_CORE_SIZE = 2;
287
288 private static final int _WORKERS_MAX_SIZE = 5;
289
290 private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
291
292 private int _maximumQueueSize = Integer.MAX_VALUE;
293 private RejectedExecutionHandler _rejectedExecutionHandler;
294 private ThreadPoolExecutor _threadPoolExecutor;
295 private int _workersCoreSize = _WORKERS_CORE_SIZE;
296 private int _workersMaxSize = _WORKERS_MAX_SIZE;
297
298 }