001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.util.ConcurrentHashSet;
020    import com.liferay.portal.kernel.util.NamedThreadFactory;
021    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
022    import com.liferay.portal.kernel.util.StringPool;
023    import com.liferay.portal.kernel.util.Validator;
024    
025    import java.util.List;
026    import java.util.Set;
027    import java.util.concurrent.LinkedBlockingQueue;
028    import java.util.concurrent.ThreadPoolExecutor;
029    import java.util.concurrent.TimeUnit;
030    
031    /**
032     * @author Michael C. Han
033     */
034    public abstract class BaseDestination implements Destination {
035    
036            public BaseDestination() {
037            }
038    
039            /**
040             * @deprecated
041             */
042            public BaseDestination(String name) {
043                    this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
044            }
045    
046            /**
047             * @deprecated
048             */
049            public BaseDestination(
050                    String name, int workersCoreSize, int workersMaxSize) {
051    
052                    _name = name;
053                    _workersCoreSize = workersCoreSize;
054                    _workersMaxSize = workersMaxSize;
055    
056                    open();
057            }
058    
059            public void addDestinationEventListener(
060                    DestinationEventListener destinationEventListener) {
061    
062                    _destinationEventListeners.add(destinationEventListener);
063            }
064    
065            public void afterPropertiesSet() {
066                    if (Validator.isNull(_name)) {
067                            throw new IllegalArgumentException("Name is null");
068                    }
069    
070                    open();
071            }
072    
073            public synchronized void close() {
074                    close(false);
075            }
076    
077            public synchronized void close(boolean force) {
078                    doClose(force);
079            }
080    
081            public void copyDestinationEventListeners(Destination destination) {
082                    for (DestinationEventListener destinationEventListener :
083                                    _destinationEventListeners) {
084    
085                            destination.addDestinationEventListener(
086                                    destinationEventListener);
087                    }
088            }
089    
090            public void copyMessageListeners(Destination destination) {
091                    for (MessageListener messageListener : _messageListeners) {
092                            InvokerMessageListener invokerMessageListener =
093                                    (InvokerMessageListener)messageListener;
094    
095                            destination.register(
096                                    invokerMessageListener.getMessageListener(),
097                                    invokerMessageListener.getClassLoader());
098                    }
099            }
100    
101            public DestinationStatistics getDestinationStatistics() {
102                    DestinationStatistics destinationStatistics =
103                            new DestinationStatistics();
104    
105                    destinationStatistics.setActiveThreadCount(
106                            _threadPoolExecutor.getActiveCount());
107                    destinationStatistics.setCurrentThreadCount(
108                            _threadPoolExecutor.getPoolSize());
109                    destinationStatistics.setLargestThreadCount(
110                            _threadPoolExecutor.getLargestPoolSize());
111                    destinationStatistics.setMaxThreadPoolSize(
112                            _threadPoolExecutor.getMaximumPoolSize());
113                    destinationStatistics.setMinThreadPoolSize(
114                            _threadPoolExecutor.getCorePoolSize());
115                    destinationStatistics.setPendingMessageCount(
116                            _threadPoolExecutor.getQueue().size());
117                    destinationStatistics.setSentMessageCount(
118                            _threadPoolExecutor.getCompletedTaskCount());
119    
120                    return destinationStatistics;
121            }
122    
123            public int getMaximumQueueSize() {
124                    return _maximumQueueSize;
125            }
126    
127            public int getMessageListenerCount() {
128                    return _messageListeners.size();
129            }
130    
131            public String getName() {
132                    return _name;
133            }
134    
135            public int getWorkersCoreSize() {
136                    return _workersCoreSize;
137            }
138    
139            public int getWorkersMaxSize() {
140                    return _workersMaxSize;
141            }
142    
143            public boolean isRegistered() {
144                    if (getMessageListenerCount() > 0) {
145                            return true;
146                    }
147                    else {
148                            return false;
149                    }
150            }
151    
152            public synchronized void open() {
153                    doOpen();
154            }
155    
156            public boolean register(MessageListener messageListener) {
157                    InvokerMessageListener invokerMessageListener =
158                            new InvokerMessageListener(messageListener);
159    
160                    return registerMessageListener(invokerMessageListener);
161            }
162    
163            public boolean register(
164                    MessageListener messageListener, ClassLoader classloader) {
165    
166                    InvokerMessageListener invokerMessageListener =
167                            new InvokerMessageListener(messageListener, classloader);
168    
169                    return registerMessageListener(invokerMessageListener);
170            }
171    
172            public void removeDestinationEventListener(
173                    DestinationEventListener destinationEventListener) {
174    
175                    _destinationEventListeners.remove(destinationEventListener);
176            }
177    
178            public void removeDestinationEventListeners() {
179                    _destinationEventListeners.clear();
180            }
181    
182            public void send(Message message) {
183                    if (_messageListeners.isEmpty()) {
184                            if (_log.isDebugEnabled()) {
185                                    _log.debug("No message listeners for destination " + getName());
186                            }
187    
188                            return;
189                    }
190    
191                    ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
192    
193                    if (threadPoolExecutor.isShutdown()) {
194                            throw new IllegalStateException(
195                                    "Destination " + getName() + " is shutdown and cannot " +
196                                            "receive more messages");
197                    }
198    
199                    if ((_maximumQueueSize > -1) &&
200                            (threadPoolExecutor.getQueue().size() > _maximumQueueSize)) {
201    
202                            throw new IllegalStateException(
203                                    threadPoolExecutor.getQueue().size() +
204                                            " messages exceeds the maximum queue size of " +
205                                                    _maximumQueueSize);
206                    }
207    
208                    dispatch(_messageListeners, message);
209            }
210    
211            public void setMaximumQueueSize(int maximumQueueSize) {
212                    _maximumQueueSize = maximumQueueSize;
213            }
214    
215            public void setName(String name) {
216                    _name = name;
217            }
218    
219            public void setWorkersCoreSize(int workersCoreSize) {
220                    _workersCoreSize = workersCoreSize;
221            }
222    
223            public void setWorkersMaxSize(int workersMaxSize) {
224                    _workersMaxSize = workersMaxSize;
225            }
226    
227            public boolean unregister(MessageListener messageListener) {
228                    InvokerMessageListener invokerMessageListener =
229                            new InvokerMessageListener(messageListener);
230    
231                    return unregisterMessageListener(invokerMessageListener);
232            }
233    
234            public boolean unregister(
235                    MessageListener messageListener, ClassLoader classloader) {
236    
237                    InvokerMessageListener invokerMessageListener =
238                            new InvokerMessageListener(messageListener, classloader);
239    
240                    return unregisterMessageListener(invokerMessageListener);
241            }
242    
243            public void unregisterMessageListeners() {
244                    for (MessageListener messageListener : _messageListeners) {
245                            unregisterMessageListener((InvokerMessageListener)messageListener);
246                    }
247            }
248    
249            protected abstract void dispatch(
250                    Set<MessageListener> messageListeners, Message message);
251    
252            protected void doClose(boolean force) {
253                    if (!_threadPoolExecutor.isShutdown() &&
254                            !_threadPoolExecutor.isTerminating()) {
255    
256                            if (!force) {
257                                    _threadPoolExecutor.shutdown();
258                            }
259                            else {
260                                    List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
261    
262                                    if (_log.isInfoEnabled()) {
263                                            _log.info(
264                                                    "The following " + pendingTasks.size() + " tasks " +
265                                                            "were not executed due to shutown: " +
266                                                                    pendingTasks);
267                                    }
268                            }
269                    }
270            }
271    
272            protected void doOpen() {
273                    if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
274                            ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
275    
276                            _threadPoolExecutor = new ThreadPoolExecutor(
277                                    _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
278                                    new LinkedBlockingQueue<Runnable>(),
279                                    new NamedThreadFactory(
280                                            getName(), Thread.NORM_PRIORITY, classLoader));
281                    }
282            }
283    
284            protected void fireMessageListenerRegisteredEvent(
285                    MessageListener messageListener) {
286    
287                    for (DestinationEventListener destinationEventListener :
288                                    _destinationEventListeners) {
289    
290                            destinationEventListener.messageListenerRegistered(
291                                    getName(), messageListener);
292                    }
293            }
294    
295            protected void fireMessageListenerUnregisteredEvent(
296                    MessageListener messageListener) {
297    
298                    for (DestinationEventListener listener : _destinationEventListeners) {
299                            listener.messageListenerUnregistered(getName(), messageListener);
300                    }
301            }
302    
303            protected ThreadPoolExecutor getThreadPoolExecutor() {
304                    return _threadPoolExecutor;
305            }
306    
307            protected boolean registerMessageListener(
308                    InvokerMessageListener invokerMessageListener) {
309    
310                    boolean registered = _messageListeners.add(invokerMessageListener);
311    
312                    if (registered) {
313                            fireMessageListenerRegisteredEvent(
314                                    invokerMessageListener.getMessageListener());
315                    }
316    
317                    return registered;
318            }
319    
320            protected boolean unregisterMessageListener(
321                    InvokerMessageListener invokerMessageListener) {
322    
323                    boolean unregistered = _messageListeners.remove(invokerMessageListener);
324    
325                    if (unregistered) {
326                            fireMessageListenerUnregisteredEvent(
327                                    invokerMessageListener.getMessageListener());
328                    }
329    
330                    return unregistered;
331            }
332    
333            private static final int _WORKERS_CORE_SIZE = 2;
334    
335            private static final int _WORKERS_MAX_SIZE = 5;
336    
337            private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
338    
339            private Set<DestinationEventListener> _destinationEventListeners =
340                    new ConcurrentHashSet<DestinationEventListener>();
341            private int _maximumQueueSize = -1;
342            private Set<MessageListener> _messageListeners =
343                    new ConcurrentHashSet<MessageListener>();
344            private String _name = StringPool.BLANK;
345            private ThreadPoolExecutor _threadPoolExecutor;
346            private int _workersCoreSize = _WORKERS_CORE_SIZE;
347            private int _workersMaxSize = _WORKERS_MAX_SIZE;
348    
349    }