001
014
015 package com.liferay.portal.cluster;
016
017 import com.liferay.portal.kernel.cluster.Address;
018 import com.liferay.portal.kernel.cluster.ClusterLink;
019 import com.liferay.portal.kernel.cluster.Priority;
020 import com.liferay.portal.kernel.cluster.messaging.ClusterForwardMessageListener;
021 import com.liferay.portal.kernel.log.Log;
022 import com.liferay.portal.kernel.log.LogFactoryUtil;
023 import com.liferay.portal.kernel.messaging.Message;
024 import com.liferay.portal.kernel.security.pacl.DoPrivileged;
025 import com.liferay.portal.kernel.util.PropsKeys;
026 import com.liferay.portal.kernel.util.Validator;
027 import com.liferay.portal.util.PropsUtil;
028
029 import java.net.InetAddress;
030
031 import java.util.ArrayList;
032 import java.util.Collections;
033 import java.util.List;
034 import java.util.Properties;
035
036 import org.jgroups.JChannel;
037
038
041 @DoPrivileged
042 public class ClusterLinkImpl extends ClusterBase implements ClusterLink {
043
044 @Override
045 public void destroy() {
046 if (!isEnabled()) {
047 return;
048 }
049
050 for (JChannel jChannel : _transportChannels) {
051 jChannel.close();
052 }
053 }
054
055 @Override
056 public InetAddress getBindInetAddress() {
057 return bindInetAddress;
058 }
059
060 @Override
061 public List<Address> getLocalTransportAddresses() {
062 if (!isEnabled()) {
063 return Collections.emptyList();
064 }
065
066 List<Address> addresses = new ArrayList<Address>(
067 _localTransportAddresses.size());
068
069 for (org.jgroups.Address address : _localTransportAddresses) {
070 addresses.add(new AddressImpl(address));
071 }
072
073 return addresses;
074 }
075
076 @Override
077 public List<Address> getTransportAddresses(Priority priority) {
078 if (!isEnabled()) {
079 return Collections.emptyList();
080 }
081
082 JChannel jChannel = getChannel(priority);
083
084 return getAddresses(jChannel);
085 }
086
087 @Override
088 public void initialize() {
089 if (!isEnabled()) {
090 return;
091 }
092
093 for (JChannel jChannel : _transportChannels) {
094 BaseReceiver baseReceiver = (BaseReceiver)jChannel.getReceiver();
095
096 baseReceiver.openLatch();
097 }
098 }
099
100 @Override
101 public void sendMulticastMessage(Message message, Priority priority) {
102 if (!isEnabled()) {
103 return;
104 }
105
106 JChannel jChannel = getChannel(priority);
107
108 try {
109 sendJGroupsMessage(jChannel, null, message);
110 }
111 catch (Exception e) {
112 _log.error("Unable to send multicast message " + message, e);
113 }
114 }
115
116 @Override
117 public void sendUnicastMessage(
118 Address address, Message message, Priority priority) {
119
120 if (!isEnabled()) {
121 return;
122 }
123
124 org.jgroups.Address jGroupsAddress =
125 (org.jgroups.Address)address.getRealAddress();
126
127 JChannel jChannel = getChannel(priority);
128
129 try {
130 sendJGroupsMessage(jChannel, jGroupsAddress, message);
131 }
132 catch (Exception e) {
133 _log.error("Unable to send unicast message " + message, e);
134 }
135 }
136
137 public void setClusterForwardMessageListener(
138 ClusterForwardMessageListener clusterForwardMessageListener) {
139
140 _clusterForwardMessageListener = clusterForwardMessageListener;
141 }
142
143 protected JChannel getChannel(Priority priority) {
144 int channelIndex =
145 priority.ordinal() * _channelCount / MAX_CHANNEL_COUNT;
146
147 if (_log.isDebugEnabled()) {
148 _log.debug(
149 "Select channel number " + channelIndex + " for priority " +
150 priority);
151 }
152
153 return _transportChannels.get(channelIndex);
154 }
155
156 @Override
157 protected void initChannels() throws Exception {
158 Properties channelNameProperties = PropsUtil.getProperties(
159 PropsKeys.CLUSTER_LINK_CHANNEL_NAME_TRANSPORT, true);
160 Properties transportProperties = PropsUtil.getProperties(
161 PropsKeys.CLUSTER_LINK_CHANNEL_PROPERTIES_TRANSPORT, true);
162
163 _channelCount = transportProperties.size();
164
165 if ((_channelCount <= 0) || (_channelCount > MAX_CHANNEL_COUNT)) {
166 throw new IllegalArgumentException(
167 "Channel count must be between 1 and " + MAX_CHANNEL_COUNT);
168 }
169
170 _localTransportAddresses = new ArrayList<org.jgroups.Address>(
171 _channelCount);
172 _transportChannels = new ArrayList<JChannel>(_channelCount);
173
174 List<String> keys = new ArrayList<String>(_channelCount);
175
176 for (Object key : transportProperties.keySet()) {
177 keys.add((String)key);
178 }
179
180 Collections.sort(keys);
181
182 for (String customName : keys) {
183 String channelName = channelNameProperties.getProperty(customName);
184 String value = transportProperties.getProperty(customName);
185
186 if (Validator.isNull(value) || Validator.isNull(channelName)) {
187 continue;
188 }
189
190 JChannel jChannel = createJChannel(
191 value,
192 new ClusterForwardReceiver(
193 _localTransportAddresses, _clusterForwardMessageListener),
194 channelName);
195
196 _localTransportAddresses.add(jChannel.getAddress());
197 _transportChannels.add(jChannel);
198 }
199 }
200
201 private static Log _log = LogFactoryUtil.getLog(ClusterLinkImpl.class);
202
203 private int _channelCount;
204 private ClusterForwardMessageListener _clusterForwardMessageListener;
205 private List<org.jgroups.Address> _localTransportAddresses;
206 private List<JChannel> _transportChannels;
207
208 }