001    /**
002     * Copyright (c) 2000-present Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.fabric.netty.handlers;
016    
017    import com.liferay.portal.fabric.agent.FabricAgent;
018    import com.liferay.portal.fabric.netty.agent.NettyFabricAgentStub;
019    import com.liferay.portal.fabric.netty.fileserver.FileHelperUtil;
020    import com.liferay.portal.fabric.netty.rpc.ChannelThreadLocal;
021    import com.liferay.portal.fabric.netty.rpc.RPCUtil;
022    import com.liferay.portal.fabric.netty.rpc.SyncProcessRPCCallable;
023    import com.liferay.portal.fabric.netty.util.NettyUtil;
024    import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerConfig;
025    import com.liferay.portal.fabric.netty.worker.NettyFabricWorkerStub;
026    import com.liferay.portal.fabric.repository.Repository;
027    import com.liferay.portal.fabric.worker.FabricWorker;
028    import com.liferay.portal.kernel.concurrent.BaseFutureListener;
029    import com.liferay.portal.kernel.concurrent.FutureListener;
030    import com.liferay.portal.kernel.concurrent.NoticeableFuture;
031    import com.liferay.portal.kernel.concurrent.NoticeableFutureConverter;
032    import com.liferay.portal.kernel.log.Log;
033    import com.liferay.portal.kernel.log.LogFactoryUtil;
034    import com.liferay.portal.kernel.process.ClassPathUtil;
035    import com.liferay.portal.kernel.process.ProcessCallable;
036    import com.liferay.portal.kernel.process.ProcessConfig;
037    import com.liferay.portal.kernel.process.ProcessConfig.Builder;
038    import com.liferay.portal.kernel.process.ProcessException;
039    import com.liferay.portal.kernel.util.ArrayUtil;
040    import com.liferay.portal.kernel.util.StringBundler;
041    import com.liferay.portal.kernel.util.StringUtil;
042    
043    import io.netty.channel.Channel;
044    import io.netty.channel.ChannelFuture;
045    import io.netty.channel.ChannelFutureListener;
046    import io.netty.channel.ChannelHandlerContext;
047    import io.netty.channel.SimpleChannelInboundHandler;
048    import io.netty.util.concurrent.EventExecutor;
049    import io.netty.util.concurrent.GenericFutureListener;
050    
051    import java.io.File;
052    import java.io.IOException;
053    import java.io.Serializable;
054    
055    import java.net.MalformedURLException;
056    import java.net.URLClassLoader;
057    
058    import java.nio.file.Path;
059    import java.nio.file.Paths;
060    
061    import java.util.ArrayList;
062    import java.util.HashMap;
063    import java.util.LinkedHashMap;
064    import java.util.List;
065    import java.util.Map;
066    import java.util.concurrent.Callable;
067    import java.util.concurrent.ExecutionException;
068    import java.util.concurrent.Future;
069    
070    /**
071     * @author Shuyang Zhou
072     */
073    public class NettyFabricWorkerExecutionChannelHandler
074            extends SimpleChannelInboundHandler<NettyFabricWorkerConfig<Serializable>> {
075    
076            public NettyFabricWorkerExecutionChannelHandler(
077                    Repository<Channel> repository, FabricAgent fabricAgent,
078                    long executionTimeout) {
079    
080                    if (repository == null) {
081                            throw new NullPointerException("Repository is null");
082                    }
083    
084                    if (fabricAgent == null) {
085                            throw new NullPointerException("Fabric agent is null");
086                    }
087    
088                    _repository = repository;
089                    _fabricAgent = fabricAgent;
090                    _executionTimeout = executionTimeout;
091            }
092    
093            @Override
094            public void exceptionCaught(
095                    ChannelHandlerContext channelHandlerContext, Throwable throwable) {
096    
097                    final Channel channel = channelHandlerContext.channel();
098    
099                    _log.error("Closing " + channel + " due to:", throwable);
100    
101                    ChannelFuture channelFuture = channel.close();
102    
103                    channelFuture.addListener(
104                            new ChannelFutureListener() {
105    
106                                    @Override
107                                    public void operationComplete(ChannelFuture channelFuture) {
108                                            if (_log.isInfoEnabled()) {
109                                                    _log.info(channel + " is closed");
110                                            }
111                                    }
112    
113                            });
114            }
115    
116            @Override
117            protected void channelRead0(
118                    ChannelHandlerContext channelHandlerContext,
119                    NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
120    
121                    NoticeableFuture<LoadedPaths> noticeableFuture = loadPaths(
122                            channelHandlerContext.channel(), nettyFabricWorkerConfig);
123    
124                    noticeableFuture.addFutureListener(
125                            new PostLoadPathsFutureListener(
126                                    channelHandlerContext, nettyFabricWorkerConfig));
127            }
128    
129            protected NoticeableFuture<LoadedPaths> loadPaths(
130                    Channel channel,
131                    NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
132    
133                    Map<Path, Path> mergedPaths = new HashMap<>();
134    
135                    ProcessConfig processConfig =
136                            nettyFabricWorkerConfig.getProcessConfig();
137    
138                    final Map<Path, Path> bootstrapPaths = new LinkedHashMap<>();
139    
140                    for (String pathString :
141                                    processConfig.getBootstrapClassPathElements()) {
142    
143                            bootstrapPaths.put(Paths.get(pathString), null);
144                    }
145    
146                    mergedPaths.putAll(bootstrapPaths);
147    
148                    final Map<Path, Path> runtimePaths = new LinkedHashMap<>();
149    
150                    for (String pathString : processConfig.getRuntimeClassPathElements()) {
151                            runtimePaths.put(Paths.get(pathString), null);
152                    }
153    
154                    mergedPaths.putAll(runtimePaths);
155    
156                    final Map<Path, Path> inputPaths =
157                            nettyFabricWorkerConfig.getInputPathMap();
158    
159                    mergedPaths.putAll(inputPaths);
160    
161                    return new NoticeableFutureConverter<LoadedPaths, Map<Path, Path>>(
162                            _repository.getFiles(channel, mergedPaths, false)) {
163    
164                            @Override
165                            protected LoadedPaths convert(Map<Path, Path> mergedPaths)
166                                    throws IOException {
167    
168                                    Map<Path, Path> loadedInputPaths = new HashMap<>();
169    
170                                    List<Path> missedInputPaths = new ArrayList<>();
171    
172                                    for (Path path : inputPaths.keySet()) {
173                                            Path loadedInputPath = mergedPaths.get(path);
174    
175                                            if (loadedInputPath == null) {
176                                                    missedInputPaths.add(path);
177                                            }
178                                            else {
179                                                    loadedInputPaths.put(path, loadedInputPath);
180                                            }
181                                    }
182    
183                                    if (!missedInputPaths.isEmpty()) {
184                                            throw new IOException(
185                                                    "Unable to get input paths: " + missedInputPaths);
186                                    }
187    
188                                    List<Path> loadedBootstrapPaths = new ArrayList<>();
189    
190                                    List<Path> missedBootstrapPaths = new ArrayList<>();
191    
192                                    for (Path path : bootstrapPaths.keySet()) {
193                                            Path loadedBootstrapPath = mergedPaths.get(path);
194    
195                                            if (loadedBootstrapPath == null) {
196                                                    missedBootstrapPaths.add(path);
197                                            }
198                                            else {
199                                                    loadedBootstrapPaths.add(loadedBootstrapPath);
200                                            }
201                                    }
202    
203                                    if (!missedBootstrapPaths.isEmpty() && _log.isWarnEnabled()) {
204                                            _log.warn(
205                                                    "Incomplete bootstrap classpath loaded, missed: " +
206                                                            missedBootstrapPaths);
207                                    }
208    
209                                    List<Path> loadedRuntimePaths = new ArrayList<>();
210    
211                                    List<Path> missedRuntimePaths = new ArrayList<>();
212    
213                                    for (Path path : runtimePaths.keySet()) {
214                                            Path loadedRuntimePath = mergedPaths.get(path);
215    
216                                            if (loadedRuntimePath == null) {
217                                                    missedRuntimePaths.add(path);
218                                            }
219                                            else {
220                                                    loadedRuntimePaths.add(loadedRuntimePath);
221                                            }
222                                    }
223    
224                                    if (!missedRuntimePaths.isEmpty() && _log.isWarnEnabled()) {
225                                            _log.warn(
226                                                    "Incomplete runtime classpath loaded, missed: " +
227                                                            missedRuntimePaths);
228                                    }
229    
230                                    return new LoadedPaths(
231                                            loadedInputPaths,
232                                            StringUtil.merge(loadedBootstrapPaths, File.pathSeparator),
233                                            StringUtil.merge(loadedRuntimePaths, File.pathSeparator));
234                            }
235    
236                    };
237            }
238    
239            protected void sendResult(
240                    Channel channel, long fabricWorkerId, Serializable result,
241                    Throwable t) {
242    
243                    final FabricWorkerResultProcessCallable
244                            fabricWorkerResultProcessCallable =
245                                    new FabricWorkerResultProcessCallable(
246                                            fabricWorkerId, result, t);
247    
248                    NoticeableFuture<Serializable> noticeableFuture = RPCUtil.execute(
249                            channel,
250                            new SyncProcessRPCCallable<Serializable>(
251                                    fabricWorkerResultProcessCallable));
252    
253                    NettyUtil.scheduleCancellation(
254                            channel, noticeableFuture, _executionTimeout);
255    
256                    noticeableFuture.addFutureListener(
257                            new BaseFutureListener<Serializable>() {
258    
259                                    @Override
260                                    public void completeWithException(
261                                            Future<Serializable> future, Throwable throwable) {
262    
263                                            _log.error(
264                                                    "Unable to send back fabric worker result " +
265                                                            fabricWorkerResultProcessCallable,
266                                                    throwable);
267                                    }
268    
269                            });
270            }
271    
272            protected static class FabricAgentFinishStartupProcessCallable
273                    implements ProcessCallable<Serializable> {
274    
275                    @Override
276                    public Serializable call() throws ProcessException {
277                            Channel channel = ChannelThreadLocal.getChannel();
278    
279                            NettyFabricAgentStub nettyStubFabricAgent =
280                                    NettyChannelAttributes.getNettyFabricAgentStub(channel);
281    
282                            if (nettyStubFabricAgent == null) {
283                                    throw new ProcessException(
284                                            "Unable to locate fabric agent on channel " + channel);
285                            }
286    
287                            nettyStubFabricAgent.finishStartup(_id);
288    
289                            return null;
290                    }
291    
292                    protected FabricAgentFinishStartupProcessCallable(long id) {
293                            _id = id;
294                    }
295    
296                    private static final long serialVersionUID = 1L;
297    
298                    private final long _id;
299    
300            }
301    
302            protected static class FabricWorkerResultProcessCallable
303                    implements ProcessCallable<Serializable> {
304    
305                    @Override
306                    public Serializable call() throws ProcessException {
307                            Channel channel = ChannelThreadLocal.getChannel();
308    
309                            NettyFabricAgentStub nettyStubFabricAgent =
310                                    NettyChannelAttributes.getNettyFabricAgentStub(channel);
311    
312                            if (nettyStubFabricAgent == null) {
313                                    throw new ProcessException(
314                                            "Unable to locate fabric agent on channel " + channel);
315                            }
316    
317                            NettyFabricWorkerStub<Serializable> nettyStubFabricWorker =
318                                    (NettyFabricWorkerStub<Serializable>)
319                                            nettyStubFabricAgent.takeNettyStubFabricWorker(_id);
320    
321                            if (nettyStubFabricWorker == null) {
322                                    throw new ProcessException(
323                                            "Unable to locate fabric worker on channel " + channel +
324                                                    ", with fabric worker id " + _id);
325                            }
326    
327                            if (_throwable != null) {
328                                    nettyStubFabricWorker.setException(_throwable);
329                            }
330                            else {
331                                    nettyStubFabricWorker.setResult(_result);
332                            }
333    
334                            return null;
335                    }
336    
337                    @Override
338                    public String toString() {
339                            StringBundler sb = new StringBundler(7);
340    
341                            sb.append("{id=");
342                            sb.append(_id);
343                            sb.append(", result=");
344                            sb.append(_result);
345                            sb.append(", throwable=");
346                            sb.append(_throwable);
347                            sb.append("}");
348    
349                            return sb.toString();
350                    }
351    
352                    protected FabricWorkerResultProcessCallable(
353                            long id, Serializable result, Throwable throwable) {
354    
355                            _id = id;
356                            _result = result;
357                            _throwable = throwable;
358                    }
359    
360                    private static final long serialVersionUID = 1L;
361    
362                    private final long _id;
363                    private final Serializable _result;
364                    private final Throwable _throwable;
365    
366            }
367    
368            protected static class LoadedPaths {
369    
370                    public LoadedPaths(
371                            Map<Path, Path> inputPaths, String bootstrapClassPath,
372                            String runtimeClassPath) {
373    
374                            _inputPaths = inputPaths;
375                            _bootstrapClassPath = bootstrapClassPath;
376                            _runtimeClassPath = runtimeClassPath;
377                    }
378    
379                    public Map<Path, Path> getInputPaths() {
380                            return _inputPaths;
381                    }
382    
383                    public ProcessConfig toProcessConfig(ProcessConfig processConfig)
384                            throws ProcessException {
385    
386                            Builder builder = new Builder();
387    
388                            builder.setArguments(processConfig.getArguments());
389                            builder.setBootstrapClassPath(_bootstrapClassPath);
390                            builder.setJavaExecutable(processConfig.getJavaExecutable());
391                            builder.setRuntimeClassPath(_runtimeClassPath);
392    
393                            try {
394                                    builder.setReactClassLoader(
395                                            new URLClassLoader(
396                                                    ArrayUtil.append(
397                                                            ClassPathUtil.getClassPathURLs(_bootstrapClassPath),
398                                                            ClassPathUtil.getClassPathURLs(
399                                                                    _runtimeClassPath))));
400                            }
401                            catch (MalformedURLException murle) {
402                                    throw new ProcessException(murle);
403                            }
404    
405                            return builder.build();
406                    }
407    
408                    private final String _bootstrapClassPath;
409                    private final Map<Path, Path> _inputPaths;
410                    private final String _runtimeClassPath;
411    
412            }
413    
414            protected class PostFabricWorkerExecutionFutureListener
415                    implements GenericFutureListener
416                            <io.netty.util.concurrent.Future<FabricWorker<Serializable>>> {
417    
418                    public PostFabricWorkerExecutionFutureListener(
419                            Channel channel, LoadedPaths loadedPaths,
420                            NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
421    
422                            _channel = channel;
423                            _loadedPaths = loadedPaths;
424                            _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
425                    }
426    
427                    @Override
428                    public void operationComplete(
429                                    io.netty.util.concurrent.Future<FabricWorker<Serializable>>
430                                            future)
431                            throws Exception {
432    
433                            Throwable throwable = future.cause();
434    
435                            if (throwable != null) {
436                                    sendResult(
437                                            _channel, _nettyFabricWorkerConfig.getId(), null,
438                                            throwable);
439    
440                                    return;
441                            }
442    
443                            FabricWorker<Serializable> fabricWorker = future.get();
444    
445                            NettyChannelAttributes.putFabricWorker(
446                                    _channel, _nettyFabricWorkerConfig.getId(), fabricWorker);
447    
448                            NoticeableFuture<Serializable> noticeableFuture = RPCUtil.execute(
449                                    _channel,
450                                    new SyncProcessRPCCallable<Serializable>(
451                                            new FabricAgentFinishStartupProcessCallable(
452                                                    _nettyFabricWorkerConfig.getId())));
453    
454                            NettyUtil.scheduleCancellation(
455                                    _channel, noticeableFuture, _executionTimeout);
456    
457                            noticeableFuture.addFutureListener(
458                                    new BaseFutureListener<Serializable>() {
459    
460                                            @Override
461                                            public void completeWithException(
462                                                    Future<Serializable> future, Throwable throwable) {
463    
464                                                    _log.error(
465                                                            "Unable to finish fabric worker startup",
466                                                            throwable);
467                                            }
468    
469                                    });
470    
471                            NoticeableFuture<Serializable> processNoticeableFuture =
472                                    fabricWorker.getProcessNoticeableFuture();
473    
474                            processNoticeableFuture.addFutureListener(
475                                    new PostFabricWorkerFinishFutureListener(
476                                            _channel, _nettyFabricWorkerConfig, _loadedPaths));
477                    }
478    
479                    private final Channel _channel;
480                    private final LoadedPaths _loadedPaths;
481                    private final NettyFabricWorkerConfig<Serializable>
482                            _nettyFabricWorkerConfig;
483    
484            }
485    
486            protected class PostFabricWorkerFinishFutureListener
487                    implements FutureListener<Serializable> {
488    
489                    public PostFabricWorkerFinishFutureListener(
490                            Channel channel,
491                            NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig,
492                            LoadedPaths loadedPaths) {
493    
494                            _channel = channel;
495                            _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
496                            _loadedPaths = loadedPaths;
497                    }
498    
499                    @Override
500                    public void complete(Future<Serializable> future) {
501                            Map<Path, Path> inputPaths = _loadedPaths.getInputPaths();
502    
503                            for (Path path : inputPaths.values()) {
504                                    FileHelperUtil.delete(true, path);
505                            }
506    
507                            try {
508                                    sendResult(
509                                            _channel, _nettyFabricWorkerConfig.getId(), future.get(),
510                                            null);
511                            }
512                            catch (Throwable t) {
513                                    if (t instanceof ExecutionException) {
514                                            t = t.getCause();
515                                    }
516    
517                                    sendResult(_channel, _nettyFabricWorkerConfig.getId(), null, t);
518                            }
519                    }
520    
521                    private final Channel _channel;
522                    private final LoadedPaths _loadedPaths;
523                    private final NettyFabricWorkerConfig<Serializable>
524                            _nettyFabricWorkerConfig;
525    
526            }
527    
528            protected class PostLoadPathsFutureListener
529                    extends BaseFutureListener<LoadedPaths> {
530    
531                    public PostLoadPathsFutureListener(
532                            ChannelHandlerContext channelHandlerContext,
533                            NettyFabricWorkerConfig<Serializable> nettyFabricWorkerConfig) {
534    
535                            _channelHandlerContext = channelHandlerContext;
536                            _nettyFabricWorkerConfig = nettyFabricWorkerConfig;
537                    }
538    
539                    @Override
540                    public void completeWithException(
541                            Future<LoadedPaths> future, Throwable throwable) {
542    
543                            sendResult(
544                                    _channelHandlerContext.channel(),
545                                    _nettyFabricWorkerConfig.getId(), null, throwable);
546                    }
547    
548                    @Override
549                    public void completeWithResult(
550                            Future<LoadedPaths> loadPathsFuture,
551                            final LoadedPaths loadedPaths) {
552    
553                            EventExecutor eventExecutor = _channelHandlerContext.executor();
554    
555                            io.netty.util.concurrent.Future<FabricWorker<Serializable>> future =
556                                    eventExecutor.submit(
557                                            new Callable<FabricWorker<Serializable>>() {
558    
559                                                    @Override
560                                                    public FabricWorker<Serializable> call()
561                                                            throws ProcessException {
562    
563                                                            ProcessConfig processConfig =
564                                                                    _nettyFabricWorkerConfig.getProcessConfig();
565    
566                                                            return _fabricAgent.execute(
567                                                                    loadedPaths.toProcessConfig(processConfig),
568                                                                    _nettyFabricWorkerConfig.getProcessCallable());
569                                                    }
570    
571                                            });
572    
573                            future.addListener(
574                                    new PostFabricWorkerExecutionFutureListener(
575                                            _channelHandlerContext.channel(), loadedPaths,
576                                            _nettyFabricWorkerConfig));
577                    }
578    
579                    private final ChannelHandlerContext _channelHandlerContext;
580                    private final NettyFabricWorkerConfig<Serializable>
581                            _nettyFabricWorkerConfig;
582    
583            }
584    
585            private static final Log _log = LogFactoryUtil.getLog(
586                    NettyFabricWorkerExecutionChannelHandler.class);
587    
588            private final long _executionTimeout;
589            private final FabricAgent _fabricAgent;
590            private final Repository<Channel> _repository;
591    
592    }