当前位置: 首页 > 知识库问答 >
问题:

如何扩展FutureTask并确保释放对Callable的引用?

莘聪
2023-03-14

我有自定义的ExecutorService,其中包含一个可用于中断提交给ExecutorSerice的任务,如果他们花了太长时间,我把完备类放在这篇文章的末尾。

这工作正常,除了有时中断本身会引起问题,所以我把一个不稳定的布尔取消标志添加到一个新的CanceableWork类中,并把它们子类为this,这样它们就可以检查并停止自己,如果它们的布尔值已经被删除了发送到真实。请注意,它们是提交给执行器服务的每个类中布尔值的实例,因此可以取消长时间运行的任务,而不取消其他任务。

但是,FutureTask作为参数传递给beforeExecute(线程t,Runnable r),这不允许访问可调用类,因此我的超时代码无法设置取消标志。

我通过重写newTaskFor方法返回一个只提供对可调用函数的引用的类来解决这个问题

public class FutureCallable<V> extends FutureTask<V>
{
    private Callable<V> callable;
    public FutureCallable(Callable<V> callable) {
        super(callable);
        this.callable = callable;
    }
    public Callable<V> getCallable() {
        return callable;
    }
}

一切都很顺利,至少我是这么想的。

不幸的是,我的应用程序现在使用越来越多的内存,因为新任务被提交给ExecutorService,并最终运行内存溢出,当我配置文件的应用程序,我发现有一个线程堆栈本地引用的所有FutureCallable,即使在任务完成后,因为FutureCallable有一个对正在运行的类的引用,它使用大量的内存。

当我查看(FutureCallable扩展的)FutureWork的代码时,有一个针对私有Callable引用的注释说

/** The underlying callable; nulled out after running */

那么,我如何改进我的FutureCallable以消除它对Callable的引用呢?或者,为什么在任务完成后会维护一个对未来调用的引用。

我已经确认,如果我注释掉newTaskFor方法,就不会有过多的内存使用,但不幸的是,我无法取消该类。

完成类是:

public class TimeoutThreadPoolExecutor extends ThreadPoolExecutor {
    private final long timeout;
    private final TimeUnit timeoutUnit;

    private final static int WAIT_BEFORE_INTERRUPT = 10000;
    private final static int WAIT_BEFORE_STOP      = 10000;


    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();

    //Map Task to the Future of the Timeout Task that could be used to interrupt it
    private final ConcurrentMap<Runnable, ScheduledFuture> runningTasks = new ConcurrentHashMap<Runnable, ScheduledFuture>();

    public long getTimeout()
    {
        return timeout;
    }

    public TimeUnit getTimeoutUnit()
    {
        return timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int workerSize, ThreadFactory threadFactory, long timeout, TimeUnit timeoutUnit)
    {
        super(workerSize, workerSize, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory);
        MainWindow.logger.severe("Init:"+workerSize+":Timeout:"+timeout+":"+timeoutUnit);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    public TimeoutThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, long timeout, TimeUnit timeoutUnit) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.timeout = timeout;
        this.timeoutUnit = timeoutUnit;
    }

    @Override
    public <T> FutureCallable<T> newTaskFor(Callable<T> callable) {
        return new FutureCallable<T>(callable);
    }

    @Override
    public List<Runnable> shutdownNow() {
        timeoutExecutor.shutdownNow();
        return super.shutdownNow();
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        if(timeout > 0) {
            //Schedule a task to interrupt the thread that is running the task after time timeout starting from now
            final ScheduledFuture<?> scheduled = timeoutExecutor.schedule(new TimeoutTask(t, r), timeout, timeoutUnit);

            //Add Mapping
            runningTasks.put(r, scheduled);
        }
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {

        //AfterExecute will be called after the task has completed, either of its own accord or because it
        //took too long and was interrupted by corresponding timeout task
        //Remove mapping and cancel timeout task
        ScheduledFuture timeoutTask = runningTasks.remove(r);
        if(timeoutTask != null) {
            timeoutTask.cancel(false);
        }

    }

    @Override
    protected void terminated()
    {
        //All tasks have completed either naturally or via being cancelled by timeout task so close the timeout task
        MainWindow.logger.severe("---Shutdown TimeoutExecutor");
        timeoutExecutor.shutdown();
    }

    /**
     * Interrupt or possibly stop the thread
     *
     */
    class TimeoutTask implements Runnable {
        private final       Thread thread;
        private             Callable c;

        public TimeoutTask(Thread thread, Runnable c) {
            this.thread = thread;
            if(c instanceof FutureCallable)
            {
                this.c = ((FutureCallable) c).getCallable();
            }
        }

        @Override
        public void run()
        {
            String msg = "";
            if (c != null)
            {
                if (c != null && c instanceof CancelableTask)
                {
                    MainWindow.logger.severe("+++Cancelling " + msg + " task because taking too long");
                    ((CancelableTask) c).setCancelTask(true);
                }
            }
        }
    }
}

    public abstract class CancelableTask  extends ExecutorServiceEnabledAnalyser
    {
        private volatile boolean cancelTask = false;

        public boolean isCancelTask() {
            return cancelTask;
        }

        public void setCancelTask(boolean cancelTask) {
            this.cancelTask = cancelTask;
        }

        CancelableTask(final MainWindow start, boolean isSelectedRecords, boolean isUseRowSelection)
        {
            super(start, isSelectedRecords, isUseRowSelection);
        }

        CancelableTask(final MainWindow start, List<MetadataChangedWrapper> songs)
        {
            super(start, songs );
        }

    }

