001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.search;
016    
017    import com.liferay.portal.kernel.cluster.messaging.ClusterBridgeMessageListener;
018    import com.liferay.portal.kernel.concurrent.CallerRunsPolicy;
019    import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
020    import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Destination;
024    import com.liferay.portal.kernel.messaging.InvokerMessageListener;
025    import com.liferay.portal.kernel.messaging.MessageBus;
026    import com.liferay.portal.kernel.messaging.MessageListener;
027    import com.liferay.portal.kernel.messaging.ParallelDestination;
028    import com.liferay.portal.kernel.messaging.SynchronousDestination;
029    import com.liferay.portal.kernel.search.messaging.BaseSearchEngineMessageListener;
030    import com.liferay.portal.kernel.search.messaging.SearchReaderMessageListener;
031    import com.liferay.portal.kernel.search.messaging.SearchWriterMessageListener;
032    import com.liferay.portal.kernel.util.GetterUtil;
033    import com.liferay.portal.kernel.util.PropsKeys;
034    import com.liferay.portal.kernel.util.PropsUtil;
035    import com.liferay.portal.kernel.util.StringBundler;
036    import com.liferay.portal.kernel.util.Validator;
037    
038    import java.util.ArrayList;
039    import java.util.List;
040    import java.util.Map;
041    import java.util.Map.Entry;
042    import java.util.Set;
043    
044    /**
045     * @author Michael C. Han
046     */
047    public abstract class AbstractSearchEngineConfigurator {
048    
049            public void afterPropertiesSet() {
050                    Set<Entry<String, SearchEngine>> entrySet = _searchEngines.entrySet();
051    
052                    for (Entry<String, SearchEngine> entry : entrySet) {
053                            initSearchEngine(entry.getKey(), entry.getValue());
054                    }
055    
056                    String defaultSearchEngineId = getDefaultSearchEngineId();
057    
058                    if (Validator.isNotNull(defaultSearchEngineId)) {
059                            _originalSearchEngineId =
060                                    SearchEngineUtil.getDefaultSearchEngineId();
061    
062                            SearchEngineUtil.setDefaultSearchEngineId(defaultSearchEngineId);
063                    }
064    
065                    _searchEngines.clear();
066            }
067    
068            public void destroy() {
069                    for (SearchEngineRegistration searchEngineRegistration :
070                                    _searchEngineRegistrations) {
071    
072                            destroySearchEngine(searchEngineRegistration);
073                    }
074    
075                    _searchEngineRegistrations.clear();
076    
077                    if (Validator.isNotNull(_originalSearchEngineId)) {
078                            SearchEngineUtil.setDefaultSearchEngineId(_originalSearchEngineId);
079    
080                            _originalSearchEngineId = null;
081                    }
082            }
083    
084            public void setSearchEngines(Map<String, SearchEngine> searchEngines) {
085                    _searchEngines = searchEngines;
086            }
087    
088            protected void createSearchEngineListeners(
089                    String searchEngineId, SearchEngine searchEngine,
090                    Destination searchReaderDestination,
091                    Destination searchWriterDestination) {
092    
093                    registerSearchEngineMessageListener(
094                            searchEngineId, searchEngine, searchReaderDestination,
095                            new SearchReaderMessageListener(), searchEngine.getIndexSearcher());
096    
097                    registerSearchEngineMessageListener(
098                            searchEngineId, searchEngine, searchWriterDestination,
099                            new SearchWriterMessageListener(), searchEngine.getIndexWriter());
100    
101                    if (searchEngine.isClusteredWrite()) {
102                            ClusterBridgeMessageListener clusterBridgeMessageListener =
103                                    new ClusterBridgeMessageListener();
104    
105                            clusterBridgeMessageListener.setPriority(
106                                    searchEngine.getClusteredWritePriority());
107    
108                            searchWriterDestination.register(clusterBridgeMessageListener);
109                    }
110            }
111    
112            protected void destroySearchEngine(
113                    SearchEngineRegistration searchEngineRegistration) {
114    
115                    MessageBus messageBus = getMessageBus();
116    
117                    Destination searchReaderDestination = messageBus.removeDestination(
118                            searchEngineRegistration.getSearchReaderDestinationName());
119    
120                    searchReaderDestination.close(true);
121    
122                    Destination searchWriterDestination = messageBus.removeDestination(
123                            searchEngineRegistration.getSearchWriterDestinationName());
124    
125                    searchWriterDestination.close(true);
126    
127                    SearchEngineUtil.removeSearchEngine(
128                            searchEngineRegistration.getSearchEngineId());
129    
130                    if (!searchEngineRegistration.isOverride()) {
131                            return;
132                    }
133    
134                    SearchEngineProxyWrapper originalSearchEngineProxy =
135                            searchEngineRegistration.getOriginalSearchEngineProxyWrapper();
136    
137                    SearchEngine originalSearchEngine =
138                            originalSearchEngineProxy.getSearchEngine();
139    
140                    searchReaderDestination = getSearchReaderDestination(
141                            messageBus, searchEngineRegistration.getSearchEngineId(),
142                            originalSearchEngine);
143    
144                    registerInvokerMessageListener(
145                            searchReaderDestination,
146                            searchEngineRegistration.getOriginalSearchReaderMessageListeners());
147    
148                    searchWriterDestination = getSearchWriterDestination(
149                            messageBus, searchEngineRegistration.getSearchEngineId(),
150                            originalSearchEngine);
151    
152                    registerInvokerMessageListener(
153                            searchWriterDestination,
154                            searchEngineRegistration.getOriginalSearchWriterMessageListeners());
155    
156                    SearchEngineUtil.setSearchEngine(
157                            searchEngineRegistration.getSearchEngineId(),
158                            originalSearchEngineProxy);
159            }
160    
161            protected abstract String getDefaultSearchEngineId();
162    
163            protected abstract IndexSearcher getIndexSearcher();
164    
165            protected abstract IndexWriter getIndexWriter();
166    
167            protected abstract MessageBus getMessageBus();
168    
169            protected abstract ClassLoader getOperatingClassloader();
170    
171            protected Destination getSearchReaderDestination(
172                    MessageBus messageBus, String searchEngineId,
173                    SearchEngine searchEngine) {
174    
175                    String searchReaderDestinationName =
176                            SearchEngineUtil.getSearchReaderDestinationName(searchEngineId);
177    
178                    Destination searchReaderDestination = messageBus.getDestination(
179                            searchReaderDestinationName);
180    
181                    if (searchReaderDestination == null) {
182                            SynchronousDestination synchronousDestination =
183                                    new SynchronousDestination();
184    
185                            synchronousDestination.setName(searchReaderDestinationName);
186    
187                            synchronousDestination.open();
188    
189                            searchReaderDestination = synchronousDestination;
190    
191                            messageBus.addDestination(searchReaderDestination);
192                    }
193    
194                    return searchReaderDestination;
195            }
196    
197            protected Destination getSearchWriterDestination(
198                    MessageBus messageBus, String searchEngineId,
199                    SearchEngine searchEngine) {
200    
201                    String searchWriterDestinationName =
202                            SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
203    
204                    Destination searchWriterDestination = messageBus.getDestination(
205                            searchWriterDestinationName);
206    
207                    if (searchWriterDestination == null) {
208                            ParallelDestination parallelDestination = new ParallelDestination();
209    
210                            parallelDestination.setName(searchWriterDestinationName);
211    
212                            if (_INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE > 0) {
213                                    parallelDestination.setMaximumQueueSize(
214                                            _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE);
215    
216                                    RejectedExecutionHandler rejectedExecutionHandler =
217                                            new CallerRunsPolicy() {
218    
219                                                    @Override
220                                                    public void rejectedExecution(
221                                                            Runnable runnable,
222                                                            ThreadPoolExecutor threadPoolExecutor) {
223    
224                                                            if (_log.isWarnEnabled()) {
225                                                                    StringBundler sb = new StringBundler(4);
226    
227                                                                    sb.append(
228                                                                            "The search index writer's task queue ");
229                                                                    sb.append("is at its maximum capacity. The ");
230                                                                    sb.append("current thread will handle the ");
231                                                                    sb.append("request.");
232    
233                                                                    _log.warn(sb.toString());
234                                                            }
235    
236                                                            super.rejectedExecution(
237                                                                    runnable, threadPoolExecutor);
238                                                    }
239    
240                                            };
241    
242                                    parallelDestination.setRejectedExecutionHandler(
243                                            rejectedExecutionHandler);
244                            }
245    
246                            parallelDestination.open();
247    
248                            searchWriterDestination = parallelDestination;
249    
250                            messageBus.addDestination(searchWriterDestination);
251                    }
252    
253                    return searchWriterDestination;
254            }
255    
256            protected void initSearchEngine(
257                    String searchEngineId, SearchEngine searchEngine) {
258    
259                    SearchEngineRegistration searchEngineRegistration =
260                            new SearchEngineRegistration(searchEngineId);
261    
262                    _searchEngineRegistrations.add(searchEngineRegistration);
263    
264                    MessageBus messageBus = getMessageBus();
265    
266                    Destination searchReaderDestination = getSearchReaderDestination(
267                            messageBus, searchEngineId, searchEngine);
268    
269                    searchEngineRegistration.setSearchReaderDestinationName(
270                            searchReaderDestination.getName());
271    
272                    Destination searchWriterDestination = getSearchWriterDestination(
273                            messageBus, searchEngineId, searchEngine);
274    
275                    searchEngineRegistration.setSearchWriterDestinationName(
276                            searchWriterDestination.getName());
277    
278                    SearchEngine originalSearchEngine =
279                            SearchEngineUtil.getSearchEngineSilent(searchEngineId);
280    
281                    if (originalSearchEngine != null) {
282                            searchEngineRegistration.setOverride(true);
283    
284                            searchEngineRegistration.setOriginalSearchEngineProxyWrapper(
285                                    (SearchEngineProxyWrapper)originalSearchEngine);
286    
287                            savePreviousSearchEngineListeners(
288                                    searchReaderDestination, searchWriterDestination,
289                                    searchEngineRegistration);
290    
291                            messageBus.removeDestination(searchReaderDestination.getName());
292    
293                            searchReaderDestination = getSearchReaderDestination(
294                                    messageBus, searchEngineId, originalSearchEngine);
295    
296                            messageBus.removeDestination(searchWriterDestination.getName());
297    
298                            searchWriterDestination = getSearchWriterDestination(
299                                    messageBus, searchEngineId, originalSearchEngine);
300                    }
301    
302                    createSearchEngineListeners(
303                            searchEngineId, searchEngine, searchReaderDestination,
304                            searchWriterDestination);
305    
306                    SearchEngineProxyWrapper searchEngineProxyWrapper =
307                            new SearchEngineProxyWrapper(
308                                    searchEngine, getIndexSearcher(), getIndexWriter());
309    
310                    SearchEngineUtil.setSearchEngine(
311                            searchEngineId, searchEngineProxyWrapper);
312            }
313    
314            protected void registerInvokerMessageListener(
315                    Destination destination,
316                    List<InvokerMessageListener> invokerMessageListeners) {
317    
318                    for (InvokerMessageListener invokerMessageListener :
319                                    invokerMessageListeners) {
320    
321                            destination.register(
322                                    invokerMessageListener.getMessageListener(),
323                                    invokerMessageListener.getClassLoader());
324                    }
325            }
326    
327            protected void registerSearchEngineMessageListener(
328                    String searchEngineId, SearchEngine searchEngine,
329                    Destination destination,
330                    BaseSearchEngineMessageListener baseSearchEngineMessageListener,
331                    Object manager) {
332    
333                    baseSearchEngineMessageListener.setManager(manager);
334                    baseSearchEngineMessageListener.setMessageBus(getMessageBus());
335                    baseSearchEngineMessageListener.setSearchEngine(searchEngine);
336                    baseSearchEngineMessageListener.setSearchEngineId(searchEngineId);
337    
338                    destination.register(
339                            baseSearchEngineMessageListener, getOperatingClassloader());
340            }
341    
342            protected void savePreviousSearchEngineListeners(
343                    Destination searchReaderDestination,
344                    Destination searchWriterDestination,
345                    SearchEngineRegistration searchEngineRegistration) {
346    
347                    Set<MessageListener> searchReaderMessageListeners =
348                            searchReaderDestination.getMessageListeners();
349    
350                    for (MessageListener searchReaderMessageListener :
351                                    searchReaderMessageListeners) {
352    
353                            InvokerMessageListener invokerMessageListener =
354                                    (InvokerMessageListener)searchReaderMessageListener;
355    
356                            searchEngineRegistration.addOriginalSearchReaderMessageListener(
357                                    invokerMessageListener);
358                    }
359    
360                    Set<MessageListener> searchWriterMessageListeners =
361                            searchWriterDestination.getMessageListeners();
362    
363                    for (MessageListener searchWriterMessageListener :
364                                    searchWriterMessageListeners) {
365    
366                            InvokerMessageListener invokerMessageListener =
367                                    (InvokerMessageListener)searchWriterMessageListener;
368    
369                            searchEngineRegistration.addOriginalSearchWriterMessageListener(
370                                    invokerMessageListener);
371                    }
372            }
373    
374            private static final int _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE =
375                    GetterUtil.getInteger(
376                            PropsUtil.get(PropsKeys.INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE));
377    
378            private static Log _log = LogFactoryUtil.getLog(
379                    AbstractSearchEngineConfigurator.class);
380    
381            private String _originalSearchEngineId;
382            private List<SearchEngineRegistration> _searchEngineRegistrations =
383                    new ArrayList<SearchEngineRegistration>();
384            private Map<String, SearchEngine> _searchEngines;
385    
386            private class SearchEngineRegistration {
387    
388                    private SearchEngineRegistration(String searchEngineId) {
389                            _searchEngineId = searchEngineId;
390                    }
391    
392                    public void addOriginalSearchReaderMessageListener(
393                            InvokerMessageListener messageListener) {
394    
395                            _originalSearchReaderMessageListeners.add(messageListener);
396                    }
397    
398                    public void addOriginalSearchWriterMessageListener(
399                            InvokerMessageListener messageListener) {
400    
401                            _originalSearchWriterMessageListeners.add(messageListener);
402                    }
403    
404                    public SearchEngineProxyWrapper getOriginalSearchEngineProxyWrapper() {
405                            return _originalSearchEngineProxyWrapper;
406                    }
407    
408                    public List<InvokerMessageListener>
409                            getOriginalSearchReaderMessageListeners() {
410    
411                            return _originalSearchReaderMessageListeners;
412                    }
413    
414                    public List<InvokerMessageListener>
415                            getOriginalSearchWriterMessageListeners() {
416    
417                            return _originalSearchWriterMessageListeners;
418                    }
419    
420                    public String getSearchEngineId() {
421                            return _searchEngineId;
422                    }
423    
424                    public String getSearchReaderDestinationName() {
425                            return _searchReaderDestinationName;
426                    }
427    
428                    public String getSearchWriterDestinationName() {
429                            return _searchWriterDestinationName;
430                    }
431    
432                    public boolean isOverride() {
433                            return _override;
434                    }
435    
436                    public void setOriginalSearchEngineProxyWrapper(
437                            SearchEngineProxyWrapper searchEngineProxyWrapper) {
438    
439                            _originalSearchEngineProxyWrapper = searchEngineProxyWrapper;
440                    }
441    
442                    public void setOverride(boolean override) {
443                            _override = override;
444                    }
445    
446                    public void setSearchReaderDestinationName(
447                            String searchReaderDestinationName) {
448    
449                            _searchReaderDestinationName = searchReaderDestinationName;
450                    }
451    
452                    public void setSearchWriterDestinationName(
453                            String searchWriterDestinationName) {
454    
455                            _searchWriterDestinationName = searchWriterDestinationName;
456                    }
457    
458                    private SearchEngineProxyWrapper _originalSearchEngineProxyWrapper;
459                    private List<InvokerMessageListener>
460                            _originalSearchReaderMessageListeners =
461                                    new ArrayList<InvokerMessageListener>();
462                    private List<InvokerMessageListener>
463                            _originalSearchWriterMessageListeners =
464                                    new ArrayList<InvokerMessageListener>();
465                    private boolean _override;
466                    private String _searchEngineId;
467                    private String _searchReaderDestinationName;
468                    private String _searchWriterDestinationName;
469    
470            }
471    
472    }