001
014
015 package com.liferay.portal.kernel.concurrent;
016
017 import java.util.concurrent.atomic.AtomicMarkableReference;
018 import java.util.concurrent.atomic.AtomicReference;
019
020
023 public class BatchablePipe<K, V> {
024
025 public BatchablePipe() {
026 _headEntry = new Entry<K, V>(null);
027 _lastEntryReference = new AtomicReference<Entry<K, V>>(_headEntry);
028 }
029
030 public boolean put(IncreasableEntry<K, V> increasableEntry) {
031 Entry<K, V> newEntry = new Entry<K, V>(increasableEntry);
032
033 while (true) {
034 if (_doIncrease(increasableEntry)) {
035 return false;
036 }
037
038 Entry<K, V> lastEntryLink = _lastEntryReference.get();
039 Entry<K, V> nextEntryLink = lastEntryLink._nextEntry.getReference();
040
041 if (nextEntryLink == null) {
042 if (lastEntryLink._nextEntry.compareAndSet(
043 null, newEntry, false, false)) {
044
045 _lastEntryReference.set(newEntry);
046
047 return true;
048 }
049 }
050 else {
051 _lastEntryReference.compareAndSet(lastEntryLink, nextEntryLink);
052 }
053 }
054 }
055
056 public IncreasableEntry<K, V> take() {
057 boolean[] marked = {false};
058
059 take:
060 while (true) {
061 Entry<K, V> predecessorEntry = _headEntry;
062 Entry<K, V> currentEntry =
063 predecessorEntry._nextEntry.getReference();
064
065 while (currentEntry != null) {
066 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
067 marked);
068
069 if (marked[0]) {
070 if (!predecessorEntry._nextEntry.compareAndSet(
071 currentEntry, successorEntry, false, false)) {
072
073 continue take;
074 }
075
076 currentEntry = predecessorEntry._nextEntry.getReference();
077
078 continue;
079 }
080
081 if (currentEntry._nextEntry.compareAndSet(
082 successorEntry, successorEntry, false, true)) {
083
084 return currentEntry._increasableEntry;
085 }
086 else {
087 continue take;
088 }
089 }
090
091 return null;
092 }
093 }
094
095 private boolean _doIncrease(IncreasableEntry<K, V> increasableEntry) {
096 boolean[] marked = {false};
097
098 Retry:
099
100 while (true) {
101 Entry<K, V> predecessorEntry = _headEntry;
102 Entry<K, V> currentEntry =
103 predecessorEntry._nextEntry.getReference();
104
105 while (currentEntry != null) {
106 Entry<K, V> successorEntry = currentEntry._nextEntry.get(
107 marked);
108
109 if (marked[0]) {
110 if (!predecessorEntry._nextEntry.compareAndSet(
111 currentEntry, successorEntry, false, false)) {
112
113 continue Retry;
114 }
115
116 currentEntry = predecessorEntry._nextEntry.getReference();
117
118 continue;
119 }
120
121 if (currentEntry._increasableEntry.getKey().equals(
122 increasableEntry.getKey())) {
123
124 return currentEntry._increasableEntry.increase(
125 increasableEntry.getValue());
126 }
127
128 predecessorEntry = currentEntry;
129 currentEntry = successorEntry;
130 }
131
132 _lastEntryReference.set(predecessorEntry);
133
134 return false;
135 }
136 }
137
138 private final Entry<K, V> _headEntry;
139 private final AtomicReference<Entry<K, V>> _lastEntryReference;
140
141 private static class Entry<K, V> {
142
143 private Entry(IncreasableEntry<K, V> increasableEntry) {
144 _increasableEntry = increasableEntry;
145 _nextEntry = new AtomicMarkableReference<Entry<K, V>>(null, false);
146 }
147
148 private IncreasableEntry<K, V> _increasableEntry;
149 private AtomicMarkableReference<Entry<K, V>> _nextEntry;
150
151 }
152
153 }