001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018 import com.liferay.portal.kernel.cluster.Address;
019 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
020 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
021 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022 import com.liferay.portal.kernel.cluster.ClusterRequest;
023 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024 import com.liferay.portal.kernel.exception.SystemException;
025 import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
026 import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
027 import com.liferay.portal.kernel.log.Log;
028 import com.liferay.portal.kernel.log.LogFactoryUtil;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031 import com.liferay.portal.kernel.util.SocketUtil;
032 import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
033 import com.liferay.portal.util.PropsValues;
034
035 import java.io.IOException;
036 import java.io.ObjectInputStream;
037 import java.io.ObjectOutputStream;
038 import java.io.Serializable;
039
040 import java.net.ServerSocket;
041 import java.net.Socket;
042 import java.net.SocketAddress;
043 import java.net.SocketException;
044 import java.net.SocketTimeoutException;
045
046 import java.nio.channels.ServerSocketChannel;
047
048 import java.util.ArrayList;
049 import java.util.List;
050 import java.util.concurrent.BlockingQueue;
051 import java.util.concurrent.TimeUnit;
052
053 import net.sf.ehcache.CacheManager;
054 import net.sf.ehcache.Ehcache;
055 import net.sf.ehcache.Element;
056
057
061 public class EhcacheStreamBootstrapHelpUtil {
062
063 public static SocketAddress createServerSocketFromCluster(
064 List<String> cacheNames)
065 throws Exception {
066
067 ServerSocketChannel serverSocketChannel =
068 SocketUtil.createServerSocketChannel(
069 ClusterLinkUtil.getBindInetAddress(),
070 PropsValues.EHCACHE_SOCKET_START_PORT,
071 _serverSocketConfigurator);
072
073 ServerSocket serverSocket = serverSocketChannel.socket();
074
075 EhcachePortalCacheManager<?, ?> ehcachePortalCacheManager =
076 (EhcachePortalCacheManager<?, ?>)PortalBeanLocatorUtil.locate(
077 _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
078
079 CacheManager cacheManager =
080 ehcachePortalCacheManager.getEhcacheManager();
081
082 EhcacheStreamServerThread ehcacheStreamServerThread =
083 new EhcacheStreamServerThread(
084 serverSocket, cacheManager, cacheNames);
085
086 ehcacheStreamServerThread.start();
087
088 return serverSocket.getLocalSocketAddress();
089 }
090
091 protected static void loadCachesFromCluster(Ehcache... ehcaches)
092 throws Exception {
093
094 List<Address> clusterNodeAddresses =
095 ClusterExecutorUtil.getClusterNodeAddresses();
096
097 if (_log.isInfoEnabled()) {
098 _log.info("Cluster node addresses " + clusterNodeAddresses);
099 }
100
101 if (clusterNodeAddresses.size() <= 1) {
102 if (_log.isDebugEnabled()) {
103 _log.debug(
104 "Not loading cache from cluster because a cluster peer " +
105 "was not found");
106 }
107
108 return;
109 }
110
111 EhcachePortalCacheManager<?, ?> ehcachePortalCacheManager =
112 (EhcachePortalCacheManager<?, ?>)PortalBeanLocatorUtil.locate(
113 _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
114
115 CacheManager cacheManager =
116 ehcachePortalCacheManager.getEhcacheManager();
117
118 List<String> cacheNames = new ArrayList<String>();
119
120 for (Ehcache ehcache : ehcaches) {
121 if (cacheManager == ehcache.getCacheManager()) {
122 cacheNames.add(ehcache.getName());
123 }
124 }
125
126 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
127 new MethodHandler(
128 _createServerSocketFromClusterMethodKey, cacheNames),
129 true);
130
131 FutureClusterResponses futureClusterResponses =
132 ClusterExecutorUtil.execute(clusterRequest);
133
134 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
135 futureClusterResponses.getPartialResults();
136
137 ClusterNodeResponse clusterNodeResponse = null;
138
139 try {
140 clusterNodeResponse = clusterNodeResponses.poll(
141 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
142 TimeUnit.MILLISECONDS);
143 }
144 catch (InterruptedException ie) {
145 return;
146 }
147
148 if (clusterNodeResponse == null) {
149 if (_log.isWarnEnabled()) {
150 _log.warn(
151 "Unable to load cache from the cluster because there " +
152 "was no peer response");
153 }
154
155 return;
156 }
157
158 Socket socket = null;
159
160 try {
161 SocketAddress remoteSocketAddress =
162 (SocketAddress)clusterNodeResponse.getResult();
163
164 if (remoteSocketAddress == null) {
165 _log.error(
166 "Cluster peer " + clusterNodeResponse.getClusterNode() +
167 " responded with a null socket address");
168
169 return;
170 }
171
172 socket = new Socket();
173
174 socket.connect(remoteSocketAddress);
175
176 socket.shutdownOutput();
177
178 ObjectInputStream objectInputStream =
179 new AnnotatedObjectInputStream(socket.getInputStream());
180
181 Ehcache ehcache = null;
182
183 try {
184 while (true) {
185 Object object = objectInputStream.readObject();
186
187 if (object instanceof EhcacheElement) {
188 EhcacheElement ehcacheElement = (EhcacheElement)object;
189
190 Element element = ehcacheElement.toElement();
191
192 ehcache.put(element, true);
193 }
194 else if (object instanceof String) {
195 if (_COMMAND_SOCKET_CLOSE.equals(object)) {
196 break;
197 }
198
199 EhcacheStreamBootstrapCacheLoader.setSkip();
200
201 try {
202 ehcache = cacheManager.addCacheIfAbsent(
203 (String)object);
204 }
205 finally {
206 EhcacheStreamBootstrapCacheLoader.resetSkip();
207 }
208 }
209 else {
210 throw new SystemException(
211 "Socket input stream returned invalid object " +
212 object);
213 }
214 }
215 }
216 finally {
217 if (objectInputStream != null) {
218 objectInputStream.close();
219 }
220 }
221 }
222 finally {
223 if (socket != null) {
224 socket.close();
225 }
226 }
227 }
228
229 private static final String _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER =
230 "com.liferay.portal.kernel.cache.MultiVMPortalCacheManager";
231
232 private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
233
234 private static Log _log = LogFactoryUtil.getLog(
235 EhcacheStreamBootstrapHelpUtil.class);
236
237 private static MethodKey _createServerSocketFromClusterMethodKey =
238 new MethodKey(
239 EhcacheStreamBootstrapHelpUtil.class,
240 "createServerSocketFromCluster", List.class);
241 private static ServerSocketConfigurator _serverSocketConfigurator =
242 new SocketCacheServerSocketConfiguration();
243
244 private static class EhcacheElement implements Serializable {
245
246 public EhcacheElement(Serializable key, Serializable value) {
247 _key = key;
248 _value = value;
249 }
250
251 public Element toElement() {
252 return new Element(_key, _value);
253 }
254
255 private Serializable _key;
256 private Serializable _value;
257
258 }
259
260 private static class EhcacheStreamServerThread extends Thread {
261
262 public EhcacheStreamServerThread(
263 ServerSocket serverSocket, CacheManager cacheManager,
264 List<String> cacheNames) {
265
266 _serverSocket = serverSocket;
267 _cacheManager = cacheManager;
268 _cacheNames = cacheNames;
269
270 setDaemon(true);
271 setName(
272 EhcacheStreamServerThread.class.getName() + " - " + cacheNames);
273 setPriority(Thread.NORM_PRIORITY);
274 }
275
276 @Override
277 public void run() {
278 Socket socket = null;
279
280 try {
281 try {
282 socket = _serverSocket.accept();
283 }
284 catch (SocketTimeoutException ste) {
285 if (_log.isDebugEnabled()) {
286 _log.debug(
287 "Terminating the socket thread " + getName() +
288 " that the client requested but never used");
289 }
290
291 return;
292 }
293 finally {
294 _serverSocket.close();
295 }
296
297 socket.shutdownInput();
298
299 ObjectOutputStream objectOutputStream =
300 new AnnotatedObjectOutputStream(socket.getOutputStream());
301
302 for (String cacheName : _cacheNames) {
303 Ehcache ehcache = _cacheManager.getCache(cacheName);
304
305 if (ehcache == null) {
306 EhcacheStreamBootstrapCacheLoader.setSkip();
307
308 try {
309 _cacheManager.addCache(cacheName);
310 }
311 finally {
312 EhcacheStreamBootstrapCacheLoader.resetSkip();
313 }
314
315 continue;
316 }
317
318 objectOutputStream.writeObject(cacheName);
319
320 List<Object> keys = ehcache.getKeys();
321
322 for (Object key : keys) {
323 if (!(key instanceof Serializable)) {
324 if (_log.isWarnEnabled()) {
325 _log.warn(
326 "Key " + key + " is not serializable");
327 }
328
329 continue;
330 }
331
332 Element element = ehcache.get(key);
333
334 if (element == null) {
335 continue;
336 }
337
338 Object value = element.getObjectValue();
339
340 if (!(value instanceof Serializable)) {
341 if (_log.isWarnEnabled() && (value != null)) {
342 _log.warn(
343 "Value " + value + " is not serializable");
344 }
345
346 continue;
347 }
348
349 EhcacheElement ehcacheElement = new EhcacheElement(
350 (Serializable)key, (Serializable)value);
351
352 objectOutputStream.writeObject(ehcacheElement);
353 }
354 }
355
356 objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
357
358 objectOutputStream.close();
359 }
360 catch (Exception e) {
361 throw new RuntimeException(e);
362 }
363 finally {
364 if (socket != null) {
365 try {
366 socket.close();
367 }
368 catch (IOException ioe) {
369 throw new RuntimeException(ioe);
370 }
371 }
372 }
373 }
374
375 private CacheManager _cacheManager;
376 private List<String> _cacheNames;
377 private ServerSocket _serverSocket;
378
379 }
380
381 private static class SocketCacheServerSocketConfiguration
382 implements ServerSocketConfigurator {
383
384 @Override
385 public void configure(ServerSocket serverSocket)
386 throws SocketException {
387
388 serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
389 }
390
391 }
392
393 }