001
014
015 package com.liferay.portal.fabric.netty.handlers;
016
017 import com.liferay.portal.fabric.agent.FabricAgent;
018 import com.liferay.portal.fabric.netty.agent.NettyFabricAgentStub;
019 import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
020 import com.liferay.portal.fabric.netty.rpc.ChannelThreadLocal;
021 import com.liferay.portal.fabric.netty.rpc.RPCUtil;
022 import com.liferay.portal.fabric.netty.rpc.SyncProcessRPCCallable;
023 import com.liferay.portal.fabric.netty.util.NettyUtil;
024 import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerConfig;
025 import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerStub;
026 import com.liferay.portal.fabric.repository.Repository;
027 import com.liferay.portal.fabric.worker.FabricWorker;
028 import com.liferay.portal.kernel.concurrent.BaseFutureListener;
029 import com.liferay.portal.kernel.concurrent.FutureListener;
030 import com.liferay.portal.kernel.concurrent.NoticeableFuture;
031 import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
032 import com.liferay.portal.kernel.log.Log;
033 import com.liferay.portal.kernel.log.LogFactoryUtil;
034 import com.liferay.portal.kernel.process.ClassPathUtil;
035 import com.liferay.portal.kernel.process.ProcessCallable;
036 import com.liferay.portal.kernel.process.ProcessConfig;
037 import com.liferay.portal.kernel.process.ProcessConfig.Builder;
038 import com.liferay.portal.kernel.process.ProcessException;
039 import com.liferay.portal.kernel.util.ArrayUtil;
040 import com.liferay.portal.kernel.util.StringBundler;
041 import com.liferay.portal.kernel.util.StringUtil;
042
043 import io.netty.channel.Channel;
044 import io.netty.channel.ChannelFuture;
045 import io.netty.channel.ChannelFutureListener;
046 import io.netty.channel.ChannelHandlerContext;
047 import io.netty.channel.SimpleChannelInboundHandler;
048 import io.netty.util.concurrent.EventExecutor;
049 import io.netty.util.concurrent.GenericFutureListener;
050
051 import java.io.File;
052 import java.io.IOException;
053 import java.io.Serializable;
054
055 import java.net.MalformedURLException;
056 import java.net.URLClassLoader;
057
058 import java.nio.file.Path;
059 import java.nio.file.Paths;
060
061 import java.util.ArrayList;
062 import java.util.HashMap;
063 import java.util.LinkedHashMap;
064 import java.util.List;
065 import java.util.Map;
066 import java.util.concurrent.Callable;
067 import java.util.concurrent.ExecutionException;
068 import java.util.concurrent.Future;
069
070
073 public class NettyFabricWorkerExecutionChannelHandler
074 extends SimpleChannelInboundHandler<NettyFabricWorkerConfig<Serializable>> {
075
076 public NettyFabricWorkerExecutionChannelHandler(
077 Repository<Channel> repository, FabricAgent fabricAgent,
078 long executionTimeout) {
079
080 if (repository == null) {
081 throw new NullPointerException("Repository is null");
082 }
083
084 if (fabricAgent == null) {
085 throw new NullPointerException("Fabric agent is null");
086 }
087
088 _repository = repository;
089 _fabricAgent = fabricAgent;
090 _executionTimeout = executionTimeout;
091 }
092
093 @Override
094 public void exceptionCaught(
095 ChannelHandlerContext channelHandlerContext, Throwable throwable) {
096
097 final Channel channel = channelHandlerContext.channel();
098
099 _log.error("Closing " + channel + " due to:", throwable);
100
101 ChannelFuture channelFuture = channel.close();
102
103 channelFuture.addListener(
104 new ChannelFutureListener() {
105
106 @Override
107 public void operationComplete(ChannelFuture channelFuture) {
108 if (_log.isInfoEnabled()) {
109 _log.info(channel + " is closed");
110 }
111 }
112
113 });
114 }
115
116 @Override
117 protected void channelRead0(
118 ChannelHandlerContext channelHandlerContext,
119 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
120
121 NoticeableFuture<LoadedPaths> noticeableFuture = loadPaths(
122 channelHandlerContext.channel(), nettyFabricWorkerConfig);
123
124 noticeableFuture.addFutureListener(
125 new PostLoadPathsFutureListener(
126 channelHandlerContext, nettyFabricWorkerConfig));
127 }
128
129 protected NoticeableFuture<LoadedPaths> loadPaths(
130 Channel channel,
131 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
132
133 Map<Path, Path> mergedPaths = new HashMap<>();
134
135 ProcessConfig processConfig =
136 nettyFabricWorkerConfig.getProcessConfig();
137
138 final Map<Path, Path> bootstrapPaths = new LinkedHashMap<>();
139
140 for (String pathString :
141 processConfig.getBootstrapClassPathElements()) {
142
143 bootstrapPaths.put(Paths.get(pathString), null);
144 }
145
146 mergedPaths.putAll(bootstrapPaths);
147
148 final Map<Path, Path> runtimePaths = new LinkedHashMap<>();
149
150 for (String pathString : processConfig.getRuntimeClassPathElements()) {
151 runtimePaths.put(Paths.get(pathString), null);
152 }
153
154 mergedPaths.putAll(runtimePaths);
155
156 final Map<Path, Path> inputPaths =
157 nettyFabricWorkerConfig.getInputPathMap();
158
159 mergedPaths.putAll(inputPaths);
160
161 return new NoticeableFutureConverter<LoadedPaths, Map<Path, Path>>(
162 _repository.getFiles(channel, mergedPaths, false)) {
163
164 @Override
165 protected LoadedPaths convert(Map<Path, Path> mergedPaths)
166 throws IOException {
167
168 Map<Path, Path> loadedInputPaths = new HashMap<>();
169
170 List<Path> missedInputPaths = new ArrayList<>();
171
172 for (Path path : inputPaths.keySet()) {
173 Path loadedInputPath = mergedPaths.get(path);
174
175 if (loadedInputPath == null) {
176 missedInputPaths.add(path);
177 }
178 else {
179 loadedInputPaths.put(path, loadedInputPath);
180 }
181 }
182
183 if (!missedInputPaths.isEmpty()) {
184 throw new IOException(
185 "Unable to get input paths: " + missedInputPaths);
186 }
187
188 List<Path> loadedBootstrapPaths = new ArrayList<>();
189
190 List<Path> missedBootstrapPaths = new ArrayList<>();
191
192 for (Path path : bootstrapPaths.keySet()) {
193 Path loadedBootstrapPath = mergedPaths.get(path);
194
195 if (loadedBootstrapPath == null) {
196 missedBootstrapPaths.add(path);
197 }
198 else {
199 loadedBootstrapPaths.add(loadedBootstrapPath);
200 }
201 }
202
203 if (!missedBootstrapPaths.isEmpty() && _log.isWarnEnabled()) {
204 _log.warn(
205 "Incomplete bootstrap classpath loaded, missed: " +
206 missedBootstrapPaths);
207 }
208
209 List<Path> loadedRuntimePaths = new ArrayList<>();
210
211 List<Path> missedRuntimePaths = new ArrayList<>();
212
213 for (Path path : runtimePaths.keySet()) {
214 Path loadedRuntimePath = mergedPaths.get(path);
215
216 if (loadedRuntimePath == null) {
217 missedRuntimePaths.add(path);
218 }
219 else {
220 loadedRuntimePaths.add(loadedRuntimePath);
221 }
222 }
223
224 if (!missedRuntimePaths.isEmpty() && _log.isWarnEnabled()) {
225 _log.warn(
226 "Incomplete runtime classpath loaded, missed: " +
227 missedRuntimePaths);
228 }
229
230 return new LoadedPaths(
231 loadedInputPaths,
232 StringUtil.merge(loadedBootstrapPaths, File.pathSeparator),
233 StringUtil.merge(loadedRuntimePaths, File.pathSeparator));
234 }
235
236 };
237 }
238
239 protected void sendResult(
240 Channel channel, long fabricWorkerId, Serializable result,
241 Throwable t) {
242
243 final FabricWorkerResultProcessCallable
244 fabricWorkerResultProcessCallable =
245 new FabricWorkerResultProcessCallable(
246 fabricWorkerId, result, t);
247
248 NoticeableFuture<Serializable> noticeableFuture = RPCUtil.execute(
249 channel,
250 new SyncProcessRPCCallable<Serializable>(
251 fabricWorkerResultProcessCallable));
252
253 NettyUtil.scheduleCancellation(
254 channel, noticeableFuture, _executionTimeout);
255
256 noticeableFuture.addFutureListener(
257 new BaseFutureListener<Serializable>() {
258
259 @Override
260 public void completeWithException(
261 Future<Serializable> future, Throwable throwable) {
262
263 _log.error(
264 "Unable to send back fabric worker result " +
265 fabricWorkerResultProcessCallable,
266 throwable);
267 }
268
269 });
270 }
271
272 protected static class FabricAgentFinishStartupProcessCallable
273 implements ProcessCallable<Serializable> {
274
275 @Override
276 public Serializable call() throws ProcessException {
277 Channel channel = ChannelThreadLocal.getChannel();
278
279 NettyFabricAgentStub nettyStubFabricAgent =
280 NettyChannelAttributes.getNettyFabricAgentStub(channel);
281
282 if (nettyStubFabricAgent == null) {
283 throw new ProcessException(
284 "Unable to locate fabric agent on channel " + channel);
285 }
286
287 nettyStubFabricAgent.finishStartup(_id);
288
289 return null;
290 }
291
292 protected FabricAgentFinishStartupProcessCallable(long id) {
293 _id = id;
294 }
295
296 private static final long serialVersionUID = 1L;
297
298 private final long _id;
299
300 }
301
302 protected static class FabricWorkerResultProcessCallable
303 implements ProcessCallable<Serializable> {
304
305 @Override
306 public Serializable call() throws ProcessException {
307 Channel channel = ChannelThreadLocal.getChannel();
308
309 NettyFabricAgentStub nettyStubFabricAgent =
310 NettyChannelAttributes.getNettyFabricAgentStub(channel);
311
312 if (nettyStubFabricAgent == null) {
313 throw new ProcessException(
314 "Unable to locate fabric agent on channel " + channel);
315 }
316
317 NettyFabricWorkerStub<Serializable> nettyStubFabricWorker =
318 (NettyFabricWorkerStub<Serializable>)
319 nettyStubFabricAgent.takeNettyStubFabricWorker(_id);
320
321 if (nettyStubFabricWorker == null) {
322 throw new ProcessException(
323 "Unable to locate fabric worker on channel " + channel +
324 ", with fabric worker id " + _id);
325 }
326
327 if (_throwable != null) {
328 nettyStubFabricWorker.setException(_throwable);
329 }
330 else {
331 nettyStubFabricWorker.setResult(_result);
332 }
333
334 return null;
335 }
336
337 @Override
338 public String toString() {
339 StringBundler sb = new StringBundler(7);
340
341 sb.append("{id=");
342 sb.append(_id);
343 sb.append(", result=");
344 sb.append(_result);
345 sb.append(", throwable=");
346 sb.append(_throwable);
347 sb.append("}");
348
349 return sb.toString();
350 }
351
352 protected FabricWorkerResultProcessCallable(
353 long id, Serializable result, Throwable throwable) {
354
355 _id = id;
356 _result = result;
357 _throwable = throwable;
358 }
359
360 private static final long serialVersionUID = 1L;
361
362 private final long _id;
363 private final Serializable _result;
364 private final Throwable _throwable;
365
366 }
367
368 protected static class LoadedPaths {
369
370 public LoadedPaths(
371 Map<Path, Path> inputPaths, String bootstrapClassPath,
372 String runtimeClassPath) {
373
374 _inputPaths = inputPaths;
375 _bootstrapClassPath = bootstrapClassPath;
376 _runtimeClassPath = runtimeClassPath;
377 }
378
379 public Map<Path, Path> getInputPaths() {
380 return _inputPaths;
381 }
382
383 public ProcessConfig toProcessConfig(ProcessConfig processConfig)
384 throws ProcessException {
385
386 Builder builder = new Builder();
387
388 builder.setArguments(processConfig.getArguments());
389 builder.setBootstrapClassPath(_bootstrapClassPath);
390 builder.setJavaExecutable(processConfig.getJavaExecutable());
391 builder.setRuntimeClassPath(_runtimeClassPath);
392
393 try {
394 builder.setReactClassLoader(
395 new URLClassLoader(
396 ArrayUtil.append(
397 ClassPathUtil.getClassPathURLs(_bootstrapClassPath),
398 ClassPathUtil.getClassPathURLs(
399 _runtimeClassPath))));
400 }
401 catch (MalformedURLException murle) {
402 throw new ProcessException(murle);
403 }
404
405 return builder.build();
406 }
407
408 private final String _bootstrapClassPath;
409 private final Map<Path, Path> _inputPaths;
410 private final String _runtimeClassPath;
411
412 }
413
414 protected class PostFabricWorkerExecutionFutureListener
415 implements GenericFutureListener
416 <io.netty.util.concurrent.Future<FabricWorker<Serializable>>> {
417
418 public PostFabricWorkerExecutionFutureListener(
419 Channel channel, LoadedPaths loadedPaths,
420 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
421
422 _channel = channel;
423 _loadedPaths = loadedPaths;
424 _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
425 }
426
427 @Override
428 public void operationComplete(
429 io.netty.util.concurrent.Future<FabricWorker<Serializable>>
430 future)
431 throws Exception {
432
433 Throwable throwable = future.cause();
434
435 if (throwable != null) {
436 sendResult(
437 _channel, _nettyFabricWorkerConfig.getId(), null,
438 throwable);
439
440 return;
441 }
442
443 FabricWorker<Serializable> fabricWorker = future.get();
444
445 NettyChannelAttributes.putFabricWorker(
446 _channel, _nettyFabricWorkerConfig.getId(), fabricWorker);
447
448 NoticeableFuture<Serializable> noticeableFuture = RPCUtil.execute(
449 _channel,
450 new SyncProcessRPCCallable<Serializable>(
451 new FabricAgentFinishStartupProcessCallable(
452 _nettyFabricWorkerConfig.getId())));
453
454 NettyUtil.scheduleCancellation(
455 _channel, noticeableFuture, _executionTimeout);
456
457 noticeableFuture.addFutureListener(
458 new BaseFutureListener<Serializable>() {
459
460 @Override
461 public void completeWithException(
462 Future<Serializable> future, Throwable throwable) {
463
464 _log.error(
465 "Unable to finish fabric worker startup",
466 throwable);
467 }
468
469 });
470
471 NoticeableFuture<Serializable> processNoticeableFuture =
472 fabricWorker.getProcessNoticeableFuture();
473
474 processNoticeableFuture.addFutureListener(
475 new PostFabricWorkerFinishFutureListener(
476 _channel, _nettyFabricWorkerConfig, _loadedPaths));
477 }
478
479 private final Channel _channel;
480 private final LoadedPaths _loadedPaths;
481 private final NettyFabricWorkerConfig<Serializable>
482 _nettyFabricWorkerConfig;
483
484 }
485
486 protected class PostFabricWorkerFinishFutureListener
487 implements FutureListener<Serializable> {
488
489 public PostFabricWorkerFinishFutureListener(
490 Channel channel,
491 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig,
492 LoadedPaths loadedPaths) {
493
494 _channel = channel;
495 _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
496 _loadedPaths = loadedPaths;
497 }
498
499 @Override
500 public void complete(Future<Serializable> future) {
501 Map<Path, Path> inputPaths = _loadedPaths.getInputPaths();
502
503 for (Path path : inputPaths.values()) {
504 FileHelperUtil.delete(true, path);
505 }
506
507 try {
508 sendResult(
509 _channel, _nettyFabricWorkerConfig.getId(), future.get(),
510 null);
511 }
512 catch (Throwable t) {
513 if (t instanceof ExecutionException) {
514 t = t.getCause();
515 }
516
517 sendResult(_channel, _nettyFabricWorkerConfig.getId(), null, t);
518 }
519 }
520
521 private final Channel _channel;
522 private final LoadedPaths _loadedPaths;
523 private final NettyFabricWorkerConfig<Serializable>
524 _nettyFabricWorkerConfig;
525
526 }
527
528 protected class PostLoadPathsFutureListener
529 extends BaseFutureListener<LoadedPaths> {
530
531 public PostLoadPathsFutureListener(
532 ChannelHandlerContext channelHandlerContext,
533 NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
534
535 _channelHandlerContext = channelHandlerContext;
536 _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
537 }
538
539 @Override
540 public void completeWithException(
541 Future<LoadedPaths> future, Throwable throwable) {
542
543 sendResult(
544 _channelHandlerContext.channel(),
545 _nettyFabricWorkerConfig.getId(), null, throwable);
546 }
547
548 @Override
549 public void completeWithResult(
550 Future<LoadedPaths> loadPathsFuture,
551 final LoadedPaths loadedPaths) {
552
553 EventExecutor eventExecutor = _channelHandlerContext.executor();
554
555 io.netty.util.concurrent.Future<FabricWorker<Serializable>> future =
556 eventExecutor.submit(
557 new Callable<FabricWorker<Serializable>>() {
558
559 @Override
560 public FabricWorker<Serializable> call()
561 throws ProcessException {
562
563 ProcessConfig processConfig =
564 _nettyFabricWorkerConfig.getProcessConfig();
565
566 return _fabricAgent.execute(
567 loadedPaths.toProcessConfig(processConfig),
568 _nettyFabricWorkerConfig.getProcessCallable());
569 }
570
571 });
572
573 future.addListener(
574 new PostFabricWorkerExecutionFutureListener(
575 _channelHandlerContext.channel(), loadedPaths,
576 _nettyFabricWorkerConfig));
577 }
578
579 private final ChannelHandlerContext _channelHandlerContext;
580 private final NettyFabricWorkerConfig<Serializable>
581 _nettyFabricWorkerConfig;
582
583 }
584
585 private static final Log _log = LogFactoryUtil.getLog(
586 NettyFabricWorkerExecutionChannelHandler.class);
587
588 private final long _executionTimeout;
589 private final FabricAgent _fabricAgent;
590 private final Repository<Channel> _repository;
591
592 }