001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.concurrent.BatchablePipe;
018 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
019 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
020 import com.liferay.portal.kernel.log.Log;
021 import com.liferay.portal.kernel.log.LogFactoryUtil;
022 import com.liferay.portal.kernel.messaging.Message;
023 import com.liferay.portal.kernel.messaging.MessageRunnable;
024
025
028 public class BufferedIncrementDiscardPolicy
029 implements RejectedExecutionHandler {
030
031 @Override
032 @SuppressWarnings("rawtypes")
033 public void rejectedExecution(
034 Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
035
036 MessageRunnable messageRunnable = (MessageRunnable)runnable;
037
038 Message message = messageRunnable.getMessage();
039
040 BatchablePipe<String, BufferedIncreasableEntry> batchablePipe =
041 (BatchablePipe<String, BufferedIncreasableEntry>)
042 message.getPayload();
043
044 for (int i = 0; i < _discardNumber; i++) {
045 BufferedIncreasableEntry bufferedIncreasableEntry =
046 (BufferedIncreasableEntry)batchablePipe.take();
047
048 if (bufferedIncreasableEntry == null) {
049 break;
050 }
051 else if (_log.isInfoEnabled()) {
052 _log.info(
053 "Discarding BufferedIncreasableEntry " +
054 bufferedIncreasableEntry);
055 }
056 }
057 }
058
059 public void setDiscardNumber(int discardNumber) {
060 _discardNumber = discardNumber;
061 }
062
063 private static Log _log = LogFactoryUtil.getLog(
064 BufferedIncrementDiscardPolicy.class);
065
066 private int _discardNumber = 1;
067
068 }