当前位置: 首页 > 知识库问答 >
问题:

当ExecutorService上的所有任务完成或取消时,为什么WaitTermination不能可靠地返回

仲孙思源
2023-03-14

我是我的代码,我向ExecutorService提交一些任务,然后使用shutdown()和awaitTermination()等待它们完成。但是,如果任何一个任务需要超过一个特定的时间来完成,我希望它被取消,而不影响其他任务。我使用来自ExecutorService的代码修正代码,该代码在超时后中断任务,如下所示:

package com.jthink.jaikoz.memory;

import com.jthink.jaikoz.MainWindow;

import java.util.List;
import java.util.concurrent.*;

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private boolean isShutdown = false;

    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public void shutdown() {
        isShutdown = true;
        super.shutdown();
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

        if (isShutdown)
        {
            if(getQueue().isEmpty())
            {
                //Queue is empty so all tasks either finished or currently running
                MainWindow.logger.severe("---Thread Pool Queue is Empty");
                timeoutExecutor.shutdown();
            }
        }
    }

    /**
     * Interrupt the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final Thread thread;

        public TimeoutTask(Thread thread) {
            this.thread = thread;
        }

        @Override
        public void run() {
            MainWindow.logger.severe("Cancelling task because taking too long");
            thread.interrupt();
        }
    }
}

以及任务何时有时间完成和何时不能按预期工作的测试案例

package com.jthink.jaikoz;

import com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor;
import junit.framework.TestCase;

import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

/**
 * Created by Paul on 08/12/2014.
 */
public class TestThreadPool extends TestCase
{
    public void testThreadPoolTasksComplete() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 6, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }

    public void testThreadPoolTasksCancelled() throws Exception
    {
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 10; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(5000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        System.out.println("Program done");
    }
}
private boolean matchToRelease(ListMultimap<MatchKey, MetadataChangedWrapper> matchKeyToSongs)
            throws JaikozException
    {
        if (stopTask)
        {
            MainWindow.logger.warning("Analyser stopped detected in matchToRelease");
            return false;
        }

        TimeoutThreadPoolExecutor es = getExecutorService();
        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>(matchKeyToSongs.size());
        for(MatchKey matchKey:matchKeyToSongs.keySet())
        {
            List<MetadataChangedWrapper> songs = matchKeyToSongs.get(matchKey);
            futures.add(es.submit(new CorrectFromMusicBrainzWorker(this, stats, matchKey, songs)));
        }
        es.shutdown();
        try
        {
            es.awaitTermination(matchKeyToSongs.keySet().size() * es.getTimeout(), es.getTimeoutUnit());
        }
        catch(InterruptedException ie)
        {
            MainWindow.logger.warning(this.getClass() + " has been interrupted");
            return false;
        }
        return true;
    }

然而对于一个客户即使

---Thread Pool Queue is Empty

是output awaitTermination()不返回,仅在用户两小时后取消任务时最终返回-此处为完整日志解压缩

14/12/2014 20.44.19:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzWorker:getSongsNotMatched:SEVERE: /Volumes/2TB External/New iTunes Library/iTunes Media/Music/XTC:albumMetadataMatchingCounts11:AlreadyMatched:2:ToMatch:11
14/12/2014 20.44.19:com.jthink.jaikoz.memory.TimeoutThreadPoolExecutor:afterExecute:SEVERE: ---Thread Pool Queue is Empty
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.ExecutorServiceEnabledAnalyser:cancelTask:WARNING: Cancelling class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser Task
14/12/2014 22.18.01:com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser:matchToRelease:WARNING: class com.jthink.jaikoz.manipulate.CorrectFromMusicBrainzAnalyser has been interrupted

那么,即使logs show队列是空的,并且因此对执行器本身和嵌入的timeoutExecutor都调用了shutdown(),怎么可能awaiTermination()不返回呢?

其次,为什么线程池队列为空有时会得到多次输出

TimeOutExecutor是单线程的,这是正确的/必要的吗?

基于Holgers答案的更新

所以您是说我shutdown()timeoutExecutor太早了(可能有高达WorkerSize-1的任务仍在运行),这意味着仍在为尚未完成的任务运行的所有timeoutExecutor都被中断了。因此,如果剩余的任务中的任何一个由于某种原因没有自行完成,则它们的超时任务不再存在,因此不能用于中断它们。但是waitTermination()woiuldnt返回的唯一原因是最后一个(WorkerSize-1)任务中的一个没有完成。

我自己已将beforeExecute()更改为

protected void afterExecute(Runnable r, Throwable t) {
    ScheduledFuture timeoutTask = runningTasks.remove(r);
    if(timeoutTask != null) {
        timeoutTask.cancel(false);
    }
    if (isShutdown)
    {
        if(getQueue().isEmpty())
        {

            if(runningTasks.size()==0)
            {
                this.shutdownNow();
            }
        }
    }
}

