当前位置: 首页 > 知识库问答 >
问题:

Java ExecutorService-任务/调用不取消/中断

鲁建茗
2023-03-14

我正在使用Java ExecutorService(ThreadPool)执行一个任务&更新UI,而特定的活动处于前台(可见)。

问题:我想要的是当用户切换到另一个活动时,我想要停止/取消所有的任务(无论是排队的还是正在运行的)。为此,我必须使用ExecutorService shutdown/shutdownNow方法,或者在调用isDone()检查未来对象状态后,对ExecutorService submit方法返回的未来对象使用cancel(true)。这会将中断对应的线程标志设置为TRUE,我必须在可调用的实现中检查(thread.CurrentThread.IsInterrupted())以确定是否中断,退出任务/线程。问题是我是调用ExecutorService shutdown方法还是Future cancel(true)方法,在这两种情况下,很少10次中的1次会将线程中断标志设置为true,这最终会导致内存泄漏等。

代码

private static class ThreadPoolManager {

    private ExecutorService executorService;
    private List<Future> queuedFutures;
    private BlockingQueue<Runnable> blockingQueue;

    private static ThreadPoolManager instance;

    private ThreadPoolManager() {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
        queuedFutures = new ArrayList<>();
        blockingQueue = new LinkedBlockingDeque<>();
        executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
    }

    static {
        instance = new ThreadPoolManager();
    }

    public static void submitItemTest(Callable<Object> callable) {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
        if(instance.executorService.isShutdown()){
            instance=new ThreadPoolManager();
        }
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    }

    public static void submitTestAll(Callable<Object> callable) {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
        if(instance.executorService.isShutdown()){
            instance=new ThreadPoolManager();
        }
        cancelAll();
        Future future = instance.executorService.submit(callable);
        instance.queuedFutures.add(future);
    }

    public static void cancelAll() {
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
        instance.blockingQueue.clear();
        for (Future future : instance.queuedFutures) {
            if (!future.isDone()) {
                boolean cancelled = future.cancel(true);
                MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
            }
        }
        instance.queuedFutures.clear();
    }

    public static void shutdownExecutor(){
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
        instance.executorService.shutdownNow();
    }
}

可调用实现(正常迭代&if子句检查中断):

private Callable<Object> getTestAllCallable() {
        return new Callable<Object>() {
            @Override
            public Object call() {
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                    if (!Thread.currentThread().isInterrupted()) {
                          //someWork

                    } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                        return null;
                    }
                }
                return null;
            }
        };
    }

活动/片段onStop实现(用于调用cancel task和shutdown):

@Override
public void onStop() {
    MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
    ThreadPoolManager.cancelAll();
    ThreadPoolManager.shutdownExecutor();
    super.onStop();
}

更新:

