001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterException;
019    import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
020    import com.liferay.portal.kernel.cluster.ClusterMessageType;
021    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022    import com.liferay.portal.kernel.cluster.ClusterRequest;
023    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024    import com.liferay.portal.kernel.log.Log;
025    import com.liferay.portal.kernel.log.LogFactoryUtil;
026    import com.liferay.portal.kernel.util.MethodHandler;
027    
028    import java.util.ArrayList;
029    import java.util.Collections;
030    import java.util.List;
031    
032    import org.jgroups.Message;
033    import org.jgroups.View;
034    
035    /**
036     * @author Michael C. Han
037     * @author Tina Tian
038     */
039    public class ClusterRequestReceiver extends BaseReceiver {
040    
041            public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
042                    _clusterExecutorImpl = clusterExecutorImpl;
043            }
044    
045            @Override
046            protected void doReceive(Message message) {
047                    Object obj = retrievePayload(message);
048    
049                    if (obj == null) {
050                            return;
051                    }
052    
053                    Address sourceAddress = new AddressImpl(message.getSrc());
054    
055                    if (sourceAddress.equals(
056                                    _clusterExecutorImpl.getLocalClusterNodeAddress())) {
057    
058                            boolean isProcessed = processLocalMessage(obj);
059    
060                            if (isProcessed) {
061                                    return;
062                            }
063                    }
064    
065                    if (obj instanceof ClusterRequest) {
066                            ClusterRequest clusterRequest = (ClusterRequest)obj;
067    
068                            processClusterRequest(clusterRequest, sourceAddress);
069                    }
070                    else if (obj instanceof ClusterNodeResponse) {
071                            ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
072    
073                            processClusterResponse(clusterNodeResponse, sourceAddress);
074                    }
075                    else if (_log.isWarnEnabled()) {
076                            _log.warn(
077                                    "Unable to process message content of type " + obj.getClass());
078                    }
079            }
080    
081            @Override
082            protected void doViewAccepted(View oldView, View newView) {
083                    List<Address> departAddresses = getDepartAddresses(oldView, newView);
084                    List<Address> newAddresses = getNewAddresses(oldView, newView);
085    
086                    if (!newAddresses.isEmpty()) {
087                            _clusterExecutorImpl.sendNotifyRequest();
088                    }
089    
090                    if (!departAddresses.isEmpty()) {
091                            _clusterExecutorImpl.memberRemoved(departAddresses);
092                    }
093            }
094    
095            protected List<Address> getDepartAddresses(View oldView, View newView) {
096                    List<org.jgroups.Address> currentJGroupsAddresses =
097                            newView.getMembers();
098                    List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
099    
100                    List<org.jgroups.Address> departJGroupsAddresses =
101                            new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
102    
103                    departJGroupsAddresses.removeAll(currentJGroupsAddresses);
104    
105                    if (departJGroupsAddresses.isEmpty()) {
106                            return Collections.emptyList();
107                    }
108    
109                    List<Address> departAddresses = new ArrayList<Address>(
110                            departJGroupsAddresses.size());
111    
112                    for (org.jgroups.Address departJGroupsAddress :
113                                    departJGroupsAddresses) {
114    
115                            Address departAddress = new AddressImpl(departJGroupsAddress);
116    
117                            departAddresses.add(departAddress);
118                    }
119    
120                    return departAddresses;
121            }
122    
123            protected List<Address> getNewAddresses(View oldView, View newView) {
124                    List<org.jgroups.Address> currentJGroupsAddresses =
125                            newView.getMembers();
126                    List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
127    
128                    List<org.jgroups.Address> newJGroupsAddresses =
129                            new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
130    
131                    newJGroupsAddresses.removeAll(lastJGroupsAddresses);
132    
133                    if (newJGroupsAddresses.isEmpty()) {
134                            return Collections.emptyList();
135                    }
136    
137                    List<Address> newAddresses = new ArrayList<Address>(
138                            newJGroupsAddresses.size());
139    
140                    for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
141                            Address newAddress = new AddressImpl(newJGroupsAddress);
142    
143                            newAddresses.add(newAddress);
144                    }
145    
146                    return newAddresses;
147            }
148    
149            protected void handleResponse(
150                    Address address, ClusterRequest clusterRequest, Object returnValue,
151                    Exception exception) {
152    
153                    ClusterNodeResponse clusterNodeResponse =
154                            _clusterExecutorImpl.generateClusterNodeResponse(
155                                    clusterRequest, returnValue, exception);
156    
157                    try {
158                            _clusterExecutorImpl.sendJGroupsMessage(
159                                    _clusterExecutorImpl.getControlChannel(),
160                                    (org.jgroups.Address)address.getRealAddress(),
161                                    clusterNodeResponse);
162                    }
163                    catch (Exception e) {
164                            _log.error(
165                                    "Unable to send response message " + clusterNodeResponse, e);
166                    }
167                    catch (Throwable t) {
168                            _log.error(t, t);
169                    }
170            }
171    
172            protected void processClusterRequest(
173                    ClusterRequest clusterRequest, Address sourceAddress) {
174    
175                    ClusterMessageType clusterMessageType =
176                            clusterRequest.getClusterMessageType();
177    
178                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
179                            clusterMessageType.equals(ClusterMessageType.UPDATE)) {
180    
181                            _clusterExecutorImpl.memberJoined(
182                                    sourceAddress, clusterRequest.getOriginatingClusterNode());
183    
184                            if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
185                                    handleResponse(sourceAddress, clusterRequest, null, null);
186                            }
187    
188                            return;
189                    }
190    
191                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
192    
193                    Object returnValue = null;
194                    Exception exception = null;
195    
196                    if (methodHandler != null) {
197                            try {
198                                    ClusterInvokeThreadLocal.setEnabled(false);
199    
200                                    returnValue = methodHandler.invoke(true);
201                            }
202                            catch (Exception e) {
203                                    exception = e;
204    
205                                    _log.error("Unable to invoke method " + methodHandler, e);
206                            }
207                            finally {
208                                    ClusterInvokeThreadLocal.setEnabled(true);
209                            }
210                    }
211                    else {
212                            exception = new ClusterException(
213                                    "Payload is not of type " + MethodHandler.class.getName());
214                    }
215    
216                    if (!clusterRequest.isFireAndForget()) {
217                            handleResponse(
218                                    sourceAddress, clusterRequest, returnValue, exception);
219                    }
220            }
221    
222            protected void processClusterResponse(
223                    ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
224    
225                    ClusterMessageType clusterMessageType =
226                            clusterNodeResponse.getClusterMessageType();
227    
228                    if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
229                            _clusterExecutorImpl.memberJoined(
230                                    sourceAddress, clusterNodeResponse.getClusterNode());
231    
232                            return;
233                    }
234    
235                    String uuid = clusterNodeResponse.getUuid();
236    
237                    FutureClusterResponses futureClusterResponses =
238                            _clusterExecutorImpl.getExecutionResults(uuid);
239    
240                    if (futureClusterResponses == null) {
241                            if (_log.isInfoEnabled()) {
242                                    _log.info("Unable to find response container for " + uuid);
243                            }
244    
245                            return;
246                    }
247    
248                    if (futureClusterResponses.expectsReply(sourceAddress)) {
249                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
250                    }
251                    else {
252                            if (_log.isWarnEnabled()) {
253                                    _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
254                            }
255                    }
256            }
257    
258            protected boolean processLocalMessage(Object message) {
259                    if (message instanceof ClusterRequest) {
260                            ClusterRequest clusterRequest = (ClusterRequest)message;
261    
262                            if (clusterRequest.isSkipLocal()) {
263                                    return true;
264                            }
265                    }
266    
267                    if (_clusterExecutorImpl.isShortcutLocalMethod()) {
268                            return true;
269                    }
270    
271                    return false;
272            }
273    
274            private static Log _log = LogFactoryUtil.getLog(
275                    ClusterRequestReceiver.class);
276    
277            private ClusterExecutorImpl _clusterExecutorImpl;
278    
279    }