001
014
015 package com.liferay.portal.cache.ehcache;
016
017 import com.liferay.portal.kernel.bean.PortalBeanLocatorUtil;
018 import com.liferay.portal.kernel.cluster.Address;
019 import com.liferay.portal.kernel.cluster.ClusterExecutorUtil;
020 import com.liferay.portal.kernel.cluster.ClusterLinkUtil;
021 import com.liferay.portal.kernel.cluster.ClusterNodeResponse;
022 import com.liferay.portal.kernel.cluster.ClusterRequest;
023 import com.liferay.portal.kernel.cluster.FutureClusterResponses;
024 import com.liferay.portal.kernel.exception.SystemException;
025 import com.liferay.portal.kernel.io.AnnotatedObjectInputStream;
026 import com.liferay.portal.kernel.io.AnnotatedObjectOutputStream;
027 import com.liferay.portal.kernel.log.Log;
028 import com.liferay.portal.kernel.log.LogFactoryUtil;
029 import com.liferay.portal.kernel.util.MethodHandler;
030 import com.liferay.portal.kernel.util.MethodKey;
031 import com.liferay.portal.kernel.util.SocketUtil.ServerSocketConfigurator;
032 import com.liferay.portal.kernel.util.SocketUtil;
033 import com.liferay.portal.util.PropsValues;
034
035 import java.io.IOException;
036 import java.io.ObjectInputStream;
037 import java.io.ObjectOutputStream;
038 import java.io.Serializable;
039
040 import java.net.InetAddress;
041 import java.net.ServerSocket;
042 import java.net.Socket;
043 import java.net.SocketAddress;
044 import java.net.SocketException;
045
046 import java.nio.channels.ServerSocketChannel;
047
048 import java.util.List;
049 import java.util.concurrent.BlockingQueue;
050 import java.util.concurrent.TimeUnit;
051
052 import net.sf.ehcache.CacheManager;
053 import net.sf.ehcache.Ehcache;
054 import net.sf.ehcache.Element;
055
056
060 public class EhcacheStreamBootstrapHelpUtil {
061
062 public static void acquireCachePeers(Ehcache ehcache) throws Exception {
063 List<Address> clusterNodeAddresses =
064 ClusterExecutorUtil.getClusterNodeAddresses();
065
066 if (_log.isInfoEnabled()) {
067 _log.info("Cluster node addresses " + clusterNodeAddresses);
068 }
069
070 int clusterNodeAddressesCount = clusterNodeAddresses.size();
071
072 if (clusterNodeAddressesCount <= 1) {
073 if (_log.isDebugEnabled()) {
074 _log.debug(
075 "Unable to find peers because there is only one portal " +
076 "instance in the cluster");
077 }
078
079 return;
080 }
081
082 loadCachesFromCluster(ehcache);
083 }
084
085 public static ServerSocket createServerSocket(int startPort)
086 throws Exception {
087
088 InetAddress inetAddress = ClusterLinkUtil.getBindInetAddress();
089
090 ServerSocketConfigurator serverSocketConfigurator =
091 new SocketCacheServerSocketConfiguration();
092
093 ServerSocketChannel serverSocketChannel =
094 SocketUtil.createServerSocketChannel(
095 inetAddress, startPort, serverSocketConfigurator);
096
097 return serverSocketChannel.socket();
098 }
099
100 public static SocketAddress createServerSocketFromCluster(String cacheName)
101 throws Exception {
102
103 ServerSocket serverSocket = createServerSocket(
104 PropsValues.EHCACHE_SOCKET_START_PORT);
105
106 EhcacheStreamServerThread ehcacheStreamServerThread =
107 new EhcacheStreamServerThread(serverSocket, cacheName);
108
109 ehcacheStreamServerThread.start();
110
111 return serverSocket.getLocalSocketAddress();
112 }
113
114 protected static void loadCachesFromCluster(Ehcache ehcache)
115 throws Exception {
116
117 ClusterRequest clusterRequest = ClusterRequest.createMulticastRequest(
118 new MethodHandler(
119 _createServerSocketFromClusterMethodKey, ehcache.getName()),
120 true);
121
122 FutureClusterResponses futureClusterResponses =
123 ClusterExecutorUtil.execute(clusterRequest);
124
125 BlockingQueue<ClusterNodeResponse> clusterNodeResponses =
126 futureClusterResponses.getPartialResults();
127
128 ClusterNodeResponse clusterNodeResponse = null;
129
130 try {
131 clusterNodeResponse = clusterNodeResponses.poll(
132 PropsValues.CLUSTER_LINK_NODE_BOOTUP_RESPONSE_TIMEOUT,
133 TimeUnit.MILLISECONDS);
134 }
135 catch (InterruptedException ie) {
136 return;
137 }
138
139 if (clusterNodeResponse == null) {
140 return;
141 }
142
143 ObjectInputStream objectInputStream = null;
144 Socket socket = null;
145
146 try {
147 SocketAddress remoteSocketAddress =
148 (SocketAddress)clusterNodeResponse.getResult();
149
150 if (remoteSocketAddress == null) {
151 return;
152 }
153
154 socket = new Socket();
155
156 socket.connect(remoteSocketAddress);
157
158 socket.shutdownOutput();
159
160 objectInputStream = new AnnotatedObjectInputStream(
161 socket.getInputStream());
162
163 while (true) {
164 Object object = objectInputStream.readObject();
165
166 if (object instanceof EhcacheElement) {
167 EhcacheElement ehcacheElement = (EhcacheElement)object;
168
169 Element element = ehcacheElement.toElement();
170
171 ehcache.put(element, true);
172 }
173 else if (object instanceof String) {
174 String command = (String)object;
175
176 if (command.equals(_COMMAND_CACHE_TX_START)) {
177 String cacheName =
178 (String)objectInputStream.readObject();
179
180 if (!cacheName.equals(ehcache.getName())) {
181 break;
182 }
183 }
184 else if (command.equals(_COMMAND_SOCKET_CLOSE)) {
185 break;
186 }
187 }
188 else {
189 throw new SystemException(
190 "Socket input stream returned invalid object " +
191 object);
192 }
193 }
194 }
195 finally {
196 if (objectInputStream != null) {
197 objectInputStream.close();
198 }
199
200 if (socket != null) {
201 socket.close();
202 }
203 }
204 }
205
206 private static final String _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER =
207 "com.liferay.portal.kernel.cache.MultiVMPortalCacheManager";
208
209 private static final String _COMMAND_CACHE_TX_START = "${CACHE_TX_START}";
210
211 private static final String _COMMAND_SOCKET_CLOSE = "${SOCKET_CLOSE}";
212
213 private static Log _log = LogFactoryUtil.getLog(
214 EhcacheStreamBootstrapHelpUtil.class);
215
216 private static MethodKey _createServerSocketFromClusterMethodKey =
217 new MethodKey(
218 EhcacheStreamBootstrapHelpUtil.class.getName(),
219 "createServerSocketFromCluster", String.class);
220
221 private static class EhcacheElement implements Serializable {
222
223 public EhcacheElement(Serializable key, Serializable value) {
224 _key = key;
225 _value = value;
226 }
227
228 public Element toElement() {
229 return new Element(_key, _value);
230 }
231
232 private Serializable _key;
233 private Serializable _value;
234
235 }
236
237 private static class EhcacheStreamServerThread extends Thread {
238
239 public EhcacheStreamServerThread(
240 ServerSocket serverSocket, String cacheName) {
241
242 _serverSocket = serverSocket;
243 _cacheName = cacheName;
244
245 EhcachePortalCacheManager ehcachePortalCacheManager =
246 (EhcachePortalCacheManager)PortalBeanLocatorUtil.locate(
247 _BEAN_NAME_MULTI_VM_PORTAL_CACHE_MANAGER);
248
249 _portalCacheManager = ehcachePortalCacheManager.getEhcacheManager();
250
251 setDaemon(true);
252 setName(
253 EhcacheStreamServerThread.class.getName() + " - " + cacheName);
254 setPriority(Thread.NORM_PRIORITY);
255 }
256
257 @Override
258 public void run() {
259 Socket socket = null;
260
261 try {
262 socket = _serverSocket.accept();
263
264 _serverSocket.close();
265
266 socket.shutdownInput();
267
268 ObjectOutputStream objectOutputStream =
269 new AnnotatedObjectOutputStream(socket.getOutputStream());
270
271 objectOutputStream.writeObject(_COMMAND_CACHE_TX_START);
272
273 Ehcache ehcache = _portalCacheManager.getCache(_cacheName);
274
275 if (ehcache == null) {
276 EhcacheStreamBootstrapCacheLoader.setSkip();
277
278 try {
279 _portalCacheManager.addCache(_cacheName);
280 }
281 finally {
282 EhcacheStreamBootstrapCacheLoader.resetSkip();
283 }
284 }
285 else {
286 objectOutputStream.writeObject(_cacheName);
287
288 List<Object> keys = ehcache.getKeys();
289
290 for (Object key : keys) {
291 if (!(key instanceof Serializable)) {
292 if (_log.isWarnEnabled()) {
293 _log.warn(
294 "Key " + key + " is not serializable");
295 }
296
297 continue;
298 }
299
300 Element element = ehcache.get(key);
301
302 Object value = element.getObjectValue();
303
304 if (!(value instanceof Serializable)) {
305 if (_log.isWarnEnabled() && (value != null)) {
306 _log.warn(
307 "Value " + value + " is not serializable");
308 }
309
310 continue;
311 }
312
313 EhcacheElement ehcacheElement = new EhcacheElement(
314 (Serializable)key, (Serializable)value);
315
316 objectOutputStream.writeObject(ehcacheElement);
317 }
318 }
319
320 objectOutputStream.writeObject(_COMMAND_SOCKET_CLOSE);
321
322 objectOutputStream.close();
323 }
324 catch (Exception e) {
325 throw new RuntimeException(e);
326 }
327 finally {
328 if (socket != null) {
329 try {
330 socket.close();
331 }
332 catch (IOException ioe) {
333 throw new RuntimeException(ioe);
334 }
335 }
336 }
337 }
338
339 private String _cacheName;
340 private CacheManager _portalCacheManager;
341 private ServerSocket _serverSocket;
342
343 }
344
345 private static class SocketCacheServerSocketConfiguration
346 implements ServerSocketConfigurator {
347
348 @Override
349 public void configure(ServerSocket serverSocket)
350 throws SocketException {
351
352 serverSocket.setSoTimeout(PropsValues.EHCACHE_SOCKET_SO_TIMEOUT);
353 }
354
355 }
356
357 }