001
014
015 package com.liferay.portal.kernel.cache.cluster;
016
017 import com.liferay.portal.kernel.concurrent.CoalescedPipe;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.util.concurrent.atomic.AtomicInteger;
022 import java.util.concurrent.atomic.AtomicLong;
023
024
027 public abstract class BasePortalCacheClusterChannel
028 implements PortalCacheClusterChannel, Runnable {
029
030 public BasePortalCacheClusterChannel() {
031 _dispatchThread = new Thread(
032 this,
033 "PortalCacheClusterChannel dispatch thread-" +
034 _dispatchThreadCounter.getAndIncrement());
035 _eventQueue = new CoalescedPipe<PortalCacheClusterEvent>(
036 new PortalCacheClusterEventCoalesceComparator());
037 }
038
039 public void destroy() {
040 _destroy = true;
041
042 _dispatchThread.interrupt();
043 }
044
045 public abstract void dispatchEvent(PortalCacheClusterEvent event);
046
047 public long getCoalescedEventNumber() {
048 return _eventQueue.coalescedCount();
049 }
050
051 public int getPendingEventNumber() {
052 return _eventQueue.pendingCount();
053 }
054
055 public long getSentEventNumber() {
056 return _sentEventCounter.get();
057 }
058
059 public void run() {
060 while (true) {
061 try {
062 if (_destroy) {
063 Object[] events = _eventQueue.takeSnapshot();
064
065 for (Object event : events) {
066 dispatchEvent((PortalCacheClusterEvent)event);
067
068 _sentEventCounter.incrementAndGet();
069 }
070
071 break;
072 }
073 else {
074 try {
075 PortalCacheClusterEvent portalCacheClusterEvent =
076 _eventQueue.take();
077
078 dispatchEvent(portalCacheClusterEvent);
079
080 _sentEventCounter.incrementAndGet();
081 }
082 catch (InterruptedException ie) {
083 }
084 }
085 }
086 catch (Throwable t) {
087 if (_log.isWarnEnabled()) {
088 _log.warn("Please fix the unexpected throwable", t);
089 }
090 }
091 }
092 }
093
094 public void sendEvent(PortalCacheClusterEvent portalCacheClusterEvent) {
095 if (_started == false) {
096 synchronized (this) {
097 if (_started == false) {
098 _dispatchThread.start();
099
100 _started = true;
101 }
102 }
103 }
104
105 try {
106 _eventQueue.put(portalCacheClusterEvent);
107 }
108 catch (InterruptedException ie) {
109 }
110 }
111
112 private static Log _log = LogFactoryUtil.getLog(
113 BasePortalCacheClusterChannel.class);
114
115 private static AtomicInteger _dispatchThreadCounter = new AtomicInteger(0);
116
117 private volatile boolean _destroy = false;
118 private final Thread _dispatchThread;
119 private final CoalescedPipe<PortalCacheClusterEvent> _eventQueue;
120 private final AtomicLong _sentEventCounter = new AtomicLong(0);
121 private volatile boolean _started = false;
122
123 }