001
014
015 package com.liferay.portal.increment;
016
017 import com.liferay.portal.kernel.concurrent.BatchablePipe;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020 import com.liferay.portal.kernel.messaging.Message;
021 import com.liferay.portal.kernel.messaging.MessageListener;
022
023
026 public class BufferedIncrementMessageListener implements MessageListener {
027
028 public void receive(Message message) {
029 try {
030 doReceive(message);
031 }
032 catch (Exception e) {
033 _log.error("Unable to process message " + message, e);
034 }
035
036 }
037
038 @SuppressWarnings("rawtypes")
039 protected void doReceive(Message message) throws Exception {
040 BatchablePipe<String, BufferedIncreasableEntry> batchablePipe =
041 (BatchablePipe<String, BufferedIncreasableEntry>)
042 message.getPayload();
043
044 while (true) {
045 BufferedIncreasableEntry bufferedIncreasableEntry =
046 (BufferedIncreasableEntry)batchablePipe.take();
047
048 if (bufferedIncreasableEntry == null) {
049 break;
050 }
051
052 try {
053 bufferedIncreasableEntry.proceed();
054 }
055 catch (Throwable t) {
056 _log.error(
057 "Cannot write buffered increment value to the database", t);
058 }
059 }
060
061 }
062
063 private static Log _log = LogFactoryUtil.getLog(
064 BufferedIncrementMessageListener.class);
065
066 }