>

  • 从使用Runnable而不是Callable移动。

    现在不对ExecutorService使用singleton。

      private class ThreadPoolManager {
    
        private ExecutorService executorService;
        private List<Future> queuedFutures;
        private BlockingQueue<Runnable> blockingQueue;
    
        private ThreadPoolManager() {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
            queuedFutures = new ArrayList<>();
            blockingQueue = new LinkedBlockingDeque<>();
            executorService =getNewExecutorService();
        }
    
        private ExecutorService getNewExecutorService(){
            return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
        }
    
        private void submitItemTest(Runnable runnable) {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
            if(executorService.isShutdown()){
                executorService=getNewExecutorService();
            }
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        }
    
        private void submitTestAll(Runnable runnable) {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
            if(executorService.isShutdown()){
                executorService=getNewExecutorService();
            }
            cancelAll();
            Future future = executorService.submit(runnable);
            queuedFutures.add(future);
        }
    
        private void cancelAll() {
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
            blockingQueue.clear();
            for (Future future : queuedFutures) {
                if (!future.isDone()) {
                    boolean cancelled = future.cancel(true);
                    MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
                }
            }
            queuedFutures.clear();
        }
    
        private void shutdownExecutor(){
            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
            executorService.shutdownNow();
            blockingQueue.clear();
            queuedFutures.clear();
        }
    }
    

    找出了罪魁祸首但还没找到解决办法。下面是Runnables 1的实现,其中1正在工作(IsInterruptedException返回true,或出现InterupptedException和than task ended)但不是Other。

    new Runnable() {
              @Override
              public void run() {
                        int i=0;
                        while(!Thread.currentThread().isInterrupted()){
                            try {
                                System.out.println(i);
                                Thread.currentThread().sleep(2000);
                            } catch (InterruptedException e) {
                                MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
                                return;
                            }
                            i++;
                        }
                    }
                }
    
    new Runnable(){
                @Override
                public void run() {
                    for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                        if (!Thread.currentThread().isInterrupted()) {
    
                        } else {
                            MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
                            break;
                        }
                    }
                }
            };
    

    一个可能的解决方案是使用变量(布尔值)作为runnable内部的中断标志,我将考虑作为最后的手段,但很乐意了解错误所在。

  • 共有1个答案

    闻人德庸
    2023-03-14

    根据executorservice文档,关闭正在执行的任务是在尽最大努力的基础上完成的。

    因此,当您调用ExecutorService.ShutdownNow()时,实现将尝试关闭当前正在执行的所有任务。每个任务将保持运行,直到它检测到任务被中断为止。

    为了确保您的线程在早期达到该点,最好在您的循环中添加一个检查,检查线程是否被互连,如下所示:

    Thread.currentThread().isInterrupted();
    

    通过在每次迭代中进行此调用,您的线程将检测到与实际插入间隔很短的中断。

    因此修改后的可调用代码如下所示:

    private Callable<Object> getTestAllCallable() {
        return new Callable<Object>() {
            @Override
            public Object call() {
                for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
                    if(Thread.currentThread().isInterrupted()) {
                        return null;
                    }
                    if(someCondition) {
                        //someWork
                    } else {
                        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
                        return null;
                    }
                }
                return null;
            }
        };
    }
    

    顺便说一下,如果您不打算从call()方法返回任何值,那么使用callable是没有意义的。如果在任务中需要参数化类型,只需创建一个参数化的runnable,如下所示:

    public class ParameterizedRunnable<T> implements Runnable {
        private final T t;
    
        public ParameterizedRunnable(T t) {
            this.t = t;
        }
    
        public void run() {
            //do some work here
        }
    }
    
     类似资料:
    • 我正在使用执行器服务并行运行任务。并行运行方法采用输入整数并返回整数 。由于并行任务具有返回类型,因此我使用了可调用的匿名类。您可以在下面的示例中看到 是从 executer 调用的。任务方法也有1秒的等待时间,并为抛出异常; 在下面的实现中,我使用invokeAll和isDone,并尝试收集数据。 下面的程序抛出。 未来任务迭代和检查有什么问题 isDone 和 get() 。如何处理特定调用的

    • 问题内容: 我正在寻找可以提供超时的ExecutorService实现。如果提交到ExecutorService的任务花费的时间超过了超时时间,则这些任务将被中断。实现这样的野兽并不是一个困难的任务,但是我想知道是否有人知道现有的实现。 这是我根据以下一些讨论得出的。任何意见? 问题答案: 你可以为此使用ScheduledExecutorService。首先,你只提交一次即可立即开始,并保留创建的

    • 问题内容: 等待所有任务完成的最简单方法是什么?我的任务主要是计算,所以我只想运行大量的作业-每个内核上一个。现在,我的设置如下所示: 实现可运行。这似乎是正确执行的任务,但代码崩溃上用。这很奇怪,因为我玩了一些玩具示例,而且看起来很奏效。 包含数以万计的元素。我应该使用其他方法吗?我正在寻找尽可能简单的东西 问题答案: 最简单的方法是使用单行代码执行所需的操作。用你的话来说,你需要修改或包装以实

    • 本文向大家介绍uwp 取消注册任务,包括了uwp 取消注册任务的使用技巧和注意事项,需要的朋友参考一下 示例            

    • 我们已经在 无阻塞调用 一节中看到了取消任务的示例。 在这节,我们将回顾一下,在一些更加详细的情况下取消的语义。 一旦任务被 fork,可以使用 yield cancel(task) 来中止任务执行。取消正在运行的任务,将抛出 SagaCancellationException 错误。 来看看它是如何工作的,让我们先考虑一个简单的例子:一个可通过某些 UI 命令启动或停止的后台同步任务。 在接收到

    • 我正在将一些用于iOS和OSX的Ant构建转换为Gradle。创建了以下内容: 这可能很简单,但我做错了什么?如何从自定义类中调用exec任务?