001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.cache.Lifecycle;
018 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
019 import com.liferay.portal.kernel.concurrent.BatchablePipe;
020 import com.liferay.portal.kernel.increment.Increment;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.util.CentralizedThreadLocal;
024 import com.liferay.portal.security.auth.CompanyThreadLocal;
025
026 import java.io.Serializable;
027
028 import java.util.concurrent.atomic.AtomicInteger;
029
030
033 public class BufferedIncrementRunnable implements Runnable {
034
035 public BufferedIncrementRunnable(
036 BufferedIncrementConfiguration bufferedIncrementConfiguration,
037 BatchablePipe<Serializable, Increment<?>> batchablePipe,
038 AtomicInteger queueLengthTracker) {
039
040 _bufferedIncrementConfiguration = bufferedIncrementConfiguration;
041 _batchablePipe = batchablePipe;
042 _queueLengthTracker = queueLengthTracker;
043
044 if (_bufferedIncrementConfiguration.isStandbyEnabled()) {
045 _queueLengthTracker.incrementAndGet();
046 }
047
048 _companyId = CompanyThreadLocal.getCompanyId();
049 }
050
051 @Override
052 @SuppressWarnings("rawtypes")
053 public void run() {
054 CompanyThreadLocal.setCompanyId(_companyId);
055
056 while (true) {
057 BufferedIncreasableEntry bufferedIncreasableEntry =
058 (BufferedIncreasableEntry)_batchablePipe.take();
059
060 if (bufferedIncreasableEntry == null) {
061 break;
062 }
063
064 try {
065 bufferedIncreasableEntry.proceed();
066 }
067 catch (Throwable t) {
068 _log.error(
069 "Unable to write buffered increment value to the database",
070 t);
071 }
072
073 if (_bufferedIncrementConfiguration.isStandbyEnabled()) {
074 int queueLength = _queueLengthTracker.decrementAndGet();
075
076 long standbyTime =
077 _bufferedIncrementConfiguration.calculateStandbyTime(
078 queueLength);
079
080 try {
081 Thread.sleep(standbyTime);
082 }
083 catch (InterruptedException ie) {
084 break;
085 }
086 }
087 }
088
089 ThreadLocalCacheManager.clearAll(Lifecycle.REQUEST);
090
091 CentralizedThreadLocal.clearShortLivedThreadLocals();
092 }
093
094 private static Log _log = LogFactoryUtil.getLog(
095 BufferedIncrementRunnable.class);
096
097 private final BatchablePipe<Serializable, Increment<?>> _batchablePipe;
098 private final BufferedIncrementConfiguration
099 _bufferedIncrementConfiguration;
100 private final long _companyId;
101 private final AtomicInteger _queueLengthTracker;
102
103 }