001
014
015 package com.liferay.portal.kernel.messaging.sender;
016
017 import com.liferay.portal.kernel.cache.ThreadLocalCacheManager;
018 import com.liferay.portal.kernel.dao.orm.EntityCacheUtil;
019 import com.liferay.portal.kernel.dao.orm.FinderCacheUtil;
020 import com.liferay.portal.kernel.messaging.Message;
021 import com.liferay.portal.kernel.messaging.MessageBus;
022 import com.liferay.portal.kernel.messaging.MessageBusException;
023 import com.liferay.portal.kernel.messaging.MessageListener;
024
025
028 public class SynchronousMessageListener implements MessageListener {
029
030 public SynchronousMessageListener(
031 MessageBus messageBus, Message message, long timeout) {
032
033 _messageBus = messageBus;
034 _message = message;
035 _timeout = timeout;
036 _responseId = _message.getResponseId();
037 }
038
039 public Object getResults() {
040 return _results;
041 }
042
043 @Override
044 public void receive(Message message) {
045 if (!message.getResponseId().equals(_responseId)) {
046 return;
047 }
048
049 synchronized (this) {
050 _results = message.getPayload();
051
052 notify();
053 }
054 }
055
056 public Object send() throws MessageBusException {
057 String destinationName = _message.getDestinationName();
058 String responseDestinationName = _message.getResponseDestinationName();
059
060 _messageBus.registerMessageListener(responseDestinationName, this);
061
062 try {
063 synchronized (this) {
064 _messageBus.sendMessage(destinationName, _message);
065
066 wait(_timeout);
067
068 if (_results == null) {
069 throw new MessageBusException(
070 "No reply received for message: " + _message);
071 }
072 }
073
074 return _results;
075 }
076 catch (InterruptedException ie) {
077 throw new MessageBusException(
078 "Message sending interrupted for: " + _message, ie);
079 }
080 finally {
081 _messageBus.unregisterMessageListener(
082 responseDestinationName, this);
083
084 EntityCacheUtil.clearLocalCache();
085 FinderCacheUtil.clearLocalCache();
086 ThreadLocalCacheManager.destroy();
087 }
088 }
089
090 private Message _message;
091 private MessageBus _messageBus;
092 private String _responseId;
093 private Object _results;
094 private long _timeout;
095
096 }