001
014
015 package com.liferay.portal.kernel.search;
016
017 import com.liferay.portal.kernel.cluster.messaging.ClusterBridgeMessageListener;
018 import com.liferay.portal.kernel.concurrent.CallerRunsPolicy;
019 import com.liferay.portal.kernel.concurrent.RejectedExecutionHandler;
020 import com.liferay.portal.kernel.concurrent.ThreadPoolExecutor;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Destination;
024 import com.liferay.portal.kernel.messaging.InvokerMessageListener;
025 import com.liferay.portal.kernel.messaging.MessageBus;
026 import com.liferay.portal.kernel.messaging.MessageListener;
027 import com.liferay.portal.kernel.messaging.ParallelDestination;
028 import com.liferay.portal.kernel.messaging.SynchronousDestination;
029 import com.liferay.portal.kernel.search.messaging.BaseSearchEngineMessageListener;
030 import com.liferay.portal.kernel.search.messaging.SearchReaderMessageListener;
031 import com.liferay.portal.kernel.search.messaging.SearchWriterMessageListener;
032 import com.liferay.portal.kernel.util.GetterUtil;
033 import com.liferay.portal.kernel.util.PropsKeys;
034 import com.liferay.portal.kernel.util.PropsUtil;
035 import com.liferay.portal.kernel.util.StringBundler;
036 import com.liferay.portal.kernel.util.Validator;
037
038 import java.util.ArrayList;
039 import java.util.List;
040 import java.util.Map;
041 import java.util.Map.Entry;
042 import java.util.Set;
043
044
047 public abstract class AbstractSearchEngineConfigurator {
048
049 public void afterPropertiesSet() {
050 Set<Entry<String, SearchEngine>> entrySet = _searchEngines.entrySet();
051
052 for (Entry<String, SearchEngine> entry : entrySet) {
053 initSearchEngine(entry.getKey(), entry.getValue());
054 }
055
056 String defaultSearchEngineId = getDefaultSearchEngineId();
057
058 if (Validator.isNotNull(defaultSearchEngineId)) {
059 _originalSearchEngineId =
060 SearchEngineUtil.getDefaultSearchEngineId();
061
062 SearchEngineUtil.setDefaultSearchEngineId(defaultSearchEngineId);
063 }
064
065 _searchEngines.clear();
066 }
067
068 public void destroy() {
069 for (SearchEngineRegistration searchEngineRegistration :
070 _searchEngineRegistrations) {
071
072 destroySearchEngine(searchEngineRegistration);
073 }
074
075 _searchEngineRegistrations.clear();
076
077 if (Validator.isNotNull(_originalSearchEngineId)) {
078 SearchEngineUtil.setDefaultSearchEngineId(_originalSearchEngineId);
079
080 _originalSearchEngineId = null;
081 }
082 }
083
084 public void setSearchEngines(Map<String, SearchEngine> searchEngines) {
085 _searchEngines = searchEngines;
086 }
087
088 protected void createSearchEngineListeners(
089 String searchEngineId, SearchEngine searchEngine,
090 Destination searchReaderDestination,
091 Destination searchWriterDestination) {
092
093 registerSearchEngineMessageListener(
094 searchEngineId, searchEngine, searchReaderDestination,
095 new SearchReaderMessageListener(), searchEngine.getIndexSearcher());
096
097 registerSearchEngineMessageListener(
098 searchEngineId, searchEngine, searchWriterDestination,
099 new SearchWriterMessageListener(), searchEngine.getIndexWriter());
100
101 if (searchEngine.isClusteredWrite()) {
102 ClusterBridgeMessageListener clusterBridgeMessageListener =
103 new ClusterBridgeMessageListener();
104
105 clusterBridgeMessageListener.setPriority(
106 searchEngine.getClusteredWritePriority());
107
108 searchWriterDestination.register(clusterBridgeMessageListener);
109 }
110 }
111
112 protected void destroySearchEngine(
113 SearchEngineRegistration searchEngineRegistration) {
114
115 MessageBus messageBus = getMessageBus();
116
117 Destination searchReaderDestination = messageBus.removeDestination(
118 searchEngineRegistration.getSearchReaderDestinationName());
119
120 searchReaderDestination.close(true);
121
122 Destination searchWriterDestination = messageBus.removeDestination(
123 searchEngineRegistration.getSearchWriterDestinationName());
124
125 searchWriterDestination.close(true);
126
127 SearchEngineUtil.removeSearchEngine(
128 searchEngineRegistration.getSearchEngineId());
129
130 if (!searchEngineRegistration.isOverride()) {
131 return;
132 }
133
134 SearchEngineProxyWrapper originalSearchEngineProxy =
135 searchEngineRegistration.getOriginalSearchEngineProxyWrapper();
136
137 SearchEngine originalSearchEngine =
138 originalSearchEngineProxy.getSearchEngine();
139
140 searchReaderDestination = getSearchReaderDestination(
141 messageBus, searchEngineRegistration.getSearchEngineId(),
142 originalSearchEngine);
143
144 registerInvokerMessageListener(
145 searchReaderDestination,
146 searchEngineRegistration.getOriginalSearchReaderMessageListeners());
147
148 searchWriterDestination = getSearchWriterDestination(
149 messageBus, searchEngineRegistration.getSearchEngineId(),
150 originalSearchEngine);
151
152 registerInvokerMessageListener(
153 searchWriterDestination,
154 searchEngineRegistration.getOriginalSearchWriterMessageListeners());
155
156 SearchEngineUtil.setSearchEngine(
157 searchEngineRegistration.getSearchEngineId(),
158 originalSearchEngineProxy);
159 }
160
161 protected abstract String getDefaultSearchEngineId();
162
163 protected abstract IndexSearcher getIndexSearcher();
164
165 protected abstract IndexWriter getIndexWriter();
166
167 protected abstract MessageBus getMessageBus();
168
169 protected abstract ClassLoader getOperatingClassloader();
170
171 protected Destination getSearchReaderDestination(
172 MessageBus messageBus, String searchEngineId,
173 SearchEngine searchEngine) {
174
175 String searchReaderDestinationName =
176 SearchEngineUtil.getSearchReaderDestinationName(searchEngineId);
177
178 Destination searchReaderDestination = messageBus.getDestination(
179 searchReaderDestinationName);
180
181 if (searchReaderDestination == null) {
182 SynchronousDestination synchronousDestination =
183 new SynchronousDestination();
184
185 synchronousDestination.setName(searchReaderDestinationName);
186
187 synchronousDestination.open();
188
189 searchReaderDestination = synchronousDestination;
190
191 messageBus.addDestination(searchReaderDestination);
192 }
193
194 return searchReaderDestination;
195 }
196
197 protected Destination getSearchWriterDestination(
198 MessageBus messageBus, String searchEngineId,
199 SearchEngine searchEngine) {
200
201 String searchWriterDestinationName =
202 SearchEngineUtil.getSearchWriterDestinationName(searchEngineId);
203
204 Destination searchWriterDestination = messageBus.getDestination(
205 searchWriterDestinationName);
206
207 if (searchWriterDestination == null) {
208 ParallelDestination parallelDestination = new ParallelDestination();
209
210 parallelDestination.setName(searchWriterDestinationName);
211
212 if (_INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE > 0) {
213 parallelDestination.setMaximumQueueSize(
214 _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE);
215
216 RejectedExecutionHandler rejectedExecutionHandler =
217 new CallerRunsPolicy() {
218
219 @Override
220 public void rejectedExecution(
221 Runnable runnable,
222 ThreadPoolExecutor threadPoolExecutor) {
223
224 if (_log.isWarnEnabled()) {
225 StringBundler sb = new StringBundler(4);
226
227 sb.append(
228 "The search index writer's task queue ");
229 sb.append("is at its maximum capacity. The ");
230 sb.append("current thread will handle the ");
231 sb.append("request.");
232
233 _log.warn(sb.toString());
234 }
235
236 super.rejectedExecution(
237 runnable, threadPoolExecutor);
238 }
239
240 };
241
242 parallelDestination.setRejectedExecutionHandler(
243 rejectedExecutionHandler);
244 }
245
246 parallelDestination.open();
247
248 searchWriterDestination = parallelDestination;
249
250 messageBus.addDestination(searchWriterDestination);
251 }
252
253 return searchWriterDestination;
254 }
255
256 protected void initSearchEngine(
257 String searchEngineId, SearchEngine searchEngine) {
258
259 SearchEngineRegistration searchEngineRegistration =
260 new SearchEngineRegistration(searchEngineId);
261
262 _searchEngineRegistrations.add(searchEngineRegistration);
263
264 MessageBus messageBus = getMessageBus();
265
266 Destination searchReaderDestination = getSearchReaderDestination(
267 messageBus, searchEngineId, searchEngine);
268
269 searchEngineRegistration.setSearchReaderDestinationName(
270 searchReaderDestination.getName());
271
272 Destination searchWriterDestination = getSearchWriterDestination(
273 messageBus, searchEngineId, searchEngine);
274
275 searchEngineRegistration.setSearchWriterDestinationName(
276 searchWriterDestination.getName());
277
278 SearchEngine originalSearchEngine =
279 SearchEngineUtil.getSearchEngineSilent(searchEngineId);
280
281 if (originalSearchEngine != null) {
282 searchEngineRegistration.setOverride(true);
283
284 searchEngineRegistration.setOriginalSearchEngineProxyWrapper(
285 (SearchEngineProxyWrapper)originalSearchEngine);
286
287 savePreviousSearchEngineListeners(
288 searchReaderDestination, searchWriterDestination,
289 searchEngineRegistration);
290
291 messageBus.removeDestination(searchReaderDestination.getName());
292
293 searchReaderDestination = getSearchReaderDestination(
294 messageBus, searchEngineId, originalSearchEngine);
295
296 messageBus.removeDestination(searchWriterDestination.getName());
297
298 searchWriterDestination = getSearchWriterDestination(
299 messageBus, searchEngineId, originalSearchEngine);
300 }
301
302 createSearchEngineListeners(
303 searchEngineId, searchEngine, searchReaderDestination,
304 searchWriterDestination);
305
306 SearchEngineProxyWrapper searchEngineProxyWrapper =
307 new SearchEngineProxyWrapper(
308 searchEngine, getIndexSearcher(), getIndexWriter());
309
310 SearchEngineUtil.setSearchEngine(
311 searchEngineId, searchEngineProxyWrapper);
312 }
313
314 protected void registerInvokerMessageListener(
315 Destination destination,
316 List<InvokerMessageListener> invokerMessageListeners) {
317
318 for (InvokerMessageListener invokerMessageListener :
319 invokerMessageListeners) {
320
321 destination.register(
322 invokerMessageListener.getMessageListener(),
323 invokerMessageListener.getClassLoader());
324 }
325 }
326
327 protected void registerSearchEngineMessageListener(
328 String searchEngineId, SearchEngine searchEngine,
329 Destination destination,
330 BaseSearchEngineMessageListener baseSearchEngineMessageListener,
331 Object manager) {
332
333 baseSearchEngineMessageListener.setManager(manager);
334 baseSearchEngineMessageListener.setMessageBus(getMessageBus());
335 baseSearchEngineMessageListener.setSearchEngine(searchEngine);
336 baseSearchEngineMessageListener.setSearchEngineId(searchEngineId);
337
338 destination.register(
339 baseSearchEngineMessageListener, getOperatingClassloader());
340 }
341
342 protected void savePreviousSearchEngineListeners(
343 Destination searchReaderDestination,
344 Destination searchWriterDestination,
345 SearchEngineRegistration searchEngineRegistration) {
346
347 Set<MessageListener> searchReaderMessageListeners =
348 searchReaderDestination.getMessageListeners();
349
350 for (MessageListener searchReaderMessageListener :
351 searchReaderMessageListeners) {
352
353 InvokerMessageListener invokerMessageListener =
354 (InvokerMessageListener)searchReaderMessageListener;
355
356 searchEngineRegistration.addOriginalSearchReaderMessageListener(
357 invokerMessageListener);
358 }
359
360 Set<MessageListener> searchWriterMessageListeners =
361 searchWriterDestination.getMessageListeners();
362
363 for (MessageListener searchWriterMessageListener :
364 searchWriterMessageListeners) {
365
366 InvokerMessageListener invokerMessageListener =
367 (InvokerMessageListener)searchWriterMessageListener;
368
369 searchEngineRegistration.addOriginalSearchWriterMessageListener(
370 invokerMessageListener);
371 }
372 }
373
374 private static final int _INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE =
375 GetterUtil.getInteger(
376 PropsUtil.get(PropsKeys.INDEX_SEARCH_WRITER_MAX_QUEUE_SIZE));
377
378 private static Log _log = LogFactoryUtil.getLog(
379 AbstractSearchEngineConfigurator.class);
380
381 private String _originalSearchEngineId;
382 private List<SearchEngineRegistration> _searchEngineRegistrations =
383 new ArrayList<SearchEngineRegistration>();
384 private Map<String, SearchEngine> _searchEngines;
385
386 private class SearchEngineRegistration {
387
388 private SearchEngineRegistration(String searchEngineId) {
389 _searchEngineId = searchEngineId;
390 }
391
392 public void addOriginalSearchReaderMessageListener(
393 InvokerMessageListener messageListener) {
394
395 _originalSearchReaderMessageListeners.add(messageListener);
396 }
397
398 public void addOriginalSearchWriterMessageListener(
399 InvokerMessageListener messageListener) {
400
401 _originalSearchWriterMessageListeners.add(messageListener);
402 }
403
404 public SearchEngineProxyWrapper getOriginalSearchEngineProxyWrapper() {
405 return _originalSearchEngineProxyWrapper;
406 }
407
408 public List<InvokerMessageListener>
409 getOriginalSearchReaderMessageListeners() {
410
411 return _originalSearchReaderMessageListeners;
412 }
413
414 public List<InvokerMessageListener>
415 getOriginalSearchWriterMessageListeners() {
416
417 return _originalSearchWriterMessageListeners;
418 }
419
420 public String getSearchEngineId() {
421 return _searchEngineId;
422 }
423
424 public String getSearchReaderDestinationName() {
425 return _searchReaderDestinationName;
426 }
427
428 public String getSearchWriterDestinationName() {
429 return _searchWriterDestinationName;
430 }
431
432 public boolean isOverride() {
433 return _override;
434 }
435
436 public void setOriginalSearchEngineProxyWrapper(
437 SearchEngineProxyWrapper searchEngineProxyWrapper) {
438
439 _originalSearchEngineProxyWrapper = searchEngineProxyWrapper;
440 }
441
442 public void setOverride(boolean override) {
443 _override = override;
444 }
445
446 public void setSearchReaderDestinationName(
447 String searchReaderDestinationName) {
448
449 _searchReaderDestinationName = searchReaderDestinationName;
450 }
451
452 public void setSearchWriterDestinationName(
453 String searchWriterDestinationName) {
454
455 _searchWriterDestinationName = searchWriterDestinationName;
456 }
457
458 private SearchEngineProxyWrapper _originalSearchEngineProxyWrapper;
459 private List<InvokerMessageListener>
460 _originalSearchReaderMessageListeners =
461 new ArrayList<InvokerMessageListener>();
462 private List<InvokerMessageListener>
463 _originalSearchWriterMessageListeners =
464 new ArrayList<InvokerMessageListener>();
465 private boolean _override;
466 private String _searchEngineId;
467 private String _searchReaderDestinationName;
468 private String _searchWriterDestinationName;
469
470 }
471
472 }