001
014
015 package com.liferay.portal.kernel.messaging.sender;
016
017 import com.liferay.portal.kernel.log.Log;
018 import com.liferay.portal.kernel.log.LogFactoryUtil;
019 import com.liferay.portal.kernel.messaging.Destination;
020 import com.liferay.portal.kernel.messaging.Message;
021 import com.liferay.portal.kernel.messaging.MessageBus;
022 import com.liferay.portal.kernel.messaging.MessageBusException;
023 import com.liferay.portal.kernel.messaging.MessageListener;
024 import com.liferay.portal.kernel.messaging.MessageListenerException;
025 import com.liferay.portal.kernel.messaging.SynchronousDestination;
026 import com.liferay.portal.kernel.nio.intraband.messaging.IntrabandBridgeDestination;
027
028 import java.util.Set;
029
030
033 public class DirectSynchronousMessageSender
034 implements SynchronousMessageSender {
035
036 @Override
037 public Object send(String destinationName, Message message)
038 throws MessageBusException {
039
040 Destination destination = _messageBus.getDestination(destinationName);
041
042 if (destination == null) {
043 if (_log.isInfoEnabled()) {
044 _log.info(
045 "Destination " + destinationName + " is not configured");
046 }
047
048 return null;
049 }
050
051 if ((destination instanceof IntrabandBridgeDestination) ||
052 (destination instanceof SynchronousDestination)) {
053
054 destination.send(message);
055 }
056 else {
057 Set<MessageListener> messageListeners =
058 destination.getMessageListeners();
059
060 for (MessageListener messageListener : messageListeners) {
061 try {
062 messageListener.receive(message);
063 }
064 catch (MessageListenerException mle) {
065 throw new MessageBusException(mle);
066 }
067 }
068 }
069
070 return message.getResponse();
071 }
072
073 @Override
074 public Object send(String destinationName, Message message, long timeout)
075 throws MessageBusException {
076
077 if (_log.isWarnEnabled()) {
078 _log.warn(
079 DirectSynchronousMessageSender.class.getName() +
080 " does not support timeout");
081 }
082
083 return send(destinationName, message);
084 }
085
086 public void setMessageBus(MessageBus messageBus) {
087 _messageBus = messageBus;
088 }
089
090 private static Log _log = LogFactoryUtil.getLog(
091 DirectSynchronousMessageSender.class);
092
093 private MessageBus _messageBus;
094
095 }