001
014
015 package com.liferay.portal.kernel.messaging.sender;
016
017 import com.liferay.portal.kernel.messaging.Message;
018 import com.liferay.portal.kernel.messaging.MessageBus;
019 import com.liferay.portal.kernel.messaging.MessageBusException;
020 import com.liferay.portal.kernel.messaging.MessageListener;
021
022
025 public class SynchronousMessageListener implements MessageListener {
026
027 public SynchronousMessageListener(
028 MessageBus messageBus, Message message, long timeout) {
029
030 _messageBus = messageBus;
031 _message = message;
032 _timeout = timeout;
033 _responseId = _message.getResponseId();
034 }
035
036 public Object getResults() {
037 return _results;
038 }
039
040 public void receive(Message message) {
041 if (!message.getResponseId().equals(_responseId)) {
042 return;
043 }
044
045 synchronized (this) {
046 _results = message.getPayload();
047
048 notify();
049 }
050 }
051
052 public Object send() throws MessageBusException {
053 String destinationName = _message.getDestinationName();
054 String responseDestinationName = _message.getResponseDestinationName();
055
056 _messageBus.registerMessageListener(responseDestinationName, this);
057
058 try {
059 synchronized (this) {
060 _messageBus.sendMessage(destinationName, _message);
061
062 wait(_timeout);
063
064 if (_results == null) {
065 throw new MessageBusException(
066 "No reply received for message: " + _message);
067 }
068 }
069
070 return _results;
071 }
072 catch (InterruptedException ie) {
073 throw new MessageBusException(
074 "Message sending interrupted for: " + _message, ie);
075 }
076 finally {
077 _messageBus.unregisterMessageListener(
078 responseDestinationName, this);
079 }
080 }
081
082 private MessageBus _messageBus;
083 private Message _message;
084 private long _timeout;
085 private String _responseId;
086 private Object _results;
087
088 }