001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.cluster.BaseReceiver;
018 import com.liferay.portal.kernel.log.Log;
019 import com.liferay.portal.kernel.log.LogFactoryUtil;
020
021 import java.io.Serializable;
022
023 import java.rmi.RemoteException;
024
025 import java.util.ArrayList;
026 import java.util.List;
027
028 import net.sf.ehcache.Cache;
029 import net.sf.ehcache.CacheException;
030 import net.sf.ehcache.CacheManager;
031 import net.sf.ehcache.Ehcache;
032 import net.sf.ehcache.Element;
033 import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034 import net.sf.ehcache.distribution.CachePeer;
035 import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036
037 import org.jgroups.Address;
038 import org.jgroups.JChannel;
039 import org.jgroups.Message;
040 import org.jgroups.View;
041
042
049 public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050
051 public JGroupsManager(
052 CacheManager cacheManager, String clusterName,
053 String channelProperties) {
054
055 try {
056 _jChannel = new JChannel(channelProperties);
057
058 _jChannel.setReceiver(new EhcacheJGroupsReceiver());
059
060 _jChannel.connect(clusterName);
061
062 if (_log.isInfoEnabled()) {
063 _log.info(
064 "Create a new channel with properties " +
065 _jChannel.getProperties());
066 }
067 }
068 catch (Exception e) {
069 if (_log.isErrorEnabled()) {
070 _log.error("Unable to initialize channels", e);
071 }
072 }
073
074 _cacheManager = cacheManager;
075
076 BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
077
078 baseReceiver.openLatch();
079 }
080
081 @Override
082 public void dispose() throws CacheException {
083 if (_jChannel != null) {
084 _jChannel.close();
085 }
086 }
087
088 public Address getBusLocalAddress() {
089 return _jChannel.getAddress();
090 }
091
092 public List<Address> getBusMembership() {
093 BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
094
095 View view = baseReceiver.getView();
096
097 return view.getMembers();
098 }
099
100 @Override
101 @SuppressWarnings("rawtypes")
102 public List getElements(List list) {
103 return null;
104 }
105
106 @Override
107 public String getGuid() {
108 return null;
109 }
110
111 @Override
112 @SuppressWarnings("rawtypes")
113 public List getKeys() {
114 return null;
115 }
116
117 @Override
118 public String getName() {
119 return null;
120 }
121
122 @Override
123 public Element getQuiet(Serializable serializable) {
124 return null;
125 }
126
127 @Override
128 public String getScheme() {
129 return _SCHEME;
130 }
131
132 @Override
133 public long getTimeForClusterToForm() {
134 return 0;
135 }
136
137 @Override
138 public String getUrl() {
139 return null;
140 }
141
142 @Override
143 public String getUrlBase() {
144 return null;
145 }
146
147 public void handleNotification(Serializable serializable) {
148 if (serializable instanceof JGroupEventMessage) {
149 handleJGroupsNotification((JGroupEventMessage)serializable);
150 }
151 else if (serializable instanceof List<?>) {
152 List<?> valueList = (List<?>)serializable;
153
154 for (Object object : valueList) {
155 if (object instanceof JGroupEventMessage) {
156 handleJGroupsNotification((JGroupEventMessage)object);
157 }
158 }
159 }
160 }
161
162 @Override
163 public void init() {
164 }
165
166 @Override
167 public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
168 List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
169
170 cachePeers.add(this);
171
172 return cachePeers;
173 }
174
175 @Override
176 public void put(Element element) {
177 }
178
179 @Override
180 public void registerPeer(String string) {
181 }
182
183 @Override
184 public boolean remove(Serializable serializable) {
185 return false;
186 }
187
188 @Override
189 public void removeAll() {
190 }
191
192 @SuppressWarnings("rawtypes")
193 public void send(Address address, List eventMessages)
194 throws RemoteException {
195
196 ArrayList<JGroupEventMessage> jGroupEventMessages =
197 new ArrayList<JGroupEventMessage>();
198
199 for (Object eventMessage : eventMessages) {
200 if (eventMessage instanceof JGroupEventMessage) {
201 JGroupEventMessage jGroupEventMessage =
202 (JGroupEventMessage)eventMessage;
203
204 jGroupEventMessages.add(jGroupEventMessage);
205 }
206 else {
207 if (_log.isDebugEnabled()) {
208 _log.debug(
209 eventMessage + "is not a JGroupEventMessage type");
210 }
211 }
212 }
213
214 try {
215 _jChannel.send(address, jGroupEventMessages);
216 }
217 catch (Throwable t) {
218 throw new RemoteException(t.getMessage());
219 }
220 }
221
222 @Override
223 @SuppressWarnings("rawtypes")
224 public void send(List eventMessages) throws RemoteException {
225 send(null, eventMessages);
226 }
227
228 @Override
229 public void unregisterPeer(String string) {
230 }
231
232 protected void handleJGroupsNotification(
233 JGroupEventMessage jGroupEventMessage) {
234
235 Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
236
237 if (cache == null) {
238 return;
239 }
240
241 int event = jGroupEventMessage.getEvent();
242 Serializable key = jGroupEventMessage.getSerializableKey();
243
244 if ((event == JGroupEventMessage.REMOVE) &&
245 (cache.getQuiet(key) != null)) {
246
247 cache.remove(key, true);
248 }
249 else if (event == JGroupEventMessage.REMOVE_ALL) {
250 cache.removeAll(true);
251 }
252 else if (event == JGroupEventMessage.PUT) {
253 Element element = jGroupEventMessage.getElement();
254
255 cache.put(new Element(key, element.getObjectValue()), true);
256 }
257 }
258
259 private static final String _SCHEME = "JGroups";
260
261 private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
262
263 private CacheManager _cacheManager;
264 private JChannel _jChannel;
265
266 private class EhcacheJGroupsReceiver extends BaseReceiver {
267
268 @Override
269 protected void doReceive(Message message) {
270 Object object = retrievePayload(message);
271
272 if (object == null) {
273 return;
274 }
275
276 if (object instanceof Serializable) {
277 handleNotification((Serializable)object);
278 }
279 else {
280 if (_log.isWarnEnabled()) {
281 _log.warn(
282 "Unable to process message content of type " +
283 object.getClass().getName());
284 }
285 }
286 }
287
288 }
289
290 }