001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterLink;
019    import com.liferay.portal.kernel.cluster.Priority;
020    import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
025    import com.liferay.portal.kernel.util.PropsKeys;
026    import com.liferay.portal.util.PropsUtil;
027    
028    import java.net.InetAddress;
029    
030    import java.util.ArrayList;
031    import java.util.Collections;
032    import java.util.List;
033    import java.util.Properties;
034    
035    import org.jgroups.ChannelException;
036    import org.jgroups.JChannel;
037    
038    /**
039     * @author Shuyang Zhou
040     */
041    @DoPrivileged
042    public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
043    
044            public static final int MAX_CHANNEL_COUNT = Priority.values().length;
045    
046            @Override
047            public void destroy() {
048                    if (!isEnabled()) {
049                            return;
050                    }
051    
052                    for (JChannel jChannel : _transportChannels) {
053                            jChannel.close();
054                    }
055            }
056    
057            @Override
058            public InetAddress getBindInetAddress() {
059                    return bindInetAddress;
060            }
061    
062            @Override
063            public List<Address> getLocalTransportAddresses() {
064                    if (!isEnabled()) {
065                            return Collections.emptyList();
066                    }
067    
068                    List<Address> addresses = new ArrayList<Address>(
069                            _localTransportAddresses.size());
070    
071                    for (org.jgroups.Address address : _localTransportAddresses) {
072                            addresses.add(new AddressImpl(address));
073                    }
074    
075                    return addresses;
076            }
077    
078            @Override
079            public List<Address> getTransportAddresses(Priority priority) {
080                    if (!isEnabled()) {
081                            return Collections.emptyList();
082                    }
083    
084                    JChannel jChannel = getChannel(priority);
085    
086                    return getAddresses(jChannel);
087            }
088    
089            @Override
090            public void sendMulticastMessage(Message message, Priority priority) {
091                    if (!isEnabled()) {
092                            return;
093                    }
094    
095                    JChannel jChannel = getChannel(priority);
096    
097                    try {
098                            jChannel.send(null, null, message);
099                    }
100                    catch (ChannelException ce) {
101                            _log.error("Unable to send multicast message " + message, ce);
102                    }
103            }
104    
105            @Override
106            public void sendUnicastMessage(
107                    Address address, Message message, Priority priority) {
108    
109                    if (!isEnabled()) {
110                            return;
111                    }
112    
113                    org.jgroups.Address jGroupsAddress =
114                            (org.jgroups.Address)address.getRealAddress();
115    
116                    JChannel jChannel = getChannel(priority);
117    
118                    try {
119                            jChannel.send(jGroupsAddress, null, message);
120                    }
121                    catch (ChannelException ce) {
122                            _log.error("Unable to send unicast message " + message, ce);
123                    }
124            }
125    
126            public void setClusterForwardMessageListener(
127                    ClusterForwardMessageListener clusterForwardMessageListener) {
128    
129                    _clusterForwardMessageListener = clusterForwardMessageListener;
130            }
131    
132            protected JChannel getChannel(Priority priority) {
133                    int channelIndex =
134                            priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
135    
136                    if (_log.isDebugEnabled()) {
137                            _log.debug(
138                                    "Select channel number " + channelIndex + " for priority " +
139                                            priority);
140                    }
141    
142                    return _transportChannels.get(channelIndex);
143            }
144    
145            @Override
146            protected void initChannels() throws ChannelException {
147                    Properties transportProperties = PropsUtil.getProperties(
148                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
149    
150                    _channelCount = transportProperties.size();
151    
152                    if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
153                            throw new IllegalArgumentException(
154                                    "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
155                    }
156    
157                    _localTransportAddresses = new ArrayList<org.jgroups.Address>(
158                            _channelCount);
159                    _transportChannels = new ArrayList<JChannel>(_channelCount);
160    
161                    List<String> keys = new ArrayList<String>(_channelCount);
162    
163                    for (Object key : transportProperties.keySet()) {
164                            keys.add((String)key);
165                    }
166    
167                    Collections.sort(keys);
168    
169                    for (int i = 0; i < keys.size(); i++) {
170                            String customName = keys.get(i);
171    
172                            String value = transportProperties.getProperty(customName);
173    
174                            JChannel jChannel = createJChannel(
175                                    value,
176                                    new ClusterForwardReceiver(
177                                            _localTransportAddresses, _clusterForwardMessageListener),
178                                            _LIFERAY_TRANSPORT_CHANNEL + i);
179    
180                            _localTransportAddresses.add(jChannel.getLocalAddress());
181                            _transportChannels.add(jChannel);
182                    }
183            }
184    
185            private static final String _LIFERAY_TRANSPORT_CHANNEL =
186                    "LIFERAY-TRANSPORT-CHANNEL-";
187    
188            private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
189    
190            private int _channelCount;
191            private ClusterForwardMessageListener _clusterForwardMessageListener;
192            private List<org.jgroups.Address> _localTransportAddresses;
193            private List<JChannel> _transportChannels;
194    
195    }