001
014
015 package com.liferay.portal.kernel.cache.cluster;
016
017 import com.liferay.portal.kernel.concurrent.CoalescedPipe;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.util.concurrent.atomic.AtomicInteger;
022 import java.util.concurrent.atomic.AtomicLong;
023
024
027 public abstract class BasePortalCacheClusterChannel
028 implements PortalCacheClusterChannel, Runnable {
029
030 public BasePortalCacheClusterChannel() {
031 _dispatchThread = new Thread(
032 this,
033 "PortalCacheClusterChannel dispatch thread-" +
034 _dispatchThreadCounter.getAndIncrement());
035 _eventQueue = new CoalescedPipe<PortalCacheClusterEvent>(
036 new PortalCacheClusterEventCoalesceComparator());
037 }
038
039 @Override
040 public void destroy() {
041 _destroy = true;
042
043 _dispatchThread.interrupt();
044 }
045
046 public abstract void dispatchEvent(PortalCacheClusterEvent event);
047
048 @Override
049 public long getCoalescedEventNumber() {
050 return _eventQueue.coalescedCount();
051 }
052
053 @Override
054 public int getPendingEventNumber() {
055 return _eventQueue.pendingCount();
056 }
057
058 @Override
059 public long getSentEventNumber() {
060 return _sentEventCounter.get();
061 }
062
063 @Override
064 public void run() {
065 while (true) {
066 try {
067 if (_destroy) {
068 Object[] events = _eventQueue.takeSnapshot();
069
070 for (Object event : events) {
071 dispatchEvent((PortalCacheClusterEvent)event);
072
073 _sentEventCounter.incrementAndGet();
074 }
075
076 break;
077 }
078 else {
079 try {
080 PortalCacheClusterEvent portalCacheClusterEvent =
081 _eventQueue.take();
082
083 dispatchEvent(portalCacheClusterEvent);
084
085 _sentEventCounter.incrementAndGet();
086 }
087 catch (InterruptedException ie) {
088 }
089 }
090 }
091 catch (Throwable t) {
092 if (_log.isWarnEnabled()) {
093 _log.warn("Please fix the unexpected throwable", t);
094 }
095 }
096 }
097 }
098
099 @Override
100 public void sendEvent(PortalCacheClusterEvent portalCacheClusterEvent) {
101 if (_started == false) {
102 synchronized (this) {
103 if (_started == false) {
104 _dispatchThread.start();
105
106 _started = true;
107 }
108 }
109 }
110
111 if (_destroy) {
112 dispatchEvent(portalCacheClusterEvent);
113 }
114 else {
115 try {
116 _eventQueue.put(portalCacheClusterEvent);
117 }
118 catch (InterruptedException ie) {
119 }
120 }
121 }
122
123 private static Log _log = LogFactoryUtil.getLog(
124 BasePortalCacheClusterChannel.class);
125
126 private static AtomicInteger _dispatchThreadCounter = new AtomicInteger(0);
127
128 private volatile boolean _destroy = false;
129 private final Thread _dispatchThread;
130 private final CoalescedPipe<PortalCacheClusterEvent> _eventQueue;
131 private final AtomicLong _sentEventCounter = new AtomicLong(0);
132 private volatile boolean _started = false;
133
134 }