001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterException;
019 import com.liferay.portal.kernel.cluster.ClusterMessageType;
020 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
021 import com.liferay.portal.kernel.cluster.ClusterRequest;
022 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
023 import com.liferay.portal.kernel.log.Log;
024 import com.liferay.portal.kernel.log.LogFactoryUtil;
025 import com.liferay.portal.kernel.util.MethodHandler;
026
027 import java.util.ArrayList;
028 import java.util.Collections;
029 import java.util.List;
030 import java.util.concurrent.CountDownLatch;
031
032 import org.jgroups.Channel;
033 import org.jgroups.Message;
034 import org.jgroups.View;
035
036
040 public class ClusterRequestReceiver extends BaseReceiver {
041
042 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
043 _countDownLatch = new CountDownLatch(1);
044 _clusterExecutorImpl = clusterExecutorImpl;
045 }
046
047 public void openLatch() {
048 _countDownLatch.countDown();
049 }
050
051 @Override
052 public void receive(Message message) {
053 try {
054 _countDownLatch.await();
055 }
056 catch (InterruptedException ie) {
057 _log.error(
058 "Latch opened prematurely by interruption. Dependence may " +
059 "not be ready.");
060 }
061
062 Object obj = message.getObject();
063
064 if (obj == null) {
065 if (_log.isWarnEnabled()) {
066 _log.warn("Message content is null");
067 }
068
069 return;
070 }
071
072 Address sourceAddress = new AddressImpl(message.getSrc());
073
074 if (sourceAddress.equals(
075 _clusterExecutorImpl.getLocalClusterNodeAddress())) {
076
077 boolean isProcessed = processLocalMessage(obj);
078
079 if (isProcessed) {
080 return;
081 }
082 }
083
084 if (obj instanceof ClusterRequest) {
085 ClusterRequest clusterRequest = (ClusterRequest)obj;
086
087 processClusterRequest(clusterRequest, sourceAddress);
088 }
089 else if (obj instanceof ClusterNodeResponse) {
090 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
091
092 processClusterResponse(clusterNodeResponse, sourceAddress);
093 }
094 else if (_log.isWarnEnabled()) {
095 _log.warn(
096 "Unable to process message content of type " + obj.getClass());
097 }
098 }
099
100 @Override
101 public void viewAccepted(View view) {
102 super.viewAccepted(view);
103
104 if (_lastView == null) {
105 _lastView = view;
106
107 return;
108 }
109
110 List<Address> departAddresses = getDepartAddresses(view);
111 List<Address> newAddresses = getNewAddresses(view);
112
113 _lastView = view;
114
115 try {
116 _countDownLatch.await();
117 }
118 catch (InterruptedException ie) {
119 _log.error(
120 "Latch opened prematurely by interruption. Dependence may " +
121 "not be ready.");
122 }
123
124 if (!newAddresses.isEmpty()) {
125 _clusterExecutorImpl.sendNotifyRequest();
126 }
127
128 if (!departAddresses.isEmpty()) {
129 _clusterExecutorImpl.memberRemoved(departAddresses);
130 }
131 }
132
133 protected List<Address> getDepartAddresses(View view) {
134 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
135 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
136
137 List<org.jgroups.Address> departJGroupsAddresses =
138 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
139
140 departJGroupsAddresses.removeAll(currentJGroupsAddresses);
141
142 if (departJGroupsAddresses.isEmpty()) {
143 return Collections.emptyList();
144 }
145
146 List<Address> departAddresses = new ArrayList<Address>(
147 departJGroupsAddresses.size());
148
149 for (org.jgroups.Address departJGroupsAddress :
150 departJGroupsAddresses) {
151
152 Address departAddress = new AddressImpl(departJGroupsAddress);
153
154 departAddresses.add(departAddress);
155 }
156
157 return departAddresses;
158 }
159
160 protected List<Address> getNewAddresses(View view) {
161 List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
162 List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
163
164 List<org.jgroups.Address> newJGroupsAddresses =
165 new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
166
167 newJGroupsAddresses.removeAll(lastJGroupsAddresses);
168
169 if (newJGroupsAddresses.isEmpty()) {
170 return Collections.emptyList();
171 }
172
173 List<Address> newAddresses = new ArrayList<Address>(
174 newJGroupsAddresses.size());
175
176 for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
177 Address newAddress = new AddressImpl(newJGroupsAddress);
178
179 newAddresses.add(newAddress);
180 }
181
182 return newAddresses;
183 }
184
185 protected void handleResponse(
186 Address address, ClusterRequest clusterRequest, Object returnValue,
187 Exception exception) {
188
189 ClusterNodeResponse clusterNodeResponse =
190 _clusterExecutorImpl.generateClusterNodeResponse(
191 clusterRequest, returnValue, exception);
192
193 Channel channel = _clusterExecutorImpl.getControlChannel();
194
195 try {
196 channel.send(
197 (org.jgroups.Address)address.getRealAddress(), null,
198 clusterNodeResponse);
199 }
200 catch (Exception e) {
201 _log.error(
202 "Unable to send response message " + clusterNodeResponse, e);
203 }
204 catch (Throwable t) {
205 _log.error(t, t);
206 }
207 }
208
209 protected void processClusterRequest(
210 ClusterRequest clusterRequest, Address sourceAddress) {
211
212 ClusterMessageType clusterMessageType =
213 clusterRequest.getClusterMessageType();
214
215 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
216 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
217
218 _clusterExecutorImpl.memberJoined(
219 sourceAddress, clusterRequest.getOriginatingClusterNode());
220
221 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
222 handleResponse(sourceAddress, clusterRequest, null, null);
223 }
224
225 return;
226 }
227
228 MethodHandler methodHandler = clusterRequest.getMethodHandler();
229
230 Object returnValue = null;
231 Exception exception = null;
232
233 if (methodHandler != null) {
234 try {
235 ClusterInvokeThreadLocal.setEnabled(false);
236
237 returnValue = methodHandler.invoke(true);
238 }
239 catch (Exception e) {
240 exception = e;
241
242 _log.error("Unable to invoke method " + methodHandler, e);
243 }
244 finally {
245 ClusterInvokeThreadLocal.setEnabled(true);
246 }
247 }
248 else {
249 exception = new ClusterException(
250 "Payload is not of type " + MethodHandler.class.getName());
251 }
252
253 if (!clusterRequest.isFireAndForget()) {
254 handleResponse(
255 sourceAddress, clusterRequest, returnValue, exception);
256 }
257 }
258
259 protected void processClusterResponse(
260 ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
261
262 ClusterMessageType clusterMessageType =
263 clusterNodeResponse.getClusterMessageType();
264
265 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
266 _clusterExecutorImpl.memberJoined(
267 sourceAddress, clusterNodeResponse.getClusterNode());
268
269 return;
270 }
271
272 String uuid = clusterNodeResponse.getUuid();
273
274 FutureClusterResponses futureClusterResponses =
275 _clusterExecutorImpl.getExecutionResults(uuid);
276
277 if (futureClusterResponses == null) {
278 if (_log.isInfoEnabled()) {
279 _log.info("Unable to find response container for " + uuid);
280 }
281
282 return;
283 }
284
285 if (futureClusterResponses.expectsReply(sourceAddress)) {
286 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
287 }
288 else {
289 if (_log.isWarnEnabled()) {
290 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
291 }
292 }
293 }
294
295 protected boolean processLocalMessage(Object message) {
296
297 if (message instanceof ClusterRequest) {
298 ClusterRequest clusterRequest = (ClusterRequest)message;
299
300 if (clusterRequest.isSkipLocal()) {
301 return true;
302 }
303 }
304
305 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
306 return true;
307 }
308
309 return false;
310 }
311
312 private static Log _log = LogFactoryUtil.getLog(
313 ClusterRequestReceiver.class);
314
315 private ClusterExecutorImpl _clusterExecutorImpl;
316 private CountDownLatch _countDownLatch;
317 private volatile View _lastView;
318
319 }