001
014
015 package com.liferay.portal.search.lucene;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
020 import com.liferay.portal.kernel.nio.intraband.rpc.IntrabandRPCUtil;
021 import com.liferay.portal.kernel.process.ProcessCallable;
022 import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
023 import com.liferay.portal.kernel.resiliency.spi.SPI;
024 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
025 import com.liferay.portal.kernel.search.SearchEngineUtil;
026 import com.liferay.portal.kernel.util.FileUtil;
027 import com.liferay.portal.kernel.util.InstanceFactory;
028 import com.liferay.portal.kernel.util.OSDetector;
029 import com.liferay.portal.kernel.util.StringPool;
030 import com.liferay.portal.search.lucene.dump.DumpIndexDeletionPolicy;
031 import com.liferay.portal.search.lucene.dump.IndexCommitSerializationUtil;
032 import com.liferay.portal.util.ClassLoaderUtil;
033 import com.liferay.portal.util.PropsValues;
034
035 import java.io.File;
036 import java.io.IOException;
037 import java.io.InputStream;
038 import java.io.OutputStream;
039 import java.io.Serializable;
040
041 import java.util.Collection;
042 import java.util.Map;
043 import java.util.concurrent.ConcurrentHashMap;
044 import java.util.concurrent.Executors;
045 import java.util.concurrent.ScheduledExecutorService;
046 import java.util.concurrent.TimeUnit;
047 import java.util.concurrent.locks.Lock;
048 import java.util.concurrent.locks.ReentrantLock;
049
050 import org.apache.lucene.analysis.Analyzer;
051 import org.apache.lucene.analysis.LimitTokenCountAnalyzer;
052 import org.apache.lucene.document.Document;
053 import org.apache.lucene.index.IndexReader;
054 import org.apache.lucene.index.IndexWriter;
055 import org.apache.lucene.index.IndexWriterConfig;
056 import org.apache.lucene.index.LogMergePolicy;
057 import org.apache.lucene.index.MergePolicy;
058 import org.apache.lucene.index.MergeScheduler;
059 import org.apache.lucene.index.NoMergePolicy;
060 import org.apache.lucene.index.NoMergeScheduler;
061 import org.apache.lucene.index.Term;
062 import org.apache.lucene.search.IndexSearcher;
063 import org.apache.lucene.search.MatchAllDocsQuery;
064 import org.apache.lucene.search.ScoreDoc;
065 import org.apache.lucene.search.TopDocs;
066 import org.apache.lucene.store.Directory;
067 import org.apache.lucene.store.FSDirectory;
068 import org.apache.lucene.store.MMapDirectory;
069 import org.apache.lucene.store.RAMDirectory;
070
071
078 public class IndexAccessorImpl implements IndexAccessor {
079
080 public IndexAccessorImpl(long companyId) {
081 _companyId = companyId;
082
083 _path = PropsValues.LUCENE_DIR.concat(
084 String.valueOf(_companyId)).concat(StringPool.SLASH);
085
086 try {
087 if (!SPIUtil.isSPI()) {
088 _checkLuceneDir();
089 _initIndexWriter();
090 _initCommitScheduler();
091
092 _indexSearcherManager = new IndexSearcherManager(_indexWriter);
093 }
094 else {
095 _indexSearcherManager = new IndexSearcherManager(
096 getLuceneDir());
097 }
098 }
099 catch (IOException ioe) {
100 _log.error(
101 "Unable to initialize index searcher manager for company " +
102 _companyId,
103 ioe);
104 }
105 }
106
107 @Override
108 public IndexSearcher acquireIndexSearcher() throws IOException {
109 return _indexSearcherManager.acquire();
110 }
111
112 @Override
113 public void addDocument(Document document) throws IOException {
114 if (SearchEngineUtil.isIndexReadOnly()) {
115 return;
116 }
117
118 _write(null, document);
119 }
120
121 @Override
122 public void addDocuments(Collection<Document> documents)
123 throws IOException {
124
125 try {
126 for (Document document : documents) {
127 _indexWriter.addDocument(document);
128 }
129
130 _batchCount++;
131 }
132 finally {
133 _commit();
134 }
135 }
136
137 @Override
138 public void close() {
139 if (SPIUtil.isSPI()) {
140 return;
141 }
142
143 try {
144 _indexSearcherManager.close();
145
146 _indexWriter.close();
147
148 _directory.close();
149 }
150 catch (Exception e) {
151 _log.error("Closing Lucene writer failed for " + _companyId, e);
152 }
153
154 if (_scheduledExecutorService != null) {
155 _scheduledExecutorService.shutdownNow();
156 }
157 }
158
159 @Override
160 public void delete() {
161 if (SearchEngineUtil.isIndexReadOnly()) {
162 return;
163 }
164
165 _deleteDirectory();
166 }
167
168 @Override
169 public void deleteDocuments(Term term) throws IOException {
170 if (SearchEngineUtil.isIndexReadOnly()) {
171 return;
172 }
173
174 try {
175 _indexWriter.deleteDocuments(term);
176
177 _batchCount++;
178 }
179 finally {
180 _commit();
181 }
182 }
183
184 @Override
185 public void dumpIndex(OutputStream outputStream) throws IOException {
186 try {
187 _dumpIndexDeletionPolicy.dump(
188 outputStream, _indexWriter, _commitLock);
189 }
190 finally {
191 _indexSearcherManager.invalidate();
192 }
193 }
194
195 @Override
196 public long getCompanyId() {
197 return _companyId;
198 }
199
200 @Override
201 public long getLastGeneration() {
202 return _dumpIndexDeletionPolicy.getLastGeneration();
203 }
204
205 @Override
206 public Directory getLuceneDir() {
207 if (_directory != null) {
208 return _directory;
209 }
210
211 if (_log.isDebugEnabled()) {
212 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
213 }
214
215 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE)) {
216 _directory = _getLuceneDirFile();
217 }
218 else if (PropsValues.LUCENE_STORE_TYPE.equals(
219 _LUCENE_STORE_TYPE_JDBC)) {
220
221 throw new IllegalArgumentException(
222 "Store type JDBC is no longer supported in favor of SOLR");
223 }
224 else if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
225 _directory = new RAMDirectory();
226 }
227 else {
228 throw new RuntimeException(
229 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
230 }
231
232 return _directory;
233 }
234
235 @Override
236 public void invalidate() {
237 _indexSearcherManager.invalidate();
238 }
239
240 @Override
241 public void loadIndex(InputStream inputStream) throws IOException {
242 File tempFile = FileUtil.createTempFile();
243
244 Directory tempDirectory = FSDirectory.open(tempFile);
245
246 if (OSDetector.isWindows() &&
247 PropsValues.INDEX_DUMP_PROCESS_DOCUMENTS_ENABLED) {
248
249 IndexCommitSerializationUtil.deserializeIndex(
250 inputStream, tempDirectory);
251
252 _deleteDirectory();
253
254 IndexReader indexReader = IndexReader.open(tempDirectory, false);
255
256 IndexSearcher indexSearcher = new IndexSearcher(indexReader);
257
258 try {
259 TopDocs topDocs = indexSearcher.search(
260 new MatchAllDocsQuery(), indexReader.numDocs());
261
262 ScoreDoc[] scoreDocs = topDocs.scoreDocs;
263
264 for (ScoreDoc scoreDoc : scoreDocs) {
265 Document document = indexSearcher.doc(scoreDoc.doc);
266
267 addDocument(document);
268 }
269 }
270 catch (IllegalArgumentException iae) {
271 if (_log.isDebugEnabled()) {
272 _log.debug(iae.getMessage());
273 }
274 }
275
276 indexSearcher.close();
277
278 indexReader.flush();
279 indexReader.close();
280 }
281 else {
282 IndexCommitSerializationUtil.deserializeIndex(
283 inputStream, tempDirectory);
284
285 _indexSearcherManager.close();
286
287 _indexWriter.close();
288
289 _deleteDirectory();
290
291 for (String file : tempDirectory.listAll()) {
292 tempDirectory.copy(getLuceneDir(), file, file);
293 }
294
295 _initIndexWriter();
296
297 _indexSearcherManager = new IndexSearcherManager(_indexWriter);
298 }
299
300 tempDirectory.close();
301
302 FileUtil.deltree(tempFile);
303 }
304
305 @Override
306 public void releaseIndexSearcher(IndexSearcher indexSearcher)
307 throws IOException {
308
309 _indexSearcherManager.release(indexSearcher);
310 }
311
312 @Override
313 public void updateDocument(Term term, Document document)
314 throws IOException {
315
316 if (SearchEngineUtil.isIndexReadOnly()) {
317 return;
318 }
319
320 if (_log.isDebugEnabled()) {
321 _log.debug("Indexing " + document);
322 }
323
324 _write(term, document);
325 }
326
327 private static void _invalidate(long companyId) {
328 for (SPI spi : MPIHelperUtil.getSPIs()) {
329 try {
330 RegistrationReference registrationReference =
331 spi.getRegistrationReference();
332
333 IntrabandRPCUtil.execute(
334 registrationReference,
335 new InvalidateProcessCallable(companyId));
336 }
337 catch (Exception e) {
338 _log.error(
339 "Unable to invalidate SPI " + spi + " for company " +
340 companyId,
341 e);
342 }
343 }
344 }
345
346 private void _checkLuceneDir() {
347 if (SearchEngineUtil.isIndexReadOnly()) {
348 return;
349 }
350
351 try {
352 Directory directory = getLuceneDir();
353
354 if (IndexWriter.isLocked(directory)) {
355 IndexWriter.unlock(directory);
356 }
357 }
358 catch (Exception e) {
359 _log.error("Check Lucene directory failed for " + _companyId, e);
360 }
361 }
362
363 private void _commit() throws IOException {
364 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE == 0) ||
365 (PropsValues.LUCENE_COMMIT_BATCH_SIZE <= _batchCount)) {
366
367 _doCommit();
368 }
369 }
370
371 private void _deleteAll() {
372 try {
373 _indexWriter.deleteAll();
374
375 _doCommit();
376 }
377 catch (Exception e) {
378 if (_log.isWarnEnabled()) {
379 _log.warn("Unable to delete index in directory " + _path);
380 }
381 }
382 }
383
384 private void _deleteDirectory() {
385 if (_log.isDebugEnabled()) {
386 _log.debug("Lucene store type " + PropsValues.LUCENE_STORE_TYPE);
387 }
388
389 if (PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_FILE) ||
390 PropsValues.LUCENE_STORE_TYPE.equals(_LUCENE_STORE_TYPE_RAM)) {
391
392 _deleteAll();
393 }
394 else if (PropsValues.LUCENE_STORE_TYPE.equals(
395 _LUCENE_STORE_TYPE_JDBC)) {
396
397 throw new IllegalArgumentException(
398 "Store type JDBC is no longer supported in favor of SOLR");
399 }
400 else {
401 throw new RuntimeException(
402 "Invalid store type " + PropsValues.LUCENE_STORE_TYPE);
403 }
404 }
405
406 private void _doCommit() throws IOException {
407 if (_indexWriter != null) {
408 _commitLock.lock();
409
410 try {
411 _indexWriter.commit();
412 }
413 finally {
414 _commitLock.unlock();
415
416 _indexSearcherManager.invalidate();
417
418 _invalidate(_companyId);
419 }
420 }
421
422 _batchCount = 0;
423 }
424
425 private Directory _getLuceneDirFile() {
426 Directory directory = null;
427
428 try {
429 if (PropsValues.LUCENE_STORE_TYPE_FILE_FORCE_MMAP) {
430 directory = new MMapDirectory(new File(_path));
431 }
432 else {
433 directory = FSDirectory.open(new File(_path));
434 }
435 }
436 catch (IOException ioe) {
437 if (directory != null) {
438 try {
439 directory.close();
440 }
441 catch (Exception e) {
442 }
443 }
444 }
445
446 return directory;
447 }
448
449 private MergePolicy _getMergePolicy() throws Exception {
450 if (PropsValues.LUCENE_MERGE_POLICY.equals(
451 NoMergePolicy.class.getName())) {
452
453 return NoMergePolicy.NO_COMPOUND_FILES;
454 }
455
456 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
457
458 MergePolicy mergePolicy = (MergePolicy)InstanceFactory.newInstance(
459 classLoader, PropsValues.LUCENE_MERGE_POLICY);
460
461 if (mergePolicy instanceof LogMergePolicy) {
462 LogMergePolicy logMergePolicy = (LogMergePolicy)mergePolicy;
463
464 logMergePolicy.setMergeFactor(PropsValues.LUCENE_MERGE_FACTOR);
465 }
466
467 return mergePolicy;
468 }
469
470 private MergeScheduler _getMergeScheduler() throws Exception {
471 if (PropsValues.LUCENE_MERGE_SCHEDULER.equals(
472 NoMergeScheduler.class.getName())) {
473
474 return NoMergeScheduler.INSTANCE;
475 }
476
477 ClassLoader classLoader = ClassLoaderUtil.getPortalClassLoader();
478
479 return (MergeScheduler)InstanceFactory.newInstance(
480 classLoader, PropsValues.LUCENE_MERGE_SCHEDULER);
481 }
482
483 private void _initCommitScheduler() {
484 if ((PropsValues.LUCENE_COMMIT_BATCH_SIZE <= 0) ||
485 (PropsValues.LUCENE_COMMIT_TIME_INTERVAL <= 0)) {
486
487 return;
488 }
489
490 _scheduledExecutorService =
491 Executors.newSingleThreadScheduledExecutor();
492
493 Runnable runnable = new Runnable() {
494
495 @Override
496 public void run() {
497 try {
498 if (_batchCount > 0) {
499 _doCommit();
500 }
501 }
502 catch (IOException ioe) {
503 _log.error("Could not run scheduled commit", ioe);
504 }
505 }
506
507 };
508
509 _scheduledExecutorService.scheduleWithFixedDelay(
510 runnable, 0, PropsValues.LUCENE_COMMIT_TIME_INTERVAL,
511 TimeUnit.MILLISECONDS);
512 }
513
514 private void _initIndexWriter() {
515 try {
516 Analyzer analyzer = new LimitTokenCountAnalyzer(
517 LuceneHelperUtil.getAnalyzer(),
518 PropsValues.LUCENE_ANALYZER_MAX_TOKENS);
519
520 IndexWriterConfig indexWriterConfig = new IndexWriterConfig(
521 LuceneHelperUtil.getVersion(), analyzer);
522
523 indexWriterConfig.setIndexDeletionPolicy(_dumpIndexDeletionPolicy);
524 indexWriterConfig.setMergePolicy(_getMergePolicy());
525 indexWriterConfig.setMergeScheduler(_getMergeScheduler());
526 indexWriterConfig.setRAMBufferSizeMB(
527 PropsValues.LUCENE_BUFFER_SIZE);
528
529 _indexWriter = new IndexWriter(getLuceneDir(), indexWriterConfig);
530
531 if (!IndexReader.indexExists(getLuceneDir())) {
532
533
534
535 if (_log.isDebugEnabled()) {
536 _log.debug("Creating missing index");
537 }
538
539 _indexWriter.commit();
540 }
541 }
542 catch (Exception e) {
543 _log.error(
544 "Initializing Lucene writer failed for " + _companyId, e);
545 }
546 }
547
548 private void _write(Term term, Document document) throws IOException {
549 try {
550 if (term != null) {
551 _indexWriter.updateDocument(term, document);
552 }
553 else {
554 _indexWriter.addDocument(document);
555 }
556
557 _batchCount++;
558 }
559 finally {
560 _commit();
561 }
562 }
563
564 private static final String _LUCENE_STORE_TYPE_FILE = "file";
565
566 private static final String _LUCENE_STORE_TYPE_JDBC = "jdbc";
567
568 private static final String _LUCENE_STORE_TYPE_RAM = "ram";
569
570 private static Log _log = LogFactoryUtil.getLog(IndexAccessorImpl.class);
571
572 private volatile int _batchCount;
573 private Lock _commitLock = new ReentrantLock();
574 private long _companyId;
575 private Directory _directory;
576 private DumpIndexDeletionPolicy _dumpIndexDeletionPolicy =
577 new DumpIndexDeletionPolicy();
578 private IndexSearcherManager _indexSearcherManager;
579 private IndexWriter _indexWriter;
580 private String _path;
581 private Map<String, Directory> _ramDirectories =
582 new ConcurrentHashMap<String, Directory>();
583 private ScheduledExecutorService _scheduledExecutorService;
584
585 private static class InvalidateProcessCallable
586 implements ProcessCallable<Serializable> {
587
588 public InvalidateProcessCallable(long companyId) {
589 _companyId = companyId;
590 }
591
592 @Override
593 public Serializable call() {
594 IndexAccessor indexAccessor = LuceneHelperUtil.getIndexAccessor(
595 _companyId);
596
597 indexAccessor.invalidate();
598
599 _invalidate(_companyId);
600
601 return null;
602 }
603
604 private static final long serialVersionUID = 1L;
605
606 private final long _companyId;
607
608 }
609
610 }