001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.concurrent.atomic.AtomicMarkableReference;
018 import java.util.concurrent.atomic.AtomicReference;
019
020
023 public class BatchablePipe<K, V> {
024
025 public BatchablePipe() {
026 _headEntry = new Entry<K, V>(null);
027 _lastEntryReference = new AtomicReference<Entry<K, V>>(_headEntry);
028 }
029
030 public boolean put(IncreasableEntry<K, V> increasableEntry) {
031 Entry<K, V> newEntry = new Entry<K, V>(increasableEntry);
032
033 while (true) {
034 if (_doIncrease(increasableEntry)) {
035 return false;
036 }
037
038 Entry<K, V> lastEntryLink = _lastEntryReference.get();
039 Entry<K, V> nextEntryLink = lastEntryLink._nextEntry.getReference();
040
041 if (nextEntryLink == null) {
042 if (lastEntryLink._nextEntry.compareAndSet(
043 null, newEntry, false, false)) {
044
045 _lastEntryReference.set(newEntry);
046
047 return true;
048 }
049 }
050 else {
051 _lastEntryReference.compareAndSet(lastEntryLink, nextEntryLink);
052 }
053 }
054 }
055
056 public IncreasableEntry<K, V> take() {
057 boolean[] marked = {false};
058
059 Retry:
060
061 while (true) {
062 Entry<K, V> predecessorEntry = _headEntry;
063 Entry<K, V> currentEntry =
064 predecessorEntry._nextEntry.getReference();
065
066 while (currentEntry != null) {
067 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
068 marked);
069
070 if (marked[0]) {
071 if (!predecessorEntry._nextEntry.compareAndSet(
072 currentEntry, successorEntry, false, false)) {
073
074 continue Retry;
075 }
076
077 currentEntry = predecessorEntry._nextEntry.getReference();
078
079 continue;
080 }
081
082 if (currentEntry._nextEntry.compareAndSet(
083 successorEntry, successorEntry, false, true)) {
084
085 return currentEntry._increasableEntry;
086 }
087 else {
088 continue Retry;
089 }
090 }
091
092 return null;
093 }
094 }
095
096 private boolean _doIncrease(IncreasableEntry<K, V> increasableEntry) {
097 boolean[] marked = {false};
098
099 Retry:
100
101 while (true) {
102 Entry<K, V> predecessorEntry = _headEntry;
103 Entry<K, V> currentEntry =
104 predecessorEntry._nextEntry.getReference();
105
106 while (currentEntry != null) {
107 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
108 marked);
109
110 if (marked[0]) {
111 if (!predecessorEntry._nextEntry.compareAndSet(
112 currentEntry, successorEntry, false, false)) {
113
114 continue Retry;
115 }
116
117 currentEntry = predecessorEntry._nextEntry.getReference();
118
119 continue;
120 }
121
122 if (currentEntry._increasableEntry.getKey().equals(
123 increasableEntry.getKey())) {
124
125 return currentEntry._increasableEntry.increase(
126 increasableEntry.getValue());
127 }
128
129 predecessorEntry = currentEntry;
130 currentEntry = successorEntry;
131 }
132
133 _lastEntryReference.set(predecessorEntry);
134
135 return false;
136 }
137 }
138
139 private final Entry<K, V> _headEntry;
140 private final AtomicReference<Entry<K, V>> _lastEntryReference;
141
142 private static class Entry<K, V> {
143
144 private Entry(IncreasableEntry<K, V> increasableEntry) {
145 _increasableEntry = increasableEntry;
146 _nextEntry = new AtomicMarkableReference<Entry<K, V>>(null, false);
147 }
148
149 private IncreasableEntry<K, V> _increasableEntry;
150 private AtomicMarkableReference<Entry<K, V>> _nextEntry;
151
152 }
153
154 }