001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterException;
019    import com.liferay.portal.kernel.cluster.ClusterMessageType;
020    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
021    import com.liferay.portal.kernel.cluster.ClusterRequest;
022    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
023    import com.liferay.portal.kernel.log.Log;
024    import com.liferay.portal.kernel.log.LogFactoryUtil;
025    import com.liferay.portal.kernel.util.MethodHandler;
026    
027    import java.util.ArrayList;
028    import java.util.Collections;
029    import java.util.List;
030    import java.util.concurrent.CountDownLatch;
031    
032    import org.jgroups.Channel;
033    import org.jgroups.Message;
034    import org.jgroups.View;
035    
036    /**
037     * @author Michael C. Han
038     * @author Tina Tian
039     */
040    public class ClusterRequestReceiver extends BaseReceiver {
041    
042            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
043                    _countDownLatch = new CountDownLatch(1);
044                    _clusterExecutorImpl = clusterExecutorImpl;
045            }
046    
047            public void openLatch() {
048                    _countDownLatch.countDown();
049            }
050    
051            @Override
052            public void receive(Message message) {
053                    try {
054                            _countDownLatch.await();
055                    }
056                    catch (InterruptedException ie) {
057                            _log.error(
058                                    "Latch opened prematurely by interruption. Dependence may " +
059                                            "not be ready.");
060                    }
061    
062                    Object obj = message.getObject();
063    
064                    if (obj == null) {
065                            if (_log.isWarnEnabled()) {
066                                    _log.warn("Message content is null");
067                            }
068    
069                            return;
070                    }
071    
072                    Address sourceAddress = new AddressImpl(message.getSrc());
073    
074                    if (sourceAddress.equals(
075                                    _clusterExecutorImpl.getLocalClusterNodeAddress())) {
076    
077                            boolean isProcessed = processLocalMessage(obj);
078    
079                            if (isProcessed) {
080                                    return;
081                            }
082                    }
083    
084                    if (obj instanceof ClusterRequest) {
085                            ClusterRequest clusterRequest = (ClusterRequest)obj;
086    
087                            processClusterRequest(clusterRequest, sourceAddress);
088                    }
089                    else if (obj instanceof ClusterNodeResponse) {
090                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
091    
092                            processClusterResponse(clusterNodeResponse, sourceAddress);
093                    }
094                    else if (_log.isWarnEnabled()) {
095                            _log.warn(
096                                    "Unable to process message content of type " + obj.getClass());
097                    }
098            }
099    
100            @Override
101            public void viewAccepted(View view) {
102                    super.viewAccepted(view);
103    
104                    if (_lastView == null) {
105                            _lastView = view;
106    
107                            return;
108                    }
109    
110                    List<Address> departAddresses = getDepartAddresses(view);
111                    List<Address> newAddresses = getNewAddresses(view);
112    
113                    _lastView = view;
114    
115                    try {
116                            _countDownLatch.await();
117                    }
118                    catch (InterruptedException ie) {
119                            _log.error(
120                                    "Latch opened prematurely by interruption. Dependence may " +
121                                            "not be ready.");
122                    }
123    
124                    if (!newAddresses.isEmpty()) {
125                            _clusterExecutorImpl.sendNotifyRequest();
126                    }
127    
128                    if (!departAddresses.isEmpty()) {
129                            _clusterExecutorImpl.memberRemoved(departAddresses);
130                    }
131            }
132    
133            protected List<Address> getDepartAddresses(View view) {
134                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
135                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
136    
137                    List<org.jgroups.Address> departJGroupsAddresses =
138                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
139    
140                    departJGroupsAddresses.removeAll(currentJGroupsAddresses);
141    
142                    if (departJGroupsAddresses.isEmpty()) {
143                            return Collections.emptyList();
144                    }
145    
146                    List<Address> departAddresses = new ArrayList<Address>(
147                            departJGroupsAddresses.size());
148    
149                    for (org.jgroups.Address departJGroupsAddress :
150                                    departJGroupsAddresses) {
151    
152                            Address departAddress = new AddressImpl(departJGroupsAddress);
153    
154                            departAddresses.add(departAddress);
155                    }
156    
157                    return departAddresses;
158            }
159    
160            protected List<Address> getNewAddresses(View view) {
161                    List<org.jgroups.Address> currentJGroupsAddresses = view.getMembers();
162                    List<org.jgroups.Address> lastJGroupsAddresses = _lastView.getMembers();
163    
164                    List<org.jgroups.Address> newJGroupsAddresses =
165                            new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
166    
167                    newJGroupsAddresses.removeAll(lastJGroupsAddresses);
168    
169                    if (newJGroupsAddresses.isEmpty()) {
170                            return Collections.emptyList();
171                    }
172    
173                    List<Address> newAddresses = new ArrayList<Address>(
174                            newJGroupsAddresses.size());
175    
176                    for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
177                            Address newAddress = new AddressImpl(newJGroupsAddress);
178    
179                            newAddresses.add(newAddress);
180                    }
181    
182                    return newAddresses;
183            }
184    
185            protected void handleResponse(
186                    Address address, ClusterRequest clusterRequest, Object returnValue,
187                    Exception exception) {
188    
189                    ClusterNodeResponse clusterNodeResponse =
190                            _clusterExecutorImpl.generateClusterNodeResponse(
191                                    clusterRequest, returnValue, exception);
192    
193                    Channel channel = _clusterExecutorImpl.getControlChannel();
194    
195                    try {
196                            channel.send(
197                                    (org.jgroups.Address)address.getRealAddress(), null,
198                                    clusterNodeResponse);
199                    }
200                    catch (Exception e) {
201                            _log.error(
202                                    "Unable to send response message " + clusterNodeResponse, e);
203                    }
204                    catch (Throwable t) {
205                            _log.error(t, t);
206                    }
207            }
208    
209            protected void processClusterRequest(
210                    ClusterRequest clusterRequest, Address sourceAddress) {
211    
212                    ClusterMessageType clusterMessageType =
213                            clusterRequest.getClusterMessageType();
214    
215                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
216                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
217    
218                            _clusterExecutorImpl.memberJoined(
219                                    sourceAddress, clusterRequest.getOriginatingClusterNode());
220    
221                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
222                                    handleResponse(sourceAddress, clusterRequest, null, null);
223                            }
224    
225                            return;
226                    }
227    
228                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
229    
230                    Object returnValue = null;
231                    Exception exception = null;
232    
233                    if (methodHandler != null) {
234                            try {
235                                    ClusterInvokeThreadLocal.setEnabled(false);
236    
237                                    returnValue = methodHandler.invoke(true);
238                            }
239                            catch (Exception e) {
240                                    exception = e;
241    
242                                    _log.error("Unable to invoke method " + methodHandler, e);
243                            }
244                            finally {
245                                    ClusterInvokeThreadLocal.setEnabled(true);
246                            }
247                    }
248                    else {
249                            exception = new ClusterException(
250                                    "Payload is not of type " + MethodHandler.class.getName());
251                    }
252    
253                    if (!clusterRequest.isFireAndForget()) {
254                            handleResponse(
255                                    sourceAddress, clusterRequest, returnValue, exception);
256                    }
257            }
258    
259            protected void processClusterResponse(
260                    ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
261    
262                    ClusterMessageType clusterMessageType =
263                            clusterNodeResponse.getClusterMessageType();
264    
265                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
266                            _clusterExecutorImpl.memberJoined(
267                                    sourceAddress, clusterNodeResponse.getClusterNode());
268    
269                            return;
270                    }
271    
272                    String uuid = clusterNodeResponse.getUuid();
273    
274                    FutureClusterResponses futureClusterResponses =
275                            _clusterExecutorImpl.getExecutionResults(uuid);
276    
277                    if (futureClusterResponses == null) {
278                            if (_log.isInfoEnabled()) {
279                                    _log.info("Unable to find response container for " + uuid);
280                            }
281    
282                            return;
283                    }
284    
285                    if (futureClusterResponses.expectsReply(sourceAddress)) {
286                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
287                    }
288                    else {
289                            if (_log.isWarnEnabled()) {
290                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
291                            }
292                    }
293            }
294    
295            protected boolean processLocalMessage(Object message) {
296    
297                    if (message instanceof ClusterRequest) {
298                            ClusterRequest clusterRequest = (ClusterRequest)message;
299    
300                            if (clusterRequest.isSkipLocal()) {
301                                    return true;
302                            }
303                    }
304    
305                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
306                            return true;
307                    }
308    
309                    return false;
310            }
311    
312            private static Log _log = LogFactoryUtil.getLog(
313                    ClusterRequestReceiver.class);
314    
315            private ClusterExecutorImpl _clusterExecutorImpl;
316            private CountDownLatch _countDownLatch;
317            private volatile View _lastView;
318    
319    }