001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cluster;
016    
017    import com.liferay.portal.kernel.cluster.Address;
018    import com.liferay.portal.kernel.cluster.ClusterLink;
019    import com.liferay.portal.kernel.cluster.Priority;
020    import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.messaging.Message;
024    import com.liferay.portal.kernel.security.pacl.DoPrivileged;
025    import com.liferay.portal.kernel.util.PropsKeys;
026    import com.liferay.portal.kernel.util.Validator;
027    import com.liferay.portal.util.PropsUtil;
028    
029    import java.net.InetAddress;
030    
031    import java.util.ArrayList;
032    import java.util.Collections;
033    import java.util.List;
034    import java.util.Properties;
035    
036    import org.jgroups.JChannel;
037    
038    /**
039     * @author Shuyang Zhou
040     */
041    @DoPrivileged
042    public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
043    
044            @Override
045            public void destroy() {
046                    if (!isEnabled()) {
047                            return;
048                    }
049    
050                    for (JChannel jChannel : _transportChannels) {
051                            jChannel.close();
052                    }
053            }
054    
055            @Override
056            public InetAddress getBindInetAddress() {
057                    return bindInetAddress;
058            }
059    
060            @Override
061            public List<Address> getLocalTransportAddresses() {
062                    if (!isEnabled()) {
063                            return Collections.emptyList();
064                    }
065    
066                    List<Address> addresses = new ArrayList<Address>(
067                            _localTransportAddresses.size());
068    
069                    for (org.jgroups.Address address : _localTransportAddresses) {
070                            addresses.add(new AddressImpl(address));
071                    }
072    
073                    return addresses;
074            }
075    
076            @Override
077            public List<Address> getTransportAddresses(Priority priority) {
078                    if (!isEnabled()) {
079                            return Collections.emptyList();
080                    }
081    
082                    JChannel jChannel = getChannel(priority);
083    
084                    return getAddresses(jChannel);
085            }
086    
087            @Override
088            public void initialize() {
089                    if (!isEnabled()) {
090                            return;
091                    }
092    
093                    for (JChannel jChannel : _transportChannels) {
094                            BaseReceiver baseReceiver = (BaseReceiver)jChannel.getReceiver();
095    
096                            baseReceiver.openLatch();
097                    }
098            }
099    
100            @Override
101            public void sendMulticastMessage(Message message, Priority priority) {
102                    if (!isEnabled()) {
103                            return;
104                    }
105    
106                    JChannel jChannel = getChannel(priority);
107    
108                    try {
109                            sendJGroupsMessage(jChannel, null, message);
110                    }
111                    catch (Exception e) {
112                            _log.error("Unable to send multicast message " + message, e);
113                    }
114            }
115    
116            @Override
117            public void sendUnicastMessage(
118                    Address address, Message message, Priority priority) {
119    
120                    if (!isEnabled()) {
121                            return;
122                    }
123    
124                    org.jgroups.Address jGroupsAddress =
125                            (org.jgroups.Address)address.getRealAddress();
126    
127                    JChannel jChannel = getChannel(priority);
128    
129                    try {
130                            sendJGroupsMessage(jChannel, jGroupsAddress, message);
131                    }
132                    catch (Exception e) {
133                            _log.error("Unable to send unicast message " + message, e);
134                    }
135            }
136    
137            public void setClusterForwardMessageListener(
138                    ClusterForwardMessageListener clusterForwardMessageListener) {
139    
140                    _clusterForwardMessageListener = clusterForwardMessageListener;
141            }
142    
143            protected JChannel getChannel(Priority priority) {
144                    int channelIndex =
145                            priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
146    
147                    if (_log.isDebugEnabled()) {
148                            _log.debug(
149                                    "Select channel number " + channelIndex + " for priority " +
150                                            priority);
151                    }
152    
153                    return _transportChannels.get(channelIndex);
154            }
155    
156            @Override
157            protected void initChannels() throws Exception {
158                    Properties channelNameProperties = PropsUtil.getProperties(
159                            PropsKeys.CLUSTER_LINK_CHANNEL_NAME_TRANSPORT, true);
160                    Properties transportProperties = PropsUtil.getProperties(
161                            PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
162    
163                    _channelCount = transportProperties.size();
164    
165                    if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
166                            throw new IllegalArgumentException(
167                                    "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
168                    }
169    
170                    _localTransportAddresses = new ArrayList<org.jgroups.Address>(
171                            _channelCount);
172                    _transportChannels = new ArrayList<JChannel>(_channelCount);
173    
174                    List<String> keys = new ArrayList<String>(_channelCount);
175    
176                    for (Object key : transportProperties.keySet()) {
177                            keys.add((String)key);
178                    }
179    
180                    Collections.sort(keys);
181    
182                    for (String customName : keys) {
183                            String channelName = channelNameProperties.getProperty(customName);
184                            String value = transportProperties.getProperty(customName);
185    
186                            if (Validator.isNull(value) || Validator.isNull(channelName)) {
187                                    continue;
188                            }
189    
190                            JChannel jChannel = createJChannel(
191                                    value,
192                                    new ClusterForwardReceiver(
193                                            _localTransportAddresses, _clusterForwardMessageListener),
194                                    channelName);
195    
196                            _localTransportAddresses.add(jChannel.getAddress());
197                            _transportChannels.add(jChannel);
198                    }
199            }
200    
201            private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
202    
203            private int _channelCount;
204            private ClusterForwardMessageListener _clusterForwardMessageListener;
205            private List<org.jgroups.Address> _localTransportAddresses;
206            private List<JChannel> _transportChannels;
207    
208    }