我正在使用Java ExecutorService(ThreadPool)执行一个任务&更新UI,而特定的活动处于前台(可见)。
问题:我想要的是当用户切换到另一个活动时,我想要停止/取消所有的任务(无论是排队的还是正在运行的)。为此,我必须使用ExecutorService shutdown/shutdownNow方法,或者在调用isDone()检查未来对象状态后,对ExecutorService submit方法返回的未来对象使用cancel(true)。这会将中断对应的线程标志设置为TRUE,我必须在可调用的实现中检查(thread.CurrentThread.IsInterrupted())以确定是否中断,退出任务/线程。问题是我是调用ExecutorService shutdown方法还是Future cancel(true)方法,在这两种情况下,很少10次中的1次会将线程中断标志设置为true,这最终会导致内存泄漏等。
代码:
private static class ThreadPoolManager {
private ExecutorService executorService;
private List<Future> queuedFutures;
private BlockingQueue<Runnable> blockingQueue;
private static ThreadPoolManager instance;
private ThreadPoolManager() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
queuedFutures = new ArrayList<>();
blockingQueue = new LinkedBlockingDeque<>();
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
}
static {
instance = new ThreadPoolManager();
}
public static void submitItemTest(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void submitTestAll(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
cancelAll();
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void cancelAll() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
instance.blockingQueue.clear();
for (Future future : instance.queuedFutures) {
if (!future.isDone()) {
boolean cancelled = future.cancel(true);
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
}
}
instance.queuedFutures.clear();
}
public static void shutdownExecutor(){
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
instance.executorService.shutdownNow();
}
}
可调用实现(正常迭代&if子句检查中断):
private Callable<Object> getTestAllCallable() {
return new Callable<Object>() {
@Override
public Object call() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
return null;
}
}
return null;
}
};
}
活动/片段onStop实现(用于调用cancel task和shutdown):
@Override
public void onStop() {
MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
ThreadPoolManager.cancelAll();
ThreadPoolManager.shutdownExecutor();
super.onStop();
}
更新:
>
从使用Runnable而不是Callable移动。
现在不对ExecutorService使用singleton。
private class ThreadPoolManager {
private ExecutorService executorService;
private List<Future> queuedFutures;
private BlockingQueue<Runnable> blockingQueue;
private ThreadPoolManager() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
queuedFutures = new ArrayList<>();
blockingQueue = new LinkedBlockingDeque<>();
executorService =getNewExecutorService();
}
private ExecutorService getNewExecutorService(){
return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
}
private void submitItemTest(Runnable runnable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
if(executorService.isShutdown()){
executorService=getNewExecutorService();
}
Future future = executorService.submit(runnable);
queuedFutures.add(future);
}
private void submitTestAll(Runnable runnable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
if(executorService.isShutdown()){
executorService=getNewExecutorService();
}
cancelAll();
Future future = executorService.submit(runnable);
queuedFutures.add(future);
}
private void cancelAll() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
blockingQueue.clear();
for (Future future : queuedFutures) {
if (!future.isDone()) {
boolean cancelled = future.cancel(true);
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
}
}
queuedFutures.clear();
}
private void shutdownExecutor(){
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
executorService.shutdownNow();
blockingQueue.clear();
queuedFutures.clear();
}
}
找出了罪魁祸首但还没找到解决办法。下面是Runnables 1的实现,其中1正在工作(IsInterruptedException返回true,或出现InterupptedException和than task ended)但不是Other。
new Runnable() {
@Override
public void run() {
int i=0;
while(!Thread.currentThread().isInterrupted()){
try {
System.out.println(i);
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
return;
}
i++;
}
}
}
new Runnable(){
@Override
public void run() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
break;
}
}
}
};
一个可能的解决方案是使用变量(布尔值)作为runnable内部的中断标志,我将考虑作为最后的手段,但很乐意了解错误所在。
根据executorservice
文档,关闭正在执行的任务是在尽最大努力的基础上完成的。
因此,当您调用ExecutorService.ShutdownNow()
时,实现将尝试关闭当前正在执行的所有任务。每个任务将保持运行,直到它检测到任务被中断为止。
为了确保您的线程在早期达到该点,最好在您的循环中添加一个检查,检查线程是否被互连,如下所示:
Thread.currentThread().isInterrupted();
通过在每次迭代中进行此调用,您的线程将检测到与实际插入间隔很短的中断。
因此修改后的可调用
代码如下所示:
private Callable<Object> getTestAllCallable() {
return new Callable<Object>() {
@Override
public Object call() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if(Thread.currentThread().isInterrupted()) {
return null;
}
if(someCondition) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
return null;
}
}
return null;
}
};
}
顺便说一下,如果您不打算从call()
方法返回任何值,那么使用callable
是没有意义的。如果在任务中需要参数化类型,只需创建一个参数化的runnable
,如下所示:
public class ParameterizedRunnable<T> implements Runnable {
private final T t;
public ParameterizedRunnable(T t) {
this.t = t;
}
public void run() {
//do some work here
}
}
我正在使用执行器服务并行运行任务。并行运行方法采用输入整数并返回整数 。由于并行任务具有返回类型,因此我使用了可调用的匿名类。您可以在下面的示例中看到 是从 executer 调用的。任务方法也有1秒的等待时间,并为抛出异常; 在下面的实现中,我使用invokeAll和isDone,并尝试收集数据。 下面的程序抛出。 未来任务迭代和检查有什么问题 isDone 和 get() 。如何处理特定调用的
问题内容: 我正在寻找可以提供超时的ExecutorService实现。如果提交到ExecutorService的任务花费的时间超过了超时时间,则这些任务将被中断。实现这样的野兽并不是一个困难的任务,但是我想知道是否有人知道现有的实现。 这是我根据以下一些讨论得出的。任何意见? 问题答案: 你可以为此使用ScheduledExecutorService。首先,你只提交一次即可立即开始,并保留创建的
问题内容: 等待所有任务完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的作业-每个内核上一个。现在,我的设置如下所示: 实现可运行。这似乎是正确执行的任务,但代码崩溃上用。这很奇怪,因为我玩了一些玩具示例,而且看起来很奏效。 包含数以万计的元素。我应该使用其他方法吗?我正在寻找尽可能简单的东西 问题答案: 最简单的方法是使用单行代码执行所需的操作。用你的话来说,你需要修改或包装以实
本文向大家介绍uwp 取消注册任务,包括了uwp 取消注册任务的使用技巧和注意事项,需要的朋友参考一下 示例
我们已经在 无阻塞调用 一节中看到了取消任务的示例。 在这节,我们将回顾一下,在一些更加详细的情况下取消的语义。 一旦任务被 fork,可以使用 yield cancel(task) 来中止任务执行。取消正在运行的任务,将抛出 SagaCancellationException 错误。 来看看它是如何工作的,让我们先考虑一个简单的例子:一个可通过某些 UI 命令启动或停止的后台同步任务。 在接收到
我正在将一些用于iOS和OSX的Ant构建转换为Gradle。创建了以下内容: 这可能很简单,但我做错了什么?如何从自定义类中调用exec任务?