1
22
23 package com.liferay.portal.kernel.messaging;
24
25 import com.liferay.portal.kernel.log.Log;
26 import com.liferay.portal.kernel.log.LogFactoryUtil;
27 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
28
29 import java.util.HashMap;
30 import java.util.Map;
31
32
38 public class DefaultMessageBus implements MessageBus {
39
40 public synchronized void addDestination(Destination destination) {
41 _destinations.put(destination.getName(), destination);
42
43 Destination responseDestination = getResponseDestination(destination);
44
45 _destinations.put(responseDestination.getName(), responseDestination);
46 }
47
48 public synchronized void registerMessageListener(
49 String destination, MessageListener listener) {
50
51 Destination destinationModel = _destinations.get(destination);
52
53 if (destinationModel == null) {
54 throw new IllegalStateException(
55 "Destination " + destination + " is not configured");
56 }
57
58 destinationModel.register(listener);
59 }
60
61 public synchronized void removeDestination(String destination) {
62 _destinations.remove(destination);
63
64 String responseDestination = getResponseDestination(destination);
65
66 _destinations.remove(responseDestination);
67 }
68
69 public void sendMessage(String destination, Object message) {
70 Destination destinationModel = _destinations.get(destination);
71
72 if (destinationModel == null) {
73 if (_log.isWarnEnabled()) {
74 _log.warn("Destination " + destination + " is not configured");
75 }
76
77 return;
78 }
79
80 destinationModel.send(message);
81 }
82
83 public void sendMessage(String destination, String message) {
84 Destination destinationModel = _destinations.get(destination);
85
86 if (destinationModel == null) {
87 if (_log.isWarnEnabled()) {
88 _log.warn("Destination " + destination + " is not configured");
89 }
90
91 return;
92 }
93
94 destinationModel.send(message);
95 }
96
97 public Object sendSynchronizedMessage(
98 String destination, Message message, long timeout)
99 throws MessageBusException {
100
101 Destination destinationModel = _destinations.get(destination);
102
103 if (destinationModel == null) {
104 if (_log.isWarnEnabled()) {
105 _log.warn("Destination " + destination + " is not configured");
106 }
107
108 return null;
109 }
110
111 Destination responseDestinationModel = _destinations.get(
112 getResponseDestination(destination));
113
114 if (responseDestinationModel == null) {
115 _log.error(
116 "Response destination " + destination + " is not configured");
117
118 return null;
119 }
120
121 ObjectResponseMessageListener responseMessageListener =
122 new ObjectResponseMessageListener(
123 destinationModel, responseDestinationModel, getNextResponseId(),
124 timeout);
125
126 responseDestinationModel.register(responseMessageListener);
127
128 try {
129 return responseMessageListener.send(message);
130 }
131 finally {
132 responseDestinationModel.unregister(responseMessageListener);
133 }
134 }
135
136 public String sendSynchronizedMessage(
137 String destination, String message, long timeout)
138 throws MessageBusException {
139
140 Destination destinationModel = _destinations.get(destination);
141
142 if (destinationModel == null) {
143 if (_log.isWarnEnabled()) {
144 _log.warn("Destination " + destination + " is not configured");
145 }
146
147 return null;
148 }
149
150 Destination responseDestinationModel = _destinations.get(
151 getResponseDestination(destination));
152
153 if (responseDestinationModel == null) {
154 _log.error(
155 "Response destination " + destination + " is not configured");
156
157 return null;
158 }
159
160 StringResponseMessageListener responseMessageListener =
161 new StringResponseMessageListener(
162 destinationModel, responseDestinationModel, getNextResponseId(),
163 timeout);
164
165 responseDestinationModel.register(responseMessageListener);
166
167 try {
168 return responseMessageListener.send(message);
169 }
170 finally {
171 responseDestinationModel.unregister(responseMessageListener);
172 }
173 }
174
175 public void shutdown() {
176 shutdown(false);
177 }
178
179 public synchronized void shutdown(boolean force) {
180 for (Destination destination : _destinations.values()) {
181 destination.close(force);
182 }
183 }
184
185 public synchronized boolean unregisterMessageListener(
186 String destination, MessageListener listener) {
187
188 Destination destinationModel = _destinations.get(destination);
189
190 if (destinationModel == null) {
191 return false;
192 }
193
194 return destinationModel.unregister(listener);
195 }
196
197 protected String getNextResponseId() {
198 return PortalUUIDUtil.generate();
199 }
200
201 protected String getResponseDestination(String destination) {
202 return destination + _RESPONSE_DESTINATION_SUFFIX;
203 }
204
205 protected Destination getResponseDestination(Destination destination) {
206 return new TempDestination(
207 getResponseDestination(destination.getName()));
208 }
209
210 private static final String _RESPONSE_DESTINATION_SUFFIX = "/response";
211
212 private static Log _log = LogFactoryUtil.getLog(DefaultMessageBus.class);
213
214 private Map<String, Destination> _destinations =
215 new HashMap<String, Destination>();
216
217 }