001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.util.NamedThreadFactory;
018    import com.liferay.portal.kernel.util.ObjectValuePair;
019    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
020    
021    import java.io.IOException;
022    
023    import java.util.Arrays;
024    import java.util.List;
025    import java.util.concurrent.Callable;
026    import java.util.concurrent.ExecutionException;
027    import java.util.concurrent.ExecutorService;
028    import java.util.concurrent.Executors;
029    import java.util.concurrent.Future;
030    import java.util.concurrent.RejectedExecutionException;
031    import java.util.concurrent.TimeUnit;
032    import java.util.concurrent.TimeoutException;
033    
034    /**
035     * @author Shuyang Zhou
036     */
037    public class ProcessUtil {
038    
039            public static final ConsumerOutputProcessor CONSUMER_OUTPUT_PROCESSOR =
040                    new ConsumerOutputProcessor();
041    
042            public static final LoggingOutputProcessor LOGGING_OUTPUT_PROCESSOR =
043                    new LoggingOutputProcessor();
044    
045            public static <O, E> Future<ObjectValuePair<O, E>> execute(
046                            OutputProcessor<O, E> outputProcessor, List<String> arguments)
047                    throws ProcessException {
048    
049                    if (outputProcessor == null) {
050                            throw new NullPointerException("Output processor is null");
051                    }
052    
053                    if (arguments == null) {
054                            throw new NullPointerException("Arguments is null");
055                    }
056    
057                    ProcessBuilder processBuilder = new ProcessBuilder(arguments);
058    
059                    try {
060                            Process process = processBuilder.start();
061    
062                            ExecutorService executorService = _getExecutorService();
063    
064                            try {
065                                    Future<O> stdOutFuture = executorService.submit(
066                                            new ProcessStdOutCallable<O>(outputProcessor, process));
067    
068                                    Future<E> stdErrFuture = executorService.submit(
069                                            new ProcessStdErrCallable<E>(outputProcessor, process));
070    
071                                    return new BindedFuture<O, E>(
072                                            stdOutFuture, stdErrFuture, process);
073                            }
074                            catch (RejectedExecutionException ree) {
075                                    process.destroy();
076    
077                                    throw new ProcessException(
078                                            "Cancelled execution because of a concurrent destroy", ree);
079                            }
080                    }
081                    catch (IOException ioe) {
082                            throw new ProcessException(ioe);
083                    }
084            }
085    
086            public static <O, E> Future<ObjectValuePair<O, E>> execute(
087                            OutputProcessor<O, E> outputProcessor, String... arguments)
088                    throws ProcessException {
089    
090                    return execute(outputProcessor, Arrays.asList(arguments));
091            }
092    
093            public void destroy() {
094                    if (_executorService == null) {
095                            return;
096                    }
097    
098                    synchronized (ProcessUtil.class) {
099                            if (_executorService != null) {
100                                    _executorService.shutdownNow();
101    
102                                    _executorService = null;
103                            }
104                    }
105            }
106    
107            private static ExecutorService _getExecutorService() {
108                    if (_executorService != null) {
109                            return _executorService;
110                    }
111    
112                    synchronized (ProcessUtil.class) {
113                            if (_executorService == null) {
114                                    _executorService = Executors.newCachedThreadPool(
115                                            new NamedThreadFactory(
116                                                    ProcessUtil.class.getName(), Thread.MIN_PRIORITY,
117                                                    PortalClassLoaderUtil.getClassLoader()));
118                            }
119                    }
120    
121                    return _executorService;
122            }
123    
124            private static volatile ExecutorService _executorService;
125    
126            private static class BindedFuture<O, E>
127                    implements Future<ObjectValuePair<O, E>> {
128    
129                    public BindedFuture(
130                            Future<O> stdOutFuture, Future<E> stdErrFuture, Process process) {
131    
132                            _stdOutFuture = stdOutFuture;
133                            _stdErrFuture = stdErrFuture;
134                            _process = process;
135                    }
136    
137                    @Override
138                    public boolean cancel(boolean mayInterruptIfRunning) {
139                            if (_stdOutFuture.isCancelled() || _stdOutFuture.isDone()) {
140                                    return false;
141                            }
142    
143                            _stdErrFuture.cancel(true);
144                            _stdOutFuture.cancel(true);
145                            _process.destroy();
146    
147                            return true;
148                    }
149    
150                    @Override
151                    public ObjectValuePair<O, E> get()
152                            throws ExecutionException, InterruptedException {
153    
154                            E stdErrResult = _stdErrFuture.get();
155                            O stdOutResult = _stdOutFuture.get();
156    
157                            return new ObjectValuePair<O, E>(stdOutResult, stdErrResult);
158                    }
159    
160                    @Override
161                    public ObjectValuePair<O, E> get(long timeout, TimeUnit unit)
162                            throws ExecutionException, InterruptedException, TimeoutException {
163    
164                            long startTime = System.currentTimeMillis();
165    
166                            E stdErrResult = _stdErrFuture.get(timeout, unit);
167    
168                            long elapseTime = System.currentTimeMillis() - startTime;
169    
170                            long secondTimeout =
171                                    timeout - unit.convert(elapseTime, TimeUnit.MILLISECONDS);
172    
173                            O stdOutResult = _stdOutFuture.get(secondTimeout, unit);
174    
175                            return new ObjectValuePair<O, E>(stdOutResult, stdErrResult);
176                    }
177    
178                    @Override
179                    public boolean isCancelled() {
180                            return _stdOutFuture.isCancelled();
181                    }
182    
183                    @Override
184                    public boolean isDone() {
185                            return _stdOutFuture.isDone();
186                    }
187    
188                    private final Future<E> _stdErrFuture;
189                    private final Future<O> _stdOutFuture;
190                    private final Process _process;
191    
192            }
193    
194            private static class ProcessStdErrCallable<T> implements Callable<T> {
195    
196                    public ProcessStdErrCallable(
197                            OutputProcessor<?, T> outputProcessor, Process process) {
198    
199                            _outputProcessor = outputProcessor;
200                            _process = process;
201                    }
202    
203                    @Override
204                    public T call() throws Exception {
205                            return _outputProcessor.processStdErr(_process.getErrorStream());
206                    }
207    
208                    private final OutputProcessor<?, T> _outputProcessor;
209                    private final Process _process;
210    
211            }
212    
213            private static class ProcessStdOutCallable<T> implements Callable<T> {
214    
215                    public ProcessStdOutCallable(
216                            OutputProcessor<T, ?> outputProcessor, Process process) {
217    
218                            _outputProcessor = outputProcessor;
219                            _process = process;
220                    }
221    
222                    @Override
223                    public T call() throws Exception {
224                            try {
225                                    return _outputProcessor.processStdOut(
226                                            _process.getInputStream());
227                            }
228                            finally {
229                                    try {
230                                            int exitCode = _process.waitFor();
231    
232                                            if (exitCode != 0) {
233                                                    throw new ProcessException(
234                                                            "Subprocess terminated with exit code " + exitCode);
235                                            }
236                                    }
237                                    catch (InterruptedException ie) {
238                                            _process.destroy();
239    
240                                            throw new ProcessException(
241                                                    "Forcibly killed subprocess on interruption", ie);
242                                    }
243                            }
244                    }
245    
246                    private final OutputProcessor<T, ?> _outputProcessor;
247                    private final Process _process;
248    
249            }
250    
251    }