001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.messaging.sender;
016    
017    import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
018    import com.liferay.portal.kernel.dao.orm.EntityCacheUtil;
019    import com.liferay.portal.kernel.dao.orm.FinderCacheUtil;
020    import com.liferay.portal.kernel.messaging.Message;
021    import com.liferay.portal.kernel.messaging.MessageBus;
022    import com.liferay.portal.kernel.messaging.MessageBusException;
023    import com.liferay.portal.kernel.messaging.MessageListener;
024    
025    /**
026     * @author Michael C. Han
027     */
028    public class SynchronousMessageListener implements MessageListener {
029    
030            public SynchronousMessageListener(
031                    MessageBus messageBus, Message message, long timeout) {
032    
033                    _messageBus = messageBus;
034                    _message = message;
035                    _timeout = timeout;
036                    _responseId = _message.getResponseId();
037            }
038    
039            public Object getResults() {
040                    return _results;
041            }
042    
043            @Override
044            public void receive(Message message) {
045                    if (!message.getResponseId().equals(_responseId)) {
046                            return;
047                    }
048    
049                    synchronized (this) {
050                            _results = message.getPayload();
051    
052                            notify();
053                    }
054            }
055    
056            public Object send() throws MessageBusException {
057                    String destinationName = _message.getDestinationName();
058                    String responseDestinationName = _message.getResponseDestinationName();
059    
060                    _messageBus.registerMessageListener(responseDestinationName, this);
061    
062                    try {
063                            synchronized (this) {
064                                    _messageBus.sendMessage(destinationName, _message);
065    
066                                    wait(_timeout);
067    
068                                    if (_results == null) {
069                                            throw new MessageBusException(
070                                                    "No reply received for message: " + _message);
071                                    }
072                            }
073    
074                            return _results;
075                    }
076                    catch (InterruptedException ie) {
077                            throw new MessageBusException(
078                                    "Message sending interrupted for: " + _message, ie);
079                    }
080                    finally {
081                            _messageBus.unregisterMessageListener(
082                                    responseDestinationName, this);
083    
084                            EntityCacheUtil.clearLocalCache();
085                            FinderCacheUtil.clearLocalCache();
086                            ThreadLocalCacheManager.destroy();
087                    }
088            }
089    
090            private Message _message;
091            private MessageBus _messageBus;
092            private String _responseId;
093            private Object _results;
094            private long _timeout;
095    
096    }