001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.util.ConcurrentHashSet;
020 import com.liferay.portal.kernel.util.NamedThreadFactory;
021 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
022 import com.liferay.portal.kernel.util.StringPool;
023 import com.liferay.portal.kernel.util.Validator;
024
025 import java.util.List;
026 import java.util.Set;
027 import java.util.concurrent.LinkedBlockingQueue;
028 import java.util.concurrent.ThreadPoolExecutor;
029 import java.util.concurrent.TimeUnit;
030
031
034 public abstract class BaseDestination implements Destination {
035
036 public BaseDestination() {
037 }
038
039
042 public BaseDestination(String name) {
043 this(name, _WORKERS_CORE_SIZE, _WORKERS_MAX_SIZE);
044 }
045
046
049 public BaseDestination(
050 String name, int workersCoreSize, int workersMaxSize) {
051
052 _name = name;
053 _workersCoreSize = workersCoreSize;
054 _workersMaxSize = workersMaxSize;
055
056 open();
057 }
058
059 public void addDestinationEventListener(
060 DestinationEventListener destinationEventListener) {
061
062 _destinationEventListeners.add(destinationEventListener);
063 }
064
065 public void afterPropertiesSet() {
066 if (Validator.isNull(_name)) {
067 throw new IllegalArgumentException("Name is null");
068 }
069
070 open();
071 }
072
073 public synchronized void close() {
074 close(false);
075 }
076
077 public synchronized void close(boolean force) {
078 doClose(force);
079 }
080
081 public void copyDestinationEventListeners(Destination destination) {
082 for (DestinationEventListener destinationEventListener :
083 _destinationEventListeners) {
084
085 destination.addDestinationEventListener(
086 destinationEventListener);
087 }
088 }
089
090 public void copyMessageListeners(Destination destination) {
091 for (MessageListener messageListener : _messageListeners) {
092 InvokerMessageListener invokerMessageListener =
093 (InvokerMessageListener)messageListener;
094
095 destination.register(
096 invokerMessageListener.getMessageListener(),
097 invokerMessageListener.getClassLoader());
098 }
099 }
100
101 public DestinationStatistics getDestinationStatistics() {
102 DestinationStatistics destinationStatistics =
103 new DestinationStatistics();
104
105 destinationStatistics.setActiveThreadCount(
106 _threadPoolExecutor.getActiveCount());
107 destinationStatistics.setCurrentThreadCount(
108 _threadPoolExecutor.getPoolSize());
109 destinationStatistics.setLargestThreadCount(
110 _threadPoolExecutor.getLargestPoolSize());
111 destinationStatistics.setMaxThreadPoolSize(
112 _threadPoolExecutor.getMaximumPoolSize());
113 destinationStatistics.setMinThreadPoolSize(
114 _threadPoolExecutor.getCorePoolSize());
115 destinationStatistics.setPendingMessageCount(
116 _threadPoolExecutor.getQueue().size());
117 destinationStatistics.setSentMessageCount(
118 _threadPoolExecutor.getCompletedTaskCount());
119
120 return destinationStatistics;
121 }
122
123 public int getMaximumQueueSize() {
124 return _maximumQueueSize;
125 }
126
127 public int getMessageListenerCount() {
128 return _messageListeners.size();
129 }
130
131 public String getName() {
132 return _name;
133 }
134
135 public int getWorkersCoreSize() {
136 return _workersCoreSize;
137 }
138
139 public int getWorkersMaxSize() {
140 return _workersMaxSize;
141 }
142
143 public boolean isRegistered() {
144 if (getMessageListenerCount() > 0) {
145 return true;
146 }
147 else {
148 return false;
149 }
150 }
151
152 public synchronized void open() {
153 doOpen();
154 }
155
156 public boolean register(MessageListener messageListener) {
157 InvokerMessageListener invokerMessageListener =
158 new InvokerMessageListener(messageListener);
159
160 return registerMessageListener(invokerMessageListener);
161 }
162
163 public boolean register(
164 MessageListener messageListener, ClassLoader classloader) {
165
166 InvokerMessageListener invokerMessageListener =
167 new InvokerMessageListener(messageListener, classloader);
168
169 return registerMessageListener(invokerMessageListener);
170 }
171
172 public void removeDestinationEventListener(
173 DestinationEventListener destinationEventListener) {
174
175 _destinationEventListeners.remove(destinationEventListener);
176 }
177
178 public void removeDestinationEventListeners() {
179 _destinationEventListeners.clear();
180 }
181
182 public void send(Message message) {
183 if (_messageListeners.isEmpty()) {
184 if (_log.isDebugEnabled()) {
185 _log.debug("No message listeners for destination " + getName());
186 }
187
188 return;
189 }
190
191 ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
192
193 if (threadPoolExecutor.isShutdown()) {
194 throw new IllegalStateException(
195 "Destination " + getName() + " is shutdown and cannot " +
196 "receive more messages");
197 }
198
199 if ((_maximumQueueSize > -1) &&
200 (threadPoolExecutor.getQueue().size() > _maximumQueueSize)) {
201
202 throw new IllegalStateException(
203 threadPoolExecutor.getQueue().size() +
204 " messages exceeds the maximum queue size of " +
205 _maximumQueueSize);
206 }
207
208 dispatch(_messageListeners, message);
209 }
210
211 public void setMaximumQueueSize(int maximumQueueSize) {
212 _maximumQueueSize = maximumQueueSize;
213 }
214
215 public void setName(String name) {
216 _name = name;
217 }
218
219 public void setWorkersCoreSize(int workersCoreSize) {
220 _workersCoreSize = workersCoreSize;
221 }
222
223 public void setWorkersMaxSize(int workersMaxSize) {
224 _workersMaxSize = workersMaxSize;
225 }
226
227 public boolean unregister(MessageListener messageListener) {
228 InvokerMessageListener invokerMessageListener =
229 new InvokerMessageListener(messageListener);
230
231 return unregisterMessageListener(invokerMessageListener);
232 }
233
234 public boolean unregister(
235 MessageListener messageListener, ClassLoader classloader) {
236
237 InvokerMessageListener invokerMessageListener =
238 new InvokerMessageListener(messageListener, classloader);
239
240 return unregisterMessageListener(invokerMessageListener);
241 }
242
243 public void unregisterMessageListeners() {
244 for (MessageListener messageListener : _messageListeners) {
245 unregisterMessageListener((InvokerMessageListener)messageListener);
246 }
247 }
248
249 protected abstract void dispatch(
250 Set<MessageListener> messageListeners, Message message);
251
252 protected void doClose(boolean force) {
253 if (!_threadPoolExecutor.isShutdown() &&
254 !_threadPoolExecutor.isTerminating()) {
255
256 if (!force) {
257 _threadPoolExecutor.shutdown();
258 }
259 else {
260 List<Runnable> pendingTasks = _threadPoolExecutor.shutdownNow();
261
262 if (_log.isInfoEnabled()) {
263 _log.info(
264 "The following " + pendingTasks.size() + " tasks " +
265 "were not executed due to shutown: " +
266 pendingTasks);
267 }
268 }
269 }
270 }
271
272 protected void doOpen() {
273 if ((_threadPoolExecutor == null) || _threadPoolExecutor.isShutdown()) {
274 ClassLoader classLoader = PortalClassLoaderUtil.getClassLoader();
275
276 _threadPoolExecutor = new ThreadPoolExecutor(
277 _workersCoreSize, _workersMaxSize, 0L, TimeUnit.MILLISECONDS,
278 new LinkedBlockingQueue<Runnable>(),
279 new NamedThreadFactory(
280 getName(), Thread.NORM_PRIORITY, classLoader));
281 }
282 }
283
284 protected void fireMessageListenerRegisteredEvent(
285 MessageListener messageListener) {
286
287 for (DestinationEventListener destinationEventListener :
288 _destinationEventListeners) {
289
290 destinationEventListener.messageListenerRegistered(
291 getName(), messageListener);
292 }
293 }
294
295 protected void fireMessageListenerUnregisteredEvent(
296 MessageListener messageListener) {
297
298 for (DestinationEventListener listener : _destinationEventListeners) {
299 listener.messageListenerUnregistered(getName(), messageListener);
300 }
301 }
302
303 protected ThreadPoolExecutor getThreadPoolExecutor() {
304 return _threadPoolExecutor;
305 }
306
307 protected boolean registerMessageListener(
308 InvokerMessageListener invokerMessageListener) {
309
310 boolean registered = _messageListeners.add(invokerMessageListener);
311
312 if (registered) {
313 fireMessageListenerRegisteredEvent(
314 invokerMessageListener.getMessageListener());
315 }
316
317 return registered;
318 }
319
320 protected boolean unregisterMessageListener(
321 InvokerMessageListener invokerMessageListener) {
322
323 boolean unregistered = _messageListeners.remove(invokerMessageListener);
324
325 if (unregistered) {
326 fireMessageListenerUnregisteredEvent(
327 invokerMessageListener.getMessageListener());
328 }
329
330 return unregistered;
331 }
332
333 private static final int _WORKERS_CORE_SIZE = 2;
334
335 private static final int _WORKERS_MAX_SIZE = 5;
336
337 private static Log _log = LogFactoryUtil.getLog(BaseDestination.class);
338
339 private Set<DestinationEventListener> _destinationEventListeners =
340 new ConcurrentHashSet<DestinationEventListener>();
341 private int _maximumQueueSize = -1;
342 private Set<MessageListener> _messageListeners =
343 new ConcurrentHashSet<MessageListener>();
344 private String _name = StringPool.BLANK;
345 private ThreadPoolExecutor _threadPoolExecutor;
346 private int _workersCoreSize = _WORKERS_CORE_SIZE;
347 private int _workersMaxSize = _WORKERS_MAX_SIZE;
348
349 }