为了确保它能完成,我使用了shutdownNow(),但要等到全部完成,但根据您的评论,这仍然可能无法工作

我应该这么做

protected void afterExecute(Runnable r, Throwable t) {
    ScheduledFuture timeoutTask = runningTasks.remove(r);
    if(timeoutTask != null) {
        timeoutTask.cancel(false);
    }
}

而且

protected void terminated() 
{
    timeoutExecutor.shutdown();
}

并且一旦所有提交的任务完成(自然完成或通过相应的timeoutExecutor取消)就会调用终止()timeoutExecutor在此时仍然存在并不重要?

public void testThreadPoolTasksCancelled() throws Exception
    {
        Instant t1, t2;
        t1 = Instant.now();
        final TimeoutThreadPoolExecutor executorService = new TimeoutThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 3, TimeUnit.SECONDS);

        for (int i = 0; i < 50; i++)
        {
            executorService.submit(new Callable<Object>()
            {
                @Override
                public Object call() throws Exception
                {
                    Thread.sleep(500000000);
                    System.out.println("Done");
                    return null;
                }

            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
        t2 = Instant.now();
        System.out.println("Program done:"+(Duration.between(t1, t2).toMillis()/ 1000+ " seconds"));
    }

共有1个答案

索正豪
2023-03-14

队列只包含尚未启动的作业。拥有空队列并不意味着没有挂起的作业;他们可能只是为了被处决而被移除。特别是在您的示例代码中,认为空队列意味着没有正在运行的作业的假设是错误的;由于您将执行器配置为具有十个核心线程并提交十个作业,因此在示例代码的整个执行过程中,队列将始终为空。

因此,您遇到的问题是您关闭timeoutexecutor得太早了,因此它可能会错过一个或多个任务,从而中断线程池执行器的挂起任务。

注意,原则上,作业甚至可能处于从队列中删除(如果添加了作业)的状态,但beforeexecute尚未被调用。因此,即使有一个空队列和一个空的runningtasks映射,也不能保证没有挂起的作业。

要回答您的另一个问题,您必须关闭TimeoutExecutor,因为它有一个关联的活动线程,该线程将始终保持执行程序活动。因此不关闭它将会产生内存泄漏并进一步保持线程活动,因此始终防止JVM自动关闭。

但是,关闭TimeoutExecutor的正确位置是对Protected void terminated()方法的重写,该方法正好用于清理。

最后一点是,timeoutexecutor有多少个线程并不重要,但考虑到任务的简单程度,多个线程并没有什么好处,单线程执行器是最简单也可能是最有效的解决方案。

 类似资料:
  • 1、任务中有未完成的子任务; 2、没有任务状态置位权限(有此权限的角色位【任务创建者/执行者】、【项目创建者】)

  • 问题内容: 我将一堆可运行的对象放入ExecutorService中: 我希望我的程序/过程在所有工作人员完成后立即停止。但是根据我的日志,这种情况还需要20到30秒。工人没有分配任何资源,实际上,他们目前什么都不做。 不要误会我的意思,这对我来说不是一个关键问题,我只是想了解正在发生的事情,而且我想知道这是否是正常行为。 问题答案: 使用它的。的javadocs说“每个新线程都被创建 为非守护

  • 问题内容: 我有ConcurrentLinkedDeque,它用于同步push / pop元素,还有一些异步任务,这些任务正在从堆栈中获取一个元素,如果该元素具有邻居,则会将其推入堆栈。 示例代码: 我想在while循环中有另外一条语句来回答问题-“执行程序中的任何任务都在工作?” 问题答案: 如果使用,没有一种干净的方法来检查所有Runnable是否都已完成。除非您在Runnable本身中构建了

  • 我正在用Java编写一个应用程序,它使用ExecutorService来运行多个线程。 我希望将多个任务(一次数千个)作为调用项提交给执行器,完成后,检索它们的结果。我处理这个问题的方式是,每次调用submit()函数时,都会得到一个Future(未来),并将其存储在ArrayList中。稍后,我将列表传递给一个线程,该线程不断对其进行迭代,调用future.get()函数并给出一个超时时间,以查

  • 我将未来从ExecutorService推送到哈希映射中。稍后,我可以在散列图中调用Futures上的cancel。虽然结果是真的,但我后来在可调用过程中遇到了断点,好像Future cancel()没有任何效果。我认为这里可能有两个不同的引用(即使在中断时引用ID是一样的),但我想知道是否有一些专家可以插话。代码如下所示: 我允许继续处理(这是一个在传入任务时提交任务的循环),稍后我可能会尝试通

  • 我正在使用Java ExecutorService(ThreadPool)执行一个任务&更新UI,而特定的活动处于前台(可见)。 问题:我想要的是当用户切换到另一个活动时,我想要停止/取消所有的任务(无论是排队的还是正在运行的)。为此,我必须使用ExecutorService shutdown/shutdownNow方法,或者在调用isDone()检查未来对象状态后,对ExecutorServic