001
014
015 package com.liferay.portal.kernel.nio.intraband.messaging;
016
017 import com.liferay.portal.kernel.messaging.Destination;
018 import com.liferay.portal.kernel.messaging.MessageBus;
019 import com.liferay.portal.kernel.messaging.MessageBusException;
020 import com.liferay.portal.kernel.messaging.MessageListener;
021 import com.liferay.portal.kernel.messaging.MessageListenerException;
022 import com.liferay.portal.kernel.nio.intraband.BaseAsyncDatagramReceiveHandler;
023 import com.liferay.portal.kernel.nio.intraband.Datagram;
024 import com.liferay.portal.kernel.nio.intraband.Intraband;
025 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
026
027 import java.nio.ByteBuffer;
028
029 import java.util.Set;
030
031
034 public class MessageDatagramReceiveHandler
035 extends BaseAsyncDatagramReceiveHandler {
036
037 public MessageDatagramReceiveHandler(MessageBus messageBus) {
038 _messageBus = messageBus;
039 }
040
041 @Override
042 protected void doReceive(
043 RegistrationReference registrationReference, Datagram datagram)
044 throws Exception {
045
046 ByteBuffer byteBuffer = datagram.getDataByteBuffer();
047
048 MessageRoutingBag messageRoutingBag = MessageRoutingBag.fromByteArray(
049 byteBuffer.array());
050
051 Destination destination = _messageBus.getDestination(
052 messageRoutingBag.getDestinationName());
053
054 if (destination != null) {
055 Set<MessageListener> messageListeners =
056 destination.getMessageListeners();
057
058 if (destination instanceof IntrabandBridgeDestination) {
059 if (messageListeners.isEmpty()) {
060 IntrabandBridgeDestination intrabandBridgeDestination =
061 (IntrabandBridgeDestination)destination;
062
063 intrabandBridgeDestination.sendMessageRoutingBag(
064 messageRoutingBag);
065 }
066 else {
067 destination.send(messageRoutingBag.getMessage());
068 }
069 }
070 else {
071 if (!messageListeners.isEmpty()) {
072 for (MessageListener messageListener : messageListeners) {
073 try {
074 messageListener.receive(
075 messageRoutingBag.getMessage());
076 }
077 catch (MessageListenerException mle) {
078 throw new MessageBusException(mle);
079 }
080 }
081 }
082 }
083 }
084
085 if (messageRoutingBag.isSynchronizedBridge()) {
086 Intraband intraband = registrationReference.getIntraband();
087
088 intraband.sendDatagram(
089 registrationReference,
090 Datagram.createResponseDatagram(
091 datagram, messageRoutingBag.toByteArray()));
092 }
093 }
094
095 private MessageBus _messageBus;
096
097 }