001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging.config;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.messaging.Destination;
020    import com.liferay.portal.kernel.messaging.DestinationEventListener;
021    import com.liferay.portal.kernel.messaging.MessageBus;
022    import com.liferay.portal.kernel.messaging.MessageListener;
023    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
024    import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
025    import com.liferay.portal.kernel.nio.intraband.messaging.IntrabandBridgeDestination;
026    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
027    import com.liferay.portal.kernel.resiliency.spi.SPI;
028    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
029    import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
030    import com.liferay.portal.kernel.util.ClassLoaderPool;
031    import com.liferay.portal.kernel.util.StringBundler;
032    
033    import java.lang.reflect.Method;
034    
035    import java.util.ArrayList;
036    import java.util.HashMap;
037    import java.util.List;
038    import java.util.Map;
039    
040    /**
041     * @author Michael C. Han
042     */
043    public abstract class AbstractMessagingConfigurator
044            implements MessagingConfigurator {
045    
046            public void afterPropertiesSet() {
047                    Thread currentThread = Thread.currentThread();
048    
049                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
050    
051                    ClassLoader operatingClassLoader = getOperatingClassloader();
052    
053                    if (contextClassLoader == operatingClassLoader) {
054                            _portalMessagingConfigurator = true;
055                    }
056    
057                    MessageBus messageBus = getMessageBus();
058    
059                    for (DestinationEventListener destinationEventListener :
060                                    _globalDestinationEventListeners) {
061    
062                            messageBus.addDestinationEventListener(destinationEventListener);
063                    }
064    
065                    for (Destination destination : _destinations) {
066                            if (SPIUtil.isSPI()) {
067                                    destination = new IntrabandBridgeDestination(destination);
068                            }
069    
070                            messageBus.addDestination(destination);
071                    }
072    
073                    for (Map.Entry<String, List<DestinationEventListener>>
074                                    destinationEventListeners :
075                                            _specificDestinationEventListeners.entrySet()) {
076    
077                            String destinationName = destinationEventListeners.getKey();
078    
079                            for (DestinationEventListener destinationEventListener :
080                                            destinationEventListeners.getValue()) {
081    
082                                    messageBus.addDestinationEventListener(
083                                            destinationName, destinationEventListener);
084                            }
085                    }
086    
087                    for (Destination destination : _replacementDestinations) {
088                            messageBus.replace(destination);
089                    }
090    
091                    connect();
092    
093                    String servletContextName = ClassLoaderPool.getContextName(
094                            operatingClassLoader);
095    
096                    MessagingConfiguratorRegistry.registerMessagingConfigurator(
097                            servletContextName, this);
098            }
099    
100            @Override
101            public void connect() {
102                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
103                            return;
104                    }
105    
106                    MessageBus messageBus = getMessageBus();
107    
108                    Thread currentThread = Thread.currentThread();
109    
110                    ClassLoader contextClassLoader = currentThread.getContextClassLoader();
111    
112                    try {
113                            ClassLoader operatingClassLoader = getOperatingClassloader();
114    
115                            currentThread.setContextClassLoader(operatingClassLoader);
116    
117                            for (Map.Entry<String, List<MessageListener>> messageListeners :
118                                            _messageListeners.entrySet()) {
119    
120                                    String destinationName = messageListeners.getKey();
121    
122                                    if (SPIUtil.isSPI()) {
123                                            SPI spi = SPIUtil.getSPI();
124    
125                                            try {
126                                                    RegistrationReference registrationReference =
127                                                            spi.getRegistrationReference();
128    
129                                                    IntrabandRPCUtil.execute(
130                                                            registrationReference,
131                                                            new DestinationConfigurationProcessCallable(
132                                                                    destinationName));
133                                            }
134                                            catch (Exception e) {
135                                                    StringBundler sb = new StringBundler();
136    
137                                                    sb.append("Unable to install ");
138                                                    sb.append(
139                                                            DestinationConfigurationProcessCallable.class.
140                                                                    getName());
141                                                    sb.append(" on MPI for ");
142                                                    sb.append(destinationName);
143    
144                                                    _log.error(sb.toString(), e);
145                                            }
146                                    }
147    
148                                    for (MessageListener messageListener :
149                                                    messageListeners.getValue()) {
150    
151                                            messageBus.registerMessageListener(
152                                                    destinationName, messageListener);
153                                    }
154                            }
155                    }
156                    finally {
157                            currentThread.setContextClassLoader(contextClassLoader);
158                    }
159            }
160    
161            @Override
162            public void destroy() {
163                    disconnect();
164    
165                    MessageBus messageBus = getMessageBus();
166    
167                    for (Destination destination : _destinations) {
168                            messageBus.removeDestination(destination.getName());
169    
170                            destination.close();
171                    }
172    
173                    for (Map.Entry<String, List<DestinationEventListener>>
174                                    destinationEventListeners :
175                                            _specificDestinationEventListeners.entrySet()) {
176    
177                            String destinationName = destinationEventListeners.getKey();
178    
179                            for (DestinationEventListener destinationEventListener :
180                                            destinationEventListeners.getValue()) {
181    
182                                    messageBus.removeDestinationEventListener(
183                                            destinationName, destinationEventListener);
184                            }
185                    }
186    
187                    for (DestinationEventListener destinationEventListener :
188                                    _globalDestinationEventListeners) {
189    
190                            messageBus.removeDestinationEventListener(destinationEventListener);
191                    }
192    
193                    ClassLoader operatingClassLoader = getOperatingClassloader();
194    
195                    String servletContextName = ClassLoaderPool.getContextName(
196                            operatingClassLoader);
197    
198                    MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
199                            servletContextName, this);
200            }
201    
202            @Override
203            public void disconnect() {
204                    if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
205                            return;
206                    }
207    
208                    MessageBus messageBus = getMessageBus();
209    
210                    for (Map.Entry<String, List<MessageListener>> messageListeners :
211                                    _messageListeners.entrySet()) {
212    
213                            String destinationName = messageListeners.getKey();
214    
215                            for (MessageListener messageListener :
216                                            messageListeners.getValue()) {
217    
218                                    messageBus.unregisterMessageListener(
219                                            destinationName, messageListener);
220                            }
221                    }
222            }
223    
224            @Override
225            public void setDestinations(List<Destination> destinations) {
226                    for (Destination destination : destinations) {
227                            try {
228                                    PortalMessageBusPermission.checkListen(destination.getName());
229                            }
230                            catch (SecurityException se) {
231                                    if (_log.isInfoEnabled()) {
232                                            _log.info("Rejecting destination " + destination.getName());
233                                    }
234    
235                                    continue;
236                            }
237    
238                            _destinations.add(destination);
239                    }
240            }
241    
242            @Override
243            public void setGlobalDestinationEventListeners(
244                    List<DestinationEventListener> globalDestinationEventListeners) {
245    
246                    _globalDestinationEventListeners = globalDestinationEventListeners;
247            }
248    
249            @Override
250            public void setMessageListeners(
251                    Map<String, List<MessageListener>> messageListeners) {
252    
253                    _messageListeners = messageListeners;
254    
255                    for (List<MessageListener> messageListenersList :
256                                    _messageListeners.values()) {
257    
258                            for (MessageListener messageListener : messageListenersList) {
259                                    Class<?> messageListenerClass = messageListener.getClass();
260    
261                                    try {
262                                            Method setMessageBusMethod = messageListenerClass.getMethod(
263                                                    "setMessageBus", MessageBus.class);
264    
265                                            setMessageBusMethod.setAccessible(true);
266    
267                                            setMessageBusMethod.invoke(
268                                                    messageListener, getMessageBus());
269    
270                                            continue;
271                                    }
272                                    catch (Exception e) {
273                                    }
274    
275                                    try {
276                                            Method setMessageBusMethod =
277                                                    messageListenerClass.getDeclaredMethod(
278                                                            "setMessageBus", MessageBus.class);
279    
280                                            setMessageBusMethod.setAccessible(true);
281    
282                                            setMessageBusMethod.invoke(
283                                                    messageListener, getMessageBus());
284                                    }
285                                    catch (Exception e) {
286                                    }
287                            }
288                    }
289            }
290    
291            @Override
292            public void setReplacementDestinations(
293                    List<Destination> replacementDestinations) {
294    
295                    _replacementDestinations = replacementDestinations;
296            }
297    
298            @Override
299            public void setSpecificDestinationEventListener(
300                    Map<String, List<DestinationEventListener>>
301                            specificDestinationEventListeners) {
302    
303                    _specificDestinationEventListeners = specificDestinationEventListeners;
304            }
305    
306            protected abstract MessageBus getMessageBus();
307    
308            protected abstract ClassLoader getOperatingClassloader();
309    
310            private static Log _log = LogFactoryUtil.getLog(
311                    AbstractMessagingConfigurator.class);
312    
313            private List<Destination> _destinations = new ArrayList<Destination>();
314            private List<DestinationEventListener> _globalDestinationEventListeners =
315                    new ArrayList<DestinationEventListener>();
316            private Map<String, List<MessageListener>> _messageListeners =
317                    new HashMap<String, List<MessageListener>>();
318            private boolean _portalMessagingConfigurator;
319            private List<Destination> _replacementDestinations =
320                    new ArrayList<Destination>();
321            private Map<String, List<DestinationEventListener>>
322                    _specificDestinationEventListeners =
323                            new HashMap<String, List<DestinationEventListener>>();
324    
325    }