001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.search.lucene;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterEvent;
019    import com.liferay.portal.kernel.cluster.ClusterEventListener;
020    import com.liferay.portal.kernel.cluster.ClusterEventType;
021    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
022    import com.liferay.portal.kernel.cluster.ClusterLink;
023    import com.liferay.portal.kernel.cluster.ClusterNode;
024    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
025    import com.liferay.portal.kernel.cluster.ClusterRequest;
026    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
027    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
028    import com.liferay.portal.kernel.exception.SystemException;
029    import com.liferay.portal.kernel.executor.PortalExecutorManagerUtil;
030    import com.liferay.portal.kernel.io.unsync.UnsyncPrintWriter;
031    import com.liferay.portal.kernel.io.unsync.UnsyncStringReader;
032    import com.liferay.portal.kernel.log.Log;
033    import com.liferay.portal.kernel.log.LogFactoryUtil;
034    import com.liferay.portal.kernel.messaging.Destination;
035    import com.liferay.portal.kernel.messaging.MessageBus;
036    import com.liferay.portal.kernel.messaging.MessageBusUtil;
037    import com.liferay.portal.kernel.messaging.proxy.MessageValuesThreadLocal;
038    import com.liferay.portal.kernel.search.BooleanClauseOccur;
039    import com.liferay.portal.kernel.search.Field;
040    import com.liferay.portal.kernel.search.SearchEngineUtil;
041    import com.liferay.portal.kernel.util.ArrayUtil;
042    import com.liferay.portal.kernel.util.GetterUtil;
043    import com.liferay.portal.kernel.util.MethodHandler;
044    import com.liferay.portal.kernel.util.MethodKey;
045    import com.liferay.portal.kernel.util.ObjectValuePair;
046    import com.liferay.portal.kernel.util.PropsKeys;
047    import com.liferay.portal.kernel.util.StringBundler;
048    import com.liferay.portal.kernel.util.StringPool;
049    import com.liferay.portal.kernel.util.StringUtil;
050    import com.liferay.portal.kernel.util.UnsyncPrintWriterPool;
051    import com.liferay.portal.kernel.util.Validator;
052    import com.liferay.portal.model.CompanyConstants;
053    import com.liferay.portal.search.lucene.cluster.LuceneClusterUtil;
054    import com.liferay.portal.search.lucene.highlight.QueryTermExtractor;
055    import com.liferay.portal.security.auth.TransientTokenUtil;
056    import com.liferay.portal.util.PortalInstances;
057    import com.liferay.portal.util.PortalUtil;
058    import com.liferay.portal.util.PropsUtil;
059    import com.liferay.portal.util.PropsValues;
060    
061    import java.io.IOException;
062    import java.io.InputStream;
063    import java.io.OutputStream;
064    
065    import java.net.InetAddress;
066    import java.net.URL;
067    import java.net.URLConnection;
068    
069    import java.util.HashSet;
070    import java.util.List;
071    import java.util.Map;
072    import java.util.Set;
073    import java.util.concurrent.BlockingQueue;
074    import java.util.concurrent.ConcurrentHashMap;
075    import java.util.concurrent.CountDownLatch;
076    import java.util.concurrent.TimeUnit;
077    
078    import org.apache.commons.lang.time.StopWatch;
079    import org.apache.lucene.analysis.Analyzer;
080    import org.apache.lucene.analysis.TokenStream;
081    import org.apache.lucene.document.Document;
082    import org.apache.lucene.index.IndexReader;
083    import org.apache.lucene.index.Term;
084    import org.apache.lucene.queryParser.QueryParser;
085    import org.apache.lucene.search.BooleanClause;
086    import org.apache.lucene.search.BooleanQuery;
087    import org.apache.lucene.search.IndexSearcher;
088    import org.apache.lucene.search.NumericRangeQuery;
089    import org.apache.lucene.search.Query;
090    import org.apache.lucene.search.TermQuery;
091    import org.apache.lucene.search.TermRangeQuery;
092    import org.apache.lucene.search.WildcardQuery;
093    import org.apache.lucene.search.highlight.Formatter;
094    import org.apache.lucene.search.highlight.Highlighter;
095    import org.apache.lucene.search.highlight.InvalidTokenOffsetsException;
096    import org.apache.lucene.search.highlight.QueryScorer;
097    import org.apache.lucene.search.highlight.SimpleFragmenter;
098    import org.apache.lucene.search.highlight.WeightedTerm;
099    import org.apache.lucene.util.Version;
100    
101    /**
102     * @author Brian Wing Shun Chan
103     * @author Harry Mark
104     * @author Bruno Farache
105     * @author Shuyang Zhou
106     * @author Tina Tian
107     * @author Hugo Huijser
108     * @author Andrea Di Giorgi
109     */
110    public class LuceneHelperImpl implements LuceneHelper {
111    
112            @Override
113            public void addDocument(long companyId, Document document)
114                    throws IOException {
115    
116                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
117    
118                    indexAccessor.addDocument(document);
119            }
120    
121            @Override
122            public void addExactTerm(
123                    BooleanQuery booleanQuery, String field, String value) {
124    
125                    addTerm(booleanQuery, field, value, false);
126            }
127    
128            @Override
129            public void addNumericRangeTerm(
130                    BooleanQuery booleanQuery, String field, Integer startValue,
131                    Integer endValue) {
132    
133                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newIntRange(
134                            field, startValue, endValue, true, true);
135    
136                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
137            }
138    
139            @Override
140            public void addNumericRangeTerm(
141                    BooleanQuery booleanQuery, String field, Long startValue,
142                    Long endValue) {
143    
144                    NumericRangeQuery<?> numericRangeQuery = NumericRangeQuery.newLongRange(
145                            field, startValue, endValue, true, true);
146    
147                    booleanQuery.add(numericRangeQuery, BooleanClause.Occur.SHOULD);
148            }
149    
150            /**
151             * @deprecated As of 6.2.0, replaced by {@link
152             *             #addNumericRangeTerm(BooleanQuery, String, Long, Long)}
153             */
154            @Override
155            public void addNumericRangeTerm(
156                    BooleanQuery booleanQuery, String field, String startValue,
157                    String endValue) {
158    
159                    addNumericRangeTerm(
160                            booleanQuery, field, GetterUtil.getLong(startValue),
161                            GetterUtil.getLong(endValue));
162            }
163    
164            @Override
165            public void addRangeTerm(
166                    BooleanQuery booleanQuery, String field, String startValue,
167                    String endValue) {
168    
169                    boolean includesLower = true;
170    
171                    if ((startValue != null) && startValue.equals(StringPool.STAR)) {
172                            includesLower = false;
173                    }
174    
175                    boolean includesUpper = true;
176    
177                    if ((endValue != null) && endValue.equals(StringPool.STAR)) {
178                            includesUpper = false;
179                    }
180    
181                    TermRangeQuery termRangeQuery = new TermRangeQuery(
182                            field, startValue, endValue, includesLower, includesUpper);
183    
184                    booleanQuery.add(termRangeQuery, BooleanClause.Occur.SHOULD);
185            }
186    
187            @Override
188            public void addRequiredTerm(
189                    BooleanQuery booleanQuery, String field, String value, boolean like) {
190    
191                    addRequiredTerm(booleanQuery, field, new String[] {value}, like);
192            }
193    
194            @Override
195            public void addRequiredTerm(
196                    BooleanQuery booleanQuery, String field, String[] values,
197                    boolean like) {
198    
199                    if (values == null) {
200                            return;
201                    }
202    
203                    BooleanQuery query = new BooleanQuery();
204    
205                    for (String value : values) {
206                            addTerm(query, field, value, like);
207                    }
208    
209                    booleanQuery.add(query, BooleanClause.Occur.MUST);
210            }
211    
212            @Override
213            public void addTerm(
214                    BooleanQuery booleanQuery, String field, String value, boolean like) {
215    
216                    addTerm(booleanQuery, field, value, like, BooleanClauseOccur.SHOULD);
217            }
218    
219            @Override
220            public void addTerm(
221                    BooleanQuery booleanQuery, String field, String value, boolean like,
222                    BooleanClauseOccur booleanClauseOccur) {
223    
224                    if (Validator.isNull(value)) {
225                            return;
226                    }
227    
228                    Analyzer analyzer = getAnalyzer();
229    
230                    if (analyzer instanceof PerFieldAnalyzer) {
231                            PerFieldAnalyzer perFieldAnalyzer = (PerFieldAnalyzer)analyzer;
232    
233                            Analyzer fieldAnalyzer = perFieldAnalyzer.getAnalyzer(field);
234    
235                            if (fieldAnalyzer instanceof LikeKeywordAnalyzer) {
236                                    like = true;
237                            }
238                    }
239    
240                    if (like) {
241                            value = StringUtil.replace(
242                                    value, StringPool.PERCENT, StringPool.BLANK);
243                    }
244    
245                    try {
246                            QueryParser queryParser = new QueryParser(
247                                    getVersion(), field, analyzer);
248    
249                            value = StringUtil.replace(
250                                    value, _KEYWORDS_LOWERCASE, _KEYWORDS_UPPERCASE);
251    
252                            Query query = queryParser.parse(value);
253    
254                            BooleanClause.Occur occur = null;
255    
256                            if (booleanClauseOccur.equals(BooleanClauseOccur.MUST)) {
257                                    occur = BooleanClause.Occur.MUST;
258                            }
259                            else if (booleanClauseOccur.equals(BooleanClauseOccur.MUST_NOT)) {
260                                    occur = BooleanClause.Occur.MUST_NOT;
261                            }
262                            else {
263                                    occur = BooleanClause.Occur.SHOULD;
264                            }
265    
266                            _includeIfUnique(booleanQuery, like, queryParser, query, occur);
267                    }
268                    catch (Exception e) {
269                            if (_log.isWarnEnabled()) {
270                                    _log.warn(e, e);
271                            }
272                    }
273            }
274    
275            @Override
276            public void addTerm(
277                    BooleanQuery booleanQuery, String field, String[] values,
278                    boolean like) {
279    
280                    for (String value : values) {
281                            addTerm(booleanQuery, field, value, like);
282                    }
283            }
284    
285            /**
286             * @deprecated As of 7.0.0, replaced by {@link #releaseIndexSearcher(long,
287             *             IndexSearcher)}
288             */
289            @Deprecated
290            @Override
291            public void cleanUp(IndexSearcher indexSearcher) {
292                    if (indexSearcher == null) {
293                            return;
294                    }
295    
296                    try {
297                            indexSearcher.close();
298    
299                            IndexReader indexReader = indexSearcher.getIndexReader();
300    
301                            if (indexReader != null) {
302                                    indexReader.close();
303                            }
304                    }
305                    catch (IOException ioe) {
306                            _log.error(ioe, ioe);
307                    }
308            }
309    
310            @Override
311            public int countScoredFieldNames(Query query, String[] filedNames) {
312                    int count = 0;
313    
314                    for (String fieldName : filedNames) {
315                            WeightedTerm[] weightedTerms = QueryTermExtractor.getTerms(
316                                    query, false, fieldName);
317    
318                            if ((weightedTerms.length > 0) &&
319                                    !ArrayUtil.contains(Field.UNSCORED_FIELD_NAMES, fieldName)) {
320    
321                                    count++;
322                            }
323                    }
324    
325                    return count;
326            }
327    
328            @Override
329            public void delete(long companyId) {
330                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
331    
332                    if (indexAccessor == null) {
333                            return;
334                    }
335    
336                    indexAccessor.delete();
337            }
338    
339            @Override
340            public void deleteDocuments(long companyId, Term term) throws IOException {
341                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
342    
343                    if (indexAccessor == null) {
344                            return;
345                    }
346    
347                    indexAccessor.deleteDocuments(term);
348            }
349    
350            @Override
351            public void dumpIndex(long companyId, OutputStream outputStream)
352                    throws IOException {
353    
354                    long lastGeneration = getLastGeneration(companyId);
355    
356                    if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
357                            if (_log.isDebugEnabled()) {
358                                    _log.debug(
359                                            "Dump index from cluster is not enabled for " + companyId);
360                            }
361    
362                            return;
363                    }
364    
365                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
366    
367                    if (indexAccessor == null) {
368                            return;
369                    }
370    
371                    indexAccessor.dumpIndex(outputStream);
372            }
373    
374            @Override
375            public Analyzer getAnalyzer() {
376                    return _analyzer;
377            }
378    
379            @Override
380            public IndexAccessor getIndexAccessor(long companyId) {
381                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
382    
383                    if (indexAccessor != null) {
384                            return indexAccessor;
385                    }
386    
387                    synchronized (this) {
388                            indexAccessor = _indexAccessors.get(companyId);
389    
390                            if (indexAccessor == null) {
391                                    indexAccessor = new IndexAccessorImpl(companyId);
392    
393                                    if (isLoadIndexFromClusterEnabled()) {
394                                            indexAccessor = new SynchronizedIndexAccessorImpl(
395                                                    indexAccessor);
396    
397                                            boolean clusterForwardMessage = GetterUtil.getBoolean(
398                                                    MessageValuesThreadLocal.getValue(
399                                                            ClusterLink.CLUSTER_FORWARD_MESSAGE));
400    
401                                            if (clusterForwardMessage) {
402                                                    if (_log.isInfoEnabled()) {
403                                                            _log.info(
404                                                                    "Skip Luncene index files cluster loading " +
405                                                                            "since this is a manual reindex request");
406                                                    }
407                                            }
408                                            else {
409                                                    try {
410                                                            _loadIndexFromCluster(
411                                                                    indexAccessor,
412                                                                    indexAccessor.getLastGeneration());
413                                                    }
414                                                    catch (Exception e) {
415                                                            _log.error(
416                                                                    "Unable to load index for company " +
417                                                                            indexAccessor.getCompanyId(),
418                                                                    e);
419                                                    }
420                                            }
421                                    }
422    
423                                    _indexAccessors.put(companyId, indexAccessor);
424                            }
425                    }
426    
427                    return indexAccessor;
428            }
429    
430            @Override
431            public IndexSearcher getIndexSearcher(long companyId) throws IOException {
432                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
433    
434                    return indexAccessor.acquireIndexSearcher();
435            }
436    
437            @Override
438            public long getLastGeneration(long companyId) {
439                    if (!isLoadIndexFromClusterEnabled()) {
440                            return IndexAccessor.DEFAULT_LAST_GENERATION;
441                    }
442    
443                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
444    
445                    if (indexAccessor == null) {
446                            return IndexAccessor.DEFAULT_LAST_GENERATION;
447                    }
448    
449                    return indexAccessor.getLastGeneration();
450            }
451    
452            @Override
453            public InputStream getLoadIndexesInputStreamFromCluster(
454                            long companyId, Address bootupAddress)
455                    throws SystemException {
456    
457                    if (!isLoadIndexFromClusterEnabled()) {
458                            return null;
459                    }
460    
461                    InputStream inputStream = null;
462    
463                    try {
464                            ObjectValuePair<String, URL> bootupClusterNodeObjectValuePair =
465                                    _getBootupClusterNodeObjectValuePair(bootupAddress);
466    
467                            URL url = bootupClusterNodeObjectValuePair.getValue();
468    
469                            URLConnection urlConnection = url.openConnection();
470    
471                            urlConnection.setDoOutput(true);
472    
473                            UnsyncPrintWriter unsyncPrintWriter = UnsyncPrintWriterPool.borrow(
474                                    urlConnection.getOutputStream());
475    
476                            unsyncPrintWriter.write("transientToken=");
477                            unsyncPrintWriter.write(bootupClusterNodeObjectValuePair.getKey());
478                            unsyncPrintWriter.write("&companyId=");
479                            unsyncPrintWriter.write(String.valueOf(companyId));
480    
481                            unsyncPrintWriter.close();
482    
483                            inputStream = urlConnection.getInputStream();
484    
485                            return inputStream;
486                    }
487                    catch (IOException ioe) {
488                            throw new SystemException(ioe);
489                    }
490            }
491    
492            @Override
493            public Set<String> getQueryTerms(Query query) {
494                    String queryString = StringUtil.replace(
495                            query.toString(), StringPool.STAR, StringPool.BLANK);
496    
497                    Query tempQuery = null;
498    
499                    try {
500                            QueryParser queryParser = new QueryParser(
501                                    getVersion(), StringPool.BLANK, getAnalyzer());
502    
503                            tempQuery = queryParser.parse(queryString);
504                    }
505                    catch (Exception e) {
506                            if (_log.isWarnEnabled()) {
507                                    _log.warn("Unable to parse " + queryString);
508                            }
509    
510                            tempQuery = query;
511                    }
512    
513                    WeightedTerm[] weightedTerms = null;
514    
515                    for (String fieldName : Field.KEYWORDS) {
516                            weightedTerms = QueryTermExtractor.getTerms(
517                                    tempQuery, false, fieldName);
518    
519                            if (weightedTerms.length > 0) {
520                                    break;
521                            }
522                    }
523    
524                    Set<String> queryTerms = new HashSet<String>();
525    
526                    for (WeightedTerm weightedTerm : weightedTerms) {
527                            queryTerms.add(weightedTerm.getTerm());
528                    }
529    
530                    return queryTerms;
531            }
532    
533            /**
534             * @deprecated As of 7.0.0, replaced by {@link #getIndexSearcher(long)}
535             */
536            @Deprecated
537            @Override
538            public IndexSearcher getSearcher(long companyId, boolean readOnly)
539                    throws IOException {
540    
541                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
542    
543                    IndexReader indexReader = IndexReader.open(
544                            indexAccessor.getLuceneDir(), readOnly);
545    
546                    IndexSearcher indexSearcher = new IndexSearcher(indexReader);
547    
548                    indexSearcher.setDefaultFieldSortScoring(true, false);
549                    indexSearcher.setSimilarity(new FieldWeightSimilarity());
550    
551                    return indexSearcher;
552            }
553    
554            @Override
555            public String getSnippet(
556                            Query query, String field, String s, int maxNumFragments,
557                            int fragmentLength, String fragmentSuffix, Formatter formatter)
558                    throws IOException {
559    
560                    QueryScorer queryScorer = new QueryScorer(query, field);
561    
562                    Highlighter highlighter = new Highlighter(formatter, queryScorer);
563    
564                    highlighter.setTextFragmenter(new SimpleFragmenter(fragmentLength));
565    
566                    TokenStream tokenStream = getAnalyzer().tokenStream(
567                            field, new UnsyncStringReader(s));
568    
569                    try {
570                            String snippet = highlighter.getBestFragments(
571                                    tokenStream, s, maxNumFragments, fragmentSuffix);
572    
573                            if (Validator.isNotNull(snippet) &&
574                                    !StringUtil.endsWith(snippet, fragmentSuffix) &&
575                                    !s.equals(snippet)) {
576    
577                                    snippet = snippet.concat(fragmentSuffix);
578                            }
579    
580                            return snippet;
581                    }
582                    catch (InvalidTokenOffsetsException itoe) {
583                            throw new IOException(itoe.getMessage());
584                    }
585            }
586    
587            @Override
588            public Version getVersion() {
589                    return _version;
590            }
591    
592            @Override
593            public boolean isLoadIndexFromClusterEnabled() {
594                    if (PropsValues.CLUSTER_LINK_ENABLED &&
595                            PropsValues.LUCENE_REPLICATE_WRITE) {
596    
597                            return true;
598                    }
599    
600                    if (_log.isDebugEnabled()) {
601                            _log.debug("Load index from cluster is not enabled");
602                    }
603    
604                    return false;
605            }
606    
607            @Override
608            public void loadIndex(long companyId, InputStream inputStream)
609                    throws IOException {
610    
611                    if (!isLoadIndexFromClusterEnabled()) {
612                            return;
613                    }
614    
615                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
616    
617                    if (indexAccessor == null) {
618                            if (_log.isInfoEnabled()) {
619                                    _log.info(
620                                            "Skip loading Lucene index files for company " + companyId +
621                                                    " in favor of lazy loading");
622                            }
623    
624                            return;
625                    }
626    
627                    StopWatch stopWatch = new StopWatch();
628    
629                    stopWatch.start();
630    
631                    if (_log.isInfoEnabled()) {
632                            _log.info(
633                                    "Start loading Lucene index files for company " + companyId);
634                    }
635    
636                    indexAccessor.loadIndex(inputStream);
637    
638                    if (_log.isInfoEnabled()) {
639                            _log.info(
640                                    "Finished loading index files for company " + companyId +
641                                            " in " + stopWatch.getTime() + " ms");
642                    }
643            }
644    
645            @Override
646            public void loadIndexesFromCluster(long companyId) throws SystemException {
647                    if (!isLoadIndexFromClusterEnabled()) {
648                            return;
649                    }
650    
651                    IndexAccessor indexAccessor = _indexAccessors.get(companyId);
652    
653                    if (indexAccessor == null) {
654                            return;
655                    }
656    
657                    long localLastGeneration = getLastGeneration(companyId);
658    
659                    _loadIndexFromCluster(indexAccessor, localLastGeneration);
660            }
661    
662            @Override
663            public void releaseIndexSearcher(
664                            long companyId, IndexSearcher indexSearcher)
665                    throws IOException {
666    
667                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
668    
669                    indexAccessor.releaseIndexSearcher(indexSearcher);
670            }
671    
672            public void setAnalyzer(Analyzer analyzer) {
673                    _analyzer = analyzer;
674            }
675    
676            public void setVersion(Version version) {
677                    _version = version;
678            }
679    
680            @Override
681            public void shutdown() {
682                    if (_luceneIndexThreadPoolExecutor != null) {
683                            _luceneIndexThreadPoolExecutor.shutdownNow();
684    
685                            try {
686                                    _luceneIndexThreadPoolExecutor.awaitTermination(
687                                            60, TimeUnit.SECONDS);
688                            }
689                            catch (InterruptedException ie) {
690                                    _log.error("Lucene indexer shutdown interrupted", ie);
691                            }
692                    }
693    
694                    if (isLoadIndexFromClusterEnabled()) {
695                            ClusterExecutorUtil.removeClusterEventListener(
696                                    _loadIndexClusterEventListener);
697                    }
698    
699                    MessageBus messageBus = MessageBusUtil.getMessageBus();
700    
701                    for (String searchEngineId : SearchEngineUtil.getSearchEngineIds()) {
702                            String searchWriterDestinationName =
703                                    SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
704    
705                            Destination searchWriteDestination = messageBus.getDestination(
706                                    searchWriterDestinationName);
707    
708                            if (searchWriteDestination != null) {
709                                    ThreadPoolExecutor threadPoolExecutor =
710                                            PortalExecutorManagerUtil.getPortalExecutor(
711                                                    searchWriterDestinationName);
712    
713                                    int maxPoolSize = threadPoolExecutor.getMaxPoolSize();
714    
715                                    CountDownLatch countDownLatch = new CountDownLatch(maxPoolSize);
716    
717                                    ShutdownSyncJob shutdownSyncJob = new ShutdownSyncJob(
718                                            countDownLatch);
719    
720                                    for (int i = 0; i < maxPoolSize; i++) {
721                                            threadPoolExecutor.submit(shutdownSyncJob);
722                                    }
723    
724                                    try {
725                                            countDownLatch.await();
726                                    }
727                                    catch (InterruptedException ie) {
728                                            _log.error("Shutdown waiting interrupted", ie);
729                                    }
730    
731                                    List<Runnable> runnables = threadPoolExecutor.shutdownNow();
732    
733                                    if (_log.isDebugEnabled()) {
734                                            _log.debug(
735                                                    "Cancelled appending indexing jobs: " + runnables);
736                                    }
737    
738                                    searchWriteDestination.close(true);
739                            }
740                    }
741    
742                    for (IndexAccessor indexAccessor : _indexAccessors.values()) {
743                            indexAccessor.close();
744                    }
745            }
746    
747            @Override
748            public void shutdown(long companyId) {
749                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
750    
751                    _indexAccessors.remove(indexAccessor);
752    
753                    indexAccessor.close();
754            }
755    
756            @Override
757            public void startup(long companyId) {
758                    if (!PropsValues.INDEX_ON_STARTUP) {
759                            return;
760                    }
761    
762                    if (_log.isInfoEnabled()) {
763                            _log.info("Indexing Lucene on startup");
764                    }
765    
766                    LuceneIndexer luceneIndexer = new LuceneIndexer(companyId);
767    
768                    if (PropsValues.INDEX_WITH_THREAD) {
769                            if (_luceneIndexThreadPoolExecutor == null) {
770    
771                                    // This should never be null except for the case where
772                                    // VerifyProcessUtil#_verifyProcess(boolean) sets
773                                    // PropsValues#INDEX_ON_STARTUP to true.
774    
775                                    _luceneIndexThreadPoolExecutor =
776                                            PortalExecutorManagerUtil.getPortalExecutor(
777                                                    LuceneHelperImpl.class.getName());
778                            }
779    
780                            _luceneIndexThreadPoolExecutor.execute(luceneIndexer);
781                    }
782                    else {
783                            luceneIndexer.reindex();
784                    }
785            }
786    
787            @Override
788            public void updateDocument(long companyId, Term term, Document document)
789                    throws IOException {
790    
791                    IndexAccessor indexAccessor = getIndexAccessor(companyId);
792    
793                    indexAccessor.updateDocument(term, document);
794            }
795    
796            private LuceneHelperImpl() {
797                    if (PropsValues.INDEX_ON_STARTUP && PropsValues.INDEX_WITH_THREAD) {
798                            _luceneIndexThreadPoolExecutor =
799                                    PortalExecutorManagerUtil.getPortalExecutor(
800                                            LuceneHelperImpl.class.getName());
801                    }
802    
803                    if (isLoadIndexFromClusterEnabled()) {
804                            _loadIndexClusterEventListener =
805                                    new LoadIndexClusterEventListener();
806    
807                            ClusterExecutorUtil.addClusterEventListener(
808                                    _loadIndexClusterEventListener);
809                    }
810    
811                    BooleanQuery.setMaxClauseCount(_LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE);
812            }
813    
814            private ObjectValuePair<String, URL>
815                            _getBootupClusterNodeObjectValuePair(Address bootupAddress)
816                    throws SystemException {
817    
818                    ClusterRequest clusterRequest = ClusterRequest.createUnicastRequest(
819                            new MethodHandler(
820                                    _createTokenMethodKey,
821                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT),
822                            bootupAddress);
823    
824                    FutureClusterResponses futureClusterResponses =
825                            ClusterExecutorUtil.execute(clusterRequest);
826    
827                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
828                            futureClusterResponses.getPartialResults();
829    
830                    try {
831                            ClusterNodeResponse clusterNodeResponse = clusterNodeResponses.poll(
832                                    _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
833                                    TimeUnit.MILLISECONDS);
834    
835                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
836    
837                            int port = clusterNode.getPort();
838    
839                            if (port <= 0) {
840                                    StringBundler sb = new StringBundler(6);
841    
842                                    sb.append("Invalid cluster node port ");
843                                    sb.append(port);
844                                    sb.append(". The port is set by the first request or ");
845                                    sb.append("configured in portal.properties by the properties ");
846                                    sb.append("\"portal.instance.http.port\" and ");
847                                    sb.append("\"portal.instance.https.port\".");
848    
849                                    throw new Exception(sb.toString());
850                            }
851    
852                            String protocol = clusterNode.getPortalProtocol();
853    
854                            if (Validator.isNull(protocol)) {
855                                    StringBundler sb = new StringBundler(4);
856    
857                                    sb.append("Cluster node protocol is empty. The protocol is ");
858                                    sb.append("set by the first request or configured in ");
859                                    sb.append("portal.properties by the property ");
860                                    sb.append("\"portal.instance.protocol\"");
861    
862                                    throw new Exception(sb.toString());
863                            }
864    
865                            InetAddress inetAddress = clusterNode.getInetAddress();
866    
867                            String hostName = null;
868    
869                            if (PropsValues.LUCENE_CLUSTER_INDEX_USE_CANONICAL_HOST_NAME) {
870                                    hostName = inetAddress.getCanonicalHostName();
871                            }
872                            else {
873                                    hostName = inetAddress.getHostAddress();
874                            }
875    
876                            String fileName = PortalUtil.getPathContext();
877    
878                            if (!fileName.endsWith(StringPool.SLASH)) {
879                                    fileName = fileName.concat(StringPool.SLASH);
880                            }
881    
882                            fileName = fileName.concat("lucene/dump");
883    
884                            URL url = new URL(protocol, hostName, port, fileName);
885    
886                            String transientToken = (String)clusterNodeResponse.getResult();
887    
888                            return new ObjectValuePair<String, URL>(transientToken, url);
889                    }
890                    catch (Exception e) {
891                            throw new SystemException(e);
892                    }
893            }
894    
895            private void _handleFutureClusterResponses(
896                    FutureClusterResponses futureClusterResponses,
897                    IndexAccessor indexAccessor, int clusterNodeAddressesCount,
898                    long localLastGeneration) {
899    
900                    BlockingQueue<ClusterNodeResponse> blockingQueue =
901                            futureClusterResponses.getPartialResults();
902    
903                    long companyId = indexAccessor.getCompanyId();
904    
905                    Address bootupAddress = null;
906    
907                    do {
908                            clusterNodeAddressesCount--;
909    
910                            ClusterNodeResponse clusterNodeResponse = null;
911    
912                            try {
913                                    clusterNodeResponse = blockingQueue.poll(
914                                            _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
915                                            TimeUnit.MILLISECONDS);
916                            }
917                            catch (Exception e) {
918                                    _log.error("Unable to get cluster node response", e);
919                            }
920    
921                            if (clusterNodeResponse == null) {
922                                    if (_log.isDebugEnabled()) {
923                                            _log.debug(
924                                                    "Unable to get cluster node response in " +
925                                                            _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT +
926                                                                    TimeUnit.MILLISECONDS);
927                                    }
928    
929                                    continue;
930                            }
931    
932                            ClusterNode clusterNode = clusterNodeResponse.getClusterNode();
933    
934                            if (clusterNode.getPort() > 0) {
935                                    try {
936                                            long remoteLastGeneration =
937                                                    (Long)clusterNodeResponse.getResult();
938    
939                                            if (remoteLastGeneration > localLastGeneration) {
940                                                    bootupAddress = clusterNodeResponse.getAddress();
941    
942                                                    break;
943                                            }
944                                    }
945                                    catch (Exception e) {
946                                            if (_log.isDebugEnabled()) {
947                                                    _log.debug(
948                                                            "Suppress exception caused by remote method " +
949                                                                    "invocation",
950                                                            e);
951                                            }
952    
953                                            continue;
954                                    }
955                            }
956                            else {
957                                    if (_log.isDebugEnabled()) {
958                                            _log.debug(
959                                                    "Cluster node " + clusterNode +
960                                                            " has invalid port");
961                                    }
962                            }
963                    }
964                    while ((bootupAddress == null) && (clusterNodeAddressesCount > 1));
965    
966                    if (bootupAddress == null) {
967                            return;
968                    }
969    
970                    if (_log.isInfoEnabled()) {
971                            _log.info(
972                                    "Start loading lucene index files from cluster node " +
973                                            bootupAddress);
974                    }
975    
976                    InputStream inputStream = null;
977    
978                    try {
979                            inputStream = getLoadIndexesInputStreamFromCluster(
980                                    companyId, bootupAddress);
981    
982                            indexAccessor.loadIndex(inputStream);
983    
984                            if (_log.isInfoEnabled()) {
985                                    _log.info("Lucene index files loaded successfully");
986                            }
987                    }
988                    catch (Exception e) {
989                            _log.error("Unable to load index for company " + companyId, e);
990                    }
991                    finally {
992                            if (inputStream != null) {
993                                    try {
994                                            inputStream.close();
995                                    }
996                                    catch (IOException ioe) {
997                                            _log.error(
998                                                    "Unable to close input stream for company " +
999                                                            companyId,
1000                                                    ioe);
1001                                    }
1002                            }
1003                    }
1004            }
1005    
1006            private void _includeIfUnique(
1007                    BooleanQuery booleanQuery, boolean like, QueryParser queryParser,
1008                    Query query, BooleanClause.Occur occur) {
1009    
1010                    if (query instanceof TermQuery) {
1011                            Set<Term> terms = new HashSet<Term>();
1012    
1013                            TermQuery termQuery = (TermQuery)query;
1014    
1015                            termQuery.extractTerms(terms);
1016    
1017                            float boost = termQuery.getBoost();
1018    
1019                            for (Term term : terms) {
1020                                    String termValue = term.text();
1021    
1022                                    if (like) {
1023                                            termValue = termValue.toLowerCase(queryParser.getLocale());
1024    
1025                                            term = term.createTerm(
1026                                                    StringPool.STAR.concat(termValue).concat(
1027                                                            StringPool.STAR));
1028    
1029                                            query = new WildcardQuery(term);
1030                                    }
1031                                    else {
1032                                            query = new TermQuery(term);
1033                                    }
1034    
1035                                    query.setBoost(boost);
1036    
1037                                    boolean included = false;
1038    
1039                                    for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1040                                            if (query.equals(booleanClause.getQuery())) {
1041                                                    included = true;
1042                                            }
1043                                    }
1044    
1045                                    if (!included) {
1046                                            booleanQuery.add(query, occur);
1047                                    }
1048                            }
1049                    }
1050                    else if (query instanceof BooleanQuery) {
1051                            BooleanQuery curBooleanQuery = (BooleanQuery)query;
1052    
1053                            BooleanQuery containerBooleanQuery = new BooleanQuery();
1054    
1055                            for (BooleanClause booleanClause : curBooleanQuery.getClauses()) {
1056                                    _includeIfUnique(
1057                                            containerBooleanQuery, like, queryParser,
1058                                            booleanClause.getQuery(), booleanClause.getOccur());
1059                            }
1060    
1061                            if (containerBooleanQuery.getClauses().length > 0) {
1062                                    booleanQuery.add(containerBooleanQuery, occur);
1063                            }
1064                    }
1065                    else {
1066                            boolean included = false;
1067    
1068                            for (BooleanClause booleanClause : booleanQuery.getClauses()) {
1069                                    if (query.equals(booleanClause.getQuery())) {
1070                                            included = true;
1071                                    }
1072                            }
1073    
1074                            if (!included) {
1075                                    booleanQuery.add(query, occur);
1076                            }
1077                    }
1078            }
1079    
1080            private void _loadIndexFromCluster(
1081                            IndexAccessor indexAccessor, long localLastGeneration)
1082                    throws SystemException {
1083    
1084                    List<Address> clusterNodeAddresses =
1085                            ClusterExecutorUtil.getClusterNodeAddresses();
1086    
1087                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
1088    
1089                    if (clusterNodeAddressesCount <= 1) {
1090                            if (_log.isDebugEnabled()) {
1091                                    _log.debug(
1092                                            "Do not load indexes because there is either one portal " +
1093                                                    "instance or no portal instances in the cluster");
1094                            }
1095    
1096                            return;
1097                    }
1098    
1099                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
1100                            new MethodHandler(
1101                                    _getLastGenerationMethodKey, indexAccessor.getCompanyId()),
1102                            true);
1103    
1104                    FutureClusterResponses futureClusterResponses =
1105                            ClusterExecutorUtil.execute(clusterRequest);
1106    
1107                    _handleFutureClusterResponses(
1108                            futureClusterResponses, indexAccessor, clusterNodeAddressesCount,
1109                            localLastGeneration);
1110            }
1111    
1112            private static final long _CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT =
1113                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT;
1114    
1115            private static final int _LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE =
1116                    GetterUtil.getInteger(
1117                            PropsUtil.get(PropsKeys.LUCENE_BOOLEAN_QUERY_CLAUSE_MAX_SIZE),
1118                            BooleanQuery.getMaxClauseCount());
1119    
1120            private static final String[] _KEYWORDS_LOWERCASE = {
1121                    " and ", " not ", " or "
1122            };
1123    
1124            private static final String[] _KEYWORDS_UPPERCASE = {
1125                    " AND ", " NOT ", " OR "
1126            };
1127    
1128            private static Log _log = LogFactoryUtil.getLog(LuceneHelperImpl.class);
1129    
1130            private static MethodKey _createTokenMethodKey = new MethodKey(
1131                    TransientTokenUtil.class, "createToken", long.class);
1132            private static MethodKey _getLastGenerationMethodKey = new MethodKey(
1133                    LuceneHelperUtil.class, "getLastGeneration", long.class);
1134    
1135            private Analyzer _analyzer;
1136            private Map<Long, IndexAccessor> _indexAccessors =
1137                    new ConcurrentHashMap<Long, IndexAccessor>();
1138            private LoadIndexClusterEventListener _loadIndexClusterEventListener;
1139            private ThreadPoolExecutor _luceneIndexThreadPoolExecutor;
1140            private Version _version;
1141    
1142            private static class ShutdownSyncJob implements Runnable {
1143    
1144                    public ShutdownSyncJob(CountDownLatch countDownLatch) {
1145                            _countDownLatch = countDownLatch;
1146                    }
1147    
1148                    @Override
1149                    public void run() {
1150                            _countDownLatch.countDown();
1151    
1152                            try {
1153                                    synchronized (this) {
1154                                            wait();
1155                                    }
1156                            }
1157                            catch (InterruptedException ie) {
1158                            }
1159                    }
1160    
1161                    private final CountDownLatch _countDownLatch;
1162    
1163            }
1164    
1165            private class LoadIndexClusterEventListener
1166                    implements ClusterEventListener {
1167    
1168                    @Override
1169                    public void processClusterEvent(ClusterEvent clusterEvent) {
1170                            ClusterEventType clusterEventType =
1171                                    clusterEvent.getClusterEventType();
1172    
1173                            if (!clusterEventType.equals(ClusterEventType.JOIN)) {
1174                                    return;
1175                            }
1176    
1177                            List<Address> clusterNodeAddresses =
1178                                    ClusterExecutorUtil.getClusterNodeAddresses();
1179                            List<ClusterNode> clusterNodes = clusterEvent.getClusterNodes();
1180    
1181                            if ((clusterNodeAddresses.size() - clusterNodes.size()) > 1) {
1182                                    if (_log.isDebugEnabled()) {
1183                                            _log.debug(
1184                                                    "Number of original cluster members is greater than " +
1185                                                            "one");
1186                                    }
1187    
1188                                    return;
1189                            }
1190    
1191                            long[] companyIds = PortalInstances.getCompanyIds();
1192    
1193                            for (long companyId : companyIds) {
1194                                    loadIndexes(companyId);
1195                            }
1196    
1197                            loadIndexes(CompanyConstants.SYSTEM);
1198                    }
1199    
1200                    private void loadIndexes(long companyId) {
1201                            long lastGeneration = getLastGeneration(companyId);
1202    
1203                            if (lastGeneration == IndexAccessor.DEFAULT_LAST_GENERATION) {
1204                                    return;
1205                            }
1206    
1207                            try {
1208                                    LuceneClusterUtil.loadIndexesFromCluster(companyId);
1209                            }
1210                            catch (Exception e) {
1211                                    _log.error(
1212                                            "Unable to load indexes for company " + companyId, e);
1213                            }
1214                    }
1215    
1216            }
1217    
1218    }