共有1个答案

陶星波
2023-03-14

这个ThreadLocal在哪里?我觉得很奇怪,很难相信你说的话,它会引用所有曾经运行过的任务,即使是在完成之后。如果是这种情况,即使没有重写,它最终也会耗尽内存(任务本身使用了一些内存,尽管可能比可调用的内存少,但仍然不是零)。

无论如何,您可以重写FutureCallable上的do方法,以在执行后为空。

 类似资料:
  • 问题内容: 我已从提供回调来跟踪提交给的任务的执行。 现在,我看到的是任务是否已提交,但最终进入队列,并且任务仍被调用(方法仍然被调用),并且(可能)检查任务已取消且未调用包装的可调用对象。 我应该做例如 还是应该打电话给我?在检查取消和对其进行任何操作之间,这两种方法似乎都容易受到比赛条件的影响。 问题答案: 您说对了,那是对的。系统 最多 会调用 一次 ,因此如果任务在通过运行之前已被取消,您

  • 在我的项目中,我使用预定义的注释: 的源代码: 我想编写自定义注释,它将与使用(Secure.class)的

  • 问题内容: 我必须在Web应用程序中使用3个不同的事务管理器。因此,我根据Spring参考(第10.5.6.3节“自定义快捷方式注释”)编写了自己的注释。 一个注释(用于使用一个特定的transactionmanager)如下所示: 使用自定义的@CustomerTX批注对我的服务层进行批注时,一切工作正常。但是我必须为注释提供更多选项,例如readonly = true,rollbackFor

  • 问题内容: Java是按价值传递的。您如何修改语言以引入引用传递(或某些等效行为)? 以类似的东西为例 哪个(不带)编译为以下字节码 的代码将参考从变量加载到堆栈上。 我正在考虑的一种可能性是让编译器确定一个方法可以通过引用传递,并可能通过来更改该方法以接受一个Holder对象,该对象存储与我们的变量相同的引用。该方法完成后,并可能更改了持有人的引用,则调用方方值的变量将替换为持有人的引用值。 它

  • 问题 你想让C扩展代码和Python解释器中的其他进程一起正确的执行, 那么你就需要去释放并重新获取全局解释器锁(GIL)。 解决方案 在C扩展代码中,GIL可以通过在代码中插入下面这样的宏来释放和重新获取: #include "Python.h" ... PyObject *pyfunc(PyObject *self, PyObject *args) { ... Py_BEGIN_

  • 因此,我们目前正在将基于MQTT的消息后端中的Netty3.x升级到Netty4.1。在我们的应用程序中,我们使用自定义MQTT消息解码器和编码器。 对于我们的解码器,我目前使用的是一个ByteToMessageDecoder,如下所示: 其中是我们的自定义对象,它被传递到下一个的。正如您所看到的,当我从传入的对象中的传入的对象。那么,既然在netty中是引用计数的,那么我需要在这里通过调用来释放