001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
018    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
019    import com.liferay.portal.kernel.log.Log;
020    import com.liferay.portal.kernel.log.LogFactoryUtil;
021    import com.liferay.portal.kernel.process.log.ProcessOutputStream;
022    import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
023    import com.liferay.portal.kernel.util.NamedThreadFactory;
024    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
025    
026    import java.io.EOFException;
027    import java.io.IOException;
028    import java.io.InputStream;
029    import java.io.ObjectInputStream;
030    import java.io.ObjectOutputStream;
031    import java.io.OutputStream;
032    import java.io.PrintStream;
033    import java.io.Serializable;
034    import java.io.StreamCorruptedException;
035    
036    import java.util.ArrayList;
037    import java.util.Collections;
038    import java.util.List;
039    import java.util.concurrent.Callable;
040    import java.util.concurrent.ExecutorService;
041    import java.util.concurrent.Executors;
042    import java.util.concurrent.Future;
043    
044    /**
045     * @author Shuyang Zhou
046     */
047    public class ProcessExecutor {
048    
049            public static <T extends Serializable> T execute(
050                            ProcessCallable<T> processCallable, String classPath)
051                    throws ProcessException {
052    
053                    return execute(
054                            processCallable, classPath, Collections.<String>emptyList());
055            }
056    
057            public static <T extends Serializable> T execute(
058                            ProcessCallable<T> processCallable, String classPath,
059                            List<String> arguments)
060                    throws ProcessException {
061    
062                    try {
063                            List<String> commands = new ArrayList<String>(arguments.size() + 4);
064    
065                            commands.add("java");
066                            commands.add("-cp");
067                            commands.add(classPath);
068                            commands.addAll(arguments);
069                            commands.add(ProcessExecutor.class.getName());
070    
071                            ProcessBuilder processBuilder = new ProcessBuilder(commands);
072    
073                            Process process = processBuilder.start();
074    
075                            _writeObject(process.getOutputStream(), processCallable);
076    
077                            ExecutorService executorService = _getExecutorService();
078    
079                            SubprocessReactor subprocessReactor = new SubprocessReactor(
080                                    process.getInputStream());
081    
082                            Future<ProcessCallable<?>> futureResponseProcessCallable =
083                                    executorService.submit(subprocessReactor);
084    
085                            int exitCode = process.waitFor();
086    
087                            if (exitCode != 0) {
088                                    throw new ProcessException(
089                                            "Subprocess terminated with exit code " + exitCode);
090                            }
091    
092                            ProcessCallable<?> responseProcessCallable =
093                                    futureResponseProcessCallable.get();
094    
095                            if (responseProcessCallable instanceof ReturnProcessCallable<?>) {
096                                    return (T)responseProcessCallable.call();
097                            }
098    
099                            if (responseProcessCallable instanceof ExceptionProcessCallable) {
100                                    ExceptionProcessCallable exceptionProcessCallable =
101                                            (ExceptionProcessCallable)responseProcessCallable;
102    
103                                    throw exceptionProcessCallable.call();
104                            }
105    
106                            if (_log.isWarnEnabled()) {
107                                    _log.warn(
108                                            "Subprocess reactor exited without a valid return " +
109                                                    "because the subprocess terminated with an exception");
110                            }
111    
112                            return null;
113                    }
114                    catch (ProcessException pe) {
115                            throw pe;
116                    }
117                    catch (Exception e) {
118                            throw new ProcessException(e);
119                    }
120            }
121    
122            public static void main(String[] arguments)
123                    throws ClassNotFoundException, IOException {
124    
125                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
126                            System.out);
127    
128                    ProcessOutputStream outProcessOutputStream = new ProcessOutputStream(
129                            objectOutputStream, false);
130    
131                    PrintStream outPrintStream = new PrintStream(
132                            outProcessOutputStream, true);
133    
134                    System.setOut(outPrintStream);
135    
136                    ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
137                            objectOutputStream, true);
138    
139                    PrintStream errPrintStream = new PrintStream(
140                            errProcessOutputStream, true);
141    
142                    System.setErr(errPrintStream);
143    
144                    try {
145                            ProcessCallable<?> processCallable =
146                                    (ProcessCallable<?>)_readObject(System.in, false);
147    
148                            Serializable result = processCallable.call();
149    
150                            outPrintStream.flush();
151    
152                            outProcessOutputStream.writeProcessCallable(
153                                    new ReturnProcessCallable<Serializable>(result));
154    
155                            outProcessOutputStream.close();
156                    }
157                    catch (ProcessException pe) {
158                            errPrintStream.flush();
159    
160                            errProcessOutputStream.writeProcessCallable(
161                                    new ExceptionProcessCallable(pe));
162    
163                            errProcessOutputStream.close();
164                    }
165            }
166    
167            public void destroy() {
168                    if (_executorService == null) {
169                            return;
170                    }
171    
172                    synchronized (ProcessExecutor.class) {
173                            if (_executorService != null) {
174                                    _executorService.shutdownNow();
175    
176                                    _executorService = null;
177                            }
178                    }
179            }
180    
181            private static ExecutorService _getExecutorService() {
182                    if (_executorService != null) {
183                            return _executorService;
184                    }
185    
186                    synchronized (ProcessExecutor.class) {
187                            if (_executorService == null) {
188                                    _executorService = Executors.newCachedThreadPool(
189                                            new NamedThreadFactory(
190                                                    ProcessExecutor.class.getName(), Thread.MIN_PRIORITY,
191                                                    PortalClassLoaderUtil.getClassLoader()));
192                            }
193                    }
194    
195                    return _executorService;
196            }
197    
198            private static Object _readObject(InputStream inputStream, boolean close)
199                    throws ClassNotFoundException, IOException {
200    
201                    ObjectInputStream objectInputStream = new ObjectInputStream(
202                            inputStream);
203    
204                    try {
205                            return objectInputStream.readObject();
206                    }
207                    finally {
208                            if (close) {
209                                    objectInputStream.close();
210                            }
211                    }
212            }
213    
214            private static void _writeObject(OutputStream outputStream, Object object)
215                    throws IOException {
216    
217                    ObjectOutputStream objectOutputStream = new ObjectOutputStream(
218                            outputStream);
219    
220                    try {
221                            objectOutputStream.writeObject(object);
222                    }
223                    finally {
224                            objectOutputStream.close();
225                    }
226            }
227    
228            private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
229    
230            private static volatile ExecutorService _executorService;
231    
232            private static class SubprocessReactor
233                    implements Callable<ProcessCallable<? extends Serializable>> {
234    
235                    public SubprocessReactor(InputStream inputStream) {
236                            _unsyncBufferedInputStream = new UnsyncBufferedInputStream(
237                                    inputStream);
238                    }
239    
240                    public ProcessCallable<? extends Serializable> call() throws Exception {
241                            try {
242                                    ObjectInputStream objectInputStream = null;
243    
244                                    UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
245                                            new UnsyncByteArrayOutputStream();
246    
247                                    while (true) {
248                                            try {
249    
250                                                    // Be ready for a bad header
251    
252                                                    _unsyncBufferedInputStream.mark(4);
253    
254                                                    objectInputStream = new ClassLoaderObjectInputStream(
255                                                            _unsyncBufferedInputStream,
256                                                            PortalClassLoaderUtil.getClassLoader());
257    
258                                                    // Found the beginning of the object input stream. Flush
259                                                    // out corrupted log if necessary.
260    
261                                                    if (unsyncByteArrayOutputStream.size() > 0) {
262                                                            if (_log.isWarnEnabled()) {
263                                                                    _log.warn(
264                                                                            "Found corrupted leading log: " +
265                                                                                    unsyncByteArrayOutputStream.toString());
266                                                            }
267                                                    }
268    
269                                                    unsyncByteArrayOutputStream = null;
270    
271                                                    break;
272                                            }
273                                            catch (StreamCorruptedException sce) {
274    
275                                                    // Collecting bad header as log information
276    
277                                                    _unsyncBufferedInputStream.reset();
278    
279                                                    unsyncByteArrayOutputStream.write(
280                                                            _unsyncBufferedInputStream.read());
281                                            }
282                                    }
283    
284                                    while (true) {
285                                            ProcessCallable<?> processCallable =
286                                                    (ProcessCallable<?>)objectInputStream.readObject();
287    
288                                            if (processCallable instanceof ExceptionProcessCallable) {
289                                                    return processCallable;
290                                            }
291    
292                                            if (processCallable instanceof ReturnProcessCallable<?>) {
293                                                    return processCallable;
294                                            }
295    
296                                            Serializable result = processCallable.call();
297    
298                                            if (_log.isDebugEnabled()) {
299                                                    _log.debug(
300                                                            "Invoked generic process callable " +
301                                                                    processCallable + " with return value " +
302                                                                            result);
303                                            }
304                                    }
305                            }
306                            catch (EOFException eofe) {
307                            }
308    
309                            return null;
310                    }
311    
312                    private final UnsyncBufferedInputStream _unsyncBufferedInputStream;
313    
314            }
315    
316    }