001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cache.ehcache;
016    
017    import com.liferay.portal.cluster.BaseReceiver;
018    import com.liferay.portal.kernel.log.Log;
019    import com.liferay.portal.kernel.log.LogFactoryUtil;
020    
021    import java.io.Serializable;
022    
023    import java.rmi.RemoteException;
024    
025    import java.util.ArrayList;
026    import java.util.List;
027    
028    import net.sf.ehcache.Cache;
029    import net.sf.ehcache.CacheException;
030    import net.sf.ehcache.CacheManager;
031    import net.sf.ehcache.Ehcache;
032    import net.sf.ehcache.Element;
033    import net.sf.ehcache.distribution.CacheManagerPeerProvider;
034    import net.sf.ehcache.distribution.CachePeer;
035    import net.sf.ehcache.distribution.jgroups.JGroupEventMessage;
036    
037    import org.jgroups.Address;
038    import org.jgroups.JChannel;
039    import org.jgroups.Message;
040    import org.jgroups.View;
041    
042    /**
043     * <p>
044     * See http://issues.liferay.com/browse/LPS-11061.
045     * </p>
046     *
047     * @author Tina Tian
048     */
049    public class JGroupsManager implements CacheManagerPeerProvider, CachePeer {
050    
051            public JGroupsManager(
052                    CacheManager cacheManager, String clusterName,
053                    String channelProperties) {
054    
055                    try {
056                            _jChannel = new JChannel(channelProperties);
057    
058                            _jChannel.setReceiver(new EhcacheJGroupsReceiver());
059    
060                            _jChannel.connect(clusterName);
061    
062                            if (_log.isInfoEnabled()) {
063                                    _log.info(
064                                            "Create a new channel with properties " +
065                                                    _jChannel.getProperties());
066                            }
067                    }
068                    catch (Exception e) {
069                            if (_log.isErrorEnabled()) {
070                                    _log.error("Unable to initialize channels", e);
071                            }
072                    }
073    
074                    _cacheManager = cacheManager;
075    
076                    BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
077    
078                    baseReceiver.openLatch();
079            }
080    
081            @Override
082            public void dispose() throws CacheException {
083                    if (_jChannel != null) {
084                            _jChannel.close();
085                    }
086            }
087    
088            public Address getBusLocalAddress() {
089                    return _jChannel.getAddress();
090            }
091    
092            public List<Address> getBusMembership() {
093                    BaseReceiver baseReceiver = (BaseReceiver)_jChannel.getReceiver();
094    
095                    View view = baseReceiver.getView();
096    
097                    return view.getMembers();
098            }
099    
100            @Override
101            @SuppressWarnings("rawtypes")
102            public List getElements(List list) {
103                    return null;
104            }
105    
106            @Override
107            public String getGuid() {
108                    return null;
109            }
110    
111            @Override
112            @SuppressWarnings("rawtypes")
113            public List getKeys() {
114                    return null;
115            }
116    
117            @Override
118            public String getName() {
119                    return null;
120            }
121    
122            @Override
123            public Element getQuiet(Serializable serializable) {
124                    return null;
125            }
126    
127            @Override
128            public String getScheme() {
129                    return _SCHEME;
130            }
131    
132            @Override
133            public long getTimeForClusterToForm() {
134                    return 0;
135            }
136    
137            @Override
138            public String getUrl() {
139                    return null;
140            }
141    
142            @Override
143            public String getUrlBase() {
144                    return null;
145            }
146    
147            public void handleNotification(Serializable serializable) {
148                    if (serializable instanceof JGroupEventMessage) {
149                            handleJGroupsNotification((JGroupEventMessage)serializable);
150                    }
151                    else if (serializable instanceof List<?>) {
152                            List<?> valueList = (List<?>)serializable;
153    
154                            for (Object object : valueList) {
155                                    if (object instanceof JGroupEventMessage) {
156                                            handleJGroupsNotification((JGroupEventMessage)object);
157                                    }
158                            }
159                    }
160            }
161    
162            @Override
163            public void init() {
164            }
165    
166            @Override
167            public List<JGroupsManager> listRemoteCachePeers(Ehcache ehcache) {
168                    List<JGroupsManager> cachePeers = new ArrayList<JGroupsManager>();
169    
170                    cachePeers.add(this);
171    
172                    return cachePeers;
173            }
174    
175            @Override
176            public void put(Element element) {
177            }
178    
179            @Override
180            public void registerPeer(String string) {
181            }
182    
183            @Override
184            public boolean remove(Serializable serializable) {
185                    return false;
186            }
187    
188            @Override
189            public void removeAll() {
190            }
191    
192            @SuppressWarnings("rawtypes")
193            public void send(Address address, List eventMessages)
194                    throws RemoteException {
195    
196                    ArrayList<JGroupEventMessage> jGroupEventMessages =
197                            new ArrayList<JGroupEventMessage>();
198    
199                    for (Object eventMessage : eventMessages) {
200                            if (eventMessage instanceof JGroupEventMessage) {
201                                    JGroupEventMessage jGroupEventMessage =
202                                            (JGroupEventMessage)eventMessage;
203    
204                                    jGroupEventMessages.add(jGroupEventMessage);
205                            }
206                            else {
207                                    if (_log.isDebugEnabled()) {
208                                            _log.debug(
209                                                    eventMessage + "is not a JGroupEventMessage type");
210                                    }
211                            }
212                    }
213    
214                    try {
215                            _jChannel.send(address, jGroupEventMessages);
216                    }
217                    catch (Throwable t) {
218                            throw new RemoteException(t.getMessage());
219                    }
220            }
221    
222            @Override
223            @SuppressWarnings("rawtypes")
224            public void send(List eventMessages) throws RemoteException {
225                    send(null, eventMessages);
226            }
227    
228            @Override
229            public void unregisterPeer(String string) {
230            }
231    
232            protected void handleJGroupsNotification(
233                    JGroupEventMessage jGroupEventMessage) {
234    
235                    Cache cache = _cacheManager.getCache(jGroupEventMessage.getCacheName());
236    
237                    if (cache == null) {
238                            return;
239                    }
240    
241                    int event = jGroupEventMessage.getEvent();
242                    Serializable key = jGroupEventMessage.getSerializableKey();
243    
244                    if ((event == JGroupEventMessage.REMOVE) &&
245                            (cache.getQuiet(key) != null)) {
246    
247                            cache.remove(key, true);
248                    }
249                    else if (event == JGroupEventMessage.REMOVE_ALL) {
250                            cache.removeAll(true);
251                    }
252                    else if (event == JGroupEventMessage.PUT) {
253                            Element element = jGroupEventMessage.getElement();
254    
255                            cache.put(new Element(key, element.getObjectValue()), true);
256                    }
257            }
258    
259            private static final String _SCHEME = "JGroups";
260    
261            private static Log _log = LogFactoryUtil.getLog(JGroupsManager.class);
262    
263            private CacheManager _cacheManager;
264            private JChannel _jChannel;
265    
266            private class EhcacheJGroupsReceiver extends BaseReceiver {
267    
268                    @Override
269                    protected void doReceive(Message message) {
270                            Object object = retrievePayload(message);
271    
272                            if (object == null) {
273                                    return;
274                            }
275    
276                            if (object instanceof Serializable) {
277                                    handleNotification((Serializable)object);
278                            }
279                            else {
280                                    if (_log.isWarnEnabled()) {
281                                            _log.warn(
282                                                    "Unable to process message content of type " +
283                                                            object.getClass().getName());
284                                    }
285                            }
286                    }
287    
288            }
289    
290    }