001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterException;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMessageType;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.exception.SystemException;
028    import com.liferay.portal.kernel.log.Log;
029    import com.liferay.portal.kernel.log.LogFactoryUtil;
030    import com.liferay.portal.kernel.util.InetAddressUtil;
031    import com.liferay.portal.kernel.util.MethodHandler;
032    import com.liferay.portal.kernel.util.PropsKeys;
033    import com.liferay.portal.kernel.util.PropsUtil;
034    import com.liferay.portal.kernel.util.WeakValueConcurrentHashMap;
035    import com.liferay.portal.kernel.uuid.PortalUUIDUtil;
036    import com.liferay.portal.util.PortalPortEventListener;
037    import com.liferay.portal.util.PortalUtil;
038    import com.liferay.portal.util.PropsValues;
039    
040    import java.io.Serializable;
041    
042    import java.net.InetAddress;
043    
044    import java.util.ArrayList;
045    import java.util.Collection;
046    import java.util.Collections;
047    import java.util.List;
048    import java.util.Map;
049    import java.util.Properties;
050    import java.util.concurrent.ConcurrentHashMap;
051    import java.util.concurrent.CopyOnWriteArrayList;
052    
053    import org.jgroups.ChannelException;
054    import org.jgroups.JChannel;
055    
056    /**
057     * @author Tina Tian
058     * @author Shuyang Zhou
059     */
060    public class ClusterExecutorImpl
061            extends ClusterBase implements ClusterExecutor, PortalPortEventListener {
062    
063            public void addClusterEventListener(
064                    ClusterEventListener clusterEventListener) {
065                    if (!isEnabled()) {
066                            return;
067                    }
068    
069                    _clusterEventListeners.addIfAbsent(clusterEventListener);
070            }
071    
072            public void afterPropertiesSet() {
073                    super.afterPropertiesSet();
074    
075                    if (PropsValues.CLUSTER_EXECUTOR_DEBUG_ENABLED) {
076                            addClusterEventListener(new DebuggingClusterEventListenerImpl());
077                    }
078            }
079    
080            public void destroy() {
081                    if (!isEnabled()) {
082                            return;
083                    }
084    
085                    _controlChannel.close();
086            }
087    
088            public FutureClusterResponses execute(ClusterRequest clusterRequest)
089                    throws SystemException {
090    
091                    if (!isEnabled()) {
092                            return null;
093                    }
094    
095                    List<Address> addresses = prepareAddresses(clusterRequest);
096    
097                    FutureClusterResponses futureClusterResponses =
098                            new FutureClusterResponses(addresses);
099    
100                    if (!clusterRequest.isFireAndForget()) {
101                            String uuid = clusterRequest.getUuid();
102    
103                            _executionResultMap.put(uuid, futureClusterResponses);
104                    }
105    
106                    if (!clusterRequest.isSkipLocal() && _shortcutLocalMethod &&
107                            addresses.remove(getLocalControlAddress())) {
108    
109                            ClusterNodeResponse clusterNodeResponse = runLocalMethod(
110                                    clusterRequest.getMethodHandler());
111    
112                            clusterNodeResponse.setMulticast(clusterRequest.isMulticast());
113                            clusterNodeResponse.setUuid(clusterRequest.getUuid());
114    
115                            futureClusterResponses.addClusterNodeResponse(clusterNodeResponse);
116                    }
117    
118                    if (clusterRequest.isMulticast()) {
119                            sendMulticastRequest(clusterRequest);
120                    }
121                    else {
122                            sendUnicastRequest(clusterRequest, addresses);
123                    }
124    
125                    return futureClusterResponses;
126            }
127    
128            public List<ClusterEventListener> getClusterEventListeners() {
129                    if (!isEnabled()) {
130                            return Collections.EMPTY_LIST;
131                    }
132    
133                    return Collections.unmodifiableList(_clusterEventListeners);
134            }
135    
136            public List<ClusterNode> getClusterNodes() {
137                    if (!isEnabled()) {
138                            return Collections.EMPTY_LIST;
139                    }
140    
141                    return new ArrayList<ClusterNode>(_addressMap.values());
142            }
143    
144            public ClusterNode getLocalClusterNode() throws SystemException {
145                    if (!isEnabled()) {
146                            return null;
147                    }
148    
149                    ClusterNode clusterNode = _addressMap.get(getLocalControlAddress());
150    
151                    if (clusterNode == null) {
152                            _localClusterNodeId = PortalUUIDUtil.generate();
153    
154                            clusterNode = new ClusterNode(_localClusterNodeId);
155    
156                            clusterNode.setPort(PortalUtil.getPortalPort());
157    
158                            try {
159                                    InetAddress inetAddress = bindInetAddress;
160    
161                                    if (inetAddress == null) {
162                                            inetAddress = InetAddressUtil.getLocalInetAddress();
163                                    }
164    
165                                    clusterNode.setInetAddress(inetAddress);
166    
167                                    clusterNode.setHostName(inetAddress.getHostName());
168                            }
169                            catch (Exception e) {
170                                    throw new SystemException(
171                                            "Unable to determine local network address", e);
172                            }
173                    }
174    
175                    return clusterNode;
176            }
177    
178            public void initialize() {
179                    if (!isEnabled()) {
180                            return;
181                    }
182    
183                    try {
184                            PortalUtil.addPortalPortEventListener(this);
185    
186                            ClusterNode clusterNode = getLocalClusterNode();
187    
188                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
189                                    ClusterMessageType.NOTIFY, clusterNode);
190    
191                            _controlChannel.send(null, null, clusterRequest);
192                    }
193                    catch (ChannelException ce) {
194                            _log.error("Unable to send multicast message ", ce);
195                    }
196                    catch (SystemException se) {
197                            _log.error("Unable to determine local network address", se);
198                    }
199            }
200    
201            public boolean isClusterNodeAlive(String clusterNodeId) {
202                    if (!isEnabled()) {
203                            return false;
204                    }
205    
206                    return _clusterNodeIdMap.containsKey(clusterNodeId);
207            }
208    
209            public boolean isEnabled() {
210                    return PropsValues.CLUSTER_LINK_ENABLED;
211            }
212    
213            public void portalPortConfigured(int port) {
214                    if (!isEnabled()) {
215                            return;
216                    }
217    
218                    try {
219                            ClusterNode clusterNode = getLocalClusterNode();
220    
221                            clusterNode.setPort(port);
222    
223                            ClusterRequest clusterRequest = ClusterRequest.createClusterRequest(
224                                    ClusterMessageType.UPDATE, clusterNode);
225    
226                            _controlChannel.send(null, null, clusterRequest);
227                    }
228                    catch (Exception e) {
229                            if (_log.isErrorEnabled()) {
230                                    _log.error("Unable to determine configure node port", e);
231                            }
232                    }
233            }
234    
235            public void removeClusterEventListener(
236                    ClusterEventListener clusterEventListener) {
237    
238                    if (!isEnabled()) {
239                            return;
240                    }
241    
242                    _clusterEventListeners.remove(clusterEventListener);
243            }
244    
245            public void setClusterEventListeners(
246                    List<ClusterEventListener> clusterEventListeners) {
247    
248                    if (!isEnabled()) {
249                            return;
250                    }
251    
252                    _clusterEventListeners.addAllAbsent(clusterEventListeners);
253            }
254    
255            public void setShortcutLocalMethod(boolean shortcutLocalMethod) {
256                    if (!isEnabled()) {
257                            return;
258                    }
259    
260                    _shortcutLocalMethod = shortcutLocalMethod;
261            }
262    
263            protected void fireClusterEvent(ClusterEvent clusterEvent) {
264                    for (ClusterEventListener listener : _clusterEventListeners) {
265                            listener.processClusterEvent(clusterEvent);
266                    }
267            }
268    
269            protected JChannel getControlChannel() {
270                    return _controlChannel;
271            }
272    
273            protected FutureClusterResponses getExecutionResults(String uuid) {
274                    return _executionResultMap.get(uuid);
275            }
276    
277            protected Address getLocalControlAddress() {
278                    return new AddressImpl(_controlChannel.getLocalAddress());
279            }
280    
281            protected void initChannels() {
282                    Properties controlProperties = PropsUtil.getProperties(
283                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL, false);
284    
285                    String controlProperty = controlProperties.getProperty(
286                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_CONTROL);
287    
288                    ClusterRequestReceiver clusterInvokeReceiver =
289                            new ClusterRequestReceiver(this);
290    
291                    try {
292                            _controlChannel = createJChannel(
293                                    controlProperty, clusterInvokeReceiver, _DEFAULT_CLUSTER_NAME);
294                    }
295                    catch (ChannelException ce) {
296                            _log.error(ce, ce);
297                    }
298                    catch (Exception e) {
299                            _log.error(e, e);
300                    }
301            }
302    
303            protected boolean isShortcutLocalMethod() {
304                    return _shortcutLocalMethod;
305            }
306    
307            protected void memberJoined(Address joinAddress, ClusterNode clusterNode) {
308                    _addressMap.put(joinAddress, clusterNode);
309    
310                    Address previousAddress = _clusterNodeIdMap.put(
311                            clusterNode.getClusterNodeId(), joinAddress);
312    
313                    if ((previousAddress == null) &&
314                            !getLocalControlAddress().equals(joinAddress)) {
315    
316                            ClusterEvent clusterEvent = ClusterEvent.join(clusterNode);
317    
318                            fireClusterEvent(clusterEvent);
319                    }
320            }
321    
322            protected void memberRemoved(List<Address> departAddresses) {
323                    List<ClusterNode> departingClusterNodes = new ArrayList<ClusterNode>();
324    
325                    for (Address departAddress : departAddresses) {
326                            ClusterNode departingClusterNode = _addressMap.remove(
327                                    departAddress);
328                            if (departingClusterNode != null) {
329                                    departingClusterNodes.add(departingClusterNode);
330    
331                                    _clusterNodeIdMap.remove(
332                                            departingClusterNode.getClusterNodeId());
333                            }
334                    }
335    
336                    if (departingClusterNodes.isEmpty()) {
337                            return;
338                    }
339    
340                    ClusterEvent clusterEvent = ClusterEvent.depart(departingClusterNodes);
341    
342                    fireClusterEvent(clusterEvent);
343            }
344    
345            protected List<Address> prepareAddresses(ClusterRequest clusterRequest) {
346                    boolean isMulticast = clusterRequest.isMulticast();
347    
348                    List<Address> addresses = null;
349    
350                    if (isMulticast) {
351                            addresses = getAddresses(_controlChannel);
352                    }
353                    else {
354                            Collection<String> clusterNodeIds =
355                                    clusterRequest.getTargetClusterNodeIds();
356    
357                            addresses = new ArrayList<Address>(clusterNodeIds.size());
358    
359                            for (String clusterNodeId : clusterNodeIds) {
360                                    Address address = _clusterNodeIdMap.get(clusterNodeId);
361    
362                                    addresses.add(address);
363                            }
364                    }
365    
366                    return addresses;
367            }
368    
369            protected ClusterNodeResponse runLocalMethod(MethodHandler methodHandler)
370                    throws SystemException {
371    
372                    ClusterNodeResponse clusterNodeResponse = new ClusterNodeResponse();
373    
374                    ClusterNode localClusterNode = getLocalClusterNode();
375    
376                    clusterNodeResponse.setClusterNode(localClusterNode);
377                    clusterNodeResponse.setClusterMessageType(ClusterMessageType.EXECUTE);
378    
379                    if (methodHandler == null) {
380                            clusterNodeResponse.setException(
381                                    new ClusterException(
382                                            "Payload is not of type " + MethodHandler.class.getName()));
383    
384                            return clusterNodeResponse;
385                    }
386    
387                    try {
388                            Object returnValue = methodHandler.invoke(true);
389    
390                            if (returnValue instanceof Serializable) {
391                                    clusterNodeResponse.setResult(returnValue);
392                            }
393                            else if (returnValue != null) {
394                                    clusterNodeResponse.setException(
395                                            new ClusterException("Return value is not serializable"));
396                            }
397                    }
398                    catch (Exception e) {
399                            clusterNodeResponse.setException(e);
400                    }
401    
402                    return clusterNodeResponse;
403            }
404    
405            protected void sendMulticastRequest(ClusterRequest clusterRequest)
406                    throws SystemException {
407    
408                    try {
409                            _controlChannel.send(null, null, clusterRequest);
410                    }
411                    catch (ChannelException ce) {
412                            _log.error(
413                                    "Unable to send multicast message " + clusterRequest, ce);
414    
415                            throw new SystemException(
416                                    "Unable to send multicast request", ce);
417                    }
418            }
419    
420            protected void sendUnicastRequest(
421                            ClusterRequest clusterRequest, List<Address> addresses)
422                    throws SystemException {
423    
424                    for (Address address : addresses) {
425                            org.jgroups.Address jGroupsAddress =
426                                    (org.jgroups.Address)address.getRealAddress();
427    
428                            try {
429                                    _controlChannel.send(jGroupsAddress, null, clusterRequest);
430                            }
431                            catch (ChannelException ce) {
432                                    _log.error(
433                                            "Unable to send unicast message " + clusterRequest, ce);
434    
435                                    throw new SystemException(
436                                            "Unable to send unicast request", ce);
437                            }
438                    }
439            }
440    
441            private static final String _DEFAULT_CLUSTER_NAME =
442                    "LIFERAY-CONTROL-CHANNEL";
443    
444            private static Log _log = LogFactoryUtil.getLog(ClusterExecutorImpl.class);
445    
446            private Map<Address, ClusterNode> _addressMap =
447                    new ConcurrentHashMap<Address, ClusterNode>();
448            private CopyOnWriteArrayList<ClusterEventListener> _clusterEventListeners =
449                    new CopyOnWriteArrayList<ClusterEventListener>();
450            private Map<String, Address> _clusterNodeIdMap =
451                    new ConcurrentHashMap<String, Address>();
452            private JChannel _controlChannel;
453            private Map<String, FutureClusterResponses> _executionResultMap =
454                    new WeakValueConcurrentHashMap<String, FutureClusterResponses>();
455            private String _localClusterNodeId;
456            private boolean _shortcutLocalMethod;
457    
458    }