Apache commons exec框架的简介说明

白翰海
2023-12-01

转自:

Apache commons exec框架的简介说明

下文笔者讲述Apache commons exec框架的简介说明,如下所示

Apache commmons exec框架的功能

Apache commons  exec框架是对
  Process进行封装
  对外提供如下功能:
     为Process的stdin, stdout, stderr重定向流
      而不是File
     并发向Process stdin写入数据、读取Process stdout和stderr的数据
     避免进程阻塞
     超时终止Process
实现思路:
    1.引入相应的Jar包
    2.调用相应的方法,即可运行Process中的方法

例:
引入相应的jar包

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-exec</artifactId>
    <version>1.3</version>
</dependency>

DefaultExecutor

DefaultExecutor类的功能用于
    DefaultExecutor是启动子进程的执行类

例:

// 设置工作目录
public void setWorkingDirectory(final File dir);

// 设置流处理器
public void setStreamHandler(final ExecuteStreamHandler streamHandler);

// 执行命令
public int execute(final CommandLine command);
public int execute(final CommandLine command, final Map<String, String> environment);
public void execute(final CommandLine command, final ExecuteResultHandler handler);
public void execute(final CommandLine command, final Map<String, String> environment, 
    final ExecuteResultHandler handler);
实际的执行方法
private int executeInternal(final CommandLine command, final Map<String, String> environment,
        final File dir, final ExecuteStreamHandler streams) throws IOException {

    setExceptionCaught(null);

    // launch():使用Runtime.exec创建Process
    // Runtime.getRuntime().exec(cmd.toStrings(), EnvironmentUtils.toStrings(env), workingDir);
    final Process process = this.launch(command, environment, dir);

    // 将用户提供的流与进程的流连接起来
    try {
        streams.setProcessInputStream(process.getOutputStream());
        streams.setProcessOutputStream(process.getInputStream());
        streams.setProcessErrorStream(process.getErrorStream());
    } catch (final IOException e) {
        process.destroy();
        throw e;
    }

    // 启动线程,从输入流复制数据写入输出流
    streams.start();

    try {

        // add the process to the list of those to destroy if the VM exits
        if (this.getProcessDestroyer() != null) {
          this.getProcessDestroyer().add(process);
        }

        // associate the watchdog with the newly created process
        if (watchdog != null) {
            watchdog.start(process);
        }

        int exitValue = Executor.INVALID_EXITVALUE;

        try {
            exitValue = process.waitFor();
        } catch (final InterruptedException e) {
            process.destroy();
        }
        finally {
            // see http://bugs.sun.com/view_bug.do?bug_id=6420270
            // see https://issues.apache.org/jira/browse/EXEC-46
            // Process.waitFor should clear interrupt status when throwing InterruptedException
            // but we have to do that manually
            Thread.interrupted();
        }            

        if (watchdog != null) {
            watchdog.stop();
        }

        try {
            streams.stop();
        }
        catch (final IOException e) {
            setExceptionCaught(e);
        }

        closeProcessStreams(process);

        if (getExceptionCaught() != null) {
            throw getExceptionCaught();
        }

        if (watchdog != null) {
            try {
                watchdog.checkException();
            } catch (final IOException e) {
                throw e;
            } catch (final Exception e) {
                throw new IOException(e.getMessage());
            }
        }

        if (this.isFailure(exitValue)) {
            throw new ExecuteException("Process exited with an error: " + exitValue, exitValue);
        }

        return exitValue;
    } finally {
        // remove the process to the list of those to destroy if the VM exits
        if (this.getProcessDestroyer() != null) {
          this.getProcessDestroyer().remove(process);
        }
    }
}

CommandLine

封装命令和参数。
// 设置命令,无参数
public CommandLine(final String executable);
// 设置命令文件,无参数
public CommandLine(final File executable);

// 设置参数
public CommandLine addArguments(final String[] addArguments);
// 设置未分拆的参数
public CommandLine addArguments(final String addArguments);
// 设置替换值,替换参数中${}的占位变量
public void setSubstitutionMap(final Map<String, ?> substitutionMap);

// 直接解析命令行:第一个元素是命令,其余是参数
public static CommandLine parse(final String line);
public static CommandLine parse(final String line, final Map<String, ?> substitutionMap);
3. ExecuteStreamHandler
处理进程的输入流、输出流、错误流。

// os为process.getOutputStream(),连接着进程stdin
void setProcessInputStream(OutputStream os) throws IOException;

// is为process.getInputStream(),连接着进程stdout
void setProcessOutputStream(InputStream is) throws IOException;

// is为process.getErrorStream(),连接着进程stderr
void setProcessErrorStream(InputStream is) throws IOException;

// 启动处理
void start() throws IOException;

