001
014
015 package com.liferay.portal.kernel.process;
016
017 import com.liferay.portal.kernel.util.NamedThreadFactory;
018 import com.liferay.portal.kernel.util.ObjectValuePair;
019 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
020
021 import java.io.IOException;
022
023 import java.util.Arrays;
024 import java.util.List;
025 import java.util.concurrent.Callable;
026 import java.util.concurrent.ExecutionException;
027 import java.util.concurrent.ExecutorService;
028 import java.util.concurrent.Executors;
029 import java.util.concurrent.Future;
030 import java.util.concurrent.RejectedExecutionException;
031 import java.util.concurrent.TimeUnit;
032 import java.util.concurrent.TimeoutException;
033
034
037 public class ProcessUtil {
038
039 public static final ConsumerOutputProcessor CONSUMER_OUTPUT_PROCESSOR =
040 new ConsumerOutputProcessor();
041
042 public static final LoggingOutputProcessor LOGGING_OUTPUT_PROCESSOR =
043 new LoggingOutputProcessor();
044
045 public static <O, E> Future<ObjectValuePair<O, E>> execute(
046 OutputProcessor<O, E> outputProcessor, List<String> arguments)
047 throws ProcessException {
048
049 if (outputProcessor == null) {
050 throw new NullPointerException("Output processor is null");
051 }
052
053 if (arguments == null) {
054 throw new NullPointerException("Arguments is null");
055 }
056
057 ProcessBuilder processBuilder = new ProcessBuilder(arguments);
058
059 try {
060 Process process = processBuilder.start();
061
062 ExecutorService executorService = _getExecutorService();
063
064 try {
065 Future<O> stdOutFuture = executorService.submit(
066 new ProcessStdOutCallable<O>(outputProcessor, process));
067
068 Future<E> stdErrFuture = executorService.submit(
069 new ProcessStdErrCallable<E>(outputProcessor, process));
070
071 return new BindedFuture<O, E>(
072 stdOutFuture, stdErrFuture, process);
073 }
074 catch (RejectedExecutionException ree) {
075 process.destroy();
076
077 throw new ProcessException(
078 "Cancelled execution because of a concurrent destroy", ree);
079 }
080 }
081 catch (IOException ioe) {
082 throw new ProcessException(ioe);
083 }
084 }
085
086 public static <O, E> Future<ObjectValuePair<O, E>> execute(
087 OutputProcessor<O, E> outputProcessor, String... arguments)
088 throws ProcessException {
089
090 return execute(outputProcessor, Arrays.asList(arguments));
091 }
092
093 public void destroy() {
094 if (_executorService == null) {
095 return;
096 }
097
098 synchronized (ProcessUtil.class) {
099 if (_executorService != null) {
100 _executorService.shutdownNow();
101
102 _executorService = null;
103 }
104 }
105 }
106
107 private static ExecutorService _getExecutorService() {
108 if (_executorService != null) {
109 return _executorService;
110 }
111
112 synchronized (ProcessUtil.class) {
113 if (_executorService == null) {
114 _executorService = Executors.newCachedThreadPool(
115 new NamedThreadFactory(
116 ProcessUtil.class.getName(), Thread.MIN_PRIORITY,
117 PortalClassLoaderUtil.getClassLoader()));
118 }
119 }
120
121 return _executorService;
122 }
123
124 private static volatile ExecutorService _executorService;
125
126 private static class BindedFuture<O, E>
127 implements Future<ObjectValuePair<O, E>> {
128
129 public BindedFuture(
130 Future<O> stdOutFuture, Future<E> stdErrFuture, Process process) {
131
132 _stdOutFuture = stdOutFuture;
133 _stdErrFuture = stdErrFuture;
134 _process = process;
135 }
136
137 @Override
138 public boolean cancel(boolean mayInterruptIfRunning) {
139 if (_stdOutFuture.isCancelled() || _stdOutFuture.isDone()) {
140 return false;
141 }
142
143 _stdErrFuture.cancel(true);
144 _stdOutFuture.cancel(true);
145 _process.destroy();
146
147 return true;
148 }
149
150 @Override
151 public ObjectValuePair<O, E> get()
152 throws ExecutionException, InterruptedException {
153
154 E stdErrResult = _stdErrFuture.get();
155 O stdOutResult = _stdOutFuture.get();
156
157 return new ObjectValuePair<O, E>(stdOutResult, stdErrResult);
158 }
159
160 @Override
161 public ObjectValuePair<O, E> get(long timeout, TimeUnit unit)
162 throws ExecutionException, InterruptedException, TimeoutException {
163
164 long startTime = System.currentTimeMillis();
165
166 E stdErrResult = _stdErrFuture.get(timeout, unit);
167
168 long elapseTime = System.currentTimeMillis() - startTime;
169
170 long secondTimeout =
171 timeout - unit.convert(elapseTime, TimeUnit.MILLISECONDS);
172
173 O stdOutResult = _stdOutFuture.get(secondTimeout, unit);
174
175 return new ObjectValuePair<O, E>(stdOutResult, stdErrResult);
176 }
177
178 @Override
179 public boolean isCancelled() {
180 return _stdOutFuture.isCancelled();
181 }
182
183 @Override
184 public boolean isDone() {
185 return _stdOutFuture.isDone();
186 }
187
188 private final Future<E> _stdErrFuture;
189 private final Future<O> _stdOutFuture;
190 private final Process _process;
191
192 }
193
194 private static class ProcessStdErrCallable<T> implements Callable<T> {
195
196 public ProcessStdErrCallable(
197 OutputProcessor<?, T> outputProcessor, Process process) {
198
199 _outputProcessor = outputProcessor;
200 _process = process;
201 }
202
203 @Override
204 public T call() throws Exception {
205 return _outputProcessor.processStdErr(_process.getErrorStream());
206 }
207
208 private final OutputProcessor<?, T> _outputProcessor;
209 private final Process _process;
210
211 }
212
213 private static class ProcessStdOutCallable<T> implements Callable<T> {
214
215 public ProcessStdOutCallable(
216 OutputProcessor<T, ?> outputProcessor, Process process) {
217
218 _outputProcessor = outputProcessor;
219 _process = process;
220 }
221
222 @Override
223 public T call() throws Exception {
224 try {
225 return _outputProcessor.processStdOut(
226 _process.getInputStream());
227 }
228 finally {
229 try {
230 int exitCode = _process.waitFor();
231
232 if (exitCode != 0) {
233 throw new ProcessException(
234 "Subprocess terminated with exit code " + exitCode);
235 }
236 }
237 catch (InterruptedException ie) {
238 _process.destroy();
239
240 throw new ProcessException(
241 "Forcibly killed subprocess on interruption", ie);
242 }
243 }
244 }
245
246 private final OutputProcessor<T, ?> _outputProcessor;
247 private final Process _process;
248
249 }
250
251 }