001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterException;
019 import com.liferay.portal.kernel.cluster.ClusterInvokeThreadLocal;
020 import com.liferay.portal.kernel.cluster.ClusterMessageType;
021 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022 import com.liferay.portal.kernel.cluster.ClusterRequest;
023 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024 import com.liferay.portal.kernel.log.Log;
025 import com.liferay.portal.kernel.log.LogFactoryUtil;
026 import com.liferay.portal.kernel.util.MethodHandler;
027
028 import java.util.ArrayList;
029 import java.util.Collections;
030 import java.util.List;
031
032 import org.jgroups.Message;
033 import org.jgroups.View;
034
035
039 public class ClusterRequestReceiver extends BaseReceiver {
040
041 public ClusterRequestReceiver(ClusterExecutorImpl clusterExecutorImpl) {
042 _clusterExecutorImpl = clusterExecutorImpl;
043 }
044
045 @Override
046 protected void doReceive(Message message) {
047 Object obj = retrievePayload(message);
048
049 if (obj == null) {
050 return;
051 }
052
053 Address sourceAddress = new AddressImpl(message.getSrc());
054
055 if (sourceAddress.equals(
056 _clusterExecutorImpl.getLocalClusterNodeAddress())) {
057
058 boolean isProcessed = processLocalMessage(obj);
059
060 if (isProcessed) {
061 return;
062 }
063 }
064
065 if (obj instanceof ClusterRequest) {
066 ClusterRequest clusterRequest = (ClusterRequest)obj;
067
068 processClusterRequest(clusterRequest, sourceAddress);
069 }
070 else if (obj instanceof ClusterNodeResponse) {
071 ClusterNodeResponse clusterNodeResponse = (ClusterNodeResponse)obj;
072
073 processClusterResponse(clusterNodeResponse, sourceAddress);
074 }
075 else if (_log.isWarnEnabled()) {
076 _log.warn(
077 "Unable to process message content of type " + obj.getClass());
078 }
079 }
080
081 @Override
082 protected void doViewAccepted(View oldView, View newView) {
083 List<Address> departAddresses = getDepartAddresses(oldView, newView);
084 List<Address> newAddresses = getNewAddresses(oldView, newView);
085
086 if (!newAddresses.isEmpty()) {
087 _clusterExecutorImpl.sendNotifyRequest();
088 }
089
090 if (!departAddresses.isEmpty()) {
091 _clusterExecutorImpl.memberRemoved(departAddresses);
092 }
093 }
094
095 protected List<Address> getDepartAddresses(View oldView, View newView) {
096 List<org.jgroups.Address> currentJGroupsAddresses =
097 newView.getMembers();
098 List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
099
100 List<org.jgroups.Address> departJGroupsAddresses =
101 new ArrayList<org.jgroups.Address>(lastJGroupsAddresses);
102
103 departJGroupsAddresses.removeAll(currentJGroupsAddresses);
104
105 if (departJGroupsAddresses.isEmpty()) {
106 return Collections.emptyList();
107 }
108
109 List<Address> departAddresses = new ArrayList<Address>(
110 departJGroupsAddresses.size());
111
112 for (org.jgroups.Address departJGroupsAddress :
113 departJGroupsAddresses) {
114
115 Address departAddress = new AddressImpl(departJGroupsAddress);
116
117 departAddresses.add(departAddress);
118 }
119
120 return departAddresses;
121 }
122
123 protected List<Address> getNewAddresses(View oldView, View newView) {
124 List<org.jgroups.Address> currentJGroupsAddresses =
125 newView.getMembers();
126 List<org.jgroups.Address> lastJGroupsAddresses = oldView.getMembers();
127
128 List<org.jgroups.Address> newJGroupsAddresses =
129 new ArrayList<org.jgroups.Address>(currentJGroupsAddresses);
130
131 newJGroupsAddresses.removeAll(lastJGroupsAddresses);
132
133 if (newJGroupsAddresses.isEmpty()) {
134 return Collections.emptyList();
135 }
136
137 List<Address> newAddresses = new ArrayList<Address>(
138 newJGroupsAddresses.size());
139
140 for (org.jgroups.Address newJGroupsAddress : newJGroupsAddresses) {
141 Address newAddress = new AddressImpl(newJGroupsAddress);
142
143 newAddresses.add(newAddress);
144 }
145
146 return newAddresses;
147 }
148
149 protected void handleResponse(
150 Address address, ClusterRequest clusterRequest, Object returnValue,
151 Exception exception) {
152
153 ClusterNodeResponse clusterNodeResponse =
154 _clusterExecutorImpl.generateClusterNodeResponse(
155 clusterRequest, returnValue, exception);
156
157 try {
158 _clusterExecutorImpl.sendJGroupsMessage(
159 _clusterExecutorImpl.getControlChannel(),
160 (org.jgroups.Address)address.getRealAddress(),
161 clusterNodeResponse);
162 }
163 catch (Exception e) {
164 _log.error(
165 "Unable to send response message " + clusterNodeResponse, e);
166 }
167 catch (Throwable t) {
168 _log.error(t, t);
169 }
170 }
171
172 protected void processClusterRequest(
173 ClusterRequest clusterRequest, Address sourceAddress) {
174
175 ClusterMessageType clusterMessageType =
176 clusterRequest.getClusterMessageType();
177
178 if (clusterMessageType.equals(ClusterMessageType.NOTIFY) ||
179 clusterMessageType.equals(ClusterMessageType.UPDATE)) {
180
181 _clusterExecutorImpl.memberJoined(
182 sourceAddress, clusterRequest.getOriginatingClusterNode());
183
184 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
185 handleResponse(sourceAddress, clusterRequest, null, null);
186 }
187
188 return;
189 }
190
191 MethodHandler methodHandler = clusterRequest.getMethodHandler();
192
193 Object returnValue = null;
194 Exception exception = null;
195
196 if (methodHandler != null) {
197 try {
198 ClusterInvokeThreadLocal.setEnabled(false);
199
200 returnValue = methodHandler.invoke(true);
201 }
202 catch (Exception e) {
203 exception = e;
204
205 _log.error("Unable to invoke method " + methodHandler, e);
206 }
207 finally {
208 ClusterInvokeThreadLocal.setEnabled(true);
209 }
210 }
211 else {
212 exception = new ClusterException(
213 "Payload is not of type " + MethodHandler.class.getName());
214 }
215
216 if (!clusterRequest.isFireAndForget()) {
217 handleResponse(
218 sourceAddress, clusterRequest, returnValue, exception);
219 }
220 }
221
222 protected void processClusterResponse(
223 ClusterNodeResponse clusterNodeResponse, Address sourceAddress) {
224
225 ClusterMessageType clusterMessageType =
226 clusterNodeResponse.getClusterMessageType();
227
228 if (clusterMessageType.equals(ClusterMessageType.NOTIFY)) {
229 _clusterExecutorImpl.memberJoined(
230 sourceAddress, clusterNodeResponse.getClusterNode());
231
232 return;
233 }
234
235 String uuid = clusterNodeResponse.getUuid();
236
237 FutureClusterResponses futureClusterResponses =
238 _clusterExecutorImpl.getExecutionResults(uuid);
239
240 if (futureClusterResponses == null) {
241 if (_log.isInfoEnabled()) {
242 _log.info("Unable to find response container for " + uuid);
243 }
244
245 return;
246 }
247
248 if (futureClusterResponses.expectsReply(sourceAddress)) {
249 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
250 }
251 else {
252 if (_log.isWarnEnabled()) {
253 _log.warn("Unknown uuid " + uuid + " from " + sourceAddress);
254 }
255 }
256 }
257
258 protected boolean processLocalMessage(Object message) {
259 if (message instanceof ClusterRequest) {
260 ClusterRequest clusterRequest = (ClusterRequest)message;
261
262 if (clusterRequest.isSkipLocal()) {
263 return true;
264 }
265 }
266
267 if (_clusterExecutorImpl.isShortcutLocalMethod()) {
268 return true;
269 }
270
271 return false;
272 }
273
274 private static Log _log = LogFactoryUtil.getLog(
275 ClusterRequestReceiver.class);
276
277 private ClusterExecutorImpl _clusterExecutorImpl;
278
279 }