线程创建后有几种情况会取消
1. 用户点击cancel取消
2. 超时取消,比如网页加载中有部分信息没被加载出来
3. 应用程序的逻辑设计,比如在多线程情况下,有一个线程找到了result,其他线程停止作业。
4. error。exception
5. 关闭程序,关闭服务
常用中断方式
while (!isCancelled) {
doSomething();
}
每次都去检查一下这个中断状态变量。这有个坏处,就是如果doSomething有阻塞。呵呵,可能就永远都执行不下去了。
Thread的三个方法
interrupt: 中断目标线程----------不会立即停止目标线程正在进行的工作,只是传递请求中断的消息
isInterrupted: 返回目标线程的中断状态
interrupted: 清楚当前线程的中断状态
核心点:中断是实现取消的最合理方法
while (!Thread.currentThread().isInterrupted()) {
doSomething();
}
之前的中断状态变量改成了判断线程是否被interrupte.
这里的情况是 用线程自身来中断。中断的操作也通过线程自身来进行。每个线程都有自身的中断策略。另,除非你知道这个线程的中断策略,不然不要中断这个线程。
上一个理念上实现的代码。不要考虑细节。
1.外部线程中中断
private static final ScheduledExecutorService cancelExec = ...;
public static void timedRun (Runnable r, long timeout, TimeUnit unit) {
final Thread taskThread = Thread.currentThread();
cancelExec.schedule(new Runnable() {
public void run() { taskThread.interrupt(); }
}, timeout, unit);
r.run();
}
2.在专门线程中中断任务
public static void timedRun(final Runnable r, long timeout, TimeUnit unit) throws InteruptedException {
class RethrowableTask implements Runnable {
private volatile Throwable t;
public void run() {
try { r.run(); }catch (Throwable t) { this.t = t }
}
void rethrow() {
if(t != null) throw launderThrowable(t);
}
}
RethrowableTask task = new RethrowableTask();
final Thread taskThread = new Thread(task);
taskThread.start();
cancelExec.schedule(new Runnable() {
public void run() { taskThread.interrupt(); }
}, timeout, unit);
taskThread.join(unit.toMillis(timeout)); //#这里的阻塞等待不知道是超时还是正常退出。
task.rethrow();
}
通过Future来实现取消
当取消某个任务时,不宜直接中断任务,因为不知道现在执行的是什么任务。
public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
Future<?> task = taskExec.submit(r);//#通过ExecutorService得到封装的Future
try {
task.get(timeout, unit);//#定时get
} catch (TimeoutException e) {
doSomething();
}catch(ExecutionException e){
doSomething();
throw launderThrowable(e.getCause());
}finally{
task.cancel(true);
}
}
处理不可中断的阻塞
1. java.io包中的同步Socket I/O: 这里个InputStream和OutputStream中的read和write不会相应中断,但这个基于套接字socket的。那么直接关闭这个套接字socket即可。这会抛出一个SocketException
2. Java.io包中的同步I/O: InterruptibleChannel 用来表现一个可以被异步关闭的 Channel 。这表现在两方面:
1. 当一个 InterruptibleChannel 的 close() 方法被调用时,其它 block 在这个 InterruptibleChannel 的 IO 操作上的线程会接收到一个 AsynchronousCloseException 。
2. 当一个线程 block 在 InterruptibleChannel 的 IO 操作上时,另一个线程调用该线程的 interrupt() 方法会导致 channel 被关闭,该线程收到一个ClosedByInterruptException ,同时线程的 interrupt 状态会被设置。
3. Selector的异步I/O: 如果一个线程在调用Selector.select方法时阻塞,那么调用close或者wakeup会使线程抛出ClosedSelectorException并提前返回。
4. 获取某个锁:可以使用lockInterruptibly方法中断
范例
public class ReaderThread extends Thread {
private final Socket socket;
private final InputStream in;
public ReaderThread(Socket socket) throws IOException {
this.socket = socket;
this.in = socket.getInputStream();
}
public void interrupt() {
try {
socket.close();
}catch(IOException ignored) {}
finally {
super.interrupter();
}
}
public void run() {
try {
byte[] buf = new byte[BUFSZ];
while(true) {
int count = in.read(buf);
if(count < 0) break;
else if(count > 0)
processBuffer(buf, count);
}
}catch(IOException e){}
}
}
另外,可以用newTaskFor来封装非标准的取消。
logger服务
范例:一段支持关闭,支持将队列中已有log信息打印出来的代码
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this") private boolean isShutdown;
@GuardedBy("this") private int reservations;
public void start() { loggerThread.start(); }
public void stop() {
synchronized (this) {isShutdown = true; }
loggerThread.interrupt();
}
public void log(String msg) throws InterruptedException {
synchronized (this) {
if(isShutdown) throw new IllegalStateException(...);
++reservations;
}
queue.put(msg);
}
private class LoggerThread extends Thread {
public void run() {
try {
while(true) {
try {
synchronized (LogService.this) {
if (isShutdown && reservations == 0) break;
}
String msg = queue.take();
synchronized(LogService.this) { --reservations; }
writer.println(msg);
}catch(InterruptedException e) { .... }
}
} finally {
writer.close();
}
}
}
}
毒丸对象
适用情况:在生产者和消费者的数量已知的情况下, 才 可以使用。除非完全确定可以保证这个 毒丸对象 不会被阻塞。不然就只能在无界队列中才能安全的使用。
public class IndexingService {
private static final File POISON = new FIle("");
private final IndexerThread consumer = new IndexerThread();
private final CrawlerThread producer = new CrawlerThread();
private final BlockingQueue<File queue;
private final FileFilter filterFilter;
private final File root;
class CrawlerThread extends Thread {
public void run() {
try {
crawl(root);
} catch (InterruptedException e) { /**/ }
finally {
try {
queue.put(POISON);//#在某些情况下,put poison 如果这个queue超了,那可能永远都要阻塞等待下去
} catch (InterruptedException e) { /**/ }
}
}
}
public class IndexerThread extends Thread {
public void run() {
try {
while (true) {
File file = queue.take();
if (file == POISON)
break;
else
indexFile(file);
}
}catch(InterruptedException e) {}
}
}
public void start() {
producer.start();
consumer.start();
}
public void stop() {
producer.interrupt();
}
public void awaitTermination() throws InterruptedException {
consumer.join();
}
}
使用Executors完成一次性任务的关闭
public void someMethod() {
ExecutorService exec = Executors.newCachedThreadPool();
try{
doSomething();
} finally {
exec.shutdown();
exec.awaitTermination(timeout, unit);
}
}
shutdownNow的局限性:会取消正在执行的任务。我们就需要将这些任务保存起来或者写入日志,方便对这些进行处理。
线程在被关闭时,我们无法知道哪些线程被关闭,故 一定要在线程内部对这些状态进行处理。。。。封装还是很重要的。
处理非正常关闭的线程,可以通过Thread.UncaughtExceptionHandler。还可以用setDefaultUncaughtExceptionHandler来设置默认的UncaughtExceptionHandler。
JVM关闭
在正常关闭中,JVM首先调用所有已注册的Shutdown hook。shutdownhook是通过Runtime.addShutdownHook注册的但尚未开始的线程。shutdownhook的调用顺序是无法确认的。JVM不会停止或者中断任何在关闭时仍然运行的应用程序线程,这些线程在最终JVM结束时,会被结束。
public void start() {
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
try { LogService.this.stop(); }
catch (InterruptedException ignored) {}
}
});
}