001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.ClusterResponseCallback;
028    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
029    import com.liferay.portal.kernel.exception.SystemException;
030    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
031    import com.liferay.portal.kernel.log.Log;
032    import com.liferay.portal.kernel.log.LogFactoryUtil;
033    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
034    import com.liferay.portal.kernel.util.InetAddressUtil;
035    import com.liferay.portal.kernel.util.MethodHandler;
036    import com.liferay.portal.kernel.util.PropsKeys;
037    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
038    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
039    import com.liferay.portal.util.PortalPortEventListener;
040    import com.liferay.portal.util.PortalUtil;
041    import com.liferay.portal.util.PropsUtil;
042    import com.liferay.portal.util.PropsValues;
043    
044    import java.io.Serializable;
045    
046    import java.net.InetAddress;
047    
048    import java.util.ArrayList;
049    import java.util.Collection;
050    import java.util.Collections;
051    import java.util.List;
052    import java.util.Map;
053    import java.util.Properties;
054    import java.util.concurrent.BlockingQueue;
055    import java.util.concurrent.ConcurrentHashMap;
056    import java.util.concurrent.CopyOnWriteArrayList;
057    import java.util.concurrent.ExecutorService;
058    import java.util.concurrent.TimeUnit;
059    import java.util.concurrent.TimeoutException;
060    
061    import org.jgroups.ChannelException;
062    import org.jgroups.JChannel;
063    
064    /**
065     * @author Tina Tian
066     * @author Shuyang Zhou
067     */
068    @DoPrivileged
069    public class ClusterExecutorImpl
070            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
071    
072            public static final String CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL =
073                    "CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL";
074    
075            @Override
076            public void addClusterEventListener(
077                    ClusterEventListener clusterEventListener) {
078    
079                    if (!isEnabled()) {
080                            return;
081                    }
082    
083                    _clusterEventListeners.addIfAbsent(clusterEventListener);
084            }
085    
086            @Override
087            public void afterPropertiesSet() {
088                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
089                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
090                    }
091    
092                    if (PropsValues.LIVE_USERS_ENABLED) {
093                            addClusterEventListener(new LiveUsersClusterEventListenerImpl());
094                    }
095    
096                    super.afterPropertiesSet();
097            }
098    
099            @Override
100            public void destroy() {
101                    if (!isEnabled()) {
102                            return;
103                    }
104    
105                    PortalExecutorManagerUtil.shutdown(
106                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL, true);
107    
108                    _controlJChannel.setReceiver(null);
109                    _controlJChannel.close();
110    
111                    _clusterEventListeners.clear();
112                    _clusterNodeAddresses.clear();
113                    _futureClusterResponses.clear();
114                    _liveInstances.clear();
115                    _localAddress = null;
116                    _localClusterNode = null;
117            }
118    
119            @Override
120            public FutureClusterResponses execute(ClusterRequest clusterRequest)
121                    throws SystemException {
122    
123                    if (!isEnabled()) {
124                            return null;
125                    }
126    
127                    List<Address> addresses = prepareAddresses(clusterRequest);
128    
129                    FutureClusterResponses futureClusterResponses =
130                            new FutureClusterResponses(addresses);
131    
132                    if (!clusterRequest.isFireAndForget()) {
133                            String uuid = clusterRequest.getUuid();
134    
135                            _futureClusterResponses.put(uuid, futureClusterResponses);
136                    }
137    
138                    if (_shortcutLocalMethod &&
139                            addresses.remove(getLocalClusterNodeAddress())) {
140    
141                            runLocalMethod(clusterRequest, futureClusterResponses);
142                    }
143    
144                    if (clusterRequest.isMulticast()) {
145                            try {
146                                    _controlJChannel.send(null, null, clusterRequest);
147                            }
148                            catch (Exception e) {
149                                    throw new SystemException(
150                                            "Unable to send multicast request", e);
151                            }
152                    }
153                    else {
154                            for (Address address : addresses) {
155                                    org.jgroups.Address jGroupsAddress =
156                                            (org.jgroups.Address)address.getRealAddress();
157    
158                                    try {
159                                            _controlJChannel.send(jGroupsAddress, null, clusterRequest);
160                                    }
161                                    catch (Exception e) {
162                                            throw new SystemException(
163                                                    "Unable to send unicast request", e);
164                                    }
165                            }
166                    }
167    
168                    return futureClusterResponses;
169            }
170    
171            @Override
172            public void execute(
173                            ClusterRequest clusterRequest,
174                            ClusterResponseCallback clusterResponseCallback)
175                    throws SystemException {
176    
177                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
178    
179                    ClusterResponseCallbackJob clusterResponseCallbackJob =
180                            new ClusterResponseCallbackJob(
181                                    clusterResponseCallback, futureClusterResponses);
182    
183                    _executorService.execute(clusterResponseCallbackJob);
184            }
185    
186            @Override
187            public void execute(
188                            ClusterRequest clusterRequest,
189                            ClusterResponseCallback clusterResponseCallback, long timeout,
190                            TimeUnit timeUnit)
191                    throws SystemException {
192    
193                    FutureClusterResponses futureClusterResponses = execute(clusterRequest);
194    
195                    ClusterResponseCallbackJob clusterResponseCallbackJob =
196                            new ClusterResponseCallbackJob(
197                                    clusterResponseCallback, futureClusterResponses, timeout,
198                                    timeUnit);
199    
200                    _executorService.execute(clusterResponseCallbackJob);
201            }
202    
203            @Override
204            public List<ClusterEventListener> getClusterEventListeners() {
205                    if (!isEnabled()) {
206                            return Collections.emptyList();
207                    }
208    
209                    return Collections.unmodifiableList(_clusterEventListeners);
210            }
211    
212            @Override
213            public List<Address> getClusterNodeAddresses() {
214                    if (!isEnabled()) {
215                            return Collections.emptyList();
216                    }
217    
218                    return getAddresses(_controlJChannel);
219            }
220    
221            @Override
222            public List<ClusterNode> getClusterNodes() {
223                    if (!isEnabled()) {
224                            return Collections.emptyList();
225                    }
226    
227                    return new ArrayList<ClusterNode>(_liveInstances.values());
228            }
229    
230            @Override
231            public ClusterNode getLocalClusterNode() {
232                    if (!isEnabled()) {
233                            return null;
234                    }
235    
236                    return _localClusterNode;
237            }
238    
239            @Override
240            public Address getLocalClusterNodeAddress() {
241                    if (!isEnabled()) {
242                            return null;
243                    }
244    
245                    return _localAddress;
246            }
247    
248            @Override
249            public void initialize() {
250                    if (!isEnabled()) {
251                            return;
252                    }
253    
254                    _executorService = PortalExecutorManagerUtil.getPortalExecutor(
255                            CLUSTER_EXECUTOR_CALLBACK_THREAD_POOL);
256    
257                    PortalUtil.addPortalPortEventListener(this);
258    
259                    _localAddress = new AddressImpl(_controlJChannel.getLocalAddress());
260    
261                    try {
262                            initLocalClusterNode();
263    
264                            memberJoined(_localAddress, _localClusterNode);
265    
266                            sendNotifyRequest();
267                    }
268                    catch (Exception e) {
269                            _log.error("Unable to determine local network address", e);
270                    }
271    
272                    ClusterRequestReceiver clusterRequestReceiver =
273                            (ClusterRequestReceiver)_controlJChannel.getReceiver();
274    
275                    clusterRequestReceiver.openLatch();
276            }
277    
278            @Override
279            public boolean isClusterNodeAlive(Address address) {
280                    if (!isEnabled()) {
281                            return false;
282                    }
283    
284                    List<Address> addresses = getAddresses(_controlJChannel);
285    
286                    return addresses.contains(address);
287            }
288    
289            @Override
290            public boolean isClusterNodeAlive(String clusterNodeId) {
291                    if (!isEnabled()) {
292                            return false;
293                    }
294    
295                    return _clusterNodeAddresses.containsKey(clusterNodeId);
296            }
297    
298            @Override
299            public void portalPortConfigured(int port) {
300                    if (!isEnabled() ||
301                            (_localClusterNode.getPort() ==
302                                    PropsValues.PORTAL_INSTANCE_HTTP_PORT)) {
303    
304                            return;
305                    }
306    
307                    try {
308                            _localClusterNode.setPort(port);
309    
310                            memberJoined(_localAddress, _localClusterNode);
311    
312                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
313                                    ClusterMessageType.UPDATE, _localClusterNode);
314    
315                            _controlJChannel.send(null, null, clusterRequest);
316                    }
317                    catch (Exception e) {
318                            _log.error("Unable to determine configure node port", e);
319                    }
320            }
321    
322            @Override
323            public void removeClusterEventListener(
324                    ClusterEventListener clusterEventListener) {
325    
326                    if (!isEnabled()) {
327                            return;
328                    }
329    
330                    _clusterEventListeners.remove(clusterEventListener);
331            }
332    
333            public void setClusterEventListeners(
334                    List<ClusterEventListener> clusterEventListeners) {
335    
336                    if (!isEnabled()) {
337                            return;
338                    }
339    
340                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
341            }
342    
343            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
344                    if (!isEnabled()) {
345                            return;
346                    }
347    
348                    _shortcutLocalMethod = shortcutLocalMethod;
349            }
350    
351            protected void fireClusterEvent(ClusterEvent clusterEvent) {
352                    for (ClusterEventListener listener : _clusterEventListeners) {
353                            listener.processClusterEvent(clusterEvent);
354                    }
355            }
356    
357            protected ClusterNodeResponse generateClusterNodeResponse(
358                    ClusterRequest clusterRequest, Object returnValue,
359                    Exception exception) {
360    
361                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
362    
363                    clusterNodeResponse.setAddress(getLocalClusterNodeAddress());
364                    clusterNodeResponse.setClusterNode(getLocalClusterNode());
365                    clusterNodeResponse.setClusterMessageType(
366                            clusterRequest.getClusterMessageType());
367                    clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
368                    clusterNodeResponse.setUuid(clusterRequest.getUuid());
369    
370                    if (exception != null) {
371                            clusterNodeResponse.setException(exception);
372                    }
373                    else {
374                            if (returnValue instanceof Serializable) {
375                                    clusterNodeResponse.setResult(returnValue);
376                            }
377                            else if (returnValue != null) {
378                                    clusterNodeResponse.setException(
379                                            new ClusterException("Return value is not serializable"));
380                            }
381                    }
382    
383                    return clusterNodeResponse;
384            }
385    
386            protected JChannel getControlChannel() {
387                    return _controlJChannel;
388            }
389    
390            protected FutureClusterResponses getExecutionResults(String uuid) {
391                    return _futureClusterResponses.get(uuid);
392            }
393    
394            @Override
395            protected void initChannels() throws ChannelException {
396                    Properties controlProperties = PropsUtil.getProperties(
397                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
398    
399                    String controlProperty = controlProperties.getProperty(
400                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
401    
402                    ClusterRequestReceiver clusterRequestReceiver =
403                            new ClusterRequestReceiver(this);
404    
405                    _controlJChannel = createJChannel(
406                            controlProperty, clusterRequestReceiver, _DEFAULT_CLUSTER_NAME);
407            }
408    
409            protected void initLocalClusterNode() throws Exception {
410                    InetAddress inetAddress = bindInetAddress;
411    
412                    if (inetAddress == null) {
413                            inetAddress = InetAddressUtil.getLocalInetAddress();
414                    }
415    
416                    ClusterNode localClusterNode = new ClusterNode(
417                            PortalUUIDUtil.generate(), inetAddress);
418    
419                    if (PropsValues.PORTAL_INSTANCE_HTTP_PORT > 0) {
420                            localClusterNode.setPort(PropsValues.PORTAL_INSTANCE_HTTP_PORT);
421                    }
422                    else {
423                            localClusterNode.setPort(PortalUtil.getPortalPort(false));
424                    }
425    
426                    _localClusterNode = localClusterNode;
427            }
428    
429            protected boolean isShortcutLocalMethod() {
430                    return _shortcutLocalMethod;
431            }
432    
433            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
434                    _liveInstances.put(joinAddress, clusterNode);
435    
436                    Address previousAddress = _clusterNodeAddresses.put(
437                            clusterNode.getClusterNodeId(), joinAddress);
438    
439                    if ((previousAddress == null) && !_localAddress.equals(joinAddress)) {
440                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
441    
442                            fireClusterEvent(clusterEvent);
443                    }
444            }
445    
446            protected void memberRemoved(List<Address> departAddresses) {
447                    List<ClusterNode> departClusterNodes = new ArrayList<ClusterNode>();
448    
449                    for (Address departAddress : departAddresses) {
450                            ClusterNode departClusterNode = _liveInstances.remove(
451                                    departAddress);
452    
453                            if (departClusterNode == null) {
454                                    continue;
455                            }
456    
457                            departClusterNodes.add(departClusterNode);
458    
459                            _clusterNodeAddresses.remove(departClusterNode.getClusterNodeId());
460                    }
461    
462                    if (departClusterNodes.isEmpty()) {
463                            return;
464                    }
465    
466                    ClusterEvent clusterEvent = ClusterEvent.depart(departClusterNodes);
467    
468                    fireClusterEvent(clusterEvent);
469            }
470    
471            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
472                    boolean isMulticast = clusterRequest.isMulticast();
473    
474                    List<Address> addresses = null;
475    
476                    if (isMulticast) {
477                            addresses = getAddresses(_controlJChannel);
478                    }
479                    else {
480                            addresses = new ArrayList<Address>();
481    
482                            Collection<Address> clusterNodeAddresses =
483                                    clusterRequest.getTargetClusterNodeAddresses();
484    
485                            if (clusterNodeAddresses != null) {
486                                    addresses.addAll(clusterNodeAddresses);
487                            }
488    
489                            Collection<String> clusterNodeIds =
490                                    clusterRequest.getTargetClusterNodeIds();
491    
492                            if (clusterNodeIds != null) {
493                                    for (String clusterNodeId : clusterNodeIds) {
494                                            Address address = _clusterNodeAddresses.get(clusterNodeId);
495    
496                                            addresses.add(address);
497                                    }
498                            }
499                    }
500    
501                    if (clusterRequest.isSkipLocal()) {
502                            addresses.remove(getLocalClusterNodeAddress());
503                    }
504    
505                    return addresses;
506            }
507    
508            protected void runLocalMethod(
509                    ClusterRequest clusterRequest,
510                    FutureClusterResponses futureClusterResponses) {
511    
512                    MethodHandler methodHandler = clusterRequest.getMethodHandler();
513                    Object returnValue = null;
514                    Exception exception = null;
515    
516                    if (methodHandler == null) {
517                            exception = new ClusterException(
518                                    "Payload is not of type " + MethodHandler.class.getName());
519                    }
520                    else {
521                            try {
522                                    returnValue = methodHandler.invoke(true);
523                            }
524                            catch (Exception e) {
525                                    exception = e;
526                            }
527                    }
528    
529                    if (!clusterRequest.isFireAndForget()) {
530                            ClusterNodeResponse clusterNodeResponse =
531                                    generateClusterNodeResponse(
532                                            clusterRequest, returnValue, exception);
533    
534                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
535                    }
536            }
537    
538            protected void sendNotifyRequest() {
539                    ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
540                            ClusterMessageType.NOTIFY, _localClusterNode);
541    
542                    try {
543                            _controlJChannel.send(null, null, clusterRequest);
544                    }
545                    catch (Exception e) {
546                            _log.error("Unable to send notify message", e);
547                    }
548            }
549    
550            private static final String _DEFAULT_CLUSTER_NAME =
551                    "LIFERAY-CONTROL-CHANNEL";
552    
553            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
554    
555            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
556                    new CopyOnWriteArrayList<ClusterEventListener>();
557            private Map<String, Address> _clusterNodeAddresses =
558                    new ConcurrentHashMap<String, Address>();
559            private JChannel _controlJChannel;
560            private ExecutorService _executorService;
561            private Map<String, FutureClusterResponses> _futureClusterResponses =
562                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
563            private Map<Address, ClusterNode> _liveInstances =
564                    new ConcurrentHashMap<Address, ClusterNode>();
565            private Address _localAddress;
566            private ClusterNode _localClusterNode;
567            private boolean _shortcutLocalMethod;
568    
569            private class ClusterResponseCallbackJob implements Runnable {
570    
571                    public ClusterResponseCallbackJob(
572                            ClusterResponseCallback clusterResponseCallback,
573                            FutureClusterResponses futureClusterResponses) {
574    
575                            _clusterResponseCallback = clusterResponseCallback;
576                            _futureClusterResponses = futureClusterResponses;
577                            _timeout = -1;
578                            _timeoutGet = false;
579                            _timeUnit = TimeUnit.SECONDS;
580                    }
581    
582                    public ClusterResponseCallbackJob(
583                            ClusterResponseCallback clusterResponseCallback,
584                            FutureClusterResponses futureClusterResponses, long timeout,
585                            TimeUnit timeUnit) {
586    
587                            _clusterResponseCallback = clusterResponseCallback;
588                            _futureClusterResponses = futureClusterResponses;
589                            _timeout = timeout;
590                            _timeoutGet = true;
591                            _timeUnit = timeUnit;
592                    }
593    
594                    @Override
595                    public void run() {
596                            BlockingQueue<ClusterNodeResponse> blockingQueue =
597                                    _futureClusterResponses.getPartialResults();
598    
599                            _clusterResponseCallback.callback(blockingQueue);
600    
601                            ClusterNodeResponses clusterNodeResponses = null;
602    
603                            try {
604                                    if (_timeoutGet) {
605                                            clusterNodeResponses = _futureClusterResponses.get(
606                                                    _timeout, _timeUnit);
607                                    }
608                                    else {
609                                            clusterNodeResponses = _futureClusterResponses.get();
610                                    }
611    
612                                    _clusterResponseCallback.callback(clusterNodeResponses);
613                            }
614                            catch (InterruptedException ie) {
615                                    _clusterResponseCallback.processInterruptedException(ie);
616                            }
617                            catch (TimeoutException te) {
618                                    _clusterResponseCallback.processTimeoutException(te);
619                            }
620                    }
621    
622                    private final ClusterResponseCallback _clusterResponseCallback;
623                    private final FutureClusterResponses _futureClusterResponses;
624                    private final long _timeout;
625                    private final boolean _timeoutGet;
626                    private final TimeUnit _timeUnit;
627    
628            }
629    
630    }