001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.concurrent.atomic.AtomicMarkableReference;
018    import java.util.concurrent.atomic.AtomicReference;
019    
020    /**
021     * @author Shuyang Zhou
022     */
023    public class BatchablePipe<K, V> {
024    
025            public BatchablePipe() {
026                    _headEntry = new Entry<K, V>(null);
027                    _lastEntryReference = new AtomicReference<Entry<K, V>>(_headEntry);
028            }
029    
030            public boolean put(IncreasableEntry<K, V> increasableEntry) {
031                    Entry<K, V> newEntry = new Entry<K, V>(increasableEntry);
032    
033                    while (true) {
034                            if (_doIncrease(increasableEntry)) {
035                                    return false;
036                            }
037    
038                            Entry<K, V> lastEntryLink = _lastEntryReference.get();
039                            Entry<K, V> nextEntryLink = lastEntryLink._nextEntry.getReference();
040    
041                            if (nextEntryLink == null) {
042                                    if (lastEntryLink._nextEntry.compareAndSet(
043                                                    null, newEntry, false, false)) {
044    
045                                            _lastEntryReference.set(newEntry);
046    
047                                            return true;
048                                    }
049                            }
050                            else {
051                                    _lastEntryReference.compareAndSet(lastEntryLink, nextEntryLink);
052                            }
053                    }
054            }
055    
056            public IncreasableEntry<K, V> take() {
057                    boolean[] marked = {false};
058    
059                    take:
060                    while (true) {
061                            Entry<K, V> predecessorEntry = _headEntry;
062                            Entry<K, V> currentEntry =
063                                    predecessorEntry._nextEntry.getReference();
064    
065                            while (currentEntry != null) {
066                                    Entry<K, V> successorEntry = currentEntry._nextEntry.get(
067                                            marked);
068    
069                                    if (marked[0]) {
070                                            if (!predecessorEntry._nextEntry.compareAndSet(
071                                                            currentEntry, successorEntry, false, false)) {
072    
073                                                    continue take;
074                                            }
075    
076                                            currentEntry = predecessorEntry._nextEntry.getReference();
077    
078                                            continue;
079                                    }
080    
081                                    if (currentEntry._nextEntry.compareAndSet(
082                                                    successorEntry, successorEntry, false, true)) {
083    
084                                            return currentEntry._increasableEntry;
085                                    }
086                                    else {
087                                            continue take;
088                                    }
089                            }
090    
091                            return null;
092                    }
093            }
094    
095            private boolean _doIncrease(IncreasableEntry<K, V> increasableEntry) {
096                    boolean[] marked = {false};
097    
098                    Retry:
099    
100                    while (true) {
101                            Entry<K, V> predecessorEntry = _headEntry;
102                            Entry<K, V> currentEntry =
103                                    predecessorEntry._nextEntry.getReference();
104    
105                            while (currentEntry != null) {
106                                    Entry<K, V> successorEntry = currentEntry._nextEntry.get(
107                                            marked);
108    
109                                    if (marked[0]) {
110                                            if (!predecessorEntry._nextEntry.compareAndSet(
111                                                            currentEntry, successorEntry, false, false)) {
112    
113                                                    continue Retry;
114                                            }
115    
116                                            currentEntry = predecessorEntry._nextEntry.getReference();
117    
118                                            continue;
119                                    }
120    
121                                    if (currentEntry._increasableEntry.getKey().equals(
122                                                    increasableEntry.getKey())) {
123    
124                                            return currentEntry._increasableEntry.increase(
125                                                    increasableEntry.getValue());
126                                    }
127    
128                                    predecessorEntry = currentEntry;
129                                    currentEntry = successorEntry;
130                            }
131    
132                            _lastEntryReference.set(predecessorEntry);
133    
134                            return false;
135                    }
136            }
137    
138            private final Entry<K, V> _headEntry;
139            private final AtomicReference<Entry<K, V>> _lastEntryReference;
140    
141            private static class Entry<K, V> {
142    
143                    private Entry(IncreasableEntry<K, V> increasableEntry) {
144                            _increasableEntry = increasableEntry;
145                            _nextEntry = new AtomicMarkableReference<Entry<K, V>>(null, false);
146                    }
147    
148                    private IncreasableEntry<K, V> _increasableEntry;
149                    private AtomicMarkableReference<Entry<K, V>> _nextEntry;
150    
151            }
152    
153    }