001    /**
002     * Copyright (c) 2000-2010 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.concurrent;
016    
017    import java.util.concurrent.atomic.AtomicMarkableReference;
018    import java.util.concurrent.atomic.AtomicReference;
019    
020    /**
021     * @author Shuyang Zhou
022     */
023    public class BatchablePipe<K, V> {
024    
025            public BatchablePipe() {
026                    _headEntry = new Entry<K, V>(null);
027                    _lastEntryReference = new AtomicReference<Entry<K, V>>(_headEntry);
028            }
029    
030            public boolean put(IncreasableEntry<K, V> increasableEntry) {
031                    Entry<K, V> newEntry = new Entry<K, V>(increasableEntry);
032    
033                    while (true) {
034                            if (_doIncrease(increasableEntry)) {
035                                    return false;
036                            }
037    
038                            Entry<K, V> lastEntryLink = _lastEntryReference.get();
039                            Entry<K, V> nextEntryLink = lastEntryLink._nextEntry.getReference();
040    
041                            if (nextEntryLink == null) {
042                                    if (lastEntryLink._nextEntry.compareAndSet(
043                                                    null, newEntry, false, false)) {
044    
045                                            _lastEntryReference.set(newEntry);
046    
047                                            return true;
048                                    }
049                            }
050                            else {
051                                    _lastEntryReference.compareAndSet(lastEntryLink, nextEntryLink);
052                            }
053                    }
054            }
055    
056            public IncreasableEntry<K, V> take() {
057                    boolean[] marked = {false};
058    
059                    Retry:
060    
061                    while (true) {
062                            Entry<K, V> predecessorEntry = _headEntry;
063                            Entry<K, V> currentEntry =
064                                    predecessorEntry._nextEntry.getReference();
065    
066                            while (currentEntry != null) {
067                                    Entry<K, V> successorEntry = currentEntry._nextEntry.get(
068                                            marked);
069    
070                                    if (marked[0]) {
071                                            if (!predecessorEntry._nextEntry.compareAndSet(
072                                                            currentEntry, successorEntry, false, false)) {
073    
074                                                    continue Retry;
075                                            }
076    
077                                            currentEntry = predecessorEntry._nextEntry.getReference();
078    
079                                            continue;
080                                    }
081    
082                                    if (currentEntry._nextEntry.compareAndSet(
083                                                    successorEntry, successorEntry, false, true)) {
084    
085                                            return currentEntry._increasableEntry;
086                                    }
087                                    else {
088                                            continue Retry;
089                                    }
090                            }
091    
092                            return null;
093                    }
094            }
095    
096            private boolean _doIncrease(IncreasableEntry<K, V> increasableEntry) {
097                    boolean[] marked = {false};
098    
099                    Retry:
100    
101                    while (true) {
102                            Entry<K, V> predecessorEntry = _headEntry;
103                            Entry<K, V> currentEntry =
104                                    predecessorEntry._nextEntry.getReference();
105    
106                            while (currentEntry != null) {
107                                    Entry<K, V> successorEntry = currentEntry._nextEntry.get(
108                                            marked);
109    
110                                    if (marked[0]) {
111                                            if (!predecessorEntry._nextEntry.compareAndSet(
112                                                            currentEntry, successorEntry, false, false)) {
113    
114                                                    continue Retry;
115                                            }
116    
117                                            currentEntry = predecessorEntry._nextEntry.getReference();
118    
119                                            continue;
120                                    }
121    
122                                    if (currentEntry._increasableEntry.getKey().equals(
123                                                    increasableEntry.getKey())) {
124    
125                                            return currentEntry._increasableEntry.increase(
126                                                    increasableEntry.getValue());
127                                    }
128    
129                                    predecessorEntry = currentEntry;
130                                    currentEntry = successorEntry;
131                            }
132    
133                            _lastEntryReference.set(predecessorEntry);
134    
135                            return false;
136                    }
137            }
138    
139            private final Entry<K, V> _headEntry;
140            private final AtomicReference<Entry<K, V>> _lastEntryReference;
141    
142            private static class Entry<K, V> {
143    
144                    private Entry(IncreasableEntry<K, V> increasableEntry) {
145                            _increasableEntry = increasableEntry;
146                            _nextEntry = new AtomicMarkableReference<Entry<K, V>>(null, false);
147                    }
148    
149                    private IncreasableEntry<K, V> _increasableEntry;
150                    private AtomicMarkableReference<Entry<K, V>> _nextEntry;
151    
152            }
153    
154    }