// 停止处理
void stop() throws IOException;
4. PumpStreamHandler
ExecuteStreamHandler的实现类
启动三个线程:
从用户提供的输入流input复制数据,到与进程stdin连接的输出流
从与进程stdout连接的输入流,复制数据到用户提供的输出流out
从与进程stderr连接的输入流,复制数据到用户提供的输出流err

用户提供的输入流需要手动关闭,提供的输出流若是PipedOutputStream则会被自动关闭。

// 设置进程的stdin、stdout、stderr
public PumpStreamHandler(final OutputStream out, final OutputStream err, final InputStream input);

// 设置进程的stdout、stderr
public PumpStreamHandler(final OutputStream out, final OutputStream err);

// 设置进程的stdout、stderr
public PumpStreamHandler(final OutputStream outAndErr);

// 设置为System.out, System.err
public PumpStreamHandler();

处理进程stdin

用户提供一个输入流input,进程提供一个与stdin连接的输出流os
启动一个线程,不断的从input复制数据到os,这样input就成为了进程的stdin
input关闭或出错后,复制结束,关闭os
// input是用户提供的输入流
// os是进程提供的与stdin连接的输出流
public void setProcessInputStream(final OutputStream os) {
    if (input != null) {
        if (input == System.in) {
            inputThread = createSystemInPump(input, os);
        } else {
            inputThread = createPump(input, os, true);
        }
    } else { // 无需输入流
        try {
            os.close();
        } catch (final IOException e) {
            final String msg = "Got exception while closing output stream";
            DebugUtils.handleException(msg, e);
        }
    }
}
创建线程和数据泵,连接输入流is和输出流os,线程启动后将不断从输入流复制数据到输出流。

protected Thread createPump(final InputStream is, final OutputStream os, final boolean closeWhenExhausted) {
    final Thread result = new Thread(new StreamPumper(is, os, closeWhenExhausted), "Exec Stream Pumper");
    result.setDaemon(true);
    return result;
}
输入流与输出流之间的数据泵StreamPumper

public class StreamPumper implements Runnable {
    // is,输入流;os,输出流;
    // closeWhenExhausted,输入流无数据是是否关闭输出流
    public StreamPumper(final InputStream is, final OutputStream os,
            final boolean closeWhenExhausted);

    public void run() {
        synchronized (this) {
            // Just in case this object is reused in the future
            finished = false;
        }

        final byte[] buf = new byte[this.size];

        int length;
        try {
            // 从输入流复制数据写入输出流
            while ((length = is.read(buf)) > 0) {
                os.write(buf, 0, length);
            }
        } catch (final Exception e) {
            // nothing to do - happens quite often with watchdog
        } finally {
            if (closeWhenExhausted) {
                try {
                    os.close();
                } catch (final IOException e) {
                    final String msg = "Got exception while closing exhausted output stream";
                    DebugUtils.handleException(msg ,e);
                }
            }
            synchronized (this) {
                finished = true;
                notifyAll(); // 通知等待泵结束的线程
            }
        }
    }

    // 阻塞等待泵结束
    public synchronized void waitFor() throws InterruptedException;
}

处理进程stdout

用户提供一个输出流out,进程提供一个与stdout连接的输入流is
启动一个线程,不断的从is复制数据到out,这样out就成为了进程的stdout
is关闭或出错后,复制结束,关闭out
// out是用户提供的输出流
// is是进程提供的与stdout连接的输出流
public void setProcessOutputStream(final InputStream is) {
    if (out != null) {
        createProcessOutputPump(is, out);
    }
}

protected void createProcessOutputPump(final InputStream is, final OutputStream os) {
    outputThread = createPump(is, os);
}

protected Thread createPump(final InputStream is, final OutputStream os) {
    final boolean closeWhenExhausted = os instanceof PipedOutputStream ? true : false;
    return createPump(is, os, closeWhenExhausted);
}

处理进程stderr

用户提供一个输出流err,进程提供一个与stderr连接的输入流is
启动一个线程,不断的从is复制数据到err,这样err就成为了进程的stderr
is关闭或出错后,复制结束,关闭err
public void setProcessErrorStream(final InputStream is) {
    if (err != null) {
        createProcessErrorPump(is, err);
    }
}

protected void createProcessErrorPump(final InputStream is, final OutputStream os) {
    errorThread = createPump(is, os);
}

启动与停止

启动线程,开始复制数据
public void start() {
    if (outputThread != null) {
        outputThread.start();
    }
    if (errorThread != null) {
        errorThread.start();
    }
    if (inputThread != null) {
        inputThread.start();
    }
}
等待线程结束,flush残留数据
public void stop() throws IOException {
    // inputStreamPumper是处理System.in的泵
    if (inputStreamPumper != null) {
        inputStreamPumper.stopProcessing();
    }

    // 超时等待线程结束
    stopThread(outputThread, stopTimeout);
    stopThread(errorThread, stopTimeout);
    stopThread(inputThread, stopTimeout);

    // flush输出流
    if (err != null && err != out) {
        try {
            err.flush();
        } catch (final IOException e) {
            final String msg = "Got exception while flushing the error stream : " + e.getMessage();
            DebugUtils.handleException(msg, e);
        }
    }

    if (out != null) {
        try {
            out.flush();
        } catch (final IOException e) {
            final String msg = "Got exception while flushing the output stream";
            DebugUtils.handleException(msg, e);
        }
    }

    if (caught != null) {
        throw caught;
    }
}

