001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.nio.intraband.messaging;
016    
017    import com.liferay.portal.kernel.messaging.Destination;
018    import com.liferay.portal.kernel.messaging.DestinationWrapper;
019    import com.liferay.portal.kernel.messaging.Message;
020    import com.liferay.portal.kernel.messaging.MessageBusUtil;
021    import com.liferay.portal.kernel.messaging.MessageListener;
022    import com.liferay.portal.kernel.messaging.proxy.MessagingProxy;
023    import com.liferay.portal.kernel.nio.intraband.Datagram;
024    import com.liferay.portal.kernel.nio.intraband.Intraband;
025    import com.liferay.portal.kernel.nio.intraband.RegistrationReference;
026    import com.liferay.portal.kernel.nio.intraband.SystemDataType;
027    import com.liferay.portal.kernel.resiliency.mpi.MPIHelperUtil;
028    import com.liferay.portal.kernel.resiliency.spi.SPI;
029    import com.liferay.portal.kernel.resiliency.spi.SPIConfiguration;
030    import com.liferay.portal.kernel.resiliency.spi.SPIUtil;
031    import com.liferay.portal.kernel.util.StringPool;
032    
033    import java.nio.ByteBuffer;
034    
035    import java.rmi.RemoteException;
036    
037    import java.util.List;
038    import java.util.Set;
039    
040    /**
041     * @author Shuyang Zhou
042     */
043    public class IntrabandBridgeDestination extends DestinationWrapper {
044    
045            public IntrabandBridgeDestination(Destination destination) {
046                    super(destination);
047            }
048    
049            @Override
050            public void send(Message message) {
051                    if (message.getBoolean(MessagingProxy.LOCAL_MESSAGE)) {
052                            destination.send(message);
053    
054                            return;
055                    }
056    
057                    message.setDestinationName(getName());
058    
059                    MessageRoutingBag messageRoutingBag =
060                            (MessageRoutingBag)message.get(
061                                    MessageRoutingBag.MESSAGE_ROUTING_BAG);
062    
063                    if (messageRoutingBag == null) {
064                            messageRoutingBag = new MessageRoutingBag(message, true);
065    
066                            message.put(
067                                    MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
068                    }
069    
070                    sendMessageRoutingBag(messageRoutingBag);
071    
072                    try {
073                            Message responseMessage = messageRoutingBag.getMessage();
074    
075                            responseMessage.copyTo(message);
076    
077                            messageRoutingBag.setMessage(message);
078                    }
079                    catch (ClassNotFoundException cnfe) {
080                            throw new RuntimeException(cnfe);
081                    }
082    
083                    Set<MessageListener> messageListeners =
084                            destination.getMessageListeners();
085    
086                    for (MessageListener messageListener : messageListeners) {
087                            try {
088                                    messageListener.receive(message);
089                            }
090                            catch (Exception e) {
091                                    throw new RuntimeException(e);
092                            }
093                    }
094            }
095    
096            public void sendMessageRoutingBag(MessageRoutingBag messageRoutingBag) {
097                    if (SPIUtil.isSPI()) {
098                            SPI spi = SPIUtil.getSPI();
099    
100                            try {
101                                    String routingId = toRoutingId(spi);
102    
103                                    messageRoutingBag.appendRoutingId(routingId);
104    
105                                    if (!messageRoutingBag.isRoutingDowncast()) {
106                                            RegistrationReference registrationReference =
107                                                    spi.getRegistrationReference();
108    
109                                            sendMessageRoutingBag(
110                                                    registrationReference, messageRoutingBag);
111                                    }
112                            }
113                            catch (Exception e) {
114                                    throw new RuntimeException(e);
115                            }
116                    }
117    
118                    List<SPI> spis = MPIHelperUtil.getSPIs();
119    
120                    if (spis.isEmpty() && !SPIUtil.isSPI()) {
121                            MessageBusUtil.addDestination(destination);
122                    }
123                    else {
124                            messageRoutingBag.setRoutingDowncast(true);
125    
126                            try {
127                                    for (SPI spi : spis) {
128                                            String routingId = toRoutingId(spi);
129    
130                                            if (!messageRoutingBag.isVisited(routingId)) {
131                                                    RegistrationReference registrationReference =
132                                                            spi.getRegistrationReference();
133    
134                                                    sendMessageRoutingBag(
135                                                            registrationReference, messageRoutingBag);
136                                            }
137                                    }
138                            }
139                            catch (Exception e) {
140                                    throw new RuntimeException(e);
141                            }
142                    }
143            }
144    
145            protected void sendMessageRoutingBag(
146                    RegistrationReference registrationReference,
147                    MessageRoutingBag messageRoutingBag) {
148    
149                    try {
150                            Intraband intraband = registrationReference.getIntraband();
151    
152                            Datagram datagram = intraband.sendSyncDatagram(
153                                    registrationReference,
154                                    Datagram.createRequestDatagram(
155                                            SystemDataType.MESSAGE.getValue(),
156                                            messageRoutingBag.toByteArray()));
157    
158                            ByteBuffer byteBuffer = datagram.getDataByteBuffer();
159    
160                            MessageRoutingBag receivedMessageRoutingBag =
161                                    MessageRoutingBag.fromByteArray(byteBuffer.array());
162    
163                            Message receivedMessage = receivedMessageRoutingBag.getMessage();
164    
165                            Message message = messageRoutingBag.getMessage();
166    
167                            receivedMessage.copyTo(message);
168    
169                            message.put(
170                                    MessageRoutingBag.MESSAGE_ROUTING_BAG, messageRoutingBag);
171                    }
172                    catch (Exception e) {
173                            throw new RuntimeException(e);
174                    }
175            }
176    
177            protected String toRoutingId(SPI spi) throws RemoteException {
178                    String spiProviderName = spi.getSPIProviderName();
179    
180                    SPIConfiguration spiConfiguration = spi.getSPIConfiguration();
181    
182                    String spiId = spiConfiguration.getSPIId();
183    
184                    return spiProviderName.concat(StringPool.POUND).concat(spiId);
185            }
186    
187    }