001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.concurrent.BatchablePipe;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020 import com.liferay.portal.kernel.messaging.BaseMessageListener;
021 import com.liferay.portal.kernel.messaging.Message;
022 import com.liferay.portal.security.auth.CompanyThreadLocal;
023
024
027 public class BufferedIncrementMessageListener extends BaseMessageListener {
028
029 @Override
030 @SuppressWarnings("rawtypes")
031 protected void doReceive(Message message) throws Exception {
032 long companyId = message.getLong("companyId");
033
034 CompanyThreadLocal.setCompanyId(companyId);
035
036 BatchablePipe<String, BufferedIncreasableEntry> batchablePipe =
037 (BatchablePipe<String, BufferedIncreasableEntry>)
038 message.getPayload();
039
040 while (true) {
041 BufferedIncreasableEntry bufferedIncreasableEntry =
042 (BufferedIncreasableEntry)batchablePipe.take();
043
044 if (bufferedIncreasableEntry == null) {
045 break;
046 }
047
048 try {
049 bufferedIncreasableEntry.proceed();
050 }
051 catch (Throwable t) {
052 _log.error(
053 "Cannot write buffered increment value to the database", t);
054 }
055 }
056
057 }
058
059 private static Log _log = LogFactoryUtil.getLog(
060 BufferedIncrementMessageListener.class);
061
062 }