001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.counter.service.persistence;
016    
017    import com.liferay.counter.model.Counter;
018    import com.liferay.counter.model.CounterHolder;
019    import com.liferay.counter.model.CounterRegister;
020    import com.liferay.counter.model.impl.CounterImpl;
021    import com.liferay.portal.kernel.cache.CacheRegistryItem;
022    import com.liferay.portal.kernel.cache.CacheRegistryUtil;
023    import com.liferay.portal.kernel.concurrent.CompeteLatch;
024    import com.liferay.portal.kernel.dao.jdbc.DataAccess;
025    import com.liferay.portal.kernel.dao.orm.LockMode;
026    import com.liferay.portal.kernel.dao.orm.ObjectNotFoundException;
027    import com.liferay.portal.kernel.dao.orm.Session;
028    import com.liferay.portal.kernel.exception.SystemException;
029    import com.liferay.portal.kernel.util.CharPool;
030    import com.liferay.portal.kernel.util.GetterUtil;
031    import com.liferay.portal.kernel.util.PropsKeys;
032    import com.liferay.portal.model.Dummy;
033    import com.liferay.portal.service.persistence.impl.BasePersistenceImpl;
034    import com.liferay.portal.util.PropsUtil;
035    import com.liferay.portal.util.PropsValues;
036    
037    import java.sql.Connection;
038    import java.sql.PreparedStatement;
039    import java.sql.ResultSet;
040    import java.sql.SQLException;
041    
042    import java.util.ArrayList;
043    import java.util.List;
044    import java.util.Map;
045    import java.util.concurrent.ConcurrentHashMap;
046    
047    /**
048     * @author Brian Wing Shun Chan
049     * @author Harry Mark
050     * @author Michael Young
051     * @author Shuyang Zhou
052     * @author Edward Han
053     */
054    public class CounterFinderImpl
055            extends BasePersistenceImpl<Dummy>
056            implements CacheRegistryItem, CounterFinder {
057    
058            @Override
059            public void afterPropertiesSet() {
060                    CacheRegistryUtil.register(this);
061            }
062    
063            @Override
064            public List<String> getNames() throws SystemException {
065                    Connection connection = null;
066                    PreparedStatement preparedStatement = null;
067                    ResultSet resultSet = null;
068    
069                    try {
070                            connection = getConnection();
071    
072                            preparedStatement = connection.prepareStatement(_SQL_SELECT_NAMES);
073    
074                            resultSet = preparedStatement.executeQuery();
075    
076                            List<String> list = new ArrayList<String>();
077    
078                            while (resultSet.next()) {
079                                    list.add(resultSet.getString(1));
080                            }
081    
082                            return list;
083                    }
084                    catch (SQLException sqle) {
085                            throw processException(sqle);
086                    }
087                    finally {
088                            DataAccess.cleanUp(connection, preparedStatement, resultSet);
089                    }
090            }
091    
092            @Override
093            public String getRegistryName() {
094                    return CounterFinderImpl.class.getName();
095            }
096    
097            @Override
098            public long increment() throws SystemException {
099                    return increment(_NAME);
100            }
101    
102            @Override
103            public long increment(String name) throws SystemException {
104                    return increment(name, _MINIMUM_INCREMENT_SIZE);
105            }
106    
107            @Override
108            public long increment(String name, int size) throws SystemException {
109                    if (size < _MINIMUM_INCREMENT_SIZE) {
110                            size = _MINIMUM_INCREMENT_SIZE;
111                    }
112    
113                    CounterRegister counterRegister = getCounterRegister(name);
114    
115                    return _competeIncrement(counterRegister, size);
116            }
117    
118            @Override
119            public void invalidate() {
120                    _counterRegisterMap.clear();
121            }
122    
123            @Override
124            public void rename(String oldName, String newName) throws SystemException {
125                    CounterRegister counterRegister = getCounterRegister(oldName);
126    
127                    synchronized (counterRegister) {
128                            if (_counterRegisterMap.containsKey(newName)) {
129                                    throw new SystemException(
130                                            "Cannot rename " + oldName + " to " + newName);
131                            }
132    
133                            Connection connection = null;
134                            PreparedStatement preparedStatement = null;
135    
136                            try {
137                                    connection = getConnection();
138    
139                                    preparedStatement = connection.prepareStatement(
140                                            _SQL_UPDATE_NAME_BY_NAME);
141    
142                                    preparedStatement.setString(1, newName);
143                                    preparedStatement.setString(2, oldName);
144    
145                                    preparedStatement.executeUpdate();
146                            }
147                            catch (ObjectNotFoundException onfe) {
148                            }
149                            catch (Exception e) {
150                                    throw processException(e);
151                            }
152                            finally {
153                                    DataAccess.cleanUp(connection, preparedStatement);
154                            }
155    
156                            counterRegister.setName(newName);
157    
158                            _counterRegisterMap.put(newName, counterRegister);
159                            _counterRegisterMap.remove(oldName);
160                    }
161            }
162    
163            @Override
164            public void reset(String name) throws SystemException {
165                    CounterRegister counterRegister = getCounterRegister(name);
166    
167                    synchronized (counterRegister) {
168                            Session session = null;
169    
170                            try {
171                                    session = openSession();
172    
173                                    Counter counter = (Counter)session.get(CounterImpl.class, name);
174    
175                                    session.delete(counter);
176    
177                                    session.flush();
178                            }
179                            catch (ObjectNotFoundException onfe) {
180                            }
181                            catch (Exception e) {
182                                    throw processException(e);
183                            }
184                            finally {
185                                    closeSession(session);
186                            }
187    
188                            _counterRegisterMap.remove(name);
189                    }
190            }
191    
192            @Override
193            public void reset(String name, long size) throws SystemException {
194                    CounterRegister counterRegister = createCounterRegister(name, size);
195    
196                    _counterRegisterMap.put(name, counterRegister);
197            }
198    
199            protected CounterRegister createCounterRegister(String name)
200                    throws SystemException {
201    
202                    return createCounterRegister(name, -1);
203            }
204    
205            protected CounterRegister createCounterRegister(String name, long size)
206                    throws SystemException {
207    
208                    long rangeMin = -1;
209                    int rangeSize = getRangeSize(name);
210    
211                    Connection connection = null;
212                    PreparedStatement preparedStatement = null;
213                    ResultSet resultSet = null;
214    
215                    try {
216                            connection = getConnection();
217    
218                            preparedStatement = connection.prepareStatement(
219                                    _SQL_SELECT_ID_BY_NAME);
220    
221                            preparedStatement.setString(1, name);
222    
223                            resultSet = preparedStatement.executeQuery();
224    
225                            if (!resultSet.next()) {
226                                    rangeMin = _DEFAULT_CURRENT_ID;
227    
228                                    if (size > rangeMin) {
229                                            rangeMin = size;
230                                    }
231    
232                                    resultSet.close();
233                                    preparedStatement.close();
234    
235                                    preparedStatement = connection.prepareStatement(_SQL_INSERT);
236    
237                                    preparedStatement.setString(1, name);
238                                    preparedStatement.setLong(2, rangeMin);
239    
240                                    preparedStatement.executeUpdate();
241                            }
242                    }
243                    catch (Exception e) {
244                            throw processException(e);
245                    }
246                    finally {
247                            DataAccess.cleanUp(connection, preparedStatement, resultSet);
248                    }
249    
250                    CounterHolder counterHolder = _obtainIncrement(name, rangeSize, size);
251    
252                    return new CounterRegister(name, counterHolder, rangeSize);
253            }
254    
255            protected Connection getConnection() throws SQLException {
256                    Connection connection = getDataSource().getConnection();
257    
258                    return connection;
259            }
260    
261            protected CounterRegister getCounterRegister(String name)
262                    throws SystemException {
263    
264                    CounterRegister counterRegister = _counterRegisterMap.get(name);
265    
266                    if (counterRegister != null) {
267                            return counterRegister;
268                    }
269                    else {
270                            synchronized (_counterRegisterMap) {
271    
272                                    // Double check
273    
274                                    counterRegister = _counterRegisterMap.get(name);
275    
276                                    if (counterRegister == null) {
277                                            counterRegister = createCounterRegister(name);
278    
279                                            _counterRegisterMap.put(name, counterRegister);
280                                    }
281    
282                                    return counterRegister;
283                            }
284                    }
285            }
286    
287            protected int getRangeSize(String name) {
288                    if (name.equals(_NAME)) {
289                            return PropsValues.COUNTER_INCREMENT;
290                    }
291    
292                    String incrementType = null;
293    
294                    int pos = name.indexOf(CharPool.POUND);
295    
296                    if (pos != -1) {
297                            incrementType = name.substring(0, pos);
298                    }
299                    else {
300                            incrementType = name;
301                    }
302    
303                    Integer rangeSize = _rangeSizeMap.get(incrementType);
304    
305                    if (rangeSize == null) {
306                            rangeSize = GetterUtil.getInteger(
307                                    PropsUtil.get(
308                                            PropsKeys.COUNTER_INCREMENT_PREFIX + incrementType),
309                                    PropsValues.COUNTER_INCREMENT);
310    
311                            _rangeSizeMap.put(incrementType, rangeSize);
312                    }
313    
314                    return rangeSize.intValue();
315            }
316    
317            private long _competeIncrement(CounterRegister counterRegister, int size)
318                    throws SystemException {
319    
320                    CounterHolder counterHolder = counterRegister.getCounterHolder();
321    
322                    // Try to use the fast path
323    
324                    long newValue = counterHolder.addAndGet(size);
325    
326                    if (newValue <= counterHolder.getRangeMax()) {
327                            return newValue;
328                    }
329    
330                    // Use the slow path
331    
332                    CompeteLatch completeLatch = counterRegister.getCompeteLatch();
333    
334                    if (!completeLatch.compete()) {
335    
336                            // Loser thread has to wait for the winner thread to finish its job
337    
338                            try {
339                                    completeLatch.await();
340                            }
341                            catch (InterruptedException ie) {
342                                    throw processException(ie);
343                            }
344    
345                            // Compete again
346    
347                            return _competeIncrement(counterRegister, size);
348                    }
349    
350                    // Winner thread
351    
352                    try {
353    
354                            // Double check
355    
356                            counterHolder = counterRegister.getCounterHolder();
357                            newValue = counterHolder.addAndGet(size);
358    
359                            if (newValue > counterHolder.getRangeMax()) {
360                                    CounterHolder newCounterHolder = _obtainIncrement(
361                                            counterRegister.getName(), counterRegister.getRangeSize(),
362                                            0);
363    
364                                    newValue = newCounterHolder.addAndGet(size);
365    
366                                    counterRegister.setCounterHolder(newCounterHolder);
367                            }
368                    }
369                    catch (Exception e) {
370                            throw processException(e);
371                    }
372                    finally {
373    
374                            // Winner thread opens the latch so that loser threads can continue
375    
376                            completeLatch.done();
377                    }
378    
379                    return newValue;
380            }
381    
382            private CounterHolder _obtainIncrement(
383                            String counterName, long range, long size)
384                    throws SystemException {
385    
386                    Session session = null;
387    
388                    try {
389                            session = openSession();
390    
391                            Counter counter = (Counter)session.get(
392                                    CounterImpl.class, counterName, LockMode.UPGRADE);
393    
394                            long newValue = counter.getCurrentId();
395    
396                            if (size > newValue) {
397                                    newValue = size;
398                            }
399    
400                            long rangeMax = newValue + range;
401    
402                            counter.setCurrentId(rangeMax);
403    
404                            CounterHolder counterHolder = new CounterHolder(newValue, rangeMax);
405    
406                            session.saveOrUpdate(counter);
407    
408                            session.flush();
409    
410                            return counterHolder;
411                    }
412                    catch (Exception e) {
413                            throw processException(e);
414                    }
415                    finally {
416                            closeSession(session);
417                    }
418            }
419    
420            private static final int _DEFAULT_CURRENT_ID = 0;
421    
422            private static final int _MINIMUM_INCREMENT_SIZE = 1;
423    
424            private static final String _NAME = Counter.class.getName();
425    
426            private static final String _SQL_INSERT =
427                    "insert into Counter(name, currentId) values (?, ?)";
428    
429            private static final String _SQL_SELECT_ID_BY_NAME =
430                    "select currentId from Counter where name = ?";
431    
432            private static final String _SQL_SELECT_NAMES =
433                    "select name from Counter order by name asc";
434    
435            private static final String _SQL_UPDATE_NAME_BY_NAME =
436                    "update Counter set name = ? where name = ?";
437    
438            private Map<String, CounterRegister> _counterRegisterMap =
439                    new ConcurrentHashMap<String, CounterRegister>();
440            private Map<String, Integer> _rangeSizeMap =
441                    new ConcurrentHashMap<String, Integer>();
442    
443    }