001
014
015 package com.liferay.portal.kernel.messaging.config;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.messaging.Destination;
020 import com.liferay.portal.kernel.messaging.DestinationEventListener;
021 import com.liferay.portal.kernel.messaging.MessageBus;
022 import com.liferay.portal.kernel.messaging.MessageListener;
023 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
024 import com.liferay.portal.kernel.nio.intraband.messaging.DestinationConfigurationProcessCallable;
025 import com.liferay.portal.kernel.nio.intraband.messaging.IntrabandBridgeDestination;
026 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
027 import com.liferay.portal.kernel.resiliency.spi.SPI;
028 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
029 import com.liferay.portal.kernel.security.pacl.permission.PortalMessageBusPermission;
030 import com.liferay.portal.kernel.util.ClassLoaderPool;
031 import com.liferay.portal.kernel.util.StringBundler;
032
033 import java.lang.reflect.Method;
034
035 import java.util.ArrayList;
036 import java.util.HashMap;
037 import java.util.List;
038 import java.util.Map;
039
040
043 public abstract class AbstractMessagingConfigurator
044 implements MessagingConfigurator {
045
046 public void afterPropertiesSet() {
047 Thread currentThread = Thread.currentThread();
048
049 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
050
051 ClassLoader operatingClassLoader = getOperatingClassloader();
052
053 if (contextClassLoader == operatingClassLoader) {
054 _portalMessagingConfigurator = true;
055 }
056
057 MessageBus messageBus = getMessageBus();
058
059 for (DestinationEventListener destinationEventListener :
060 _globalDestinationEventListeners) {
061
062 messageBus.addDestinationEventListener(destinationEventListener);
063 }
064
065 for (Destination destination : _destinations) {
066 if (SPIUtil.isSPI()) {
067 destination = new IntrabandBridgeDestination(destination);
068 }
069
070 messageBus.addDestination(destination);
071 }
072
073 for (Map.Entry<String, List<DestinationEventListener>>
074 destinationEventListeners :
075 _specificDestinationEventListeners.entrySet()) {
076
077 String destinationName = destinationEventListeners.getKey();
078
079 for (DestinationEventListener destinationEventListener :
080 destinationEventListeners.getValue()) {
081
082 messageBus.addDestinationEventListener(
083 destinationName, destinationEventListener);
084 }
085 }
086
087 for (Destination destination : _replacementDestinations) {
088 messageBus.replace(destination);
089 }
090
091 connect();
092
093 String servletContextName = ClassLoaderPool.getContextName(
094 operatingClassLoader);
095
096 MessagingConfiguratorRegistry.registerMessagingConfigurator(
097 servletContextName, this);
098 }
099
100 @Override
101 public void connect() {
102 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
103 return;
104 }
105
106 MessageBus messageBus = getMessageBus();
107
108 Thread currentThread = Thread.currentThread();
109
110 ClassLoader contextClassLoader = currentThread.getContextClassLoader();
111
112 try {
113 ClassLoader operatingClassLoader = getOperatingClassloader();
114
115 currentThread.setContextClassLoader(operatingClassLoader);
116
117 for (Map.Entry<String, List<MessageListener>> messageListeners :
118 _messageListeners.entrySet()) {
119
120 String destinationName = messageListeners.getKey();
121
122 if (SPIUtil.isSPI()) {
123 SPI spi = SPIUtil.getSPI();
124
125 try {
126 RegistrationReference registrationReference =
127 spi.getRegistrationReference();
128
129 IntrabandRPCUtil.execute(
130 registrationReference,
131 new DestinationConfigurationProcessCallable(
132 destinationName));
133 }
134 catch (Exception e) {
135 StringBundler sb = new StringBundler();
136
137 sb.append("Unable to install ");
138 sb.append(
139 DestinationConfigurationProcessCallable.class.
140 getName());
141 sb.append(" on MPI for ");
142 sb.append(destinationName);
143
144 _log.error(sb.toString(), e);
145 }
146 }
147
148 for (MessageListener messageListener :
149 messageListeners.getValue()) {
150
151 messageBus.registerMessageListener(
152 destinationName, messageListener);
153 }
154 }
155 }
156 finally {
157 currentThread.setContextClassLoader(contextClassLoader);
158 }
159 }
160
161 @Override
162 public void destroy() {
163 disconnect();
164
165 MessageBus messageBus = getMessageBus();
166
167 for (Destination destination : _destinations) {
168 messageBus.removeDestination(destination.getName());
169
170 destination.close();
171 }
172
173 for (Map.Entry<String, List<DestinationEventListener>>
174 destinationEventListeners :
175 _specificDestinationEventListeners.entrySet()) {
176
177 String destinationName = destinationEventListeners.getKey();
178
179 for (DestinationEventListener destinationEventListener :
180 destinationEventListeners.getValue()) {
181
182 messageBus.removeDestinationEventListener(
183 destinationName, destinationEventListener);
184 }
185 }
186
187 for (DestinationEventListener destinationEventListener :
188 _globalDestinationEventListeners) {
189
190 messageBus.removeDestinationEventListener(destinationEventListener);
191 }
192
193 ClassLoader operatingClassLoader = getOperatingClassloader();
194
195 String servletContextName = ClassLoaderPool.getContextName(
196 operatingClassLoader);
197
198 MessagingConfiguratorRegistry.unregisterMessagingConfigurator(
199 servletContextName, this);
200 }
201
202 @Override
203 public void disconnect() {
204 if (SPIUtil.isSPI() && _portalMessagingConfigurator) {
205 return;
206 }
207
208 MessageBus messageBus = getMessageBus();
209
210 for (Map.Entry<String, List<MessageListener>> messageListeners :
211 _messageListeners.entrySet()) {
212
213 String destinationName = messageListeners.getKey();
214
215 for (MessageListener messageListener :
216 messageListeners.getValue()) {
217
218 messageBus.unregisterMessageListener(
219 destinationName, messageListener);
220 }
221 }
222 }
223
224 @Override
225 public void setDestinations(List<Destination> destinations) {
226 for (Destination destination : destinations) {
227 try {
228 PortalMessageBusPermission.checkListen(destination.getName());
229 }
230 catch (SecurityException se) {
231 if (_log.isInfoEnabled()) {
232 _log.info("Rejecting destination " + destination.getName());
233 }
234
235 continue;
236 }
237
238 _destinations.add(destination);
239 }
240 }
241
242 @Override
243 public void setGlobalDestinationEventListeners(
244 List<DestinationEventListener> globalDestinationEventListeners) {
245
246 _globalDestinationEventListeners = globalDestinationEventListeners;
247 }
248
249 @Override
250 public void setMessageListeners(
251 Map<String, List<MessageListener>> messageListeners) {
252
253 _messageListeners = messageListeners;
254
255 for (List<MessageListener> messageListenersList :
256 _messageListeners.values()) {
257
258 for (MessageListener messageListener : messageListenersList) {
259 Class<?> messageListenerClass = messageListener.getClass();
260
261 try {
262 Method setMessageBusMethod = messageListenerClass.getMethod(
263 "setMessageBus", MessageBus.class);
264
265 setMessageBusMethod.setAccessible(true);
266
267 setMessageBusMethod.invoke(
268 messageListener, getMessageBus());
269
270 continue;
271 }
272 catch (Exception e) {
273 }
274
275 try {
276 Method setMessageBusMethod =
277 messageListenerClass.getDeclaredMethod(
278 "setMessageBus", MessageBus.class);
279
280 setMessageBusMethod.setAccessible(true);
281
282 setMessageBusMethod.invoke(
283 messageListener, getMessageBus());
284 }
285 catch (Exception e) {
286 }
287 }
288 }
289 }
290
291 @Override
292 public void setReplacementDestinations(
293 List<Destination> replacementDestinations) {
294
295 _replacementDestinations = replacementDestinations;
296 }
297
298 @Override
299 public void setSpecificDestinationEventListener(
300 Map<String, List<DestinationEventListener>>
301 specificDestinationEventListeners) {
302
303 _specificDestinationEventListeners = specificDestinationEventListeners;
304 }
305
306 protected abstract MessageBus getMessageBus();
307
308 protected abstract ClassLoader getOperatingClassloader();
309
310 private static Log _log = LogFactoryUtil.getLog(
311 AbstractMessagingConfigurator.class);
312
313 private List<Destination> _destinations = new ArrayList<Destination>();
314 private List<DestinationEventListener> _globalDestinationEventListeners =
315 new ArrayList<DestinationEventListener>();
316 private Map<String, List<MessageListener>> _messageListeners =
317 new HashMap<String, List<MessageListener>>();
318 private boolean _portalMessagingConfigurator;
319 private List<Destination> _replacementDestinations =
320 new ArrayList<Destination>();
321 private Map<String, List<DestinationEventListener>>
322 _specificDestinationEventListeners =
323 new HashMap<String, List<DestinationEventListener>>();
324
325 }