001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
018    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolHandlerAdapter;
021    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
022    import com.liferay.portal.kernel.log.Log;
023    import com.liferay.portal.kernel.log.LogFactoryUtil;
024    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
025    import com.liferay.portal.kernel.util.NamedThreadFactory;
026    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027    import com.liferay.portal.kernel.util.Validator;
028    import com.liferay.portal.model.User;
029    import com.liferay.portal.security.auth.CompanyThreadLocal;
030    import com.liferay.portal.security.auth.PrincipalThreadLocal;
031    import com.liferay.portal.security.permission.PermissionChecker;
032    import com.liferay.portal.security.permission.PermissionCheckerFactoryUtil;
033    import com.liferay.portal.security.permission.PermissionThreadLocal;
034    import com.liferay.portal.service.UserLocalServiceUtil;
035    
036    import java.util.Set;
037    import java.util.concurrent.TimeUnit;
038    
039    /**
040     * @author Michael C. Han
041     * @author Shuyang Zhou
042     */
043    public abstract class BaseAsyncDestination extends BaseDestination {
044    
045            public BaseAsyncDestination() {
046            }
047    
048            /**
049             * @deprecated
050             */
051            public BaseAsyncDestination(String name) {
052                    this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
053            }
054    
055            /**
056             * @deprecated
057             */
058            public BaseAsyncDestination(
059                    String name, int workersCoreSize, int workersMaxSize) {
060    
061                    this.name = name;
062                    _workersCoreSize = workersCoreSize;
063                    _workersMaxSize = workersMaxSize;
064    
065                    open();
066            }
067    
068            @Override
069            public void close(boolean force) {
070                    PortalExecutorManagerUtil.shutdown(getName(), force);
071            }
072    
073            @Override
074            public DestinationStatistics getDestinationStatistics() {
075                    DestinationStatistics destinationStatistics =
076                            new DestinationStatistics();
077    
078                    destinationStatistics.setActiveThreadCount(
079                            _threadPoolExecutor.getActiveCount());
080                    destinationStatistics.setCurrentThreadCount(
081                            _threadPoolExecutor.getPoolSize());
082                    destinationStatistics.setLargestThreadCount(
083                            _threadPoolExecutor.getLargestPoolSize());
084                    destinationStatistics.setMaxThreadPoolSize(
085                            _threadPoolExecutor.getMaxPoolSize());
086                    destinationStatistics.setMinThreadPoolSize(
087                            _threadPoolExecutor.getCorePoolSize());
088                    destinationStatistics.setPendingMessageCount(
089                            _threadPoolExecutor.getPendingTaskCount());
090                    destinationStatistics.setSentMessageCount(
091                            _threadPoolExecutor.getCompletedTaskCount());
092    
093                    return destinationStatistics;
094            }
095    
096            public int getMaximumQueueSize() {
097                    return _maximumQueueSize;
098            }
099    
100            public int getWorkersCoreSize() {
101                    return _workersCoreSize;
102            }
103    
104            public int getWorkersMaxSize() {
105                    return _workersMaxSize;
106            }
107    
108            @Override
109            public void open() {
110                    if ((_threadPoolExecutor != null) &&
111                            !_threadPoolExecutor.isShutdown()) {
112    
113                            return;
114                    }
115    
116                    ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
117    
118                    if (_rejectedExecutionHandler == null) {
119                            _rejectedExecutionHandler = createRejectionExecutionHandler();
120                    }
121    
122                    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
123                            _workersCoreSize, _workersMaxSize, 60L, TimeUnit.SECONDS, false,
124                            _maximumQueueSize, _rejectedExecutionHandler,
125                            new NamedThreadFactory(
126                                    getName(), Thread.NORM_PRIORITY, classLoader),
127                            new ThreadPoolHandlerAdapter());
128    
129                    ThreadPoolExecutor oldThreadPoolExecutor =
130                            PortalExecutorManagerUtil.registerPortalExecutor(
131                                    getName(), threadPoolExecutor);
132    
133                    if (oldThreadPoolExecutor != null) {
134                            if (_log.isWarnEnabled()) {
135                                    _log.warn(
136                                            "Abort creating a new thread pool for destination " +
137                                                    getName() + " and reuse previous one");
138                            }
139    
140                            threadPoolExecutor.shutdownNow();
141    
142                            threadPoolExecutor = oldThreadPoolExecutor;
143                    }
144    
145                    _threadPoolExecutor = threadPoolExecutor;
146            }
147    
148            @Override
149            public void send(Message message) {
150                    if (messageListeners.isEmpty()) {
151                            if (_log.isDebugEnabled()) {
152                                    _log.debug("No message listeners for destination " + getName());
153                            }
154    
155                            return;
156                    }
157    
158                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
159    
160                    if (threadPoolExecutor.isShutdown()) {
161                            throw new IllegalStateException(
162                                    "Destination " + getName() + " is shutdown and cannot " +
163                                            "receive more messages");
164                    }
165    
166                    populateMessageFromThreadLocals(message);
167    
168                    dispatch(messageListeners, message);
169            }
170    
171            public void setMaximumQueueSize(int maximumQueueSize) {
172                    _maximumQueueSize = maximumQueueSize;
173            }
174    
175            public void setRejectedExecutionHandler(
176                    RejectedExecutionHandler rejectedExecutionHandler) {
177    
178                    _rejectedExecutionHandler = rejectedExecutionHandler;
179            }
180    
181            public void setWorkersCoreSize(int workersCoreSize) {
182                    _workersCoreSize = workersCoreSize;
183            }
184    
185            public void setWorkersMaxSize(int workersMaxSize) {
186                    _workersMaxSize = workersMaxSize;
187            }
188    
189            protected RejectedExecutionHandler createRejectionExecutionHandler() {
190                    return new RejectedExecutionHandler() {
191    
192                            @Override
193                            public void rejectedExecution(
194                                    Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
195    
196                                    if (!_log.isWarnEnabled()) {
197                                            return;
198                                    }
199    
200                                    MessageRunnable messageRunnable = (MessageRunnable)runnable;
201    
202                                    _log.warn(
203                                            "Discarding message " + messageRunnable.getMessage() +
204                                                    " because it exceeds the maximum queue size of " +
205                                                            _maximumQueueSize);
206                            }
207    
208                    };
209            }
210    
211            protected abstract void dispatch(
212                    Set<MessageListener> messageListeners, Message message);
213    
214            protected ThreadPoolExecutor getThreadPoolExecutor() {
215                    return _threadPoolExecutor;
216            }
217    
218            protected void populateMessageFromThreadLocals(Message message) {
219                    if (!message.contains("companyId")) {
220                            message.put("companyId", CompanyThreadLocal.getCompanyId());
221                    }
222    
223                    if (!message.contains("permissionChecker")) {
224                            message.put(
225                                    "permissionChecker",
226                                    PermissionThreadLocal.getPermissionChecker());
227                    }
228    
229                    if (!message.contains("principalName")) {
230                            message.put("principalName", PrincipalThreadLocal.getName());
231                    }
232    
233                    if (!message.contains("principalPassword")) {
234                            message.put(
235                                    "principalPassword", PrincipalThreadLocal.getPassword());
236                    }
237            }
238    
239            protected void populateThreadLocalsFromMessage(Message message) {
240                    long companyId = message.getLong("companyId");
241    
242                    if (companyId > 0) {
243                            CompanyThreadLocal.setCompanyId(companyId);
244                    }
245    
246                    PermissionChecker permissionChecker = (PermissionChecker)message.get(
247                            "permissionChecker");
248    
249                    String principalName = message.getString("principalName");
250    
251                    if (Validator.isNotNull(principalName)) {
252                            PrincipalThreadLocal.setName(principalName);
253                    }
254    
255                    if ((permissionChecker == null) && Validator.isNotNull(principalName)) {
256                            try {
257                                    User user = UserLocalServiceUtil.fetchUser(
258                                            PrincipalThreadLocal.getUserId());
259    
260                                    permissionChecker = PermissionCheckerFactoryUtil.create(user);
261                            }
262                            catch (Exception e) {
263                                    throw new RuntimeException(e);
264                            }
265                    }
266    
267                    if (permissionChecker != null) {
268                            PermissionThreadLocal.setPermissionChecker(permissionChecker);
269                    }
270    
271                    String principalPassword = message.getString("principalPassword");
272    
273                    if (Validator.isNotNull(principalPassword)) {
274                            PrincipalThreadLocal.setPassword(principalPassword);
275                    }
276    
277                    Boolean clusterForwardMessage = (Boolean)message.get(
278                            ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE);
279    
280                    if (clusterForwardMessage != null) {
281                            MessageValuesThreadLocal.setValue(
282                                    ClusterLinkUtil.CLUSTER_FORWARD_MESSAGE, clusterForwardMessage);
283                    }
284            }
285    
286            private static final int _WORKERS_CORE_SIZE = 2;
287    
288            private static final int _WORKERS_MAX_SIZE = 5;
289    
290            private static Log _log = LogFactoryUtil.getLog(BaseAsyncDestination.class);
291    
292            private int _maximumQueueSize = Integer.MAX_VALUE;
293            private RejectedExecutionHandler _rejectedExecutionHandler;
294            private ThreadPoolExecutor _threadPoolExecutor;
295            private int _workersCoreSize = _WORKERS_CORE_SIZE;
296            private int _workersMaxSize = _WORKERS_MAX_SIZE;
297    
298    }