001    /**
002     * Copyright (c) 2000-2013 Liferay, Inc. All rights reserved.
003     *
004     * This library is free software; you can redistribute it and/or modify it under
005     * the terms of the GNU Lesser General Public License as published by the Free
006     * Software Foundation; either version 2.1 of the License, or (at your option)
007     * any later version.
008     *
009     * This library is distributed in the hope that it will be useful, but WITHOUT
010     * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
011     * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
012     * details.
013     */
014    
015    package com.liferay.portal.kernel.process;
016    
017    import com.liferay.portal.kernel.concurrent.ConcurrentHashSet;
018    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedInputStream;
019    import com.liferay.portal.kernel.io.unsync.UnsyncBufferedOutputStream;
020    import com.liferay.portal.kernel.io.unsync.UnsyncByteArrayOutputStream;
021    import com.liferay.portal.kernel.log.Log;
022    import com.liferay.portal.kernel.log.LogFactoryUtil;
023    import com.liferay.portal.kernel.process.log.ProcessOutputStream;
024    import com.liferay.portal.kernel.util.ClassLoaderObjectInputStream;
025    import com.liferay.portal.kernel.util.NamedThreadFactory;
026    import com.liferay.portal.kernel.util.PortalClassLoaderUtil;
027    import com.liferay.portal.kernel.util.StreamUtil;
028    import com.liferay.portal.kernel.util.StringPool;
029    
030    import java.io.EOFException;
031    import java.io.File;
032    import java.io.FileDescriptor;
033    import java.io.FileOutputStream;
034    import java.io.IOException;
035    import java.io.ObjectInputStream;
036    import java.io.ObjectOutputStream;
037    import java.io.PrintStream;
038    import java.io.Serializable;
039    import java.io.StreamCorruptedException;
040    
041    import java.util.ArrayList;
042    import java.util.Collections;
043    import java.util.Iterator;
044    import java.util.List;
045    import java.util.Set;
046    import java.util.concurrent.Callable;
047    import java.util.concurrent.ConcurrentHashMap;
048    import java.util.concurrent.ConcurrentMap;
049    import java.util.concurrent.ExecutionException;
050    import java.util.concurrent.ExecutorService;
051    import java.util.concurrent.Executors;
052    import java.util.concurrent.Future;
053    import java.util.concurrent.RejectedExecutionException;
054    import java.util.concurrent.TimeUnit;
055    import java.util.concurrent.TimeoutException;
056    import java.util.concurrent.atomic.AtomicReference;
057    
058    /**
059     * @author Shuyang Zhou
060     */
061    public class ProcessExecutor {
062    
063            public static <T extends Serializable> Future<T> execute(
064                            String classPath, List<String> arguments,
065                            ProcessCallable<? extends Serializable> processCallable)
066                    throws ProcessException {
067    
068                    return execute("java", classPath, arguments, processCallable);
069            }
070    
071            public static <T extends Serializable> Future<T> execute(
072                            String classPath,
073                            ProcessCallable<? extends Serializable> processCallable)
074                    throws ProcessException {
075    
076                    return execute(
077                            "java", classPath, Collections.<String>emptyList(),
078                            processCallable);
079            }
080    
081            public static <T extends Serializable> Future<T> execute(
082                            String java, String classPath, List<String> arguments,
083                            ProcessCallable<? extends Serializable> processCallable)
084                    throws ProcessException {
085    
086                    try {
087                            List<String> commands = new ArrayList<String>(arguments.size() + 4);
088    
089                            commands.add(java);
090                            commands.add("-cp");
091                            commands.add(classPath);
092                            commands.addAll(arguments);
093                            commands.add(ProcessExecutor.class.getName());
094    
095                            ProcessBuilder processBuilder = new ProcessBuilder(commands);
096    
097                            Process process = processBuilder.start();
098    
099                            ObjectOutputStream objectOutputStream = new ObjectOutputStream(
100                                    process.getOutputStream());
101    
102                            try {
103                                    objectOutputStream.writeObject(processCallable);
104                            }
105                            finally {
106                                    objectOutputStream.close();
107                            }
108    
109                            ExecutorService executorService = _getExecutorService();
110    
111                            SubprocessReactor subprocessReactor = new SubprocessReactor(
112                                    process);
113    
114                            try {
115                                    Future<ProcessCallable<? extends Serializable>>
116                                            futureResponseProcessCallable = executorService.submit(
117                                                    subprocessReactor);
118    
119                                    // Consider the newly created process as a managed process only
120                                    // after the subprocess reactor is taken by the thread pool
121    
122                                    _managedProcesses.add(process);
123    
124                                    return new ProcessExecutionFutureResult<T>(
125                                            futureResponseProcessCallable, process);
126                            }
127                            catch (RejectedExecutionException ree) {
128                                    process.destroy();
129    
130                                    throw new ProcessException(
131                                            "Cancelled execution because of a concurrent destroy", ree);
132                            }
133                    }
134                    catch (IOException ioe) {
135                            throw new ProcessException(ioe);
136                    }
137            }
138    
139            public static void main(String[] arguments)
140                    throws ClassNotFoundException, IOException {
141    
142                    PrintStream oldOutPrintStream = System.out;
143    
144                    ObjectOutputStream objectOutputStream = null;
145                    ProcessOutputStream outProcessOutputStream = null;
146    
147                    synchronized (oldOutPrintStream) {
148                            oldOutPrintStream.flush();
149    
150                            FileOutputStream fileOutputStream = new FileOutputStream(
151                                    FileDescriptor.out);
152    
153                            objectOutputStream = new ObjectOutputStream(
154                                    new UnsyncBufferedOutputStream(fileOutputStream));
155    
156                            outProcessOutputStream = new ProcessOutputStream(
157                                    objectOutputStream, false);
158    
159                            ProcessContext._setProcessOutputStream(outProcessOutputStream);
160    
161                            PrintStream newOutPrintStream = new PrintStream(
162                                    outProcessOutputStream, true);
163    
164                            System.setOut(newOutPrintStream);
165                    }
166    
167                    ProcessOutputStream errProcessOutputStream = new ProcessOutputStream(
168                            objectOutputStream, true);
169    
170                    PrintStream errPrintStream = new PrintStream(
171                            errProcessOutputStream, true);
172    
173                    System.setErr(errPrintStream);
174    
175                    try {
176                            ObjectInputStream objectInputStream = new ObjectInputStream(
177                                    System.in);
178    
179                            ProcessCallable<?> processCallable =
180                                    (ProcessCallable<?>)objectInputStream.readObject();
181    
182                            String logPrefixString =
183                                    StringPool.OPEN_BRACKET.concat(
184                                            processCallable.toString()).concat(
185                                                    StringPool.CLOSE_BRACKET);
186    
187                            byte[] logPrefix = logPrefixString.getBytes(StringPool.UTF8);
188    
189                            outProcessOutputStream.setLogPrefix(logPrefix);
190                            errProcessOutputStream.setLogPrefix(logPrefix);
191    
192                            Serializable result = processCallable.call();
193    
194                            System.out.flush();
195    
196                            outProcessOutputStream.writeProcessCallable(
197                                    new ReturnProcessCallable<Serializable>(result));
198    
199                            outProcessOutputStream.flush();
200                    }
201                    catch (ProcessException pe) {
202                            errPrintStream.flush();
203    
204                            errProcessOutputStream.writeProcessCallable(
205                                    new ExceptionProcessCallable(pe));
206    
207                            errProcessOutputStream.flush();
208                    }
209            }
210    
211            public void destroy() {
212                    if (_executorService == null) {
213                            return;
214                    }
215    
216                    synchronized (ProcessExecutor.class) {
217                            if (_executorService != null) {
218                                    _executorService.shutdownNow();
219    
220                                    // At this point, the thread pool will no longer take in any
221                                    // more subprocess reactors, so we know the list of managed
222                                    // processes is in a safe state. The worst case is that the
223                                    // destroyer thread and the thread pool thread concurrently
224                                    // destroy the same process, but this is JDK's job to ensure
225                                    // that processes are destroyed in a thread safe manner.
226    
227                                    Iterator<Process> iterator = _managedProcesses.iterator();
228    
229                                    while (iterator.hasNext()) {
230                                            Process process = iterator.next();
231    
232                                            process.destroy();
233    
234                                            iterator.remove();
235                                    }
236    
237                                    // The current thread has a more comprehensive view of the list
238                                    // of managed processes than any thread pool thread. After the
239                                    // previous iteration, we are safe to clear the list of managed
240                                    // processes.
241    
242                                    _managedProcesses.clear();
243    
244                                    _executorService = null;
245                            }
246                    }
247            }
248    
249            public static class ProcessContext {
250    
251                    public static boolean attach(
252                            String message, long interval, ShutdownHook shutdownHook) {
253    
254                            HeartbeatThread heartbeatThread = new HeartbeatThread(
255                                    message, interval, shutdownHook);
256    
257                            boolean value = _heartbeatThreadReference.compareAndSet(
258                                    null, heartbeatThread);
259    
260                            if (value) {
261                                    heartbeatThread.start();
262                            }
263    
264                            return value;
265                    }
266    
267                    public static void detach() throws InterruptedException {
268                            HeartbeatThread heartbeatThread =
269                                    _heartbeatThreadReference.getAndSet(null);
270    
271                            if (heartbeatThread != null) {
272                                    heartbeatThread.detach();
273                                    heartbeatThread.join();
274                            }
275                    }
276    
277                    public static ConcurrentMap<String, Object> getAttributes() {
278                            return _attributes;
279                    }
280    
281                    public static ProcessOutputStream getProcessOutputStream() {
282                            return _processOutputStream;
283                    }
284    
285                    public static boolean isAttached() {
286                            HeartbeatThread attachThread = _heartbeatThreadReference.get();
287    
288                            if (attachThread != null) {
289                                    return true;
290                            }
291                            else {
292                                    return false;
293                            }
294                    }
295    
296                    private static void _setProcessOutputStream(
297                            ProcessOutputStream processOutputStream) {
298    
299                            _processOutputStream = processOutputStream;
300                    }
301    
302                    private ProcessContext() {
303                    }
304    
305                    private static ConcurrentMap<String, Object> _attributes =
306                            new ConcurrentHashMap<String, Object>();
307                    private static AtomicReference<HeartbeatThread>
308                            _heartbeatThreadReference = new AtomicReference<HeartbeatThread>();
309                    private static ProcessOutputStream _processOutputStream;
310    
311            }
312    
313            public static interface ShutdownHook {
314    
315                    public static final int BROKEN_PIPE_CODE = 1;
316    
317                    public static final int INTERRUPTION_CODE = 2;
318    
319                    public static final int UNKNOWN_CODE = 3;
320    
321                    public boolean shutdown(int shutdownCode, Throwable shutdownThrowable);
322    
323            }
324    
325            private static ExecutorService _getExecutorService() {
326                    if (_executorService != null) {
327                            return _executorService;
328                    }
329    
330                    synchronized (ProcessExecutor.class) {
331                            if (_executorService == null) {
332                                    _executorService = Executors.newCachedThreadPool(
333                                            new NamedThreadFactory(
334                                                    ProcessExecutor.class.getName(), Thread.MIN_PRIORITY,
335                                                    PortalClassLoaderUtil.getClassLoader()));
336                            }
337                    }
338    
339                    return _executorService;
340            }
341    
342            private static Log _log = LogFactoryUtil.getLog(ProcessExecutor.class);
343    
344            private static volatile ExecutorService _executorService;
345            private static Set<Process> _managedProcesses =
346                    new ConcurrentHashSet<Process>();
347    
348            private static class HeartbeatThread extends Thread {
349    
350                    public HeartbeatThread(
351                            String message, long interval, ShutdownHook shutdownHook) {
352    
353                            if (shutdownHook == null) {
354                                    throw new IllegalArgumentException("Shutdown hook is null");
355                            }
356    
357                            _interval = interval;
358                            _shutdownHook = shutdownHook;
359    
360                            _pringBackProcessCallable = new PingbackProcessCallable(message);
361    
362                            setDaemon(true);
363                            setName(HeartbeatThread.class.getSimpleName());
364                    }
365    
366                    public void detach() {
367                            _detach = true;
368    
369                            interrupt();
370                    }
371    
372                    @Override
373                    public void run() {
374                            ProcessOutputStream processOutputStream =
375                                    ProcessContext.getProcessOutputStream();
376    
377                            int shutdownCode = 0;
378                            Throwable shutdownThrowable = null;
379    
380                            while (!_detach) {
381                                    try {
382                                            sleep(_interval);
383    
384                                            processOutputStream.writeProcessCallable(
385                                                    _pringBackProcessCallable);
386                                    }
387                                    catch (InterruptedException ie) {
388                                            if (_detach) {
389                                                    return;
390                                            }
391                                            else {
392                                                    shutdownThrowable = ie;
393    
394                                                    shutdownCode = ShutdownHook.INTERRUPTION_CODE;
395                                            }
396                                    }
397                                    catch (IOException ioe) {
398                                            shutdownThrowable = ioe;
399    
400                                            shutdownCode = ShutdownHook.BROKEN_PIPE_CODE;
401                                    }
402                                    catch (Throwable throwable) {
403                                            shutdownThrowable = throwable;
404    
405                                            shutdownCode = ShutdownHook.UNKNOWN_CODE;
406                                    }
407    
408                                    if (shutdownCode != 0) {
409                                            _detach = _shutdownHook.shutdown(
410                                                    shutdownCode, shutdownThrowable);
411                                    }
412                            }
413                    }
414    
415                    private volatile boolean _detach;
416                    private final long _interval;
417                    private final ProcessCallable<String> _pringBackProcessCallable;
418                    private final ShutdownHook _shutdownHook;
419    
420            }
421    
422            private static class PingbackProcessCallable
423                    implements ProcessCallable<String> {
424    
425                    public PingbackProcessCallable(String message) {
426                            _message = message;
427                    }
428    
429                    @Override
430                    public String call() {
431                            return _message;
432                    }
433    
434                    private static final long serialVersionUID = 1L;
435    
436                    private final String _message;
437    
438            }
439    
440            private static class ProcessExecutionFutureResult<T> implements Future<T> {
441    
442                    public ProcessExecutionFutureResult(
443                            Future<ProcessCallable<? extends Serializable>> future,
444                            Process process) {
445    
446                            _future = future;
447                            _process = process;
448                    }
449    
450                    @Override
451                    public boolean cancel(boolean mayInterruptIfRunning) {
452                            if (_future.isCancelled() || _future.isDone()) {
453                                    return false;
454                            }
455    
456                            _future.cancel(true);
457                            _process.destroy();
458    
459                            return true;
460                    }
461    
462                    @Override
463                    public boolean isCancelled() {
464                            return _future.isCancelled();
465                    }
466    
467                    @Override
468                    public boolean isDone() {
469                            return _future.isDone();
470                    }
471    
472                    @Override
473                    public T get() throws ExecutionException, InterruptedException {
474                            ProcessCallable<?> processCallable = _future.get();
475    
476                            return get(processCallable);
477                    }
478    
479                    @Override
480                    public T get(long timeout, TimeUnit timeUnit)
481                            throws ExecutionException, InterruptedException, TimeoutException {
482    
483                            ProcessCallable<?> processCallable = _future.get(timeout, timeUnit);
484    
485                            return get(processCallable);
486                    }
487    
488                    private T get(ProcessCallable<?> processCallable)
489                            throws ExecutionException {
490    
491                            try {
492                                    if (processCallable instanceof ReturnProcessCallable<?>) {
493                                            return (T)processCallable.call();
494                                    }
495    
496                                    ExceptionProcessCallable exceptionProcessCallable =
497                                            (ExceptionProcessCallable)processCallable;
498    
499                                    throw exceptionProcessCallable.call();
500                            }
501                            catch (ProcessException pe) {
502                                    throw new ExecutionException(pe);
503                            }
504                    }
505    
506                    private final Future<ProcessCallable<?>> _future;
507                    private final Process _process;
508    
509            }
510    
511            private static class SubprocessReactor
512                    implements Callable<ProcessCallable<? extends Serializable>> {
513    
514                    public SubprocessReactor(Process process) {
515                            _process = process;
516                    }
517    
518                    @Override
519                    public ProcessCallable<? extends Serializable> call() throws Exception {
520                            ProcessCallable<?> resultProcessCallable = null;
521    
522                            UnsyncBufferedInputStream unsyncBufferedInputStream =
523                                    new UnsyncBufferedInputStream(_process.getInputStream());
524    
525                            try {
526                                    ObjectInputStream objectInputStream = null;
527    
528                                    UnsyncByteArrayOutputStream unsyncByteArrayOutputStream =
529                                            new UnsyncByteArrayOutputStream();
530    
531                                    while (true) {
532                                            try {
533    
534                                                    // Be ready for a bad header
535    
536                                                    unsyncBufferedInputStream.mark(4);
537    
538                                                    objectInputStream = new ClassLoaderObjectInputStream(
539                                                            unsyncBufferedInputStream,
540                                                            PortalClassLoaderUtil.getClassLoader());
541    
542                                                    // Found the beginning of the object input stream. Flush
543                                                    // out corrupted log if necessary.
544    
545                                                    if (unsyncByteArrayOutputStream.size() > 0) {
546                                                            if (_log.isWarnEnabled()) {
547                                                                    _log.warn(
548                                                                            "Found corrupt leading log " +
549                                                                                    unsyncByteArrayOutputStream.toString());
550                                                            }
551                                                    }
552    
553                                                    unsyncByteArrayOutputStream = null;
554    
555                                                    break;
556                                            }
557                                            catch (StreamCorruptedException sce) {
558    
559                                                    // Collecting bad header as log information
560    
561                                                    unsyncBufferedInputStream.reset();
562    
563                                                    unsyncByteArrayOutputStream.write(
564                                                            unsyncBufferedInputStream.read());
565                                            }
566                                    }
567    
568                                    while (true) {
569                                            ProcessCallable<?> processCallable =
570                                                    (ProcessCallable<?>)objectInputStream.readObject();
571    
572                                            if ((processCallable instanceof ExceptionProcessCallable) ||
573                                                    (processCallable instanceof ReturnProcessCallable<?>)) {
574    
575                                                    resultProcessCallable = processCallable;
576    
577                                                    continue;
578                                            }
579    
580                                            Serializable returnValue = processCallable.call();
581    
582                                            if (_log.isDebugEnabled()) {
583                                                    _log.debug(
584                                                            "Invoked generic process callable " +
585                                                                    processCallable + " with return value " +
586                                                                            returnValue);
587                                            }
588                                    }
589                            }
590                            catch (StreamCorruptedException sce) {
591                                    File file = File.createTempFile(
592                                            "corrupted-stream-dump-" + System.currentTimeMillis(),
593                                            ".log");
594    
595                                    _log.error(
596                                            "Dumping content of corrupted object input stream to " +
597                                                    file.getAbsolutePath(),
598                                            sce);
599    
600                                    FileOutputStream fileOutputStream = new FileOutputStream(file);
601    
602                                    StreamUtil.transfer(
603                                            unsyncBufferedInputStream, fileOutputStream);
604    
605                                    throw new ProcessException(
606                                            "Corrupted object input stream", sce);
607                            }
608                            catch (EOFException eofe) {
609                                    throw new ProcessException(
610                                            "Subprocess piping back ended prematurely", eofe);
611                            }
612                            finally {
613                                    try {
614                                            int exitCode = _process.waitFor();
615    
616                                            if (exitCode != 0) {
617                                                    throw new ProcessException(
618                                                            "Subprocess terminated with exit code " + exitCode);
619                                            }
620                                    }
621                                    catch (InterruptedException ie) {
622                                            _process.destroy();
623    
624                                            throw new ProcessException(
625                                                    "Forcibly killed subprocess on interruption", ie);
626                                    }
627    
628                                    _managedProcesses.remove(_process);
629    
630                                    if (resultProcessCallable != null) {
631    
632                                            // Override previous process exception if there was one
633    
634                                            return resultProcessCallable;
635                                    }
636                            }
637                    }
638    
639                    private final Process _process;
640    
641            }
642    
643    }