001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.counter.service.persistence.impl;
016    
017    import com.liferay.counter.kernel.model.Counter;
018    import com.liferay.counter.kernel.service.persistence.CounterFinder;
019    import com.liferay.counter.model.CounterHolder;
020    import com.liferay.counter.model.CounterRegister;
021    import com.liferay.counter.model.impl.CounterImpl;
022    import com.liferay.portal.kernel.cache.CacheRegistryItem;
023    import com.liferay.portal.kernel.concurrent.CompeteLatch;
024    import com.liferay.portal.kernel.dao.jdbc.DataAccess;
025    import com.liferay.portal.kernel.dao.orm.LockMode;
026    import com.liferay.portal.kernel.dao.orm.ObjectNotFoundException;
027    import com.liferay.portal.kernel.dao.orm.Session;
028    import com.liferay.portal.kernel.exception.SystemException;
029    import com.liferay.portal.kernel.model.Dummy;
030    import com.liferay.portal.kernel.service.persistence.impl.BasePersistenceImpl;
031    import com.liferay.portal.kernel.util.CharPool;
032    import com.liferay.portal.kernel.util.GetterUtil;
033    import com.liferay.portal.kernel.util.PropsKeys;
034    import com.liferay.portal.util.PropsUtil;
035    import com.liferay.portal.util.PropsValues;
036    
037    import java.sql.Connection;
038    import java.sql.PreparedStatement;
039    import java.sql.ResultSet;
040    import java.sql.SQLException;
041    
042    import java.util.ArrayList;
043    import java.util.List;
044    import java.util.Map;
045    import java.util.concurrent.ConcurrentHashMap;
046    
047    /**
048     * @author Brian Wing Shun Chan
049     * @author Harry Mark
050     * @author Michael Young
051     * @author Shuyang Zhou
052     * @author Edward Han
053     */
054    public class CounterFinderImpl
055            extends BasePersistenceImpl<Dummy>
056            implements CacheRegistryItem, CounterFinder {
057    
058            @Override
059            public List<String> getNames() {
060                    Connection connection = null;
061                    PreparedStatement preparedStatement = null;
062                    ResultSet resultSet = null;
063    
064                    try {
065                            connection = getConnection();
066    
067                            preparedStatement = connection.prepareStatement(_SQL_SELECT_NAMES);
068    
069                            resultSet = preparedStatement.executeQuery();
070    
071                            List<String> list = new ArrayList<>();
072    
073                            while (resultSet.next()) {
074                                    list.add(resultSet.getString(1));
075                            }
076    
077                            return list;
078                    }
079                    catch (SQLException sqle) {
080                            throw processException(sqle);
081                    }
082                    finally {
083                            DataAccess.cleanUp(connection, preparedStatement, resultSet);
084                    }
085            }
086    
087            @Override
088            public String getRegistryName() {
089                    return CounterFinderImpl.class.getName();
090            }
091    
092            @Override
093            public long increment() {
094                    return increment(_NAME);
095            }
096    
097            @Override
098            public long increment(String name) {
099                    return increment(name, _MINIMUM_INCREMENT_SIZE);
100            }
101    
102            @Override
103            public long increment(String name, int size) {
104                    if (size < _MINIMUM_INCREMENT_SIZE) {
105                            size = _MINIMUM_INCREMENT_SIZE;
106                    }
107    
108                    CounterRegister counterRegister = getCounterRegister(name);
109    
110                    return _competeIncrement(counterRegister, size);
111            }
112    
113            @Override
114            public void invalidate() {
115                    _counterRegisterMap.clear();
116            }
117    
118            @Override
119            public void rename(String oldName, String newName) {
120                    CounterRegister counterRegister = getCounterRegister(oldName);
121    
122                    synchronized (counterRegister) {
123                            if (_counterRegisterMap.containsKey(newName)) {
124                                    throw new SystemException(
125                                            "Cannot rename " + oldName + " to " + newName);
126                            }
127    
128                            Connection connection = null;
129                            PreparedStatement preparedStatement = null;
130    
131                            try {
132                                    connection = getConnection();
133    
134                                    preparedStatement = connection.prepareStatement(
135                                            _SQL_UPDATE_NAME_BY_NAME);
136    
137                                    preparedStatement.setString(1, newName);
138                                    preparedStatement.setString(2, oldName);
139    
140                                    preparedStatement.executeUpdate();
141                            }
142                            catch (ObjectNotFoundException onfe) {
143                            }
144                            catch (Exception e) {
145                                    throw processException(e);
146                            }
147                            finally {
148                                    DataAccess.cleanUp(connection, preparedStatement);
149                            }
150    
151                            counterRegister.setName(newName);
152    
153                            _counterRegisterMap.put(newName, counterRegister);
154                            _counterRegisterMap.remove(oldName);
155                    }
156            }
157    
158            @Override
159            public void reset(String name) {
160                    CounterRegister counterRegister = getCounterRegister(name);
161    
162                    synchronized (counterRegister) {
163                            Session session = null;
164    
165                            try {
166                                    session = openSession();
167    
168                                    Counter counter = (Counter)session.get(CounterImpl.class, name);
169    
170                                    session.delete(counter);
171    
172                                    session.flush();
173                            }
174                            catch (ObjectNotFoundException onfe) {
175                            }
176                            catch (Exception e) {
177                                    throw processException(e);
178                            }
179                            finally {
180                                    closeSession(session);
181                            }
182    
183                            _counterRegisterMap.remove(name);
184                    }
185            }
186    
187            @Override
188            public void reset(String name, long size) {
189                    CounterRegister counterRegister = createCounterRegister(name, size);
190    
191                    _counterRegisterMap.put(name, counterRegister);
192            }
193    
194            protected CounterRegister createCounterRegister(String name) {
195                    return createCounterRegister(name, -1);
196            }
197    
198            protected CounterRegister createCounterRegister(String name, long size) {
199                    long rangeMin = -1;
200                    int rangeSize = getRangeSize(name);
201    
202                    Connection connection = null;
203                    PreparedStatement preparedStatement = null;
204                    ResultSet resultSet = null;
205    
206                    try {
207                            connection = getConnection();
208    
209                            preparedStatement = connection.prepareStatement(
210                                    _SQL_SELECT_ID_BY_NAME);
211    
212                            preparedStatement.setString(1, name);
213    
214                            resultSet = preparedStatement.executeQuery();
215    
216                            if (!resultSet.next()) {
217                                    rangeMin = _DEFAULT_CURRENT_ID;
218    
219                                    if (size > rangeMin) {
220                                            rangeMin = size;
221                                    }
222    
223                                    resultSet.close();
224                                    preparedStatement.close();
225    
226                                    preparedStatement = connection.prepareStatement(_SQL_INSERT);
227    
228                                    preparedStatement.setString(1, name);
229                                    preparedStatement.setLong(2, rangeMin);
230    
231                                    preparedStatement.executeUpdate();
232                            }
233                    }
234                    catch (Exception e) {
235                            throw processException(e);
236                    }
237                    finally {
238                            DataAccess.cleanUp(connection, preparedStatement, resultSet);
239                    }
240    
241                    CounterHolder counterHolder = _obtainIncrement(name, rangeSize, size);
242    
243                    return new CounterRegister(name, counterHolder, rangeSize);
244            }
245    
246            protected Connection getConnection() throws SQLException {
247                    Connection connection = getDataSource().getConnection();
248    
249                    return connection;
250            }
251    
252            protected CounterRegister getCounterRegister(String name) {
253                    CounterRegister counterRegister = _counterRegisterMap.get(name);
254    
255                    if (counterRegister != null) {
256                            return counterRegister;
257                    }
258    
259                    synchronized (_counterRegisterMap) {
260    
261                            // Double check
262    
263                            counterRegister = _counterRegisterMap.get(name);
264    
265                            if (counterRegister == null) {
266                                    counterRegister = createCounterRegister(name);
267    
268                                    _counterRegisterMap.put(name, counterRegister);
269                            }
270    
271                            return counterRegister;
272                    }
273            }
274    
275            protected int getRangeSize(String name) {
276                    if (name.equals(_NAME)) {
277                            return PropsValues.COUNTER_INCREMENT;
278                    }
279    
280                    String incrementType = null;
281    
282                    int pos = name.indexOf(CharPool.POUND);
283    
284                    if (pos != -1) {
285                            incrementType = name.substring(0, pos);
286                    }
287                    else {
288                            incrementType = name;
289                    }
290    
291                    Integer rangeSize = _rangeSizeMap.get(incrementType);
292    
293                    if (rangeSize == null) {
294                            rangeSize = GetterUtil.getInteger(
295                                    PropsUtil.get(
296                                            PropsKeys.COUNTER_INCREMENT_PREFIX + incrementType),
297                                    PropsValues.COUNTER_INCREMENT);
298    
299                            _rangeSizeMap.put(incrementType, rangeSize);
300                    }
301    
302                    return rangeSize.intValue();
303            }
304    
305            private long _competeIncrement(CounterRegister counterRegister, int size) {
306                    CounterHolder counterHolder = counterRegister.getCounterHolder();
307    
308                    // Try to use the fast path
309    
310                    long newValue = counterHolder.addAndGet(size);
311    
312                    if (newValue <= counterHolder.getRangeMax()) {
313                            return newValue;
314                    }
315    
316                    // Use the slow path
317    
318                    CompeteLatch competeLatch = counterRegister.getCompeteLatch();
319    
320                    if (!competeLatch.compete()) {
321    
322                            // Loser thread has to wait for the winner thread to finish its job
323    
324                            try {
325                                    competeLatch.await();
326                            }
327                            catch (InterruptedException ie) {
328                                    throw processException(ie);
329                            }
330    
331                            // Compete again
332    
333                            return _competeIncrement(counterRegister, size);
334                    }
335    
336                    // Winner thread
337    
338                    try {
339    
340                            // Double check
341    
342                            counterHolder = counterRegister.getCounterHolder();
343                            newValue = counterHolder.addAndGet(size);
344    
345                            if (newValue > counterHolder.getRangeMax()) {
346                                    CounterHolder newCounterHolder = _obtainIncrement(
347                                            counterRegister.getName(), counterRegister.getRangeSize(),
348                                            0);
349    
350                                    newValue = newCounterHolder.addAndGet(size);
351    
352                                    counterRegister.setCounterHolder(newCounterHolder);
353                            }
354                    }
355                    catch (Exception e) {
356                            throw processException(e);
357                    }
358                    finally {
359    
360                            // Winner thread opens the latch so that loser threads can continue
361    
362                            competeLatch.done();
363                    }
364    
365                    return newValue;
366            }
367    
368            private CounterHolder _obtainIncrement(
369                    String counterName, long range, long size) {
370    
371                    Session session = null;
372    
373                    try {
374                            session = openSession();
375    
376                            Counter counter = (Counter)session.get(
377                                    CounterImpl.class, counterName, LockMode.UPGRADE);
378    
379                            long newValue = counter.getCurrentId();
380    
381                            if (size > newValue) {
382                                    newValue = size;
383                            }
384    
385                            long rangeMax = newValue + range;
386    
387                            counter.setCurrentId(rangeMax);
388    
389                            CounterHolder counterHolder = new CounterHolder(newValue, rangeMax);
390    
391                            session.saveOrUpdate(counter);
392    
393                            session.flush();
394    
395                            return counterHolder;
396                    }
397                    catch (Exception e) {
398                            throw processException(e);
399                    }
400                    finally {
401                            closeSession(session);
402                    }
403            }
404    
405            private static final int _DEFAULT_CURRENT_ID = 0;
406    
407            private static final int _MINIMUM_INCREMENT_SIZE = 1;
408    
409            private static final String _NAME = Counter.class.getName();
410    
411            private static final String _SQL_INSERT =
412                    "insert into Counter(name, currentId) values (?, ?)";
413    
414            private static final String _SQL_SELECT_ID_BY_NAME =
415                    "select currentId from Counter where name = ?";
416    
417            private static final String _SQL_SELECT_NAMES =
418                    "select name from Counter order by name asc";
419    
420            private static final String _SQL_UPDATE_NAME_BY_NAME =
421                    "update Counter set name = ? where name = ?";
422    
423            private final Map<String, CounterRegister> _counterRegisterMap =
424                    new ConcurrentHashMap<>();
425            private final Map<String, Integer> _rangeSizeMap =
426                    new ConcurrentHashMap<>();
427    
428    }