001
014
015 package com.liferay.portal.kernel.nio.intraband.messaging;
016
017 import com.liferay.portal.kernel.messaging.Destination;
018 import com.liferay.portal.kernel.messaging.DestinationWrapper;
019 import com.liferay.portal.kernel.messaging.Message;
020 import com.liferay.portal.kernel.messaging.MessageBusUtil;
021 import com.liferay.portal.kernel.messaging.MessageListener;
022 import com.liferay.portal.kernel.messaging.proxy.MessagingProxy;
023 import com.liferay.portal.kernel.nio.intraband.Datagram;
024 import com.liferay.portal.kernel.nio.intraband.Intraband;
025 import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
026 import com.liferay.portal.kernel.nio.intraband.SystemDataType;
027 import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
028 import com.liferay.portal.kernel.resiliency.spi.SPI;
029 import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
030 import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
031 import com.liferay.portal.kernel.util.StringPool;
032
033 import java.nio.ByteBuffer;
034
035 import java.rmi.RemoteException;
036
037 import java.util.List;
038 import java.util.Set;
039
040
043 public class IntrabandBridgeDestination extends DestinationWrapper {
044
045 public IntrabandBridgeDestination(Destination destination) {
046 super(destination);
047 }
048
049 @Override
050 public void send(Message message) {
051 if (message.getBoolean(MessagingProxy.LOCAL_MESSAGE)) {
052 destination.send(message);
053
054 return;
055 }
056
057 message.setDestinationName(getName());
058
059 MessageRoutingBag messageRoutingBag =
060 (MessageRoutingBag)message.get(
061 MessageRoutingBag.MESSAGE_ROUTING_BAG);
062
063 if (messageRoutingBag == null) {
064 messageRoutingBag = new MessageRoutingBag(message, true);
065
066 message.put(
067 MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
068 }
069
070 sendMessageRoutingBag(messageRoutingBag);
071
072 try {
073 Message responseMessage = messageRoutingBag.getMessage();
074
075 responseMessage.copyTo(message);
076
077 messageRoutingBag.setMessage(message);
078 }
079 catch (ClassNotFoundException cnfe) {
080 throw new RuntimeException(cnfe);
081 }
082
083 Set<MessageListener> messageListeners =
084 destination.getMessageListeners();
085
086 for (MessageListener messageListener : messageListeners) {
087 try {
088 messageListener.receive(message);
089 }
090 catch (Exception e) {
091 throw new RuntimeException(e);
092 }
093 }
094 }
095
096 public void sendMessageRoutingBag(MessageRoutingBag messageRoutingBag) {
097 if (SPIUtil.isSPI()) {
098 SPI spi = SPIUtil.getSPI();
099
100 try {
101 String routingId = toRoutingId(spi);
102
103 messageRoutingBag.appendRoutingId(routingId);
104
105 if (!messageRoutingBag.isRoutingDowncast()) {
106 RegistrationReference registrationReference =
107 spi.getRegistrationReference();
108
109 sendMessageRoutingBag(
110 registrationReference, messageRoutingBag);
111 }
112 }
113 catch (Exception e) {
114 throw new RuntimeException(e);
115 }
116 }
117
118 List<SPI> spis = MPIHelperUtil.getSPIs();
119
120 if (spis.isEmpty() && !SPIUtil.isSPI()) {
121 MessageBusUtil.addDestination(destination);
122 }
123 else {
124 messageRoutingBag.setRoutingDowncast(true);
125
126 try {
127 for (SPI spi : spis) {
128 String routingId = toRoutingId(spi);
129
130 if (!messageRoutingBag.isVisited(routingId)) {
131 RegistrationReference registrationReference =
132 spi.getRegistrationReference();
133
134 sendMessageRoutingBag(
135 registrationReference, messageRoutingBag);
136 }
137 }
138 }
139 catch (Exception e) {
140 throw new RuntimeException(e);
141 }
142 }
143 }
144
145 protected void sendMessageRoutingBag(
146 RegistrationReference registrationReference,
147 MessageRoutingBag messageRoutingBag) {
148
149 try {
150 Intraband intraband = registrationReference.getIntraband();
151
152 Datagram datagram = intraband.sendSyncDatagram(
153 registrationReference,
154 Datagram.createRequestDatagram(
155 SystemDataType.MESSAGE.getValue(),
156 messageRoutingBag.toByteArray()));
157
158 ByteBuffer byteBuffer = datagram.getDataByteBuffer();
159
160 MessageRoutingBag receivedMessageRoutingBag =
161 MessageRoutingBag.fromByteArray(byteBuffer.array());
162
163 Message receivedMessage = receivedMessageRoutingBag.getMessage();
164
165 Message message = messageRoutingBag.getMessage();
166
167 receivedMessage.copyTo(message);
168
169 message.put(
170 MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
171 }
172 catch (Exception e) {
173 throw new RuntimeException(e);
174 }
175 }
176
177 protected String toRoutingId(SPI spi) throws RemoteException {
178 String spiProviderName = spi.getSPIProviderName();
179
180 SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
181
182 String spiId = spiConfiguration.getSPIId();
183
184 return spiProviderName.concat(StringPool.POUND).concat(spiId);
185 }
186
187 }