001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterRequest;
026 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027 import com.liferay.portal.kernel.exception.SystemException;
028 import com.liferay.portal.kernel.log.Log;
029 import com.liferay.portal.kernel.log.LogFactoryUtil;
030 import com.liferay.portal.kernel.util.InetAddressUtil;
031 import com.liferay.portal.kernel.util.MethodHandler;
032 import com.liferay.portal.kernel.util.PropsKeys;
033 import com.liferay.portal.kernel.util.PropsUtil;
034 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
035 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
036 import com.liferay.portal.util.PortalPortEventListener;
037 import com.liferay.portal.util.PortalUtil;
038 import com.liferay.portal.util.PropsValues;
039
040 import java.io.Serializable;
041
042 import java.net.InetAddress;
043
044 import java.util.ArrayList;
045 import java.util.Collection;
046 import java.util.Collections;
047 import java.util.List;
048 import java.util.Map;
049 import java.util.Properties;
050 import java.util.concurrent.ConcurrentHashMap;
051 import java.util.concurrent.CopyOnWriteArrayList;
052
053 import org.jgroups.ChannelException;
054 import org.jgroups.JChannel;
055
056
060 public class ClusterExecutorImpl
061 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
062
063 public void addClusterEventListener(
064 ClusterEventListener clusterEventListener) {
065 if (!isEnabled()) {
066 return;
067 }
068
069 _clusterEventListeners.addIfAbsent(clusterEventListener);
070 }
071
072 public void afterPropertiesSet() {
073 super.afterPropertiesSet();
074
075 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
076 addClusterEventListener(new DebuggingClusterEventListenerImpl());
077 }
078 }
079
080 public void destroy() {
081 if (!isEnabled()) {
082 return;
083 }
084
085 _controlChannel.close();
086 }
087
088 public FutureClusterResponses execute(ClusterRequest clusterRequest)
089 throws SystemException {
090
091 if (!isEnabled()) {
092 return null;
093 }
094
095 List<Address> addresses = prepareAddresses(clusterRequest);
096
097 FutureClusterResponses futureClusterResponses =
098 new FutureClusterResponses(addresses);
099
100 if (!clusterRequest.isFireAndForget()) {
101 String uuid = clusterRequest.getUuid();
102
103 _executionResultMap.put(uuid, futureClusterResponses);
104 }
105
106 if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
107 addresses.remove(getLocalControlAddress())) {
108
109 ClusterNodeResponse clusterNodeResponse = runLocalMethod(
110 clusterRequest.getMethodHandler());
111
112 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
113 clusterNodeResponse.setUuid(clusterRequest.getUuid());
114
115 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
116 }
117
118 if (clusterRequest.isMulticast()) {
119 sendMulticastRequest(clusterRequest);
120 }
121 else {
122 sendUnicastRequest(clusterRequest, addresses);
123 }
124
125 return futureClusterResponses;
126 }
127
128 public List<ClusterEventListener> getClusterEventListeners() {
129 if (!isEnabled()) {
130 return Collections.EMPTY_LIST;
131 }
132
133 return Collections.unmodifiableList(_clusterEventListeners);
134 }
135
136 public List<ClusterNode> getClusterNodes() {
137 if (!isEnabled()) {
138 return Collections.EMPTY_LIST;
139 }
140
141 return new ArrayList<ClusterNode>(_addressMap.values());
142 }
143
144 public ClusterNode getLocalClusterNode() throws SystemException {
145 if (!isEnabled()) {
146 return null;
147 }
148
149 ClusterNode clusterNode = _addressMap.get(getLocalControlAddress());
150
151 if (clusterNode == null) {
152 _localClusterNodeId = PortalUUIDUtil.generate();
153
154 clusterNode = new ClusterNode(_localClusterNodeId);
155
156 clusterNode.setPort(PortalUtil.getPortalPort());
157
158 try {
159 InetAddress inetAddress = bindInetAddress;
160
161 if (inetAddress == null) {
162 inetAddress = InetAddressUtil.getLocalInetAddress();
163 }
164
165 clusterNode.setInetAddress(inetAddress);
166
167 clusterNode.setHostName(inetAddress.getHostName());
168 }
169 catch (Exception e) {
170 throw new SystemException(
171 "Unable to determine local network address", e);
172 }
173 }
174
175 return clusterNode;
176 }
177
178 public void initialize() {
179 if (!isEnabled()) {
180 return;
181 }
182
183 try {
184 PortalUtil.addPortalPortEventListener(this);
185
186 ClusterNode clusterNode = getLocalClusterNode();
187
188 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
189 ClusterMessageType.NOTIFY, clusterNode);
190
191 _controlChannel.send(null, null, clusterRequest);
192 }
193 catch (ChannelException ce) {
194 _log.error("Unable to send multicast message ", ce);
195 }
196 catch (SystemException se) {
197 _log.error("Unable to determine local network address", se);
198 }
199 }
200
201 public boolean isClusterNodeAlive(String clusterNodeId) {
202 if (!isEnabled()) {
203 return false;
204 }
205
206 return _clusterNodeIdMap.containsKey(clusterNodeId);
207 }
208
209 public boolean isEnabled() {
210 return PropsValues.CLUSTER_LINK_ENABLED;
211 }
212
213 public void portalPortConfigured(int port) {
214 if (!isEnabled()) {
215 return;
216 }
217
218 try {
219 ClusterNode clusterNode = getLocalClusterNode();
220
221 clusterNode.setPort(port);
222
223 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
224 ClusterMessageType.UPDATE, clusterNode);
225
226 _controlChannel.send(null, null, clusterRequest);
227 }
228 catch (Exception e) {
229 if (_log.isErrorEnabled()) {
230 _log.error("Unable to determine configure node port", e);
231 }
232 }
233 }
234
235 public void removeClusterEventListener(
236 ClusterEventListener clusterEventListener) {
237
238 if (!isEnabled()) {
239 return;
240 }
241
242 _clusterEventListeners.remove(clusterEventListener);
243 }
244
245 public void setClusterEventListeners(
246 List<ClusterEventListener> clusterEventListeners) {
247
248 if (!isEnabled()) {
249 return;
250 }
251
252 _clusterEventListeners.addAllAbsent(clusterEventListeners);
253 }
254
255 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
256 if (!isEnabled()) {
257 return;
258 }
259
260 _shortcutLocalMethod = shortcutLocalMethod;
261 }
262
263 protected void fireClusterEvent(ClusterEvent clusterEvent) {
264 for (ClusterEventListener listener : _clusterEventListeners) {
265 listener.processClusterEvent(clusterEvent);
266 }
267 }
268
269 protected JChannel getControlChannel() {
270 return _controlChannel;
271 }
272
273 protected FutureClusterResponses getExecutionResults(String uuid) {
274 return _executionResultMap.get(uuid);
275 }
276
277 protected Address getLocalControlAddress() {
278 return new AddressImpl(_controlChannel.getLocalAddress());
279 }
280
281 protected void initChannels() {
282 Properties controlProperties = PropsUtil.getProperties(
283 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
284
285 String controlProperty = controlProperties.getProperty(
286 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
287
288 ClusterRequestReceiver clusterInvokeReceiver =
289 new ClusterRequestReceiver(this);
290
291 try {
292 _controlChannel = createJChannel(
293 controlProperty, clusterInvokeReceiver, _DEFAULT_CLUSTER_NAME);
294 }
295 catch (ChannelException ce) {
296 _log.error(ce, ce);
297 }
298 catch (Exception e) {
299 _log.error(e, e);
300 }
301 }
302
303 protected boolean isShortcutLocalMethod() {
304 return _shortcutLocalMethod;
305 }
306
307 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
308 _addressMap.put(joinAddress, clusterNode);
309
310 Address previousAddress = _clusterNodeIdMap.put(
311 clusterNode.getClusterNodeId(), joinAddress);
312
313 if ((previousAddress == null) &&
314 !getLocalControlAddress().equals(joinAddress)) {
315
316 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
317
318 fireClusterEvent(clusterEvent);
319 }
320 }
321
322 protected void memberRemoved(List<Address> departAddresses) {
323 List<ClusterNode> departingClusterNodes = new ArrayList<ClusterNode>();
324
325 for (Address departAddress : departAddresses) {
326 ClusterNode departingClusterNode = _addressMap.remove(
327 departAddress);
328 if (departingClusterNode != null) {
329 departingClusterNodes.add(departingClusterNode);
330
331 _clusterNodeIdMap.remove(
332 departingClusterNode.getClusterNodeId());
333 }
334 }
335
336 if (departingClusterNodes.isEmpty()) {
337 return;
338 }
339
340 ClusterEvent clusterEvent = ClusterEvent.depart(departingClusterNodes);
341
342 fireClusterEvent(clusterEvent);
343 }
344
345 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
346 boolean isMulticast = clusterRequest.isMulticast();
347
348 List<Address> addresses = null;
349
350 if (isMulticast) {
351 addresses = getAddresses(_controlChannel);
352 }
353 else {
354 Collection<String> clusterNodeIds =
355 clusterRequest.getTargetClusterNodeIds();
356
357 addresses = new ArrayList<Address>(clusterNodeIds.size());
358
359 for (String clusterNodeId : clusterNodeIds) {
360 Address address = _clusterNodeIdMap.get(clusterNodeId);
361
362 addresses.add(address);
363 }
364 }
365
366 return addresses;
367 }
368
369 protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler)
370 throws SystemException {
371
372 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
373
374 ClusterNode localClusterNode = getLocalClusterNode();
375
376 clusterNodeResponse.setClusterNode(localClusterNode);
377 clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
378
379 if (methodHandler == null) {
380 clusterNodeResponse.setException(
381 new ClusterException(
382 "Payload is not of type " + MethodHandler.class.getName()));
383
384 return clusterNodeResponse;
385 }
386
387 try {
388 Object returnValue = methodHandler.invoke(true);
389
390 if (returnValue instanceof Serializable) {
391 clusterNodeResponse.setResult(returnValue);
392 }
393 else if (returnValue != null) {
394 clusterNodeResponse.setException(
395 new ClusterException("Return value is not serializable"));
396 }
397 }
398 catch (Exception e) {
399 clusterNodeResponse.setException(e);
400 }
401
402 return clusterNodeResponse;
403 }
404
405 protected void sendMulticastRequest(ClusterRequest clusterRequest)
406 throws SystemException {
407
408 try {
409 _controlChannel.send(null, null, clusterRequest);
410 }
411 catch (ChannelException ce) {
412 _log.error(
413 "Unable to send multicast message " + clusterRequest, ce);
414
415 throw new SystemException(
416 "Unable to send multicast request", ce);
417 }
418 }
419
420 protected void sendUnicastRequest(
421 ClusterRequest clusterRequest, List<Address> addresses)
422 throws SystemException {
423
424 for (Address address : addresses) {
425 org.jgroups.Address jGroupsAddress =
426 (org.jgroups.Address)address.getRealAddress();
427
428 try {
429 _controlChannel.send(jGroupsAddress, null, clusterRequest);
430 }
431 catch (ChannelException ce) {
432 _log.error(
433 "Unable to send unicast message " + clusterRequest, ce);
434
435 throw new SystemException(
436 "Unable to send unicast request", ce);
437 }
438 }
439 }
440
441 private static final String _DEFAULT_CLUSTER_NAME =
442 "LIFERAY-CONTROL-CHANNEL";
443
444 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
445
446 private Map<Address, ClusterNode> _addressMap =
447 new ConcurrentHashMap<Address, ClusterNode>();
448 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
449 new CopyOnWriteArrayList<ClusterEventListener>();
450 private Map<String, Address> _clusterNodeIdMap =
451 new ConcurrentHashMap<String, Address>();
452 private JChannel _controlChannel;
453 private Map<String, FutureClusterResponses> _executionResultMap =
454 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
455 private String _localClusterNodeId;
456 private boolean _shortcutLocalMethod;
457
458 }