001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034    import com.liferay.portal.kernel.util.Http;
035    import com.liferay.portal.kernel.util.InetAddressUtil;
036    import com.liferay.portal.kernel.util.MethodHandler;
037    import com.liferay.portal.kernel.util.PropsKeys;
038    import com.liferay.portal.kernel.util.StringBundler;
039    import com.liferay.portal.kernel.util.StringUtil;
040    import com.liferay.portal.kernel.util.Validator;
041    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
042    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
043    import com.liferay.portal.util.PortalPortEventListener;
044    import com.liferay.portal.util.PortalPortProtocolEventListener;
045    import com.liferay.portal.util.PortalUtil;
046    import com.liferay.portal.util.PropsUtil;
047    import com.liferay.portal.util.PropsValues;
048    
049    import java.io.Serializable;
050    
051    import java.net.InetAddress;
052    
053    import java.util.ArrayList;
054    import java.util.Collection;
055    import java.util.Collections;
056    import java.util.List;
057    import java.util.Map;
058    import java.util.concurrent.BlockingQueue;
059    import java.util.concurrent.ConcurrentHashMap;
060    import java.util.concurrent.CopyOnWriteArrayList;
061    import java.util.concurrent.ExecutorService;
062    import java.util.concurrent.TimeUnit;
063    import java.util.concurrent.TimeoutException;
064    
065    import org.jgroups.JChannel;
066    
067    /**
068     * @author Tina Tian
069     * @author Shuyang Zhou
070     */
071    @DoPrivileged
072    public class ClusterExecutorImpl
073            extends ClusterBase
074            implements ClusterExecutor, PortalPortEventListener,
075                               PortalPortProtocolEventListener {
076    
077            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
078                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
079    
080            @Override
081            public void addClusterEventListener(
082                    ClusterEventListener clusterEventListener) {
083    
084                    if (!isEnabled()) {
085                            return;
086                    }
087    
088                    _clusterEventListeners.addIfAbsent(clusterEventListener);
089            }
090    
091            @Override
092            public void afterPropertiesSet() {
093                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
094                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
095                    }
096    
097                    if (PropsValues.LIVE_USERS_ENABLED) {
098                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
099                    }
100    
101                    super.afterPropertiesSet();
102            }
103    
104            @Override
105            public void destroy() {
106                    if (!isEnabled()) {
107                            return;
108                    }
109    
110                    PortalExecutorManagerUtil.shutdown(
111                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
112    
113                    _controlJChannel.setReceiver(null);
114    
115                    _controlJChannel.close();
116    
117                    _clusterEventListeners.clear();
118                    _clusterNodeAddresses.clear();
119                    _futureClusterResponses.clear();
120                    _liveInstances.clear();
121                    _localAddress = null;
122                    _localClusterNode = null;
123            }
124    
125            @Override
126            public FutureClusterResponses execute(ClusterRequest clusterRequest)
127                    throws SystemException {
128    
129                    if (!isEnabled()) {
130                            return null;
131                    }
132    
133                    List<Address> addresses = prepareAddresses(clusterRequest);
134    
135                    FutureClusterResponses futureClusterResponses =
136                            new FutureClusterResponses(addresses);
137    
138                    if (!clusterRequest.isFireAndForget()) {
139                            String uuid = clusterRequest.getUuid();
140    
141                            _futureClusterResponses.put(uuid, futureClusterResponses);
142                    }
143    
144                    if (_shortcutLocalMethod &&
145                            addresses.remove(getLocalClusterNodeAddress())) {
146    
147                            runLocalMethod(clusterRequest, futureClusterResponses);
148                    }
149    
150                    if (clusterRequest.isMulticast()) {
151                            try {
152                                    sendJGroupsMessage(_controlJChannel, null, clusterRequest);
153                            }
154                            catch (Exception e) {
155                                    throw new SystemException(
156                                            "Unable to send multicast request", e);
157                            }
158                    }
159                    else {
160                            for (Address address : addresses) {
161                                    org.jgroups.Address jGroupsAddress =
162                                            (org.jgroups.Address)address.getRealAddress();
163    
164                                    try {
165                                            sendJGroupsMessage(
166                                                    _controlJChannel, jGroupsAddress, clusterRequest);
167                                    }
168                                    catch (Exception e) {
169                                            throw new SystemException(
170                                                    "Unable to send unicast request", e);
171                                    }
172                            }
173                    }
174    
175                    return futureClusterResponses;
176            }
177    
178            @Override
179            public void execute(
180                            ClusterRequest clusterRequest,
181                            ClusterResponseCallback clusterResponseCallback)
182                    throws SystemException {
183    
184                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
185    
186                    ClusterResponseCallbackJob clusterResponseCallbackJob =
187                            new ClusterResponseCallbackJob(
188                                    clusterResponseCallback, futureClusterResponses);
189    
190                    _executorService.execute(clusterResponseCallbackJob);
191            }
192    
193            @Override
194            public void execute(
195                            ClusterRequest clusterRequest,
196                            ClusterResponseCallback clusterResponseCallback, long timeout,
197                            TimeUnit timeUnit)
198                    throws SystemException {
199    
200                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
201    
202                    ClusterResponseCallbackJob clusterResponseCallbackJob =
203                            new ClusterResponseCallbackJob(
204                                    clusterResponseCallback, futureClusterResponses, timeout,
205                                    timeUnit);
206    
207                    _executorService.execute(clusterResponseCallbackJob);
208            }
209    
210            @Override
211            public List<ClusterEventListener> getClusterEventListeners() {
212                    if (!isEnabled()) {
213                            return Collections.emptyList();
214                    }
215    
216                    return Collections.unmodifiableList(_clusterEventListeners);
217            }
218    
219            @Override
220            public List<Address> getClusterNodeAddresses() {
221                    if (!isEnabled()) {
222                            return Collections.emptyList();
223                    }
224    
225                    return getAddresses(_controlJChannel);
226            }
227    
228            @Override
229            public List<ClusterNode> getClusterNodes() {
230                    if (!isEnabled()) {
231                            return Collections.emptyList();
232                    }
233    
234                    return new ArrayList<ClusterNode>(_liveInstances.values());
235            }
236    
237            @Override
238            public ClusterNode getLocalClusterNode() {
239                    if (!isEnabled()) {
240                            return null;
241                    }
242    
243                    return _localClusterNode;
244            }
245    
246            @Override
247            public Address getLocalClusterNodeAddress() {
248                    if (!isEnabled()) {
249                            return null;
250                    }
251    
252                    return _localAddress;
253            }
254    
255            @Override
256            public void initialize() {
257                    if (!isEnabled()) {
258                            return;
259                    }
260    
261                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
262                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
263    
264                    PortalUtil.addPortalPortProtocolEventListener(this);
265    
266                    _localAddress = new AddressImpl(_controlJChannel.getAddress());
267    
268                    try {
269                            initLocalClusterNode();
270    
271                            memberJoined(_localAddress, _localClusterNode);
272    
273                            sendNotifyRequest();
274                    }
275                    catch (Exception e) {
276                            _log.error("Unable to determine local network address", e);
277                    }
278    
279                    ClusterRequestReceiver clusterRequestReceiver =
280                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
281    
282                    clusterRequestReceiver.openLatch();
283            }
284    
285            @Override
286            public boolean isClusterNodeAlive(Address address) {
287                    if (!isEnabled()) {
288                            return false;
289                    }
290    
291                    List<Address> addresses = getAddresses(_controlJChannel);
292    
293                    return addresses.contains(address);
294            }
295    
296            @Override
297            public boolean isClusterNodeAlive(String clusterNodeId) {
298                    if (!isEnabled()) {
299                            return false;
300                    }
301    
302                    return _clusterNodeAddresses.containsKey(clusterNodeId);
303            }
304    
305            /**
306             * @deprecated As of 6.2.0, replaced by {@link
307             *             #portalPortProtocolConfigured(int, Boolean)}
308             */
309            @Override
310            public void portalPortConfigured(int port) {
311                    portalPortProtocolConfigured(port, null);
312            }
313    
314            @Override
315            public void portalPortProtocolConfigured(int port, Boolean secure) {
316                    if (!isEnabled() || (port <= 0) || (secure == null)) {
317                            return;
318                    }
319    
320                    if (Validator.isNotNull(_localClusterNode.getPortalProtocol())) {
321                            return;
322                    }
323    
324                    if (secure) {
325                            _localClusterNode.setPortalProtocol(Http.HTTPS);
326                    }
327                    else {
328                            _localClusterNode.setPortalProtocol(Http.HTTP);
329                    }
330    
331                    try {
332                            _localClusterNode.setPort(port);
333    
334                            memberJoined(_localAddress, _localClusterNode);
335    
336                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
337                                    ClusterMessageType.UPDATE, _localClusterNode);
338    
339                            sendJGroupsMessage(_controlJChannel, null, clusterRequest);
340                    }
341                    catch (Exception e) {
342                            _log.error("Unable to determine configure node port", e);
343                    }
344            }
345    
346            @Override
347            public void removeClusterEventListener(
348                    ClusterEventListener clusterEventListener) {
349    
350                    if (!isEnabled()) {
351                            return;
352                    }
353    
354                    _clusterEventListeners.remove(clusterEventListener);
355            }
356    
357            public void setClusterEventListeners(
358                    List<ClusterEventListener> clusterEventListeners) {
359    
360                    if (!isEnabled()) {
361                            return;
362                    }
363    
364                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
365            }
366    
367            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
368                    if (!isEnabled()) {
369                            return;
370                    }
371    
372                    _shortcutLocalMethod = shortcutLocalMethod;
373            }
374    
375            protected void fireClusterEvent(ClusterEvent clusterEvent) {
376                    for (ClusterEventListener listener : _clusterEventListeners) {
377                            listener.processClusterEvent(clusterEvent);
378                    }
379            }
380    
381            protected ClusterNodeResponse generateClusterNodeResponse(
382                    ClusterRequest clusterRequest, Object returnValue,
383                    Exception exception) {
384    
385                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
386    
387                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
388                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
389                    clusterNodeResponse.setClusterMessageType(
390                            clusterRequest.getClusterMessageType());
391                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
392                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
393    
394                    if (exception != null) {
395                            clusterNodeResponse.setException(exception);
396                    }
397                    else {
398                            if (returnValue instanceof Serializable) {
399                                    clusterNodeResponse.setResult(returnValue);
400                            }
401                            else if (returnValue != null) {
402                                    clusterNodeResponse.setException(
403                                            new ClusterException("Return value is not serializable"));
404                            }
405                    }
406    
407                    return clusterNodeResponse;
408            }
409    
410            protected JChannel getControlChannel() {
411                    return _controlJChannel;
412            }
413    
414            protected FutureClusterResponses getExecutionResults(String uuid) {
415                    return _futureClusterResponses.get(uuid);
416            }
417    
418            @Override
419            protected void initChannels() throws Exception {
420                    String channelName = PropsUtil.get(
421                            PropsKeys.CLUSTER_LINK_CHANNEL_NAME_CONTROL);
422    
423                    if (Validator.isNull(channelName)) {
424                            throw new IllegalStateException(
425                                    "Set \"" + PropsKeys.CLUSTER_LINK_CHANNEL_NAME_CONTROL +
426                                            "\"");
427                    }
428    
429                    String controlProperty = PropsUtil.get(
430                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
431    
432                    if (Validator.isNull(controlProperty)) {
433                            throw new IllegalStateException(
434                                    "Set \"" + PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL +
435                                            "\"");
436                    }
437    
438                    ClusterRequestReceiver clusterRequestReceiver =
439                            new ClusterRequestReceiver(this);
440    
441                    _controlJChannel = createJChannel(
442                            controlProperty, clusterRequestReceiver, channelName);
443            }
444    
445            protected void initLocalClusterNode() throws Exception {
446                    InetAddress inetAddress = bindInetAddress;
447    
448                    if (inetAddress == null) {
449                            inetAddress = InetAddressUtil.getLocalInetAddress();
450                    }
451    
452                    ClusterNode localClusterNode = new ClusterNode(
453                            PortalUUIDUtil.generate(), inetAddress);
454    
455                    if (StringUtil.equalsIgnoreCase(
456                                    Http.HTTPS, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
457                            (PropsValues.PORTAL_INSTANCE_HTTPS_PORT > 0)) {
458    
459                            localClusterNode.setPortalProtocol(Http.HTTPS);
460                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTPS_PORT);
461                    }
462                    else if (StringUtil.equalsIgnoreCase(
463                                            Http.HTTP, PropsValues.PORTAL_INSTANCE_PROTOCOL) &&
464                                     (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0)) {
465    
466                            localClusterNode.setPortalProtocol(Http.HTTP);
467                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
468                    }
469                    else {
470                            if (_log.isWarnEnabled()) {
471                                    StringBundler sb = new StringBundler(8);
472    
473                                    sb.append("Unable to configure node protocol and port. ");
474                                    sb.append("This configuration will be inferred dynamically ");
475                                    sb.append("from the first request but static configuration ");
476                                    sb.append("is recomended to avoid comunications problems ");
477                                    sb.append("between nodes. Please set the right values for ");
478                                    sb.append("\"portal.instance.protocol\" and ");
479                                    sb.append("\"portal.instance.http.port\" or ");
480                                    sb.append("\"portal.instance.https.port\".");
481    
482                                    _log.warn(sb.toString());
483                            }
484                    }
485    
486                    _localClusterNode = localClusterNode;
487    
488                    if (_log.isDebugEnabled()) {
489                            _log.debug(
490                                    "Initialized cluster node: " + localClusterNode.toString());
491                    }
492            }
493    
494            protected boolean isShortcutLocalMethod() {
495                    return _shortcutLocalMethod;
496            }
497    
498            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
499                    _liveInstances.put(joinAddress, clusterNode);
500    
501                    Address previousAddress = _clusterNodeAddresses.put(
502                            clusterNode.getClusterNodeId(), joinAddress);
503    
504                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
505                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
506    
507                            // PLACEHOLDER
508    
509                            fireClusterEvent(clusterEvent);
510                    }
511            }
512    
513            protected void memberRemoved(List<Address> departAddresses) {
514                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
515    
516                    for (Address departAddress : departAddresses) {
517                            ClusterNode departClusterNode = _liveInstances.remove(
518                                    departAddress);
519    
520                            if (departClusterNode == null) {
521                                    continue;
522                            }
523    
524                            departClusterNodes.add(departClusterNode);
525    
526                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
527                    }
528    
529                    if (departClusterNodes.isEmpty()) {
530                            return;
531                    }
532    
533                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
534    
535                    fireClusterEvent(clusterEvent);
536            }
537    
538            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
539                    boolean isMulticast = clusterRequest.isMulticast();
540    
541                    List<Address> addresses = null;
542    
543                    if (isMulticast) {
544                            addresses = getAddresses(_controlJChannel);
545                    }
546                    else {
547                            addresses = new ArrayList<Address>();
548    
549                            Collection<Address> clusterNodeAddresses =
550                                    clusterRequest.getTargetClusterNodeAddresses();
551    
552                            if (clusterNodeAddresses != null) {
553                                    addresses.addAll(clusterNodeAddresses);
554                            }
555    
556                            Collection<String> clusterNodeIds =
557                                    clusterRequest.getTargetClusterNodeIds();
558    
559                            if (clusterNodeIds != null) {
560                                    for (String clusterNodeId : clusterNodeIds) {
561                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
562    
563                                            addresses.add(address);
564                                    }
565                            }
566                    }
567    
568                    if (clusterRequest.isSkipLocal()) {
569                            addresses.remove(getLocalClusterNodeAddress());
570                    }
571    
572                    return addresses;
573            }
574    
575            protected void runLocalMethod(
576                    ClusterRequest clusterRequest,
577                    FutureClusterResponses futureClusterResponses) {
578    
579                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
580    
581                    Object returnValue = null;
582                    Exception exception = null;
583    
584                    if (methodHandler == null) {
585                            exception = new ClusterException(
586                                    "Payload is not of type " + MethodHandler.class.getName());
587                    }
588                    else {
589                            try {
590                                    returnValue = methodHandler.invoke(true);
591                            }
592                            catch (Exception e) {
593                                    exception = e;
594                            }
595                    }
596    
597                    if (!clusterRequest.isFireAndForget()) {
598                            ClusterNodeResponse clusterNodeResponse =
599                                    generateClusterNodeResponse(
600                                            clusterRequest, returnValue, exception);
601    
602                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
603                    }
604            }
605    
606            protected void sendNotifyRequest() {
607                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
608                            ClusterMessageType.NOTIFY, _localClusterNode);
609    
610                    try {
611                            sendJGroupsMessage(_controlJChannel, null, clusterRequest);
612                    }
613                    catch (Exception e) {
614                            _log.error("Unable to send notify message", e);
615                    }
616            }
617    
618            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
619    
620            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
621                    new CopyOnWriteArrayList<ClusterEventListener>();
622            private Map<String, Address> _clusterNodeAddresses =
623                    new ConcurrentHashMap<String, Address>();
624            private JChannel _controlJChannel;
625            private ExecutorService _executorService;
626            private Map<String, FutureClusterResponses> _futureClusterResponses =
627                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
628            private Map<Address, ClusterNode> _liveInstances =
629                    new ConcurrentHashMap<Address, ClusterNode>();
630            private Address _localAddress;
631            private ClusterNode _localClusterNode;
632            private boolean _shortcutLocalMethod;
633    
634            private class ClusterResponseCallbackJob implements Runnable {
635    
636                    public ClusterResponseCallbackJob(
637                            ClusterResponseCallback clusterResponseCallback,
638                            FutureClusterResponses futureClusterResponses) {
639    
640                            _clusterResponseCallback = clusterResponseCallback;
641                            _futureClusterResponses = futureClusterResponses;
642                            _timeout = -1;
643                            _timeoutGet = false;
644                            _timeUnit = TimeUnit.SECONDS;
645                    }
646    
647                    public ClusterResponseCallbackJob(
648                            ClusterResponseCallback clusterResponseCallback,
649                            FutureClusterResponses futureClusterResponses, long timeout,
650                            TimeUnit timeUnit) {
651    
652                            _clusterResponseCallback = clusterResponseCallback;
653                            _futureClusterResponses = futureClusterResponses;
654                            _timeout = timeout;
655                            _timeoutGet = true;
656                            _timeUnit = timeUnit;
657                    }
658    
659                    @Override
660                    public void run() {
661                            BlockingQueue<ClusterNodeResponse> blockingQueue =
662                                    _futureClusterResponses.getPartialResults();
663    
664                            _clusterResponseCallback.callback(blockingQueue);
665    
666                            ClusterNodeResponses clusterNodeResponses = null;
667    
668                            try {
669                                    if (_timeoutGet) {
670                                            clusterNodeResponses = _futureClusterResponses.get(
671                                                    _timeout, _timeUnit);
672                                    }
673                                    else {
674                                            clusterNodeResponses = _futureClusterResponses.get();
675                                    }
676    
677                                    _clusterResponseCallback.callback(clusterNodeResponses);
678                            }
679                            catch (InterruptedException ie) {
680                                    _clusterResponseCallback.processInterruptedException(ie);
681                            }
682                            catch (TimeoutException te) {
683                                    _clusterResponseCallback.processTimeoutException(te);
684                            }
685                    }
686    
687                    private final ClusterResponseCallback _clusterResponseCallback;
688                    private final FutureClusterResponses _futureClusterResponses;
689                    private final long _timeout;
690                    private final boolean _timeoutGet;
691                    private final TimeUnit _timeUnit;
692    
693            }
694    
695    }