001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.concurrent.atomic.AtomicMarkableReference;
018    import java.util.concurrent.atomic.AtomicReference;
019    
020    /**
021     * @author Shuyang Zhou
022     */
023    public class BatchablePipe<K, V> {
024    
025            public BatchablePipe() {
026                    _headEntry = new Entry<K, V>(null);
027                    _lastEntryReference = new AtomicReference<Entry<K, V>>(_headEntry);
028            }
029    
030            public boolean put(IncreasableEntry<K, V> increasableEntry) {
031                    Entry<K, V> newEntry = new Entry<K, V>(increasableEntry);
032    
033                    while (true) {
034                            if (_doIncrease(increasableEntry)) {
035                                    return false;
036                            }
037    
038                            Entry<K, V> lastEntryLink = _lastEntryReference.get();
039                            Entry<K, V> nextEntryLink = lastEntryLink._nextEntry.getReference();
040    
041                            if (nextEntryLink == null) {
042                                    if (lastEntryLink._nextEntry.compareAndSet(
043                                                    null, newEntry, false, false)) {
044    
045                                            _lastEntryReference.set(newEntry);
046    
047                                            return true;
048                                    }
049                            }
050                            else {
051                                    _lastEntryReference.compareAndSet(lastEntryLink, nextEntryLink);
052                            }
053                    }
054            }
055    
056            public IncreasableEntry<K, V> take() {
057                    boolean[] marked = {false};
058    
059                    take:
060                    while (true) {
061                            Entry<K, V> predecessorEntry = _headEntry;
062                            Entry<K, V> currentEntry =
063                                    predecessorEntry._nextEntry.getReference();
064    
065                            while (currentEntry != null) {
066                                    Entry<K, V> successorEntry = currentEntry._nextEntry.get(
067                                            marked);
068    
069                                    if (marked[0]) {
070                                            if (!predecessorEntry._nextEntry.compareAndSet(
071                                                            currentEntry, successorEntry, false, false)) {
072    
073                                                    continue take;
074                                            }
075    
076                                            currentEntry = predecessorEntry._nextEntry.getReference();
077    
078                                            continue;
079                                    }
080    
081                                    if (currentEntry._nextEntry.compareAndSet(
082                                                    successorEntry, successorEntry, false, true)) {
083    
084                                            return currentEntry._increasableEntry;
085                                    }
086    
087                                    continue take;
088                            }
089    
090                            return null;
091                    }
092            }
093    
094            private boolean _doIncrease(IncreasableEntry<K, V> increasableEntry) {
095                    boolean[] marked = {false};
096    
097                    retry:
098                    while (true) {
099                            Entry<K, V> predecessorEntry = _headEntry;
100                            Entry<K, V> currentEntry =
101                                    predecessorEntry._nextEntry.getReference();
102    
103                            while (currentEntry != null) {
104                                    Entry<K, V> successorEntry = currentEntry._nextEntry.get(
105                                            marked);
106    
107                                    if (marked[0]) {
108                                            if (!predecessorEntry._nextEntry.compareAndSet(
109                                                            currentEntry, successorEntry, false, false)) {
110    
111                                                    continue retry;
112                                            }
113    
114                                            currentEntry = predecessorEntry._nextEntry.getReference();
115    
116                                            continue;
117                                    }
118    
119                                    if (currentEntry._increasableEntry.getKey().equals(
120                                                    increasableEntry.getKey())) {
121    
122                                            return currentEntry._increasableEntry.increase(
123                                                    increasableEntry.getValue());
124                                    }
125    
126                                    predecessorEntry = currentEntry;
127                                    currentEntry = successorEntry;
128                            }
129    
130                            _lastEntryReference.set(predecessorEntry);
131    
132                            return false;
133                    }
134            }
135    
136            private final Entry<K, V> _headEntry;
137            private final AtomicReference<Entry<K, V>> _lastEntryReference;
138    
139            private static class Entry<K, V> {
140    
141                    private Entry(IncreasableEntry<K, V> increasableEntry) {
142                            _increasableEntry = increasableEntry;
143                            _nextEntry = new AtomicMarkableReference<Entry<K, V>>(null, false);
144                    }
145    
146                    private IncreasableEntry<K, V> _increasableEntry;
147                    private AtomicMarkableReference<Entry<K, V>> _nextEntry;
148    
149            }
150    
151    }