超时终止ExecuteWatchdog

执行器启动时,另起一个线程进行异步计时;
进程提前执行完毕,则通知计时器终止;
若计时器先计时结束,则通知执行器超时,执行器终止进程;
创建时指定超时时间
public ExecuteWatchdog(final long timeout) {
    this.killedProcess = false;
    this.watch = false;
    this.hasWatchdog = timeout != INFINITE_TIMEOUT; // -1表示不超时
    this.processStarted = false;
    if (this.hasWatchdog) {
        this.watchdog = new Watchdog(timeout); // 新建Watchdog进行计时
        this.watchdog.addTimeoutObserver(this); // ExecuteWatchdog作为WatchDog的观察者,计时超时后得到通知
    }
    else {
        this.watchdog = null;
    }
}

提交给DefaultExecutor
public void setWatchdog(final ExecuteWatchdog watchDog);
DefaultExecutor执行时启动ExecuteWatchdog,执行完毕停止:

private int executeInternal(...) {
    ...
    if (watchdog != null) {
        watchdog.start(process);
    }
    ...
    if (watchdog != null) {
        watchdog.stop();
    }
}
ExecuteWatchdog启动:

public synchronized void start(final Process processToMonitor) {
    if (processToMonitor == null) {
        throw new NullPointerException("process is null.");
    }
    if (this.process != null) {
        throw new IllegalStateException("Already running.");
    }
    this.caught = null;
    this.killedProcess = false;
    this.watch = true;
    this.process = processToMonitor;
    this.processStarted = true;
    this.notifyAll();
    if (this.hasWatchdog) {
        watchdog.start();  // 启动WatchDog计时
    }
}

WatchDog启动
public synchronized void start() {
    stopped = false;
    final Thread t = new Thread(this, "WATCHDOG"); // 启动线程进行异步计时
    t.setDaemon(true);
    t.start();
}

public void run() {
    final long startTime = System.currentTimeMillis();
    boolean isWaiting; // 未超时
    synchronized (this) {
        long timeLeft = timeout - (System.currentTimeMillis() - startTime);
        isWaiting = timeLeft > 0;
        while (!stopped && isWaiting) { // stopped表示Process是否执行完毕,执行完毕时退出计时循环
            try {
                wait(timeLeft); // 等待指定超时时间
            } catch (final InterruptedException e) {
            }
            timeLeft = timeout - (System.currentTimeMillis() - startTime);
            isWaiting = timeLeft > 0;
        }
    }

    // notify the listeners outside of the synchronized block (see EXEC-60)
    if (!isWaiting) { // 超时
        fireTimeoutOccured();
    }
}

// 观察者模式,通知观察者超时
protected final void fireTimeoutOccured() {
    final Enumeration<TimeoutObserver> e = observers.elements();
    while (e.hasMoreElements()) {
        e.nextElement().timeoutOccured(this);
    }
}

ExecuteWatchdog被通知超时
public synchronized void timeoutOccured(final Watchdog w) {
    try {
        try {
            if (process != null) {
                process.exitValue(); // 再检查一次是否执行完毕
            }
        } catch (final IllegalThreadStateException itse) {
            // 未执行完毕
            if (watch) {
                killedProcess = true;
                process.destroy(); // 终止进程
            }
        }
    } catch (final Exception e) {
        caught = e;
        DebugUtils.handleException("Getting the exit value of the process failed", e);
    } finally {
        cleanUp();
    }
}

public static void main(String[] args) throws Exception {
    // 命令行
    CommandLine commandLine = CommandLine.parse("ping baidu.com");

    // 重定向stdout和stderr到文件
    FileOutputStream fileOutputStream = new FileOutputStream("D:\\Test\\exec.log");
    PumpStreamHandler pumpStreamHandler = new PumpStreamHandler(fileOutputStream);

    // 超时终止:1秒
    ExecuteWatchdog executeWatchdog = new ExecuteWatchdog(1000);

    // 创建执行器
    DefaultExecutor executor = new DefaultExecutor();
    executor.setStreamHandler(pumpStreamHandler);
    executor.setWatchdog(executeWatchdog);

    // 执行,打印退出码
    int exitValue = executor.execute(commandLine);
    System.out.println(exitValue);

    // 关闭流
    fileOutputStream.close();
}

版权声明

本文仅代表作者观点,不代表本站立场。

 类似资料: