001
014
015 package com.liferay.portal.kernel.messaging;
016
017 import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.util.Collection;
022 import java.util.HashMap;
023 import java.util.Map;
024 import java.util.Set;
025
026
029 public class DefaultMessageBus implements MessageBus {
030
031 @Override
032 public synchronized void addDestination(Destination destination) {
033 _destinations.put(destination.getName(), destination);
034
035 fireDestinationAddedEvent(destination);
036 }
037
038 @Override
039 public void addDestinationEventListener(
040 DestinationEventListener destinationEventListener) {
041
042 _destinationEventListeners.add(destinationEventListener);
043 }
044
045 @Override
046 public void addDestinationEventListener(
047 String destinationName,
048 DestinationEventListener destinationEventListener) {
049
050 Destination destination = _destinations.get(destinationName);
051
052 if (destination != null) {
053 destination.addDestinationEventListener(destinationEventListener);
054 }
055 }
056
057 public void destroy() {
058 shutdown(true);
059 }
060
061 @Override
062 public Destination getDestination(String destinationName) {
063 return _destinations.get(destinationName);
064 }
065
066 @Override
067 public int getDestinationCount() {
068 return _destinations.size();
069 }
070
071 @Override
072 public Collection<String> getDestinationNames() {
073 return _destinations.keySet();
074 }
075
076 @Override
077 public Collection<Destination> getDestinations() {
078 return _destinations.values();
079 }
080
081 @Override
082 public boolean hasDestination(String destinationName) {
083 return _destinations.containsKey(destinationName);
084 }
085
086 @Override
087 public boolean hasMessageListener(String destinationName) {
088 Destination destination = _destinations.get(destinationName);
089
090 if ((destination != null) && destination.isRegistered()) {
091 return true;
092 }
093 else {
094 return false;
095 }
096 }
097
098 @Override
099 public synchronized boolean registerMessageListener(
100 String destinationName, MessageListener messageListener) {
101
102 Destination destination = _destinations.get(destinationName);
103
104 if (destination == null) {
105 throw new IllegalStateException(
106 "Destination " + destinationName + " is not configured");
107 }
108
109 boolean registered = destination.register(messageListener);
110
111 if (registered) {
112 fireMessageListenerRegisteredEvent(destination, messageListener);
113 }
114
115 return registered;
116 }
117
118 @Override
119 public synchronized Destination removeDestination(String destinationName) {
120 Destination destinationModel = _destinations.remove(destinationName);
121
122 if (destinationModel != null) {
123 destinationModel.removeDestinationEventListeners();
124 destinationModel.unregisterMessageListeners();
125
126 fireDestinationRemovedEvent(destinationModel);
127 }
128
129 return destinationModel;
130 }
131
132 @Override
133 public void removeDestinationEventListener(
134 DestinationEventListener destinationEventListener) {
135
136 _destinationEventListeners.remove(destinationEventListener);
137 }
138
139 @Override
140 public void removeDestinationEventListener(
141 String destinationName,
142 DestinationEventListener destinationEventListener) {
143
144 Destination destination = _destinations.get(destinationName);
145
146 if (destination != null) {
147 destination.removeDestinationEventListener(
148 destinationEventListener);
149 }
150 }
151
152 @Override
153 public void replace(Destination destination) {
154 Destination oldDestination = _destinations.get(destination.getName());
155
156 oldDestination.copyDestinationEventListeners(destination);
157 oldDestination.copyMessageListeners(destination);
158
159 removeDestination(oldDestination.getName());
160
161 addDestination(destination);
162 }
163
164 @Override
165 public void sendMessage(String destinationName, Message message) {
166 Destination destination = _destinations.get(destinationName);
167
168 if (destination == null) {
169 if (_log.isWarnEnabled()) {
170 _log.warn(
171 "Destination " + destinationName + " is not configured");
172 }
173
174 return;
175 }
176
177 message.setDestinationName(destinationName);
178
179 destination.send(message);
180 }
181
182 @Override
183 public void shutdown() {
184 shutdown(false);
185 }
186
187 @Override
188 public synchronized void shutdown(boolean force) {
189 for (Destination destination : _destinations.values()) {
190 destination.close(force);
191 }
192 }
193
194 @Override
195 public synchronized boolean unregisterMessageListener(
196 String destinationName, MessageListener messageListener) {
197
198 Destination destination = _destinations.get(destinationName);
199
200 if (destination == null) {
201 return false;
202 }
203
204 boolean unregistered = destination.unregister(messageListener);
205
206 if (unregistered) {
207 fireMessageListenerUnregisteredEvent(destination, messageListener);
208 }
209
210 return unregistered;
211 }
212
213 protected void fireDestinationAddedEvent(Destination destination) {
214 for (DestinationEventListener listener : _destinationEventListeners) {
215 listener.destinationAdded(destination);
216 }
217 }
218
219 protected void fireDestinationRemovedEvent(Destination destination) {
220 for (DestinationEventListener listener : _destinationEventListeners) {
221 listener.destinationRemoved(destination);
222 }
223 }
224
225 protected void fireMessageListenerRegisteredEvent(
226 Destination destination, MessageListener messageListener) {
227
228 for (DestinationEventListener destinationEventListener :
229 _destinationEventListeners) {
230
231 destinationEventListener.messageListenerRegistered(
232 destination.getName(), messageListener);
233 }
234 }
235
236 protected void fireMessageListenerUnregisteredEvent(
237 Destination destination, MessageListener messageListener) {
238
239 for (DestinationEventListener destinationEventListener :
240 _destinationEventListeners) {
241
242 destinationEventListener.messageListenerUnregistered(
243 destination.getName(), messageListener);
244 }
245 }
246
247 private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
248
249 private Set<DestinationEventListener> _destinationEventListeners =
250 new ConcurrentHashSet<DestinationEventListener>();
251 private Map<String, Destination> _destinations =
252 new HashMap<String, Destination>();
253
254 }