001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterLink;
019 import com.liferay.portal.kernel.cluster.Priority;
020 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
025 import com.liferay.portal.kernel.util.PropsKeys;
026 import com.liferay.portal.util.PropsUtil;
027
028 import java.net.InetAddress;
029
030 import java.util.ArrayList;
031 import java.util.Collections;
032 import java.util.List;
033 import java.util.Properties;
034
035 import org.jgroups.ChannelException;
036 import org.jgroups.JChannel;
037
038
041 @DoPrivileged
042 public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
043
044 public static final int MAX_CHANNEL_COUNT = Priority.values().length;
045
046 @Override
047 public void destroy() {
048 if (!isEnabled()) {
049 return;
050 }
051
052 for (JChannel jChannel : _transportChannels) {
053 jChannel.close();
054 }
055 }
056
057 @Override
058 public InetAddress getBindInetAddress() {
059 return bindInetAddress;
060 }
061
062 @Override
063 public List<Address> getLocalTransportAddresses() {
064 if (!isEnabled()) {
065 return Collections.emptyList();
066 }
067
068 List<Address> addresses = new ArrayList<Address>(
069 _localTransportAddresses.size());
070
071 for (org.jgroups.Address address : _localTransportAddresses) {
072 addresses.add(new AddressImpl(address));
073 }
074
075 return addresses;
076 }
077
078 @Override
079 public List<Address> getTransportAddresses(Priority priority) {
080 if (!isEnabled()) {
081 return Collections.emptyList();
082 }
083
084 JChannel jChannel = getChannel(priority);
085
086 return getAddresses(jChannel);
087 }
088
089 @Override
090 public void sendMulticastMessage(Message message, Priority priority) {
091 if (!isEnabled()) {
092 return;
093 }
094
095 JChannel jChannel = getChannel(priority);
096
097 try {
098 jChannel.send(null, null, message);
099 }
100 catch (ChannelException ce) {
101 _log.error("Unable to send multicast message " + message, ce);
102 }
103 }
104
105 @Override
106 public void sendUnicastMessage(
107 Address address, Message message, Priority priority) {
108
109 if (!isEnabled()) {
110 return;
111 }
112
113 org.jgroups.Address jGroupsAddress =
114 (org.jgroups.Address)address.getRealAddress();
115
116 JChannel jChannel = getChannel(priority);
117
118 try {
119 jChannel.send(jGroupsAddress, null, message);
120 }
121 catch (ChannelException ce) {
122 _log.error("Unable to send unicast message " + message, ce);
123 }
124 }
125
126 public void setClusterForwardMessageListener(
127 ClusterForwardMessageListener clusterForwardMessageListener) {
128
129 _clusterForwardMessageListener = clusterForwardMessageListener;
130 }
131
132 protected JChannel getChannel(Priority priority) {
133 int channelIndex =
134 priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
135
136 if (_log.isDebugEnabled()) {
137 _log.debug(
138 "Select channel number " + channelIndex + " for priority " +
139 priority);
140 }
141
142 return _transportChannels.get(channelIndex);
143 }
144
145 @Override
146 protected void initChannels() throws ChannelException {
147 Properties transportProperties = PropsUtil.getProperties(
148 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
149
150 _channelCount = transportProperties.size();
151
152 if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
153 throw new IllegalArgumentException(
154 "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
155 }
156
157 _localTransportAddresses = new ArrayList<org.jgroups.Address>(
158 _channelCount);
159 _transportChannels = new ArrayList<JChannel>(_channelCount);
160
161 List<String> keys = new ArrayList<String>(_channelCount);
162
163 for (Object key : transportProperties.keySet()) {
164 keys.add((String)key);
165 }
166
167 Collections.sort(keys);
168
169 for (int i = 0; i < keys.size(); i++) {
170 String customName = keys.get(i);
171
172 String value = transportProperties.getProperty(customName);
173
174 JChannel jChannel = createJChannel(
175 value,
176 new ClusterForwardReceiver(
177 _localTransportAddresses, _clusterForwardMessageListener),
178 _LIFERAY_TRANSPORT_CHANNEL + i);
179
180 _localTransportAddresses.add(jChannel.getLocalAddress());
181 _transportChannels.add(jChannel);
182 }
183 }
184
185 private static final String _LIFERAY_TRANSPORT_CHANNEL =
186 "LIFERAY-TRANSPORT-CHANNEL-";
187
188 private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
189
190 private int _channelCount;
191 private ClusterForwardMessageListener _clusterForwardMessageListener;
192 private List<org.jgroups.Address> _localTransportAddresses;
193 private List<JChannel> _transportChannels;
194
195 }