励精图治---Concurrency---取消线程

宰父夕
2023-12-01

线程创建后有几种情况会取消

1. 用户点击cancel取消

2. 超时取消,比如网页加载中有部分信息没被加载出来

3. 应用程序的逻辑设计,比如在多线程情况下,有一个线程找到了result,其他线程停止作业。

4. error。exception

5. 关闭程序,关闭服务


常用中断方式

while (!isCancelled)  {

        doSomething();

}

每次都去检查一下这个中断状态变量。这有个坏处,就是如果doSomething有阻塞。呵呵,可能就永远都执行不下去了。


Thread的三个方法

interrupt: 中断目标线程----------不会立即停止目标线程正在进行的工作,只是传递请求中断的消息

isInterrupted: 返回目标线程的中断状态

interrupted: 清楚当前线程的中断状态


核心点:中断是实现取消的最合理方法

while (!Thread.currentThread().isInterrupted()) {

doSomething();

}

之前的中断状态变量改成了判断线程是否被interrupte.

这里的情况是    用线程自身来中断。中断的操作也通过线程自身来进行。每个线程都有自身的中断策略。另,除非你知道这个线程的中断策略,不然不要中断这个线程。


上一个理念上实现的代码。不要考虑细节。

1.外部线程中中断

private static final ScheduledExecutorService cancelExec = ...;

public static void timedRun (Runnable r, long timeout, TimeUnit unit) {

        final Thread taskThread = Thread.currentThread();

cancelExec.schedule(new Runnable() {

                public void run() { taskThread.interrupt(); }

        }, timeout, unit);

        r.run();

}


2.在专门线程中中断任务

public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InteruptedException {

class RethrowableTask implements Runnable {

private volatile Throwable t;

public void run() {

try { r.run(); }catch (Throwable t) { this.t = t }

}

void rethrow() {

if(t != null) throw launderThrowable(t);

}

}

RethrowableTask task = new RethrowableTask();

final Thread taskThread = new Thread(task);

taskThread.start();

cancelExec.schedule(new Runnable() {

public void run() { taskThread.interrupt(); }

}, timeout, unit);

taskThread.join(unit.toMillis(timeout)); //#这里的阻塞等待不知道是超时还是正常退出。

task.rethrow();

}


通过Future来实现取消

当取消某个任务时,不宜直接中断任务,因为不知道现在执行的是什么任务。

public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {

Future<?> task = taskExec.submit(r);//#通过ExecutorService得到封装的Future

try {

task.get(timeout, unit);//#定时get

} catch (TimeoutException e) {

doSomething();

}catch(ExecutionException e){

doSomething();

throw launderThrowable(e.getCause());

}finally{

task.cancel(true);

}

}


处理不可中断的阻塞

1. java.io包中的同步Socket I/O: 这里个InputStream和OutputStream中的read和write不会相应中断,但这个基于套接字socket的。那么直接关闭这个套接字socket即可。这会抛出一个SocketException

2. Java.io包中的同步I/O: InterruptibleChannel 用来表现一个可以被异步关闭的 Channel 。这表现在两方面:

1.    当一个 InterruptibleChannel 的 close() 方法被调用时,其它 block 在这个 InterruptibleChannel 的 IO 操作上的线程会接收到一个 AsynchronousCloseException 。

2.    当一个线程 block 在 InterruptibleChannel 的 IO 操作上时,另一个线程调用该线程的 interrupt() 方法会导致 channel 被关闭,该线程收到一个ClosedByInterruptException ,同时线程的 interrupt 状态会被设置。

3. Selector的异步I/O: 如果一个线程在调用Selector.select方法时阻塞,那么调用close或者wakeup会使线程抛出ClosedSelectorException并提前返回。

4. 获取某个锁:可以使用lockInterruptibly方法中断


范例

