001
014
015 package com.liferay.portal.kernel.process;
016
017 import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
018 import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
019 import com.liferay.portal.kernel.log.Log;
020 import com.liferay.portal.kernel.log.LogFactoryUtil;
021 import com.liferay.portal.kernel.process.log.ProcessOutputStream;
022 import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
023 import com.liferay.portal.kernel.util.NamedThreadFactory;
024 import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
025
026 import java.io.EOFException;
027 import java.io.IOException;
028 import java.io.InputStream;
029 import java.io.ObjectInputStream;
030 import java.io.ObjectOutputStream;
031 import java.io.OutputStream;
032 import java.io.PrintStream;
033 import java.io.Serializable;
034 import java.io.StreamCorruptedException;
035
036 import java.util.ArrayList;
037 import java.util.Collections;
038 import java.util.List;
039 import java.util.concurrent.Callable;
040 import java.util.concurrent.ExecutorService;
041 import java.util.concurrent.Executors;
042 import java.util.concurrent.Future;
043
044
047 public class ProcessExecutor {
048
049 public static <T extends Serializable> T execute(
050 ProcessCallable<T> processCallable, String classPath)
051 throws ProcessException {
052
053 return execute(
054 processCallable, classPath, Collections.<String>emptyList());
055 }
056
057 public static <T extends Serializable> T execute(
058 ProcessCallable<T> processCallable, String classPath,
059 List<String> arguments)
060 throws ProcessException {
061
062 try {
063 List<String> commands = new ArrayList<String>(arguments.size() + 4);
064
065 commands.add("java");
066 commands.add("-cp");
067 commands.add(classPath);
068 commands.addAll(arguments);
069 commands.add(ProcessExecutor.class.getName());
070
071 ProcessBuilder processBuilder = new ProcessBuilder(commands);
072
073 Process process = processBuilder.start();
074
075 _writeObject(process.getOutputStream(), processCallable);
076
077 ExecutorService executorService = _getExecutorService();
078
079 SubprocessReactor subprocessReactor = new SubprocessReactor(
080 process.getInputStream());
081
082 Future<ProcessCallable<?>> futureResponseProcessCallable =
083 executorService.submit(subprocessReactor);
084
085 int exitCode = process.waitFor();
086
087 if (exitCode != 0) {
088 throw new ProcessException(
089 "Subprocess terminated with exit code " + exitCode);
090 }
091
092 ProcessCallable<?> responseProcessCallable =
093 futureResponseProcessCallable.get();
094
095 if (responseProcessCallable instanceof ReturnProcessCallable<?>) {
096 return (T)responseProcessCallable.call();
097 }
098
099 if (responseProcessCallable instanceof ExceptionProcessCallable) {
100 ExceptionProcessCallable exceptionProcessCallable =
101 (ExceptionProcessCallable)responseProcessCallable;
102
103 throw exceptionProcessCallable.call();
104 }
105
106 if (_log.isWarnEnabled()) {
107 _log.warn(
108 "Subprocess reactor exited without a valid return " +
109 "because the subprocess terminated with an exception");
110 }
111
112 return null;
113 }
114 catch (ProcessException pe) {
115 throw pe;
116 }
117 catch (Exception e) {
118 throw new ProcessException(e);
119 }
120 }
121
122 public static void main(String[] arguments)
123 throws ClassNotFoundException, IOException {
124
125 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
126 System.out);
127
128 ProcessOutputStream outProcessOutputStream = new ProcessOutputStream(
129 objectOutputStream, false);
130
131 PrintStream outPrintStream = new PrintStream(
132 outProcessOutputStream, true);
133
134 System.setOut(outPrintStream);
135
136 ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
137 objectOutputStream, true);
138
139 PrintStream errPrintStream = new PrintStream(
140 errProcessOutputStream, true);
141
142 System.setErr(errPrintStream);
143
144 try {
145 ProcessCallable<?> processCallable =
146 (ProcessCallable<?>)_readObject(System.in, false);
147
148 Serializable result = processCallable.call();
149
150 outPrintStream.flush();
151
152 outProcessOutputStream.writeProcessCallable(
153 new ReturnProcessCallable<Serializable>(result));
154
155 outProcessOutputStream.close();
156 }
157 catch (ProcessException pe) {
158 errPrintStream.flush();
159
160 errProcessOutputStream.writeProcessCallable(
161 new ExceptionProcessCallable(pe));
162
163 errProcessOutputStream.close();
164 }
165 }
166
167 public void destroy() {
168 if (_executorService == null) {
169 return;
170 }
171
172 synchronized (ProcessExecutor.class) {
173 if (_executorService != null) {
174 _executorService.shutdownNow();
175
176 _executorService = null;
177 }
178 }
179 }
180
181 private static ExecutorService _getExecutorService() {
182 if (_executorService != null) {
183 return _executorService;
184 }
185
186 synchronized (ProcessExecutor.class) {
187 if (_executorService == null) {
188 _executorService = Executors.newCachedThreadPool(
189 new NamedThreadFactory(
190 ProcessExecutor.class.getName(), Thread.MIN_PRIORITY,
191 PortalClassLoaderUtil.getClassLoader()));
192 }
193 }
194
195 return _executorService;
196 }
197
198 private static Object _readObject(InputStream inputStream, boolean close)
199 throws ClassNotFoundException, IOException {
200
201 ObjectInputStream objectInputStream = new ObjectInputStream(
202 inputStream);
203
204 try {
205 return objectInputStream.readObject();
206 }
207 finally {
208 if (close) {
209 objectInputStream.close();
210 }
211 }
212 }
213
214 private static void _writeObject(OutputStream outputStream, Object object)
215 throws IOException {
216
217 ObjectOutputStream objectOutputStream = new ObjectOutputStream(
218 outputStream);
219
220 try {
221 objectOutputStream.writeObject(object);
222 }
223 finally {
224 objectOutputStream.close();
225 }
226 }
227
228 private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
229
230 private static volatile ExecutorService _executorService;
231
232 private static class SubprocessReactor
233 implements Callable<ProcessCallable<? extends Serializable>> {
234
235 public SubprocessReactor(InputStream inputStream) {
236 _unsyncBufferedInputStream = new UnsyncBufferedInputStream(
237 inputStream);
238 }
239
240 public ProcessCallable<? extends Serializable> call() throws Exception {
241 try {
242 ObjectInputStream objectInputStream = null;
243
244 UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
245 new UnsyncByteArrayOutputStream();
246
247 while (true) {
248 try {
249
250
251
252 _unsyncBufferedInputStream.mark(4);
253
254 objectInputStream = new ClassLoaderObjectInputStream(
255 _unsyncBufferedInputStream,
256 PortalClassLoaderUtil.getClassLoader());
257
258
259
260
261 if (unsyncByteArrayOutputStream.size() > 0) {
262 if (_log.isWarnEnabled()) {
263 _log.warn(
264 "Found corrupted leading log: " +
265 unsyncByteArrayOutputStream.toString());
266 }
267 }
268
269 unsyncByteArrayOutputStream = null;
270
271 break;
272 }
273 catch (StreamCorruptedException sce) {
274
275
276
277 _unsyncBufferedInputStream.reset();
278
279 unsyncByteArrayOutputStream.write(
280 _unsyncBufferedInputStream.read());
281 }
282 }
283
284 while (true) {
285 ProcessCallable<?> processCallable =
286 (ProcessCallable<?>)objectInputStream.readObject();
287
288 if (processCallable instanceof ExceptionProcessCallable) {
289 return processCallable;
290 }
291
292 if (processCallable instanceof ReturnProcessCallable<?>) {
293 return processCallable;
294 }
295
296 Serializable result = processCallable.call();
297
298 if (_log.isDebugEnabled()) {
299 _log.debug(
300 "Invoked generic process callable " +
301 processCallable + " with return value " +
302 result);
303 }
304 }
305 }
306 catch (EOFException eofe) {
307 }
308
309 return null;
310 }
311
312 private final UnsyncBufferedInputStream _unsyncBufferedInputStream;
313
314 }
315
316 }