001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.cache.ehcache;
016    
017    import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018    import com.liferay.portal.kernel.cluster.Address;
019    import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
020    import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
021    import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022    import com.liferay.portal.kernel.cluster.ClusterRequest;
023    import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024    import com.liferay.portal.kernel.exception.SystemException;
025    import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
026    import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
027    import com.liferay.portal.kernel.log.Log;
028    import com.liferay.portal.kernel.log.LogFactoryUtil;
029    import com.liferay.portal.kernel.util.MethodHandler;
030    import com.liferay.portal.kernel.util.MethodKey;
031    import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
032    import com.liferay.portal.kernel.util.SocketUtil;
033    import com.liferay.portal.util.PropsValues;
034    
035    import java.io.IOException;
036    import java.io.ObjectInputStream;
037    import java.io.ObjectOutputStream;
038    import java.io.Serializable;
039    
040    import java.net.InetAddress;
041    import java.net.ServerSocket;
042    import java.net.Socket;
043    import java.net.SocketAddress;
044    import java.net.SocketException;
045    
046    import java.nio.channels.ServerSocketChannel;
047    
048    import java.util.List;
049    import java.util.concurrent.BlockingQueue;
050    import java.util.concurrent.TimeUnit;
051    
052    import net.sf.ehcache.CacheManager;
053    import net.sf.ehcache.Ehcache;
054    import net.sf.ehcache.Element;
055    
056    /**
057     * @author Shuyang Zhou
058     * @author Sherry Yang
059     */
060    public class EhcacheStreamBootstrapHelpUtil {
061    
062            public static void acquireCachePeers(Ehcache ehcache) throws Exception {
063                    List<Address> clusterNodeAddresses =
064                            ClusterExecutorUtil.getClusterNodeAddresses();
065    
066                    if (_log.isInfoEnabled()) {
067                            _log.info("Cluster node addresses " + clusterNodeAddresses);
068                    }
069    
070                    int clusterNodeAddressesCount = clusterNodeAddresses.size();
071    
072                    if (clusterNodeAddressesCount <= 1) {
073                            if (_log.isDebugEnabled()) {
074                                    _log.debug(
075                                            "Unable to find peers because there is only one portal " +
076                                                    "instance in the cluster");
077                            }
078    
079                            return;
080                    }
081    
082                    loadCachesFromCluster(ehcache);
083            }
084    
085            public static ServerSocket createServerSocket(int startPort)
086                    throws Exception {
087    
088                    InetAddress inetAddress = ClusterLinkUtil.getBindInetAddress();
089    
090                    ServerSocketConfigurator serverSocketConfigurator =
091                            new SocketCacheServerSocketConfiguration();
092    
093                    ServerSocketChannel serverSocketChannel =
094                            SocketUtil.createServerSocketChannel(
095                                    inetAddress, startPort, serverSocketConfigurator);
096    
097                    return serverSocketChannel.socket();
098            }
099    
100            public static SocketAddress createServerSocketFromCluster(String cacheName)
101                    throws Exception {
102    
103                    ServerSocket serverSocket = createServerSocket(
104                            PropsValues.EHCACHE_SOCKET_START_PORT);
105    
106                    EhcacheStreamServerThread ehcacheStreamServerThread =
107                            new EhcacheStreamServerThread(serverSocket, cacheName);
108    
109                    ehcacheStreamServerThread.start();
110    
111                    return serverSocket.getLocalSocketAddress();
112            }
113    
114            protected static void loadCachesFromCluster(Ehcache ehcache)
115                    throws Exception {
116    
117                    ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
118                            new MethodHandler(
119                                    _createServerSocketFromClusterMethodKey, ehcache.getName()),
120                            true);
121    
122                    FutureClusterResponses futureClusterResponses =
123                            ClusterExecutorUtil.execute(clusterRequest);
124    
125                    BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
126                            futureClusterResponses.getPartialResults();
127    
128                    ClusterNodeResponse clusterNodeResponse = null;
129    
130                    try {
131                            clusterNodeResponse = clusterNodeResponses.poll(
132                                    PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
133                                    TimeUnit.MILLISECONDS);
134                    }
135                    catch (InterruptedException ie) {
136                            return;
137                    }
138    
139                    if (clusterNodeResponse == null) {
140                            return;
141                    }
142    
143                    ObjectInputStream objectInputStream = null;
144                    Socket socket = null;
145    
146                    try {
147                            SocketAddress remoteSocketAddress =
148                                    (SocketAddress)clusterNodeResponse.getResult();
149    
150                            if (remoteSocketAddress == null) {
151                                    return;
152                            }
153    
154                            socket = new Socket();
155    
156                            socket.connect(remoteSocketAddress);
157    
158                            socket.shutdownOutput();
159    
160                            objectInputStream = new AnnotatedObjectInputStream(
161                                    socket.getInputStream());
162    
163                            while (true) {
164                                    Object object = objectInputStream.readObject();
165    
166                                    if (object instanceof EhcacheElement) {
167                                            EhcacheElement ehcacheElement = (EhcacheElement)object;
168    
169                                            Element element = ehcacheElement.toElement();
170    
171                                            ehcache.put(element, true);
172                                    }
173                                    else if (object instanceof String) {
174                                            String command = (String)object;
175    
176                                            if (command.equals(_COMMAND_CACHE_TX_START)) {
177                                                    String cacheName =
178                                                            (String)objectInputStream.readObject();
179    
180                                                    if (!cacheName.equals(ehcache.getName())) {
181                                                            break;
182                                                    }
183                                            }
184                                            else if (command.equals(_COMMAND_SOCKET_CLOSE)) {
185                                                    break;
186                                            }
187                                    }
188                                    else {
189                                            throw new SystemException(
190                                                    "Socket input stream returned invalid object " +
191                                                            object);
192                                    }
193                            }
194                    }
195                    finally {
196                            if (objectInputStream != null) {
197                                    objectInputStream.close();
198                            }
199    
200                            if (socket != null) {
201                                    socket.close();
202                            }
203                    }
204            }
205    
206            private static final String _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER =
207                    "com.liferay.portal.kernel.cache.MultiVMPortalCacheManager";
208    
209            private static final String _COMMAND_CACHE_TX_START = "${CACHE_TX_START}";
210    
211            private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
212    
213            private static Log _log = LogFactoryUtil.getLog(
214                    EhcacheStreamBootstrapHelpUtil.class);
215    
216            private static MethodKey _createServerSocketFromClusterMethodKey =
217                    new MethodKey(
218                            EhcacheStreamBootstrapHelpUtil.class.getName(),
219                            "createServerSocketFromCluster", String.class);
220    
221            private static class EhcacheElement implements Serializable {
222    
223                    public EhcacheElement(Serializable key, Serializable value) {
224                            _key = key;
225                            _value = value;
226                    }
227    
228                    public Element toElement() {
229                            return new Element(_key, _value);
230                    }
231    
232                    private Serializable _key;
233                    private Serializable _value;
234    
235            }
236    
237            private static class EhcacheStreamServerThread extends Thread {
238    
239                    public EhcacheStreamServerThread(
240                            ServerSocket serverSocket, String cacheName) {
241    
242                            _serverSocket = serverSocket;
243                            _cacheName = cacheName;
244    
245                            EhcachePortalCacheManager ehcachePortalCacheManager =
246                                    (EhcachePortalCacheManager)PortalBeanLocatorUtil.locate(
247                                            _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
248    
249                            _portalCacheManager = ehcachePortalCacheManager.getEhcacheManager();
250    
251                            setDaemon(true);
252                            setName(
253                                    EhcacheStreamServerThread.class.getName() + " - " + cacheName);
254                            setPriority(Thread.NORM_PRIORITY);
255                    }
256    
257                    @Override
258                    public void run() {
259                            Socket socket = null;
260    
261                            try {
262                                    socket = _serverSocket.accept();
263    
264                                    _serverSocket.close();
265    
266                                    socket.shutdownInput();
267    
268                                    ObjectOutputStream objectOutputStream =
269                                            new AnnotatedObjectOutputStream(socket.getOutputStream());
270    
271                                    objectOutputStream.writeObject(_COMMAND_CACHE_TX_START);
272    
273                                    Ehcache ehcache = _portalCacheManager.getCache(_cacheName);
274    
275                                    if (ehcache == null) {
276                                            EhcacheStreamBootstrapCacheLoader.setSkip();
277    
278                                            try {
279                                                    _portalCacheManager.addCache(_cacheName);
280                                            }
281                                            finally {
282                                                    EhcacheStreamBootstrapCacheLoader.resetSkip();
283                                            }
284                                    }
285                                    else {
286                                            objectOutputStream.writeObject(_cacheName);
287    
288                                            List<Object> keys = ehcache.getKeys();
289    
290                                            for (Object key : keys) {
291                                                    if (!(key instanceof Serializable)) {
292                                                            if (_log.isWarnEnabled()) {
293                                                                    _log.warn(
294                                                                            "Key " + key + " is not serializable");
295                                                            }
296    
297                                                            continue;
298                                                    }
299    
300                                                    Element element = ehcache.get(key);
301    
302                                                    Object value = element.getObjectValue();
303    
304                                                    if (!(value instanceof Serializable)) {
305                                                            if (_log.isWarnEnabled() && (value != null)) {
306                                                                    _log.warn(
307                                                                            "Value " + value + " is not serializable");
308                                                            }
309    
310                                                            continue;
311                                                    }
312    
313                                                    EhcacheElement ehcacheElement = new EhcacheElement(
314                                                            (Serializable)key, (Serializable)value);
315    
316                                                    objectOutputStream.writeObject(ehcacheElement);
317                                            }
318                                    }
319    
320                                    objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
321    
322                                    objectOutputStream.close();
323                            }
324                            catch (Exception e) {
325                                    throw new RuntimeException(e);
326                            }
327                            finally {
328                                    if (socket != null) {
329                                            try {
330                                                    socket.close();
331                                            }
332                                            catch (IOException ioe) {
333                                                    throw new RuntimeException(ioe);
334                                            }
335                                    }
336                            }
337                    }
338    
339                    private String _cacheName;
340                    private CacheManager _portalCacheManager;
341                    private ServerSocket _serverSocket;
342    
343            }
344    
345            private static class SocketCacheServerSocketConfiguration
346                    implements ServerSocketConfigurator {
347    
348                    @Override
349                    public void configure(ServerSocket serverSocket)
350                            throws SocketException {
351    
352                            serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
353                    }
354    
355            }
356    
357    }