001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.concurrent.atomic.AtomicMarkableReference;
018 import java.util.concurrent.atomic.AtomicReference;
019
020
023 public class BatchablePipe<K, V> {
024
025 public BatchablePipe() {
026 _headEntry = new Entry<K, V>(null);
027 _lastEntryReference = new AtomicReference<Entry<K, V>>(_headEntry);
028 }
029
030 public boolean put(IncreasableEntry<K, V> increasableEntry) {
031 Entry<K, V> newEntry = new Entry<K, V>(increasableEntry);
032
033 while (true) {
034 if (_doIncrease(increasableEntry)) {
035 return false;
036 }
037
038 Entry<K, V> lastEntryLink = _lastEntryReference.get();
039 Entry<K, V> nextEntryLink = lastEntryLink._nextEntry.getReference();
040
041 if (nextEntryLink == null) {
042 if (lastEntryLink._nextEntry.compareAndSet(
043 null, newEntry, false, false)) {
044
045 _lastEntryReference.set(newEntry);
046
047 return true;
048 }
049 }
050 else {
051 _lastEntryReference.compareAndSet(lastEntryLink, nextEntryLink);
052 }
053 }
054 }
055
056 public IncreasableEntry<K, V> take() {
057 boolean[] marked = {false};
058
059 take:
060 while (true) {
061 Entry<K, V> predecessorEntry = _headEntry;
062 Entry<K, V> currentEntry =
063 predecessorEntry._nextEntry.getReference();
064
065 while (currentEntry != null) {
066 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
067 marked);
068
069 if (marked[0]) {
070 if (!predecessorEntry._nextEntry.compareAndSet(
071 currentEntry, successorEntry, false, false)) {
072
073 continue take;
074 }
075
076 currentEntry = predecessorEntry._nextEntry.getReference();
077
078 continue;
079 }
080
081 if (currentEntry._nextEntry.compareAndSet(
082 successorEntry, successorEntry, false, true)) {
083
084 return currentEntry._increasableEntry;
085 }
086
087 continue take;
088 }
089
090 return null;
091 }
092 }
093
094 private boolean _doIncrease(IncreasableEntry<K, V> increasableEntry) {
095 boolean[] marked = {false};
096
097 retry:
098 while (true) {
099 Entry<K, V> predecessorEntry = _headEntry;
100 Entry<K, V> currentEntry =
101 predecessorEntry._nextEntry.getReference();
102
103 while (currentEntry != null) {
104 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
105 marked);
106
107 if (marked[0]) {
108 if (!predecessorEntry._nextEntry.compareAndSet(
109 currentEntry, successorEntry, false, false)) {
110
111 continue retry;
112 }
113
114 currentEntry = predecessorEntry._nextEntry.getReference();
115
116 continue;
117 }
118
119 if (currentEntry._increasableEntry.getKey().equals(
120 increasableEntry.getKey())) {
121
122 return currentEntry._increasableEntry.increase(
123 increasableEntry.getValue());
124 }
125
126 predecessorEntry = currentEntry;
127 currentEntry = successorEntry;
128 }
129
130 _lastEntryReference.set(predecessorEntry);
131
132 return false;
133 }
134 }
135
136 private final Entry<K, V> _headEntry;
137 private final AtomicReference<Entry<K, V>> _lastEntryReference;
138
139 private static class Entry<K, V> {
140
141 private Entry(IncreasableEntry<K, V> increasableEntry) {
142 _increasableEntry = increasableEntry;
143 _nextEntry = new AtomicMarkableReference<Entry<K, V>>(null, false);
144 }
145
146 private IncreasableEntry<K, V> _increasableEntry;
147 private AtomicMarkableReference<Entry<K, V>> _nextEntry;
148
149 }
150
151 }