001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.AddressSerializerUtil;
019 import com.liferay.portal.kernel.cluster.ClusterEvent;
020 import com.liferay.portal.kernel.cluster.ClusterEventListener;
021 import com.liferay.portal.kernel.cluster.ClusterExecutor;
022 import com.liferay.portal.kernel.cluster.ClusterMasterExecutor;
023 import com.liferay.portal.kernel.cluster.ClusterMasterTokenTransitionListener;
024 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025 import com.liferay.portal.kernel.cluster.ClusterNodeResponses;
026 import com.liferay.portal.kernel.cluster.ClusterRequest;
027 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
028 import com.liferay.portal.kernel.exception.SystemException;
029 import com.liferay.portal.kernel.log.Log;
030 import com.liferay.portal.kernel.log.LogFactoryUtil;
031 import com.liferay.portal.kernel.util.MethodHandler;
032 import com.liferay.portal.kernel.util.Validator;
033 import com.liferay.portal.model.Lock;
034 import com.liferay.portal.service.LockLocalServiceUtil;
035
036 import java.util.HashSet;
037 import java.util.Set;
038 import java.util.concurrent.ExecutionException;
039 import java.util.concurrent.Future;
040 import java.util.concurrent.TimeUnit;
041 import java.util.concurrent.TimeoutException;
042
043
046 public class ClusterMasterExecutorImpl implements ClusterMasterExecutor {
047
048 public void destroy() {
049 if (!_enabled) {
050 return;
051 }
052
053 try {
054 _clusterExecutor.removeClusterEventListener(_clusterEventListener);
055
056 LockLocalServiceUtil.unlock(
057 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, _localClusterNodeAddress);
058 }
059 catch (SystemException se) {
060 if (_log.isWarnEnabled()) {
061 _log.warn("Unable to destroy the cluster master executor", se);
062 }
063 }
064 }
065
066 @Override
067 public <T> Future<T> executeOnMaster(MethodHandler methodHandler)
068 throws SystemException {
069
070 if (!_enabled) {
071 if (_log.isWarnEnabled()) {
072 _log.warn(
073 "Executing on the local node because the cluster master " +
074 "executor is disabled");
075 }
076
077 try {
078 return new LocalFuture<T>((T)methodHandler.invoke(true));
079 }
080 catch (Exception e) {
081 throw new SystemException(e);
082 }
083 }
084
085 String masterAddressString = getMasterAddressString();
086
087 Address address = AddressSerializerUtil.deserialize(
088 masterAddressString);
089
090 ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
091 methodHandler, address);
092
093 try {
094 FutureClusterResponses futureClusterResponses =
095 _clusterExecutor.execute(clusterRequest);
096
097 return new RemoteFuture<T>(address, futureClusterResponses);
098 }
099 catch (Exception e) {
100 throw new SystemException(
101 "Unable to execute on master " + address.getDescription(), e);
102 }
103 }
104
105 @Override
106 public void initialize() {
107 if (!_clusterExecutor.isEnabled()) {
108 return;
109 }
110
111 try {
112 _localClusterNodeAddress = AddressSerializerUtil.serialize(
113 _clusterExecutor.getLocalClusterNodeAddress());
114
115 _clusterEventListener =
116 new ClusterMasterTokenClusterEventListener();
117
118 _clusterExecutor.addClusterEventListener(_clusterEventListener);
119
120 String masterAddressString = getMasterAddressString();
121
122 _enabled = true;
123
124 notifyMasterTokenTransitionListeners(
125 _localClusterNodeAddress.equals(masterAddressString));
126 }
127 catch (Exception e) {
128 throw new RuntimeException(
129 "Unable to initialize cluster master executor", e);
130 }
131 }
132
133 @Override
134 public boolean isEnabled() {
135 return _enabled;
136 }
137
138 @Override
139 public boolean isMaster() {
140 if (isEnabled()) {
141 return _master;
142 }
143
144 return true;
145 }
146
147 @Override
148 public void registerClusterMasterTokenTransitionListener(
149 ClusterMasterTokenTransitionListener
150 clusterMasterTokenTransitionListener) {
151
152 _clusterMasterTokenTransitionListeners.add(
153 clusterMasterTokenTransitionListener);
154 }
155
156 public void setClusterExecutor(ClusterExecutor clusterExecutor) {
157 _clusterExecutor = clusterExecutor;
158 }
159
160 public void setClusterMasterTokenTransitionListeners(
161 Set<ClusterMasterTokenTransitionListener>
162 clusterMasterTokenTransitionListeners) {
163
164 _clusterMasterTokenTransitionListeners.addAll(
165 clusterMasterTokenTransitionListeners);
166 }
167
168 @Override
169 public void unregisterClusterMasterTokenTransitionListener(
170 ClusterMasterTokenTransitionListener
171 clusterMasterTokenTransitionListener) {
172
173 _clusterMasterTokenTransitionListeners.remove(
174 clusterMasterTokenTransitionListener);
175 }
176
177 protected String getMasterAddressString() {
178 String owner = null;
179
180 while (true) {
181 try {
182 Lock lock = null;
183
184 if (owner == null) {
185 lock = LockLocalServiceUtil.lock(
186 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME,
187 _localClusterNodeAddress);
188 }
189 else {
190 lock = LockLocalServiceUtil.lock(
191 _LOCK_CLASS_NAME, _LOCK_CLASS_NAME, owner,
192 _localClusterNodeAddress);
193 }
194
195 owner = lock.getOwner();
196
197 Address address = AddressSerializerUtil.deserialize(owner);
198
199 if (_clusterExecutor.isClusterNodeAlive(address)) {
200 break;
201 }
202 }
203 catch (Exception e) {
204 if (_log.isWarnEnabled()) {
205 _log.warn(
206 "Unable to acquire memory scheduler cluster lock", e);
207 }
208 }
209
210 if (_log.isInfoEnabled()) {
211 if (Validator.isNotNull(owner)) {
212 _log.info("Lock currently held by " + owner);
213 }
214
215 _log.info(
216 "Reattempting to acquire memory scheduler cluster lock");
217 }
218 }
219
220 boolean master = _localClusterNodeAddress.equals(owner);
221
222 if (master == _master) {
223 return owner;
224 }
225
226 _master = master;
227
228 if (_enabled) {
229 notifyMasterTokenTransitionListeners(master);
230 }
231
232 return owner;
233 }
234
235 protected void notifyMasterTokenTransitionListeners(
236 boolean masterTokenAcquired) {
237
238 for (ClusterMasterTokenTransitionListener
239 clusterMasterTokenTransitionListener :
240 _clusterMasterTokenTransitionListeners) {
241
242 if (masterTokenAcquired) {
243 clusterMasterTokenTransitionListener.masterTokenAcquired();
244 }
245 else {
246 clusterMasterTokenTransitionListener.masterTokenReleased();
247 }
248 }
249 }
250
251 private static final String _LOCK_CLASS_NAME =
252 ClusterMasterExecutorImpl.class.getName();
253
254 private static Log _log = LogFactoryUtil.getLog(
255 ClusterMasterExecutorImpl.class);
256
257 private static volatile boolean _master;
258
259 private ClusterEventListener _clusterEventListener;
260 private ClusterExecutor _clusterExecutor;
261 private Set<ClusterMasterTokenTransitionListener>
262 _clusterMasterTokenTransitionListeners =
263 new HashSet<ClusterMasterTokenTransitionListener>();
264 private volatile boolean _enabled;
265 private volatile String _localClusterNodeAddress;
266
267 private class ClusterMasterTokenClusterEventListener
268 implements ClusterEventListener {
269
270 @Override
271 public void processClusterEvent(ClusterEvent clusterEvent) {
272 try {
273 getMasterAddressString();
274 }
275 catch (Exception e) {
276 _log.error("Unable to update the cluster master lock", e);
277 }
278 }
279 }
280
281 private class LocalFuture<T> implements Future<T> {
282
283 public LocalFuture(T result) {
284 _result = result;
285 }
286
287 @Override
288 public boolean cancel(boolean mayInterruptIfRunning) {
289 return false;
290 }
291
292 @Override
293 public boolean isCancelled() {
294 return false;
295 }
296
297 @Override
298 public boolean isDone() {
299 return true;
300 }
301
302 @Override
303 public T get() {
304 return _result;
305 }
306
307 @Override
308 public T get(long timeout, TimeUnit unit) {
309 return _result;
310 }
311
312 private final T _result;
313
314 }
315
316 private class RemoteFuture<T> implements Future<T> {
317
318 public RemoteFuture(
319 Address address, FutureClusterResponses futureClusterResponses) {
320
321 _address = address;
322 _futureClusterResponses = futureClusterResponses;
323 }
324
325 @Override
326 public boolean cancel(boolean mayInterruptIfRunning) {
327 return _futureClusterResponses.cancel(mayInterruptIfRunning);
328 }
329
330 @Override
331 public boolean isCancelled() {
332 return _futureClusterResponses.isCancelled();
333 }
334
335 @Override
336 public boolean isDone() {
337 return _futureClusterResponses.isDone();
338 }
339
340 @Override
341 public T get() throws ExecutionException, InterruptedException {
342 ClusterNodeResponses clusterNodeResponses =
343 _futureClusterResponses.get();
344
345 ClusterNodeResponse clusterNodeResponse =
346 clusterNodeResponses.getClusterResponse(_address);
347
348 try {
349 return (T)clusterNodeResponse.getResult();
350 }
351 catch (Exception e) {
352 throw new ExecutionException(e);
353 }
354 }
355
356 @Override
357 public T get(long timeout, TimeUnit unit)
358 throws ExecutionException, InterruptedException, TimeoutException {
359
360 ClusterNodeResponses clusterNodeResponses =
361 _futureClusterResponses.get(timeout, unit);
362
363 ClusterNodeResponse clusterNodeResponse =
364 clusterNodeResponses.getClusterResponse(_address);
365
366 try {
367 return (T)clusterNodeResponse.getResult();
368 }
369 catch (Exception e) {
370 throw new ExecutionException(e);
371 }
372 }
373
374 private final Address _address;
375 private final FutureClusterResponses _futureClusterResponses;
376
377 }
378
379 }