001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.Comparator;
018 import java.util.concurrent.atomic.AtomicInteger;
019 import java.util.concurrent.atomic.AtomicLong;
020 import java.util.concurrent.locks.Condition;
021 import java.util.concurrent.locks.ReentrantLock;
022
023
026 public class CoalescedPipe<E> {
027
028 public CoalescedPipe() {
029 this(null);
030 }
031
032 public CoalescedPipe(Comparator<E> comparator) {
033 _comparator = comparator;
034 _notEmptyCondition = _takeLock.newCondition();
035
036 _headElementLink = new ElementLink<E>(null);
037 _lastElementLink = _headElementLink;
038 }
039
040 public long coalescedCount() {
041 return _coalescedCount.get();
042 }
043
044 public int pendingCount() {
045 return _pendingCount.get();
046 }
047
048 public void put(E e) throws InterruptedException {
049 if (e == null) {
050 throw new NullPointerException();
051 }
052
053 int pendingElements = -1;
054
055 _putLock.lockInterruptibly();
056
057 try {
058 if (_coalesceElement(e)) {
059 return;
060 }
061
062 _lastElementLink._nextElementLink = new ElementLink<E>(e);
063
064 _lastElementLink = _lastElementLink._nextElementLink;
065
066 pendingElements = _pendingCount.getAndIncrement();
067 }
068 finally {
069 _putLock.unlock();
070 }
071
072 if (pendingElements == 0) {
073 _takeLock.lock();
074
075 try {
076 _notEmptyCondition.signal();
077 }
078 finally {
079 _takeLock.unlock();
080 }
081 }
082 }
083
084 public E take() throws InterruptedException {
085 E element = null;
086
087 _takeLock.lockInterruptibly();
088
089 try {
090 while (_pendingCount.get() == 0) {
091 _notEmptyCondition.await();
092 }
093
094 ElementLink<E> garbageELementLink = _headElementLink;
095
096 _headElementLink = _headElementLink._nextElementLink;
097
098 garbageELementLink._nextElementLink = null;
099
100 element = _headElementLink._element;
101
102 _headElementLink._element = null;
103
104 int pendingElements = _pendingCount.getAndDecrement();
105
106 if (pendingElements > 1) {
107 _notEmptyCondition.signal();
108 }
109 }
110 finally {
111 _takeLock.unlock();
112 }
113
114 return element;
115 }
116
117 public Object[] takeSnapshot() {
118 _putLock.lock();
119 _takeLock.lock();
120
121 try {
122 Object[] pendingElements = new Object[_pendingCount.get()];
123
124 ElementLink<E> currentElementLink =
125 _headElementLink._nextElementLink;
126
127 for (int i = 0; currentElementLink != null; i++) {
128 pendingElements[i] = currentElementLink._element;
129
130 currentElementLink = currentElementLink._nextElementLink;
131 }
132
133 return pendingElements;
134 }
135 finally {
136 _putLock.unlock();
137 _takeLock.unlock();
138 }
139 }
140
141 private boolean _coalesceElement(E e) {
142 try {
143 _takeLock.lockInterruptibly();
144
145 try {
146 ElementLink<E> currentElementLink =
147 _headElementLink._nextElementLink;
148
149 if (_comparator != null) {
150 while (currentElementLink != null) {
151 if (_comparator.compare(
152 currentElementLink._element, e) == 0) {
153
154 _coalescedCount.incrementAndGet();
155
156 return true;
157 }
158
159 currentElementLink =
160 currentElementLink._nextElementLink;
161 }
162 }
163 else {
164 while (currentElementLink != null) {
165 if (currentElementLink._element.equals(e)) {
166 _coalescedCount.incrementAndGet();
167
168 return true;
169 }
170
171 currentElementLink =
172 currentElementLink._nextElementLink;
173 }
174 }
175 }
176 finally {
177 _takeLock.unlock();
178 }
179 }
180 catch (InterruptedException ie) {
181
182
183
184 }
185
186 return false;
187 }
188
189 private final AtomicLong _coalescedCount = new AtomicLong(0);
190 private final Comparator<E> _comparator;
191 private ElementLink<E> _headElementLink;
192 private ElementLink<E> _lastElementLink;
193 private final Condition _notEmptyCondition;
194 private final AtomicInteger _pendingCount = new AtomicInteger(0);
195 private final ReentrantLock _putLock = new ReentrantLock();
196 private final ReentrantLock _takeLock = new ReentrantLock();
197
198 private static class ElementLink<E> {
199
200 private ElementLink(E element) {
201 _element = element;
202 }
203
204 private E _element;
205 private ElementLink<E> _nextElementLink;
206
207 }
208
209 }