001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034 import com.liferay.portal.kernel.util.InetAddressUtil;
035 import com.liferay.portal.kernel.util.MethodHandler;
036 import com.liferay.portal.kernel.util.PropsKeys;
037 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
038 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
039 import com.liferay.portal.util.PortalPortEventListener;
040 import com.liferay.portal.util.PortalUtil;
041 import com.liferay.portal.util.PropsUtil;
042 import com.liferay.portal.util.PropsValues;
043
044 import java.io.Serializable;
045
046 import java.net.InetAddress;
047
048 import java.util.ArrayList;
049 import java.util.Collection;
050 import java.util.Collections;
051 import java.util.List;
052 import java.util.Map;
053 import java.util.Properties;
054 import java.util.concurrent.BlockingQueue;
055 import java.util.concurrent.ConcurrentHashMap;
056 import java.util.concurrent.CopyOnWriteArrayList;
057 import java.util.concurrent.ExecutorService;
058 import java.util.concurrent.TimeUnit;
059 import java.util.concurrent.TimeoutException;
060
061 import org.jgroups.ChannelException;
062 import org.jgroups.JChannel;
063
064
068 @DoPrivileged
069 public class ClusterExecutorImpl
070 extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
071
072 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
073 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
074
075 @Override
076 public void addClusterEventListener(
077 ClusterEventListener clusterEventListener) {
078
079 if (!isEnabled()) {
080 return;
081 }
082
083 _clusterEventListeners.addIfAbsent(clusterEventListener);
084 }
085
086 @Override
087 public void afterPropertiesSet() {
088 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
089 addClusterEventListener(new DebuggingClusterEventListenerImpl());
090 }
091
092 if (PropsValues.LIVE_USERS_ENABLED) {
093 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
094 }
095
096 super.afterPropertiesSet();
097 }
098
099 @Override
100 public void destroy() {
101 if (!isEnabled()) {
102 return;
103 }
104
105 PortalExecutorManagerUtil.shutdown(
106 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
107
108 _controlJChannel.setReceiver(null);
109 _controlJChannel.close();
110
111 _clusterEventListeners.clear();
112 _clusterNodeAddresses.clear();
113 _futureClusterResponses.clear();
114 _liveInstances.clear();
115 _localAddress = null;
116 _localClusterNode = null;
117 }
118
119 @Override
120 public FutureClusterResponses execute(ClusterRequest clusterRequest)
121 throws SystemException {
122
123 if (!isEnabled()) {
124 return null;
125 }
126
127 List<Address> addresses = prepareAddresses(clusterRequest);
128
129 FutureClusterResponses futureClusterResponses =
130 new FutureClusterResponses(addresses);
131
132 if (!clusterRequest.isFireAndForget()) {
133 String uuid = clusterRequest.getUuid();
134
135 _futureClusterResponses.put(uuid, futureClusterResponses);
136 }
137
138 if (_shortcutLocalMethod &&
139 addresses.remove(getLocalClusterNodeAddress())) {
140
141 runLocalMethod(clusterRequest, futureClusterResponses);
142 }
143
144 if (clusterRequest.isMulticast()) {
145 try {
146 _controlJChannel.send(null, null, clusterRequest);
147 }
148 catch (Exception e) {
149 throw new SystemException(
150 "Unable to send multicast request", e);
151 }
152 }
153 else {
154 for (Address address : addresses) {
155 org.jgroups.Address jGroupsAddress =
156 (org.jgroups.Address)address.getRealAddress();
157
158 try {
159 _controlJChannel.send(jGroupsAddress, null, clusterRequest);
160 }
161 catch (Exception e) {
162 throw new SystemException(
163 "Unable to send unicast request", e);
164 }
165 }
166 }
167
168 return futureClusterResponses;
169 }
170
171 @Override
172 public void execute(
173 ClusterRequest clusterRequest,
174 ClusterResponseCallback clusterResponseCallback)
175 throws SystemException {
176
177 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
178
179 ClusterResponseCallbackJob clusterResponseCallbackJob =
180 new ClusterResponseCallbackJob(
181 clusterResponseCallback, futureClusterResponses);
182
183 _executorService.execute(clusterResponseCallbackJob);
184 }
185
186 @Override
187 public void execute(
188 ClusterRequest clusterRequest,
189 ClusterResponseCallback clusterResponseCallback, long timeout,
190 TimeUnit timeUnit)
191 throws SystemException {
192
193 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
194
195 ClusterResponseCallbackJob clusterResponseCallbackJob =
196 new ClusterResponseCallbackJob(
197 clusterResponseCallback, futureClusterResponses, timeout,
198 timeUnit);
199
200 _executorService.execute(clusterResponseCallbackJob);
201 }
202
203 @Override
204 public List<ClusterEventListener> getClusterEventListeners() {
205 if (!isEnabled()) {
206 return Collections.emptyList();
207 }
208
209 return Collections.unmodifiableList(_clusterEventListeners);
210 }
211
212 @Override
213 public List<Address> getClusterNodeAddresses() {
214 if (!isEnabled()) {
215 return Collections.emptyList();
216 }
217
218 return getAddresses(_controlJChannel);
219 }
220
221 @Override
222 public List<ClusterNode> getClusterNodes() {
223 if (!isEnabled()) {
224 return Collections.emptyList();
225 }
226
227 return new ArrayList<ClusterNode>(_liveInstances.values());
228 }
229
230 @Override
231 public ClusterNode getLocalClusterNode() {
232 if (!isEnabled()) {
233 return null;
234 }
235
236 return _localClusterNode;
237 }
238
239 @Override
240 public Address getLocalClusterNodeAddress() {
241 if (!isEnabled()) {
242 return null;
243 }
244
245 return _localAddress;
246 }
247
248 @Override
249 public void initialize() {
250 if (!isEnabled()) {
251 return;
252 }
253
254 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
255 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
256
257 PortalUtil.addPortalPortEventListener(this);
258
259 _localAddress = new AddressImpl(_controlJChannel.getLocalAddress());
260
261 try {
262 initLocalClusterNode();
263
264 memberJoined(_localAddress, _localClusterNode);
265
266 sendNotifyRequest();
267 }
268 catch (Exception e) {
269 _log.error("Unable to determine local network address", e);
270 }
271
272 ClusterRequestReceiver clusterRequestReceiver =
273 (ClusterRequestReceiver)_controlJChannel.getReceiver();
274
275 clusterRequestReceiver.openLatch();
276 }
277
278 @Override
279 public boolean isClusterNodeAlive(Address address) {
280 if (!isEnabled()) {
281 return false;
282 }
283
284 List<Address> addresses = getAddresses(_controlJChannel);
285
286 return addresses.contains(address);
287 }
288
289 @Override
290 public boolean isClusterNodeAlive(String clusterNodeId) {
291 if (!isEnabled()) {
292 return false;
293 }
294
295 return _clusterNodeAddresses.containsKey(clusterNodeId);
296 }
297
298 @Override
299 public void portalPortConfigured(int port) {
300 if (!isEnabled() ||
301 (_localClusterNode.getPort() ==
302 PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
303
304 return;
305 }
306
307 try {
308 _localClusterNode.setPort(port);
309
310 memberJoined(_localAddress, _localClusterNode);
311
312 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
313 ClusterMessageType.UPDATE, _localClusterNode);
314
315 _controlJChannel.send(null, null, clusterRequest);
316 }
317 catch (Exception e) {
318 _log.error("Unable to determine configure node port", e);
319 }
320 }
321
322 @Override
323 public void removeClusterEventListener(
324 ClusterEventListener clusterEventListener) {
325
326 if (!isEnabled()) {
327 return;
328 }
329
330 _clusterEventListeners.remove(clusterEventListener);
331 }
332
333 public void setClusterEventListeners(
334 List<ClusterEventListener> clusterEventListeners) {
335
336 if (!isEnabled()) {
337 return;
338 }
339
340 _clusterEventListeners.addAllAbsent(clusterEventListeners);
341 }
342
343 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
344 if (!isEnabled()) {
345 return;
346 }
347
348 _shortcutLocalMethod = shortcutLocalMethod;
349 }
350
351 protected void fireClusterEvent(ClusterEvent clusterEvent) {
352 for (ClusterEventListener listener : _clusterEventListeners) {
353 listener.processClusterEvent(clusterEvent);
354 }
355 }
356
357 protected ClusterNodeResponse generateClusterNodeResponse(
358 ClusterRequest clusterRequest, Object returnValue,
359 Exception exception) {
360
361 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
362
363 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
364 clusterNodeResponse.setClusterNode(getLocalClusterNode());
365 clusterNodeResponse.setClusterMessageType(
366 clusterRequest.getClusterMessageType());
367 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
368 clusterNodeResponse.setUuid(clusterRequest.getUuid());
369
370 if (exception != null) {
371 clusterNodeResponse.setException(exception);
372 }
373 else {
374 if (returnValue instanceof Serializable) {
375 clusterNodeResponse.setResult(returnValue);
376 }
377 else if (returnValue != null) {
378 clusterNodeResponse.setException(
379 new ClusterException("Return value is not serializable"));
380 }
381 }
382
383 return clusterNodeResponse;
384 }
385
386 protected JChannel getControlChannel() {
387 return _controlJChannel;
388 }
389
390 protected FutureClusterResponses getExecutionResults(String uuid) {
391 return _futureClusterResponses.get(uuid);
392 }
393
394 @Override
395 protected void initChannels() throws ChannelException {
396 Properties controlProperties = PropsUtil.getProperties(
397 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
398
399 String controlProperty = controlProperties.getProperty(
400 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
401
402 ClusterRequestReceiver clusterRequestReceiver =
403 new ClusterRequestReceiver(this);
404
405 _controlJChannel = createJChannel(
406 controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
407 }
408
409 protected void initLocalClusterNode() throws Exception {
410 InetAddress inetAddress = bindInetAddress;
411
412 if (inetAddress == null) {
413 inetAddress = InetAddressUtil.getLocalInetAddress();
414 }
415
416 ClusterNode localClusterNode = new ClusterNode(
417 PortalUUIDUtil.generate(), inetAddress);
418
419 if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
420 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
421 }
422 else {
423 localClusterNode.setPort(PortalUtil.getPortalPort(false));
424 }
425
426 _localClusterNode = localClusterNode;
427 }
428
429 protected boolean isShortcutLocalMethod() {
430 return _shortcutLocalMethod;
431 }
432
433 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
434 _liveInstances.put(joinAddress, clusterNode);
435
436 Address previousAddress = _clusterNodeAddresses.put(
437 clusterNode.getClusterNodeId(), joinAddress);
438
439 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
440 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
441
442 fireClusterEvent(clusterEvent);
443 }
444 }
445
446 protected void memberRemoved(List<Address> departAddresses) {
447 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
448
449 for (Address departAddress : departAddresses) {
450 ClusterNode departClusterNode = _liveInstances.remove(
451 departAddress);
452
453 if (departClusterNode == null) {
454 continue;
455 }
456
457 departClusterNodes.add(departClusterNode);
458
459 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
460 }
461
462 if (departClusterNodes.isEmpty()) {
463 return;
464 }
465
466 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
467
468 fireClusterEvent(clusterEvent);
469 }
470
471 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
472 boolean isMulticast = clusterRequest.isMulticast();
473
474 List<Address> addresses = null;
475
476 if (isMulticast) {
477 addresses = getAddresses(_controlJChannel);
478 }
479 else {
480 addresses = new ArrayList<Address>();
481
482 Collection<Address> clusterNodeAddresses =
483 clusterRequest.getTargetClusterNodeAddresses();
484
485 if (clusterNodeAddresses != null) {
486 addresses.addAll(clusterNodeAddresses);
487 }
488
489 Collection<String> clusterNodeIds =
490 clusterRequest.getTargetClusterNodeIds();
491
492 if (clusterNodeIds != null) {
493 for (String clusterNodeId : clusterNodeIds) {
494 Address address = _clusterNodeAddresses.get(clusterNodeId);
495
496 addresses.add(address);
497 }
498 }
499 }
500
501 if (clusterRequest.isSkipLocal()) {
502 addresses.remove(getLocalClusterNodeAddress());
503 }
504
505 return addresses;
506 }
507
508 protected void runLocalMethod(
509 ClusterRequest clusterRequest,
510 FutureClusterResponses futureClusterResponses) {
511
512 MethodHandler methodHandler = clusterRequest.getMethodHandler();
513 Object returnValue = null;
514 Exception exception = null;
515
516 if (methodHandler == null) {
517 exception = new ClusterException(
518 "Payload is not of type " + MethodHandler.class.getName());
519 }
520 else {
521 try {
522 returnValue = methodHandler.invoke(true);
523 }
524 catch (Exception e) {
525 exception = e;
526 }
527 }
528
529 if (!clusterRequest.isFireAndForget()) {
530 ClusterNodeResponse clusterNodeResponse =
531 generateClusterNodeResponse(
532 clusterRequest, returnValue, exception);
533
534 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
535 }
536 }
537
538 protected void sendNotifyRequest() {
539 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
540 ClusterMessageType.NOTIFY, _localClusterNode);
541
542 try {
543 _controlJChannel.send(null, null, clusterRequest);
544 }
545 catch (Exception e) {
546 _log.error("Unable to send notify message", e);
547 }
548 }
549
550 private static final String _DEFAULT_CLUSTER_NAME =
551 "LIFERAY-CONTROL-CHANNEL";
552
553 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
554
555 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
556 new CopyOnWriteArrayList<ClusterEventListener>();
557 private Map<String, Address> _clusterNodeAddresses =
558 new ConcurrentHashMap<String, Address>();
559 private JChannel _controlJChannel;
560 private ExecutorService _executorService;
561 private Map<String, FutureClusterResponses> _futureClusterResponses =
562 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
563 private Map<Address, ClusterNode> _liveInstances =
564 new ConcurrentHashMap<Address, ClusterNode>();
565 private Address _localAddress;
566 private ClusterNode _localClusterNode;
567 private boolean _shortcutLocalMethod;
568
569 private class ClusterResponseCallbackJob implements Runnable {
570
571 public ClusterResponseCallbackJob(
572 ClusterResponseCallback clusterResponseCallback,
573 FutureClusterResponses futureClusterResponses) {
574
575 _clusterResponseCallback = clusterResponseCallback;
576 _futureClusterResponses = futureClusterResponses;
577 _timeout = -1;
578 _timeoutGet = false;
579 _timeUnit = TimeUnit.SECONDS;
580 }
581
582 public ClusterResponseCallbackJob(
583 ClusterResponseCallback clusterResponseCallback,
584 FutureClusterResponses futureClusterResponses, long timeout,
585 TimeUnit timeUnit) {
586
587 _clusterResponseCallback = clusterResponseCallback;
588 _futureClusterResponses = futureClusterResponses;
589 _timeout = timeout;
590 _timeoutGet = true;
591 _timeUnit = timeUnit;
592 }
593
594 @Override
595 public void run() {
596 BlockingQueue<ClusterNodeResponse> blockingQueue =
597 _futureClusterResponses.getPartialResults();
598
599 _clusterResponseCallback.callback(blockingQueue);
600
601 ClusterNodeResponses clusterNodeResponses = null;
602
603 try {
604 if (_timeoutGet) {
605 clusterNodeResponses = _futureClusterResponses.get(
606 _timeout, _timeUnit);
607 }
608 else {
609 clusterNodeResponses = _futureClusterResponses.get();
610 }
611
612 _clusterResponseCallback.callback(clusterNodeResponses);
613 }
614 catch (InterruptedException ie) {
615 _clusterResponseCallback.processInterruptedException(ie);
616 }
617 catch (TimeoutException te) {
618 _clusterResponseCallback.processTimeoutException(te);
619 }
620 }
621
622 private final ClusterResponseCallback _clusterResponseCallback;
623 private final FutureClusterResponses _futureClusterResponses;
624 private final long _timeout;
625 private final boolean _timeoutGet;
626 private final TimeUnit _timeUnit;
627
628 }
629
630 }