001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.log.Log;
018    import com.liferay.portal.kernel.log.LogFactoryUtil;
019    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020    import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021    import com.liferay.portal.kernel.process.ProcessCallable;
022    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
023    import com.liferay.portal.kernel.resiliency.spi.SPI;
024    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
025    import com.liferay.portal.kernel.search.SearchEngineUtil;
026    import com.liferay.portal.kernel.util.FileUtil;
027    import com.liferay.portal.kernel.util.InstanceFactory;
028    import com.liferay.portal.kernel.util.OSDetector;
029    import com.liferay.portal.kernel.util.StringPool;
030    import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
031    import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
032    import com.liferay.portal.util.ClassLoaderUtil;
033    import com.liferay.portal.util.PropsValues;
034    
035    import java.io.File;
036    import java.io.IOException;
037    import java.io.InputStream;
038    import java.io.OutputStream;
039    import java.io.Serializable;
040    
041    import java.util.Collection;
042    import java.util.Map;
043    import java.util.concurrent.ConcurrentHashMap;
044    import java.util.concurrent.Executors;
045    import java.util.concurrent.ScheduledExecutorService;
046    import java.util.concurrent.TimeUnit;
047    import java.util.concurrent.locks.Lock;
048    import java.util.concurrent.locks.ReentrantLock;
049    
050    import org.apache.lucene.analysis.Analyzer;
051    import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
052    import org.apache.lucene.document.Document;
053    import org.apache.lucene.index.IndexReader;
054    import org.apache.lucene.index.IndexWriter;
055    import org.apache.lucene.index.IndexWriterConfig;
056    import org.apache.lucene.index.LogMergePolicy;
057    import org.apache.lucene.index.MergePolicy;
058    import org.apache.lucene.index.MergeScheduler;
059    import org.apache.lucene.index.NoMergePolicy;
060    import org.apache.lucene.index.NoMergeScheduler;
061    import org.apache.lucene.index.Term;
062    import org.apache.lucene.search.IndexSearcher;
063    import org.apache.lucene.search.MatchAllDocsQuery;
064    import org.apache.lucene.search.ScoreDoc;
065    import org.apache.lucene.search.TopDocs;
066    import org.apache.lucene.store.Directory;
067    import org.apache.lucene.store.FSDirectory;
068    import org.apache.lucene.store.MMapDirectory;
069    import org.apache.lucene.store.RAMDirectory;
070    
071    /**
072     * @author Harry Mark
073     * @author Brian Wing Shun Chan
074     * @author Bruno Farache
075     * @author Shuyang Zhou
076     * @author Mate Thurzo
077     */
078    public class IndexAccessorImpl implements IndexAccessor {
079    
080            public IndexAccessorImpl(long companyId) {
081                    _companyId = companyId;
082    
083                    _path = PropsValues.LUCENE_DIR.concat(
084                            String.valueOf(_companyId)).concat(StringPool.SLASH);
085    
086                    try {
087                            if (!SPIUtil.isSPI()) {
088                                    _checkLuceneDir();
089                                    _initIndexWriter();
090                                    _initCommitScheduler();
091    
092                                    _indexSearcherManager = new IndexSearcherManager(_indexWriter);
093                            }
094                            else {
095                                    _indexSearcherManager = new IndexSearcherManager(
096                                            getLuceneDir());
097                            }
098                    }
099                    catch (IOException ioe) {
100                            _log.error(
101                                    "Unable to initialize index searcher manager for company " +
102                                            _companyId,
103                                    ioe);
104                    }
105            }
106    
107            @Override
108            public IndexSearcher acquireIndexSearcher() throws IOException {
109                    return _indexSearcherManager.acquire();
110            }
111    
112            @Override
113            public void addDocument(Document document) throws IOException {
114                    if (SearchEngineUtil.isIndexReadOnly()) {
115                            return;
116                    }
117    
118                    _write(null, document);
119            }
120    
121            @Override
122            public void addDocuments(Collection<Document> documents)
123                    throws IOException {
124    
125                    try {
126                            for (Document document : documents) {
127                                    _indexWriter.addDocument(document);
128                            }
129    
130                            _batchCount++;
131                    }
132                    finally {
133                            _commit();
134                    }
135            }
136    
137            @Override
138            public void close() {
139                    if (SPIUtil.isSPI()) {
140                            return;
141                    }
142    
143                    try {
144                            _indexSearcherManager.close();
145    
146                            _indexWriter.close();
147    
148                            _directory.close();
149                    }
150                    catch (Exception e) {
151                            _log.error("Closing Lucene writer failed for " + _companyId, e);
152                    }
153    
154                    if (_scheduledExecutorService != null) {
155                            _scheduledExecutorService.shutdownNow();
156                    }
157            }
158    
159            @Override
160            public void delete() {
161                    if (SearchEngineUtil.isIndexReadOnly()) {
162                            return;
163                    }
164    
165                    _deleteDirectory();
166            }
167    
168            @Override
169            public void deleteDocuments(Term term) throws IOException {
170                    if (SearchEngineUtil.isIndexReadOnly()) {
171                            return;
172                    }
173    
174                    try {
175                            _indexWriter.deleteDocuments(term);
176    
177                            _batchCount++;
178                    }
179                    finally {
180                            _commit();
181                    }
182            }
183    
184            @Override
185            public void dumpIndex(OutputStream outputStream) throws IOException {
186                    try {
187                            _dumpIndexDeletionPolicy.dump(
188                                    outputStream, _indexWriter, _commitLock);
189                    }
190                    finally {
191                            _indexSearcherManager.invalidate();
192                    }
193            }
194    
195            @Override
196            public long getCompanyId() {
197                    return _companyId;
198            }
199    
200            @Override
201            public long getLastGeneration() {
202                    return _dumpIndexDeletionPolicy.getLastGeneration();
203            }
204    
205            @Override
206            public Directory getLuceneDir() {
207                    if (_directory != null) {
208                            return _directory;
209                    }
210    
211                    if (_log.isDebugEnabled()) {
212                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
213                    }
214    
215                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
216                            _directory = _getLuceneDirFile();
217                    }
218                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
219                                            _LUCENE_STORE_TYPE_JDBC)) {
220    
221                            throw new IllegalArgumentException(
222                                    "Store type JDBC is no longer supported in favor of SOLR");
223                    }
224                    else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
225                            _directory = new RAMDirectory();
226                    }
227                    else {
228                            throw new RuntimeException(
229                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
230                    }
231    
232                    return _directory;
233            }
234    
235            @Override
236            public void invalidate() {
237                    _indexSearcherManager.invalidate();
238            }
239    
240            @Override
241            public void loadIndex(InputStream inputStream) throws IOException {
242                    File tempFile = FileUtil.createTempFile();
243    
244                    Directory tempDirectory = FSDirectory.open(tempFile);
245    
246                    if (OSDetector.isWindows() &&
247                            PropsValues.INDEX_DUMP_PROCESS_DOCUMENTS_ENABLED) {
248    
249                            IndexCommitSerializationUtil.deserializeIndex(
250                                    inputStream, tempDirectory);
251    
252                            _deleteDirectory();
253    
254                            IndexReader indexReader = IndexReader.open(tempDirectory, false);
255    
256                            IndexSearcher indexSearcher = new IndexSearcher(indexReader);
257    
258                            try {
259                                    TopDocs topDocs = indexSearcher.search(
260                                            new MatchAllDocsQuery(), indexReader.numDocs());
261    
262                                    ScoreDoc[] scoreDocs = topDocs.scoreDocs;
263    
264                                    for (ScoreDoc scoreDoc : scoreDocs) {
265                                            Document document = indexSearcher.doc(scoreDoc.doc);
266    
267                                            addDocument(document);
268                                    }
269                            }
270                            catch (IllegalArgumentException iae) {
271                                    if (_log.isDebugEnabled()) {
272                                            _log.debug(iae.getMessage());
273                                    }
274                            }
275    
276                            indexSearcher.close();
277    
278                            indexReader.flush();
279                            indexReader.close();
280                    }
281                    else {
282                            IndexCommitSerializationUtil.deserializeIndex(
283                                    inputStream, tempDirectory);
284    
285                            _indexSearcherManager.close();
286    
287                            _indexWriter.close();
288    
289                            _deleteDirectory();
290    
291                            for (String file : tempDirectory.listAll()) {
292                                    tempDirectory.copy(getLuceneDir(), file, file);
293                            }
294    
295                            _initIndexWriter();
296    
297                            _indexSearcherManager = new IndexSearcherManager(_indexWriter);
298                    }
299    
300                    tempDirectory.close();
301    
302                    FileUtil.deltree(tempFile);
303            }
304    
305            @Override
306            public void releaseIndexSearcher(IndexSearcher indexSearcher)
307                    throws IOException {
308    
309                    _indexSearcherManager.release(indexSearcher);
310            }
311    
312            @Override
313            public void updateDocument(Term term, Document document)
314                    throws IOException {
315    
316                    if (SearchEngineUtil.isIndexReadOnly()) {
317                            return;
318                    }
319    
320                    if (_log.isDebugEnabled()) {
321                            _log.debug("Indexing " + document);
322                    }
323    
324                    _write(term, document);
325            }
326    
327            private static void _invalidate(long companyId) {
328                    for (SPI spi : MPIHelperUtil.getSPIs()) {
329                            try {
330                                    RegistrationReference registrationReference =
331                                            spi.getRegistrationReference();
332    
333                                    IntrabandRPCUtil.execute(
334                                            registrationReference,
335                                            new InvalidateProcessCallable(companyId));
336                            }
337                            catch (Exception e) {
338                                    _log.error(
339                                            "Unable to invalidate SPI " + spi + " for company " +
340                                                    companyId,
341                                            e);
342                            }
343                    }
344            }
345    
346            private void _checkLuceneDir() {
347                    if (SearchEngineUtil.isIndexReadOnly()) {
348                            return;
349                    }
350    
351                    try {
352                            Directory directory = getLuceneDir();
353    
354                            if (IndexWriter.isLocked(directory)) {
355                                    IndexWriter.unlock(directory);
356                            }
357                    }
358                    catch (Exception e) {
359                            _log.error("Check Lucene directory failed for " + _companyId, e);
360                    }
361            }
362    
363            private void _commit() throws IOException {
364                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
365                            (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
366    
367                            _doCommit();
368                    }
369            }
370    
371            private void _deleteAll() {
372                    try {
373                            _indexWriter.deleteAll();
374    
375                            _doCommit();
376                    }
377                    catch (Exception e) {
378                            if (_log.isWarnEnabled()) {
379                                    _log.warn("Unable to delete index in directory " + _path);
380                            }
381                    }
382            }
383    
384            private void _deleteDirectory() {
385                    if (_log.isDebugEnabled()) {
386                            _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
387                    }
388    
389                    if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
390                            PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
391    
392                            _deleteAll();
393                    }
394                    else if (PropsValues.LUCENE_STORE_TYPE.equals(
395                                            _LUCENE_STORE_TYPE_JDBC)) {
396    
397                            throw new IllegalArgumentException(
398                                    "Store type JDBC is no longer supported in favor of SOLR");
399                    }
400                    else {
401                            throw new RuntimeException(
402                                    "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
403                    }
404            }
405    
406            private void _doCommit() throws IOException {
407                    if (_indexWriter != null) {
408                            _commitLock.lock();
409    
410                            try {
411                                    _indexWriter.commit();
412                            }
413                            finally {
414                                    _commitLock.unlock();
415    
416                                    _indexSearcherManager.invalidate();
417    
418                                    _invalidate(_companyId);
419                            }
420                    }
421    
422                    _batchCount = 0;
423            }
424    
425            private Directory _getLuceneDirFile() {
426                    Directory directory = null;
427    
428                    try {
429                            if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
430                                    directory = new MMapDirectory(new File(_path));
431                            }
432                            else {
433                                    directory = FSDirectory.open(new File(_path));
434                            }
435                    }
436                    catch (IOException ioe) {
437                            if (directory != null) {
438                                    try {
439                                            directory.close();
440                                    }
441                                    catch (Exception e) {
442                                    }
443                            }
444                    }
445    
446                    return directory;
447            }
448    
449            private MergePolicy _getMergePolicy() throws Exception {
450                    if (PropsValues.LUCENE_MERGE_POLICY.equals(
451                                    NoMergePolicy.class.getName())) {
452    
453                            return NoMergePolicy.NO_COMPOUND_FILES;
454                    }
455    
456                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
457    
458                    MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
459                            classLoader, PropsValues.LUCENE_MERGE_POLICY);
460    
461                    if (mergePolicy instanceof LogMergePolicy) {
462                            LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
463    
464                            logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
465                    }
466    
467                    return mergePolicy;
468            }
469    
470            private MergeScheduler _getMergeScheduler() throws Exception {
471                    if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
472                                    NoMergeScheduler.class.getName())) {
473    
474                            return NoMergeScheduler.INSTANCE;
475                    }
476    
477                    ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
478    
479                    return (MergeScheduler)InstanceFactory.newInstance(
480                            classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
481            }
482    
483            private void _initCommitScheduler() {
484                    if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
485                            (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
486    
487                            return;
488                    }
489    
490                    _scheduledExecutorService =
491                            Executors.newSingleThreadScheduledExecutor();
492    
493                    Runnable runnable = new Runnable() {
494    
495                            @Override
496                            public void run() {
497                                    try {
498                                            if (_batchCount > 0) {
499                                                    _doCommit();
500                                            }
501                                    }
502                                    catch (IOException ioe) {
503                                            _log.error("Could not run scheduled commit", ioe);
504                                    }
505                            }
506    
507                    };
508    
509                    _scheduledExecutorService.scheduleWithFixedDelay(
510                            runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
511                            TimeUnit.MILLISECONDS);
512            }
513    
514            private void _initIndexWriter() {
515                    try {
516                            Analyzer analyzer = new LimitTokenCountAnalyzer(
517                                    LuceneHelperUtil.getAnalyzer(),
518                                    PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
519    
520                            IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
521                                    LuceneHelperUtil.getVersion(), analyzer);
522    
523                            indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
524                            indexWriterConfig.setMergePolicy(_getMergePolicy());
525                            indexWriterConfig.setMergeScheduler(_getMergeScheduler());
526                            indexWriterConfig.setRAMBufferSizeMB(
527                                    PropsValues.LUCENE_BUFFER_SIZE);
528    
529                            _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
530    
531                            if (!IndexReader.indexExists(getLuceneDir())) {
532    
533                                    // Workaround for LUCENE-2386
534    
535                                    if (_log.isDebugEnabled()) {
536                                            _log.debug("Creating missing index");
537                                    }
538    
539                                    _indexWriter.commit();
540                            }
541                    }
542                    catch (Exception e) {
543                            _log.error(
544                                    "Initializing Lucene writer failed for " + _companyId, e);
545                    }
546            }
547    
548            private void _write(Term term, Document document) throws IOException {
549                    try {
550                            if (term != null) {
551                                    _indexWriter.updateDocument(term, document);
552                            }
553                            else {
554                                    _indexWriter.addDocument(document);
555                            }
556    
557                            _batchCount++;
558                    }
559                    finally {
560                            _commit();
561                    }
562            }
563    
564            private static final String _LUCENE_STORE_TYPE_FILE = "file";
565    
566            private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
567    
568            private static final String _LUCENE_STORE_TYPE_RAM = "ram";
569    
570            private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
571    
572            private volatile int _batchCount;
573            private Lock _commitLock = new ReentrantLock();
574            private long _companyId;
575            private Directory _directory;
576            private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
577                    new DumpIndexDeletionPolicy();
578            private IndexSearcherManager _indexSearcherManager;
579            private IndexWriter _indexWriter;
580            private String _path;
581            private Map<String, Directory> _ramDirectories =
582                    new ConcurrentHashMap<String, Directory>();
583            private ScheduledExecutorService _scheduledExecutorService;
584    
585            private static class InvalidateProcessCallable
586                    implements ProcessCallable<Serializable> {
587    
588                    public InvalidateProcessCallable(long companyId) {
589                            _companyId = companyId;
590                    }
591    
592                    @Override
593                    public Serializable call() {
594                            IndexAccessor indexAccessor = LuceneHelperUtil.getIndexAccessor(
595                                    _companyId);
596    
597                            indexAccessor.invalidate();
598    
599                            _invalidate(_companyId);
600    
601                            return null;
602                    }
603    
604                    private static final long serialVersionUID = 1L;
605    
606                    private final long _companyId;
607    
608            }
609    
610    }