001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging;
016    
017    import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    
021    import java.util.Collection;
022    import java.util.HashMap;
023    import java.util.Map;
024    import java.util.Set;
025    
026    /**
027     * @author Michael C. Han
028     */
029    public class DefaultMessageBus implements MessageBus {
030    
031            @Override
032            public synchronized void addDestination(Destination destination) {
033                    _destinations.put(destination.getName(), destination);
034    
035                    fireDestinationAddedEvent(destination);
036            }
037    
038            @Override
039            public void addDestinationEventListener(
040                    DestinationEventListener destinationEventListener) {
041    
042                    _destinationEventListeners.add(destinationEventListener);
043            }
044    
045            @Override
046            public void addDestinationEventListener(
047                    String destinationName,
048                    DestinationEventListener destinationEventListener) {
049    
050                    Destination destination = _destinations.get(destinationName);
051    
052                    if (destination != null) {
053                            destination.addDestinationEventListener(destinationEventListener);
054                    }
055            }
056    
057            public void destroy() {
058                    shutdown(true);
059            }
060    
061            @Override
062            public Destination getDestination(String destinationName) {
063                    return _destinations.get(destinationName);
064            }
065    
066            @Override
067            public int getDestinationCount() {
068                    return _destinations.size();
069            }
070    
071            @Override
072            public Collection<String> getDestinationNames() {
073                    return _destinations.keySet();
074            }
075    
076            @Override
077            public Collection<Destination> getDestinations() {
078                    return _destinations.values();
079            }
080    
081            @Override
082            public boolean hasDestination(String destinationName) {
083                    return _destinations.containsKey(destinationName);
084            }
085    
086            @Override
087            public boolean hasMessageListener(String destinationName) {
088                    Destination destination = _destinations.get(destinationName);
089    
090                    if ((destination != null) && destination.isRegistered()) {
091                            return true;
092                    }
093                    else {
094                            return false;
095                    }
096            }
097    
098            @Override
099            public synchronized boolean registerMessageListener(
100                    String destinationName, MessageListener messageListener) {
101    
102                    Destination destination = _destinations.get(destinationName);
103    
104                    if (destination == null) {
105                            throw new IllegalStateException(
106                                    "Destination " + destinationName + " is not configured");
107                    }
108    
109                    boolean registered = destination.register(messageListener);
110    
111                    if (registered) {
112                            fireMessageListenerRegisteredEvent(destination, messageListener);
113                    }
114    
115                    return registered;
116            }
117    
118            @Override
119            public synchronized Destination removeDestination(String destinationName) {
120                    Destination destinationModel = _destinations.remove(destinationName);
121    
122                    if (destinationModel != null) {
123                            destinationModel.removeDestinationEventListeners();
124                            destinationModel.unregisterMessageListeners();
125    
126                            fireDestinationRemovedEvent(destinationModel);
127                    }
128    
129                    return destinationModel;
130            }
131    
132            @Override
133            public void removeDestinationEventListener(
134                    DestinationEventListener destinationEventListener) {
135    
136                    _destinationEventListeners.remove(destinationEventListener);
137            }
138    
139            @Override
140            public void removeDestinationEventListener(
141                    String destinationName,
142                    DestinationEventListener destinationEventListener) {
143    
144                    Destination destination = _destinations.get(destinationName);
145    
146                    if (destination != null) {
147                            destination.removeDestinationEventListener(
148                                    destinationEventListener);
149                    }
150            }
151    
152            @Override
153            public void replace(Destination destination) {
154                    Destination oldDestination = _destinations.get(destination.getName());
155    
156                    oldDestination.copyDestinationEventListeners(destination);
157                    oldDestination.copyMessageListeners(destination);
158    
159                    removeDestination(oldDestination.getName());
160    
161                    addDestination(destination);
162            }
163    
164            @Override
165            public void sendMessage(String destinationName, Message message) {
166                    Destination destination = _destinations.get(destinationName);
167    
168                    if (destination == null) {
169                            if (_log.isWarnEnabled()) {
170                                    _log.warn(
171                                            "Destination " + destinationName + " is not configured");
172                            }
173    
174                            return;
175                    }
176    
177                    message.setDestinationName(destinationName);
178    
179                    destination.send(message);
180            }
181    
182            @Override
183            public void shutdown() {
184                    shutdown(false);
185            }
186    
187            @Override
188            public synchronized void shutdown(boolean force) {
189                    for (Destination destination : _destinations.values()) {
190                            destination.close(force);
191                    }
192            }
193    
194            @Override
195            public synchronized boolean unregisterMessageListener(
196                    String destinationName, MessageListener messageListener) {
197    
198                    Destination destination = _destinations.get(destinationName);
199    
200                    if (destination == null) {
201                            return false;
202                    }
203    
204                    boolean unregistered = destination.unregister(messageListener);
205    
206                    if (unregistered) {
207                            fireMessageListenerUnregisteredEvent(destination, messageListener);
208                    }
209    
210                    return unregistered;
211            }
212    
213            protected void fireDestinationAddedEvent(Destination destination) {
214                    for (DestinationEventListener listener : _destinationEventListeners) {
215                            listener.destinationAdded(destination);
216                    }
217            }
218    
219            protected void fireDestinationRemovedEvent(Destination destination) {
220                    for (DestinationEventListener listener : _destinationEventListeners) {
221                            listener.destinationRemoved(destination);
222                    }
223            }
224    
225            protected void fireMessageListenerRegisteredEvent(
226                    Destination destination, MessageListener messageListener) {
227    
228                    for (DestinationEventListener destinationEventListener :
229                                    _destinationEventListeners) {
230    
231                            destinationEventListener.messageListenerRegistered(
232                                    destination.getName(), messageListener);
233                    }
234            }
235    
236            protected void fireMessageListenerUnregisteredEvent(
237                    Destination destination, MessageListener messageListener) {
238    
239                    for (DestinationEventListener destinationEventListener :
240                                    _destinationEventListeners) {
241    
242                            destinationEventListener.messageListenerUnregistered(
243                                    destination.getName(), messageListener);
244                    }
245            }
246    
247            private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
248    
249            private Set<DestinationEventListener> _destinationEventListeners =
250                    new ConcurrentHashSet<DestinationEventListener>();
251            private Map<String, Destination> _destinations =
252                    new HashMap<String, Destination>();
253    
254    }