001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterEvent;
019 import com.liferay.portal.kernel.cluster.ClusterEventListener;
020 import com.liferay.portal.kernel.cluster.ClusterException;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMessageType;
023 import com.liferay.portal.kernel.cluster.ClusterNode;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029 import com.liferay.portal.kernel.exception.SystemException;
030 import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031 import com.liferay.portal.kernel.log.Log;
032 import com.liferay.portal.kernel.log.LogFactoryUtil;
033 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034 import com.liferay.portal.kernel.util.Http;
035 import com.liferay.portal.kernel.util.InetAddressUtil;
036 import com.liferay.portal.kernel.util.MethodHandler;
037 import com.liferay.portal.kernel.util.PropsKeys;
038 import com.liferay.portal.kernel.util.StringBundler;
039 import com.liferay.portal.kernel.util.StringUtil;
040 import com.liferay.portal.kernel.util.Validator;
041 import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
042 import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
043 import com.liferay.portal.util.PortalPortEventListener;
044 import com.liferay.portal.util.PortalPortProtocolEventListener;
045 import com.liferay.portal.util.PortalUtil;
046 import com.liferay.portal.util.PropsUtil;
047 import com.liferay.portal.util.PropsValues;
048
049 import java.io.Serializable;
050
051 import java.net.InetAddress;
052
053 import java.util.ArrayList;
054 import java.util.Collection;
055 import java.util.Collections;
056 import java.util.List;
057 import java.util.Map;
058 import java.util.concurrent.BlockingQueue;
059 import java.util.concurrent.ConcurrentHashMap;
060 import java.util.concurrent.CopyOnWriteArrayList;
061 import java.util.concurrent.ExecutorService;
062 import java.util.concurrent.TimeUnit;
063 import java.util.concurrent.TimeoutException;
064
065 import org.jgroups.JChannel;
066
067
071 @DoPrivileged
072 public class ClusterExecutorImpl
073 extends ClusterBase
074 implements ClusterExecutor, PortalPortEventListener,
075 PortalPortProtocolEventListener {
076
077 public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
078 "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
079
080 @Override
081 public void addClusterEventListener(
082 ClusterEventListener clusterEventListener) {
083
084 if (!isEnabled()) {
085 return;
086 }
087
088 _clusterEventListeners.addIfAbsent(clusterEventListener);
089 }
090
091 @Override
092 public void afterPropertiesSet() {
093 if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
094 addClusterEventListener(new DebuggingClusterEventListenerImpl());
095 }
096
097 if (PropsValues.LIVE_USERS_ENABLED) {
098 addClusterEventListener(new LiveUsersClusterEventListenerImpl());
099 }
100
101 super.afterPropertiesSet();
102 }
103
104 @Override
105 public void destroy() {
106 if (!isEnabled()) {
107 return;
108 }
109
110 PortalExecutorManagerUtil.shutdown(
111 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
112
113 _controlJChannel.setReceiver(null);
114
115 _controlJChannel.close();
116
117 _clusterEventListeners.clear();
118 _clusterNodeAddresses.clear();
119 _futureClusterResponses.clear();
120 _liveInstances.clear();
121 _localAddress = null;
122 _localClusterNode = null;
123 }
124
125 @Override
126 public FutureClusterResponses execute(ClusterRequest clusterRequest)
127 throws SystemException {
128
129 if (!isEnabled()) {
130 return null;
131 }
132
133 List<Address> addresses = prepareAddresses(clusterRequest);
134
135 FutureClusterResponses futureClusterResponses =
136 new FutureClusterResponses(addresses);
137
138 if (!clusterRequest.isFireAndForget()) {
139 String uuid = clusterRequest.getUuid();
140
141 _futureClusterResponses.put(uuid, futureClusterResponses);
142 }
143
144 if (_shortcutLocalMethod &&
145 addresses.remove(getLocalClusterNodeAddress())) {
146
147 runLocalMethod(clusterRequest, futureClusterResponses);
148 }
149
150 if (clusterRequest.isMulticast()) {
151 try {
152 sendJGroupsMessage(_controlJChannel, null, clusterRequest);
153 }
154 catch (Exception e) {
155 throw new SystemException(
156 "Unable to send multicast request", e);
157 }
158 }
159 else {
160 for (Address address : addresses) {
161 org.jgroups.Address jGroupsAddress =
162 (org.jgroups.Address)address.getRealAddress();
163
164 try {
165 sendJGroupsMessage(
166 _controlJChannel, jGroupsAddress, clusterRequest);
167 }
168 catch (Exception e) {
169 throw new SystemException(
170 "Unable to send unicast request", e);
171 }
172 }
173 }
174
175 return futureClusterResponses;
176 }
177
178 @Override
179 public void execute(
180 ClusterRequest clusterRequest,
181 ClusterResponseCallback clusterResponseCallback)
182 throws SystemException {
183
184 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
185
186 ClusterResponseCallbackJob clusterResponseCallbackJob =
187 new ClusterResponseCallbackJob(
188 clusterResponseCallback, futureClusterResponses);
189
190 _executorService.execute(clusterResponseCallbackJob);
191 }
192
193 @Override
194 public void execute(
195 ClusterRequest clusterRequest,
196 ClusterResponseCallback clusterResponseCallback, long timeout,
197 TimeUnit timeUnit)
198 throws SystemException {
199
200 FutureClusterResponses futureClusterResponses = execute(clusterRequest);
201
202 ClusterResponseCallbackJob clusterResponseCallbackJob =
203 new ClusterResponseCallbackJob(
204 clusterResponseCallback, futureClusterResponses, timeout,
205 timeUnit);
206
207 _executorService.execute(clusterResponseCallbackJob);
208 }
209
210 @Override
211 public List<ClusterEventListener> getClusterEventListeners() {
212 if (!isEnabled()) {
213 return Collections.emptyList();
214 }
215
216 return Collections.unmodifiableList(_clusterEventListeners);
217 }
218
219 @Override
220 public List<Address> getClusterNodeAddresses() {
221 if (!isEnabled()) {
222 return Collections.emptyList();
223 }
224
225 return getAddresses(_controlJChannel);
226 }
227
228 @Override
229 public List<ClusterNode> getClusterNodes() {
230 if (!isEnabled()) {
231 return Collections.emptyList();
232 }
233
234 return new ArrayList<ClusterNode>(_liveInstances.values());
235 }
236
237 @Override
238 public ClusterNode getLocalClusterNode() {
239 if (!isEnabled()) {
240 return null;
241 }
242
243 return _localClusterNode;
244 }
245
246 @Override
247 public Address getLocalClusterNodeAddress() {
248 if (!isEnabled()) {
249 return null;
250 }
251
252 return _localAddress;
253 }
254
255 @Override
256 public void initialize() {
257 if (!isEnabled()) {
258 return;
259 }
260
261 _executorService = PortalExecutorManagerUtil.getPortalExecutor(
262 CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
263
264 PortalUtil.addPortalPortProtocolEventListener(this);
265
266 _localAddress = new AddressImpl(_controlJChannel.getAddress());
267
268 try {
269 initLocalClusterNode();
270
271 memberJoined(_localAddress, _localClusterNode);
272
273 sendNotifyRequest();
274 }
275 catch (Exception e) {
276 _log.error("Unable to determine local network address", e);
277 }
278
279 ClusterRequestReceiver clusterRequestReceiver =
280 (ClusterRequestReceiver)_controlJChannel.getReceiver();
281
282 clusterRequestReceiver.openLatch();
283 }
284
285 @Override
286 public boolean isClusterNodeAlive(Address address) {
287 if (!isEnabled()) {
288 return false;
289 }
290
291 List<Address> addresses = getAddresses(_controlJChannel);
292
293 return addresses.contains(address);
294 }
295
296 @Override
297 public boolean isClusterNodeAlive(String clusterNodeId) {
298 if (!isEnabled()) {
299 return false;
300 }
301
302 return _clusterNodeAddresses.containsKey(clusterNodeId);
303 }
304
305
309 @Override
310 public void portalPortConfigured(int port) {
311 portalPortProtocolConfigured(port, null);
312 }
313
314 @Override
315 public void portalPortProtocolConfigured(int port, Boolean secure) {
316 if (!isEnabled() || (port <= 0) || (secure == null)) {
317 return;
318 }
319
320 if (Validator.isNotNull(_localClusterNode.getPortalProtocol())) {
321 return;
322 }
323
324 if (secure) {
325 _localClusterNode.setPortalProtocol(Http.HTTPS);
326 }
327 else {
328 _localClusterNode.setPortalProtocol(Http.HTTP);
329 }
330
331 try {
332 _localClusterNode.setPort(port);
333
334 memberJoined(_localAddress, _localClusterNode);
335
336 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
337 ClusterMessageType.UPDATE, _localClusterNode);
338
339 sendJGroupsMessage(_controlJChannel, null, clusterRequest);
340 }
341 catch (Exception e) {
342 _log.error("Unable to determine configure node port", e);
343 }
344 }
345
346 @Override
347 public void removeClusterEventListener(
348 ClusterEventListener clusterEventListener) {
349
350 if (!isEnabled()) {
351 return;
352 }
353
354 _clusterEventListeners.remove(clusterEventListener);
355 }
356
357 public void setClusterEventListeners(
358 List<ClusterEventListener> clusterEventListeners) {
359
360 if (!isEnabled()) {
361 return;
362 }
363
364 _clusterEventListeners.addAllAbsent(clusterEventListeners);
365 }
366
367 public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
368 if (!isEnabled()) {
369 return;
370 }
371
372 _shortcutLocalMethod = shortcutLocalMethod;
373 }
374
375 protected void fireClusterEvent(ClusterEvent clusterEvent) {
376 for (ClusterEventListener listener : _clusterEventListeners) {
377 listener.processClusterEvent(clusterEvent);
378 }
379 }
380
381 protected ClusterNodeResponse generateClusterNodeResponse(
382 ClusterRequest clusterRequest, Object returnValue,
383 Exception exception) {
384
385 ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
386
387 clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
388 clusterNodeResponse.setClusterNode(getLocalClusterNode());
389 clusterNodeResponse.setClusterMessageType(
390 clusterRequest.getClusterMessageType());
391 clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
392 clusterNodeResponse.setUuid(clusterRequest.getUuid());
393
394 if (exception != null) {
395 clusterNodeResponse.setException(exception);
396 }
397 else {
398 if (returnValue instanceof Serializable) {
399 clusterNodeResponse.setResult(returnValue);
400 }
401 else if (returnValue != null) {
402 clusterNodeResponse.setException(
403 new ClusterException("Return value is not serializable"));
404 }
405 }
406
407 return clusterNodeResponse;
408 }
409
410 protected JChannel getControlChannel() {
411 return _controlJChannel;
412 }
413
414 protected FutureClusterResponses getExecutionResults(String uuid) {
415 return _futureClusterResponses.get(uuid);
416 }
417
418 @Override
419 protected void initChannels() throws Exception {
420 String channelName = PropsUtil.get(
421 PropsKeys.CLUSTER_LINK_CHANNEL_NAME_CONTROL);
422
423 if (Validator.isNull(channelName)) {
424 throw new IllegalStateException(
425 "Set \"" + PropsKeys.CLUSTER_LINK_CHANNEL_NAME_CONTROL +
426 "\"");
427 }
428
429 String controlProperty = PropsUtil.get(
430 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
431
432 if (Validator.isNull(controlProperty)) {
433 throw new IllegalStateException(
434 "Set \"" + PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL +
435 "\"");
436 }
437
438 ClusterRequestReceiver clusterRequestReceiver =
439 new ClusterRequestReceiver(this);
440
441 _controlJChannel = createJChannel(
442 controlProperty, clusterRequestReceiver, channelName);
443 }
444
445 protected void initLocalClusterNode() throws Exception {
446 InetAddress inetAddress = bindInetAddress;
447
448 if (inetAddress == null) {
449 inetAddress = InetAddressUtil.getLocalInetAddress();
450 }
451
452 ClusterNode localClusterNode = new ClusterNode(
453 PortalUUIDUtil.generate(), inetAddress);
454
455 if (StringUtil.equalsIgnoreCase(
456 Http.HTTPS, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
457 (PropsValues.PORTAL_INSTANCE_HTTPS_PORT > 0)) {
458
459 localClusterNode.setPortalProtocol(Http.HTTPS);
460 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTPS_PORT);
461 }
462 else if (StringUtil.equalsIgnoreCase(
463 Http.HTTP, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
464 (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0)) {
465
466 localClusterNode.setPortalProtocol(Http.HTTP);
467 localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
468 }
469 else {
470 if (_log.isWarnEnabled()) {
471 StringBundler sb = new StringBundler(8);
472
473 sb.append("Unable to configure node protocol and port. ");
474 sb.append("This configuration will be inferred dynamically ");
475 sb.append("from the first request but static configuration ");
476 sb.append("is recomended to avoid comunications problems ");
477 sb.append("between nodes. Please set the right values for ");
478 sb.append("\"portal.instance.protocol\" and ");
479 sb.append("\"portal.instance.http.port\" or ");
480 sb.append("\"portal.instance.https.port\".");
481
482 _log.warn(sb.toString());
483 }
484 }
485
486 _localClusterNode = localClusterNode;
487
488 if (_log.isDebugEnabled()) {
489 _log.debug(
490 "Initialized cluster node: " + localClusterNode.toString());
491 }
492 }
493
494 protected boolean isShortcutLocalMethod() {
495 return _shortcutLocalMethod;
496 }
497
498 protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
499 _liveInstances.put(joinAddress, clusterNode);
500
501 Address previousAddress = _clusterNodeAddresses.put(
502 clusterNode.getClusterNodeId(), joinAddress);
503
504 if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
505 ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
506
507
508
509 fireClusterEvent(clusterEvent);
510 }
511 }
512
513 protected void memberRemoved(List<Address> departAddresses) {
514 List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
515
516 for (Address departAddress : departAddresses) {
517 ClusterNode departClusterNode = _liveInstances.remove(
518 departAddress);
519
520 if (departClusterNode == null) {
521 continue;
522 }
523
524 departClusterNodes.add(departClusterNode);
525
526 _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
527 }
528
529 if (departClusterNodes.isEmpty()) {
530 return;
531 }
532
533 ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
534
535 fireClusterEvent(clusterEvent);
536 }
537
538 protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
539 boolean isMulticast = clusterRequest.isMulticast();
540
541 List<Address> addresses = null;
542
543 if (isMulticast) {
544 addresses = getAddresses(_controlJChannel);
545 }
546 else {
547 addresses = new ArrayList<Address>();
548
549 Collection<Address> clusterNodeAddresses =
550 clusterRequest.getTargetClusterNodeAddresses();
551
552 if (clusterNodeAddresses != null) {
553 addresses.addAll(clusterNodeAddresses);
554 }
555
556 Collection<String> clusterNodeIds =
557 clusterRequest.getTargetClusterNodeIds();
558
559 if (clusterNodeIds != null) {
560 for (String clusterNodeId : clusterNodeIds) {
561 Address address = _clusterNodeAddresses.get(clusterNodeId);
562
563 addresses.add(address);
564 }
565 }
566 }
567
568 if (clusterRequest.isSkipLocal()) {
569 addresses.remove(getLocalClusterNodeAddress());
570 }
571
572 return addresses;
573 }
574
575 protected void runLocalMethod(
576 ClusterRequest clusterRequest,
577 FutureClusterResponses futureClusterResponses) {
578
579 MethodHandler methodHandler = clusterRequest.getMethodHandler();
580
581 Object returnValue = null;
582 Exception exception = null;
583
584 if (methodHandler == null) {
585 exception = new ClusterException(
586 "Payload is not of type " + MethodHandler.class.getName());
587 }
588 else {
589 try {
590 returnValue = methodHandler.invoke(true);
591 }
592 catch (Exception e) {
593 exception = e;
594 }
595 }
596
597 if (!clusterRequest.isFireAndForget()) {
598 ClusterNodeResponse clusterNodeResponse =
599 generateClusterNodeResponse(
600 clusterRequest, returnValue, exception);
601
602 futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
603 }
604 }
605
606 protected void sendNotifyRequest() {
607 ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
608 ClusterMessageType.NOTIFY, _localClusterNode);
609
610 try {
611 sendJGroupsMessage(_controlJChannel, null, clusterRequest);
612 }
613 catch (Exception e) {
614 _log.error("Unable to send notify message", e);
615 }
616 }
617
618 private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
619
620 private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
621 new CopyOnWriteArrayList<ClusterEventListener>();
622 private Map<String, Address> _clusterNodeAddresses =
623 new ConcurrentHashMap<String, Address>();
624 private JChannel _controlJChannel;
625 private ExecutorService _executorService;
626 private Map<String, FutureClusterResponses> _futureClusterResponses =
627 new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
628 private Map<Address, ClusterNode> _liveInstances =
629 new ConcurrentHashMap<Address, ClusterNode>();
630 private Address _localAddress;
631 private ClusterNode _localClusterNode;
632 private boolean _shortcutLocalMethod;
633
634 private class ClusterResponseCallbackJob implements Runnable {
635
636 public ClusterResponseCallbackJob(
637 ClusterResponseCallback clusterResponseCallback,
638 FutureClusterResponses futureClusterResponses) {
639
640 _clusterResponseCallback = clusterResponseCallback;
641 _futureClusterResponses = futureClusterResponses;
642 _timeout = -1;
643 _timeoutGet = false;
644 _timeUnit = TimeUnit.SECONDS;
645 }
646
647 public ClusterResponseCallbackJob(
648 ClusterResponseCallback clusterResponseCallback,
649 FutureClusterResponses futureClusterResponses, long timeout,
650 TimeUnit timeUnit) {
651
652 _clusterResponseCallback = clusterResponseCallback;
653 _futureClusterResponses = futureClusterResponses;
654 _timeout = timeout;
655 _timeoutGet = true;
656 _timeUnit = timeUnit;
657 }
658
659 @Override
660 public void run() {
661 BlockingQueue<ClusterNodeResponse> blockingQueue =
662 _futureClusterResponses.getPartialResults();
663
664 _clusterResponseCallback.callback(blockingQueue);
665
666 ClusterNodeResponses clusterNodeResponses = null;
667
668 try {
669 if (_timeoutGet) {
670 clusterNodeResponses = _futureClusterResponses.get(
671 _timeout, _timeUnit);
672 }
673 else {
674 clusterNodeResponses = _futureClusterResponses.get();
675 }
676
677 _clusterResponseCallback.callback(clusterNodeResponses);
678 }
679 catch (InterruptedException ie) {
680 _clusterResponseCallback.processInterruptedException(ie);
681 }
682 catch (TimeoutException te) {
683 _clusterResponseCallback.processTimeoutException(te);
684 }
685 }
686
687 private final ClusterResponseCallback _clusterResponseCallback;
688 private final FutureClusterResponses _futureClusterResponses;
689 private final long _timeout;
690 private final boolean _timeoutGet;
691 private final TimeUnit _timeUnit;
692
693 }
694
695 }