001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019    import com.liferay.portal.kernel.cluster.ClusterEvent;
020    import com.liferay.portal.kernel.cluster.ClusterEventListener;
021    import com.liferay.portal.kernel.cluster.ClusterExecutor;
022    import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023    import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026    import com.liferay.portal.kernel.cluster.ClusterRequest;
027    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028    import com.liferay.portal.kernel.exception.SystemException;
029    import com.liferay.portal.kernel.log.Log;
030    import com.liferay.portal.kernel.log.LogFactoryUtil;
031    import com.liferay.portal.kernel.util.MethodHandler;
032    import com.liferay.portal.kernel.util.Validator;
033    import com.liferay.portal.model.Lock;
034    import com.liferay.portal.service.LockLocalServiceUtil;
035    
036    import java.util.HashSet;
037    import java.util.Set;
038    import java.util.concurrent.ExecutionException;
039    import java.util.concurrent.Future;
040    import java.util.concurrent.TimeUnit;
041    import java.util.concurrent.TimeoutException;
042    
043    /**
044     * @author Michael C. Han
045     */
046    public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
047    
048            public void destroy() {
049                    if (!_enabled) {
050                            return;
051                    }
052    
053                    try {
054                            _clusterExecutor.removeClusterEventListener(_clusterEventListener);
055    
056                            LockLocalServiceUtil.unlock(
057                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
058                    }
059                    catch (SystemException se) {
060                            if (_log.isWarnEnabled()) {
061                                    _log.warn("Unable to destroy the cluster master executor", se);
062                            }
063                    }
064            }
065    
066            @Override
067            public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
068                    throws SystemException {
069    
070                    if (!_enabled) {
071                            if (_log.isWarnEnabled()) {
072                                    _log.warn(
073                                            "Executing on the local node because the cluster master " +
074                                                    "executor is disabled");
075                            }
076    
077                            try {
078                                    return new LocalFuture<T>((T)methodHandler.invoke(true));
079                            }
080                            catch (Exception e) {
081                                    throw new SystemException(e);
082                            }
083                    }
084    
085                    String masterAddressString = getMasterAddressString();
086    
087                    Address address = AddressSerializerUtil.deserialize(
088                            masterAddressString);
089    
090                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
091                            methodHandler, address);
092    
093                    try {
094                            FutureClusterResponses futureClusterResponses =
095                                    _clusterExecutor.execute(clusterRequest);
096    
097                            return new RemoteFuture<T>(address, futureClusterResponses);
098                    }
099                    catch (Exception e) {
100                            throw new SystemException(
101                                    "Unable to execute on master " + address.getDescription(), e);
102                    }
103            }
104    
105            @Override
106            public void initialize() {
107                    if (!_clusterExecutor.isEnabled()) {
108                            return;
109                    }
110    
111                    try {
112                            _localClusterNodeAddress = AddressSerializerUtil.serialize(
113                                    _clusterExecutor.getLocalClusterNodeAddress());
114    
115                            _clusterEventListener =
116                                    new ClusterMasterTokenClusterEventListener();
117    
118                            _clusterExecutor.addClusterEventListener(_clusterEventListener);
119    
120                            String masterAddressString = getMasterAddressString();
121    
122                            _enabled = true;
123    
124                            notifyMasterTokenTransitionListeners(
125                                    _localClusterNodeAddress.equals(masterAddressString));
126                    }
127                    catch (Exception e) {
128                            throw new RuntimeException(
129                                    "Unable to initialize cluster master executor", e);
130                    }
131            }
132    
133            @Override
134            public boolean isEnabled() {
135                    return _enabled;
136            }
137    
138            @Override
139            public boolean isMaster() {
140                    if (isEnabled()) {
141                            return _master;
142                    }
143    
144                    return true;
145            }
146    
147            @Override
148            public void registerClusterMasterTokenTransitionListener(
149                    ClusterMasterTokenTransitionListener
150                            clusterMasterTokenTransitionListener) {
151    
152                    _clusterMasterTokenTransitionListeners.add(
153                            clusterMasterTokenTransitionListener);
154            }
155    
156            public void setClusterExecutor(ClusterExecutor clusterExecutor) {
157                    _clusterExecutor = clusterExecutor;
158            }
159    
160            public void setClusterMasterTokenTransitionListeners(
161                    Set<ClusterMasterTokenTransitionListener>
162                            clusterMasterTokenTransitionListeners) {
163    
164                    _clusterMasterTokenTransitionListeners.addAll(
165                            clusterMasterTokenTransitionListeners);
166            }
167    
168            @Override
169            public void unregisterClusterMasterTokenTransitionListener(
170                    ClusterMasterTokenTransitionListener
171                            clusterMasterTokenTransitionListener) {
172    
173                    _clusterMasterTokenTransitionListeners.remove(
174                            clusterMasterTokenTransitionListener);
175            }
176    
177            protected String getMasterAddressString() {
178                    String owner = null;
179    
180                    while (true) {
181                            try {
182                                    Lock lock = null;
183    
184                                    if (owner == null) {
185                                            lock = LockLocalServiceUtil.lock(
186                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
187                                                    _localClusterNodeAddress);
188                                    }
189                                    else {
190                                            lock = LockLocalServiceUtil.lock(
191                                                    _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
192                                                    _localClusterNodeAddress);
193                                    }
194    
195                                    owner = lock.getOwner();
196    
197                                    Address address = AddressSerializerUtil.deserialize(owner);
198    
199                                    if (_clusterExecutor.isClusterNodeAlive(address)) {
200                                            break;
201                                    }
202                            }
203                            catch (Exception e) {
204                                    if (_log.isWarnEnabled()) {
205                                            _log.warn(
206                                                    "Unable to acquire memory scheduler cluster lock", e);
207                                    }
208                            }
209    
210                            if (_log.isInfoEnabled()) {
211                                    if (Validator.isNotNull(owner)) {
212                                            _log.info("Lock currently held by " + owner);
213                                    }
214    
215                                    _log.info(
216                                            "Reattempting to acquire memory scheduler cluster lock");
217                            }
218                    }
219    
220                    boolean master = _localClusterNodeAddress.equals(owner);
221    
222                    if (master == _master) {
223                            return owner;
224                    }
225    
226                    _master = master;
227    
228                    if (_enabled) {
229                            notifyMasterTokenTransitionListeners(master);
230                    }
231    
232                    return owner;
233            }
234    
235            protected void notifyMasterTokenTransitionListeners(
236                    boolean masterTokenAcquired) {
237    
238                    for (ClusterMasterTokenTransitionListener
239                                    clusterMasterTokenTransitionListener :
240                                            _clusterMasterTokenTransitionListeners) {
241    
242                            if (masterTokenAcquired) {
243                                    clusterMasterTokenTransitionListener.masterTokenAcquired();
244                            }
245                            else {
246                                    clusterMasterTokenTransitionListener.masterTokenReleased();
247                            }
248                    }
249            }
250    
251            private static final String _LOCK_CLASS_NAME =
252                    ClusterMasterExecutorImpl.class.getName();
253    
254            private static Log _log = LogFactoryUtil.getLog(
255                    ClusterMasterExecutorImpl.class);
256    
257            private static volatile boolean _master;
258    
259            private ClusterEventListener _clusterEventListener;
260            private ClusterExecutor _clusterExecutor;
261            private Set<ClusterMasterTokenTransitionListener>
262                    _clusterMasterTokenTransitionListeners =
263                    new HashSet<ClusterMasterTokenTransitionListener>();
264            private volatile boolean _enabled;
265            private volatile String _localClusterNodeAddress;
266    
267            private class ClusterMasterTokenClusterEventListener
268                    implements ClusterEventListener {
269    
270                    @Override
271                    public void processClusterEvent(ClusterEvent clusterEvent) {
272                            try {
273                                    getMasterAddressString();
274                            }
275                            catch (Exception e) {
276                                    _log.error("Unable to update the cluster master lock", e);
277                            }
278                    }
279            }
280    
281            private class LocalFuture<T> implements Future<T> {
282    
283                    public LocalFuture(T result) {
284                            _result = result;
285                    }
286    
287                    @Override
288                    public boolean cancel(boolean mayInterruptIfRunning) {
289                            return false;
290                    }
291    
292                    @Override
293                    public boolean isCancelled() {
294                            return false;
295                    }
296    
297                    @Override
298                    public boolean isDone() {
299                            return true;
300                    }
301    
302                    @Override
303                    public T get() {
304                            return _result;
305                    }
306    
307                    @Override
308                    public T get(long timeout, TimeUnit unit) {
309                            return _result;
310                    }
311    
312                    private final T _result;
313    
314            }
315    
316            private class RemoteFuture<T> implements Future<T> {
317    
318                    public RemoteFuture(
319                            Address address, FutureClusterResponses futureClusterResponses) {
320    
321                            _address = address;
322                            _futureClusterResponses = futureClusterResponses;
323                    }
324    
325                    @Override
326                    public boolean cancel(boolean mayInterruptIfRunning) {
327                            return _futureClusterResponses.cancel(mayInterruptIfRunning);
328                    }
329    
330                    @Override
331                    public boolean isCancelled() {
332                            return _futureClusterResponses.isCancelled();
333                    }
334    
335                    @Override
336                    public boolean isDone() {
337                            return _futureClusterResponses.isDone();
338                    }
339    
340                    @Override
341                    public T get() throws ExecutionException, InterruptedException {
342                            ClusterNodeResponses clusterNodeResponses =
343                                    _futureClusterResponses.get();
344    
345                            ClusterNodeResponse clusterNodeResponse =
346                                    clusterNodeResponses.getClusterResponse(_address);
347    
348                            try {
349                                    return (T)clusterNodeResponse.getResult();
350                            }
351                            catch (Exception e) {
352                                    throw new ExecutionException(e);
353                            }
354                    }
355    
356                    @Override
357                    public T get(long timeout, TimeUnit unit)
358                            throws ExecutionException, InterruptedException, TimeoutException {
359    
360                            ClusterNodeResponses clusterNodeResponses =
361                                    _futureClusterResponses.get(timeout, unit);
362    
363                            ClusterNodeResponse clusterNodeResponse =
364                                    clusterNodeResponses.getClusterResponse(_address);
365    
366                            try {
367                                    return (T)clusterNodeResponse.getResult();
368                            }
369                            catch (Exception e) {
370                                    throw new ExecutionException(e);
371                            }
372                    }
373    
374                    private final Address _address;
375                    private final FutureClusterResponses _futureClusterResponses;
376    
377            }
378    
379    }