public class ReaderThread extends Thread {

private final Socket socket;

private final InputStream in;


public ReaderThread(Socket socket) throws IOException {

this.socket = socket;

this.in = socket.getInputStream();

}


public void interrupt() {

try {

socket.close();

}catch(IOException ignored) {}

finally {

super.interrupter();

}

}


public void run() {

try {

byte[] buf = new byte[BUFSZ];

while(true) {

int count = in.read(buf);

if(count < 0) break;

else if(count > 0)

processBuffer(buf, count);

}

}catch(IOException e){}

}

}


另外,可以用newTaskFor来封装非标准的取消。


logger服务

范例:一段支持关闭,支持将队列中已有log信息打印出来的代码

public class LogService {

private final BlockingQueue<String> queue;

private final LoggerThread loggerThread;

private final PrintWriter writer;

@GuardedBy("this") private boolean isShutdown;

@GuardedBy("this") private int reservations;

public void start() { loggerThread.start(); }


public void stop() {

synchronized (this) {isShutdown = true; }

loggerThread.interrupt();

}


public void log(String msg) throws InterruptedException  {

synchronized (this) {

if(isShutdown) throw new IllegalStateException(...);

++reservations;

}

queue.put(msg);

}


private class LoggerThread extends Thread {

public void run() {

try {

while(true) {

try {

synchronized (LogService.this) {

if (isShutdown && reservations == 0) break;

}

String msg = queue.take();

synchronized(LogService.this) { --reservations; }

writer.println(msg);

}catch(InterruptedException e) { .... }

}

} finally {

writer.close();

}

}

}

}


毒丸对象

适用情况:在生产者和消费者的数量已知的情况下, 才 可以使用。除非完全确定可以保证这个 毒丸对象 不会被阻塞。不然就只能在无界队列中才能安全的使用。

public class IndexingService {
    private static final File POISON = new FIle("");
    private final IndexerThread consumer = new IndexerThread();
    private final CrawlerThread producer = new CrawlerThread();
    private final BlockingQueue<File queue;
    private final FileFilter filterFilter;
    private final File root;
    
    class CrawlerThread extends Thread {
        public void run() {
            try {
                crawl(root);
            } catch (InterruptedException e) { /**/ }
            finally {
                try {
                    queue.put(POISON);//#在某些情况下,put poison   如果这个queue超了,那可能永远都要阻塞等待下去
                } catch (InterruptedException e) { /**/ }
            }
        }
    }
    
    public class IndexerThread extends Thread {
        public void run() {
            try {
                while (true) {
                    File file = queue.take();
                    if (file == POISON)
                        break;
                    else
                        indexFile(file);
                }
            }catch(InterruptedException e) {}
        }
    }
    
    public void start() {
        producer.start();
        consumer.start();
    }
    
    public void stop() {
        producer.interrupt();
    }
    
    public void awaitTermination() throws InterruptedException {
        consumer.join();
    }
}


使用Executors完成一次性任务的关闭

public void someMethod() {
ExecutorService exec = Executors.newCachedThreadPool();
try{
doSomething();
} finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
}


shutdownNow的局限性:会取消正在执行的任务。我们就需要将这些任务保存起来或者写入日志,方便对这些进行处理。

线程在被关闭时,我们无法知道哪些线程被关闭,故  一定要在线程内部对这些状态进行处理。。。。封装还是很重要的。


处理非正常关闭的线程,可以通过Thread.UncaughtExceptionHandler。还可以用setDefaultUncaughtExceptionHandler来设置默认的UncaughtExceptionHandler。


JVM关闭

在正常关闭中,JVM首先调用所有已注册的Shutdown hook。shutdownhook是通过Runtime.addShutdownHook注册的但尚未开始的线程。shutdownhook的调用顺序是无法确认的。JVM不会停止或者中断任何在关闭时仍然运行的应用程序线程,这些线程在最终JVM结束时,会被结束。

public void start() {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            try { LogService.this.stop(); }
            catch (InterruptedException ignored) {}
        }
    });
}


 类似资料: