当前位置: 首页 > 知识库问答 >
问题:

Java:ExecutorService在特定队列大小后阻止提交[重复]

苏畅
2023-03-14

我正在尝试编写一个解决方案,其中单个线程产生可以并行执行的I/O密集型任务。每个任务都有重要的内存数据。所以我希望能够限制一次挂起的任务数量。

如果我像这样创建ThreadPoolExecator:

    ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads, numWorkerThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>(maxQueue));

然后是执行器。submit(可调用)在队列已满且所有线程都已忙时抛出RejectedExecutionException。

我该怎么做才能成为执行者。提交(可调用)在队列已满且所有线程都忙时阻止?

编辑:我尝试了以下操作:

executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

它在某种程度上实现了我想要实现的效果,但以一种不优雅的方式(基本上被拒绝的线程在调用线程中运行,因此这会阻止调用线程提交更多)。

编辑:(提问后5年)

对于阅读此问题及其答案的任何人,请不要将已接受的答案视为一个正确答案。请通读所有答案和评论。

共有3个答案

子车新立
2023-03-14

当前接受的答案有一个潜在的重大问题-它改变了ThreadPoolExecutor的行为。执行时,如果您有一个corePoolSize

来自ThreadPoolExecutor。执行(可运行):

    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);

具体来说,最后的“else”块永远不会被击中。

一个更好的替代方案是做一些类似于OP已经在做的事情-使用Reject tedExecutionHandler执行相同的put逻辑:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    try {
        if (!executor.isShutdown()) {
            executor.getQueue().put(r);
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        throw new RejectedExecutionException("Executor was interrupted while the task was waiting to put on work queue", e);
    }
}

正如评论中所指出的(参考此答案),这种方法需要注意一些事情:

  1. 如果corePoolSize==0,则存在竞态条件,池中的所有线程都可能在任务可见之前死亡
  2. 使用包装队列任务的实现(不适用于ThreadPoolExecutor)将导致问题,除非处理程序也以相同的方式包装它

记住这些问题,此解决方案将适用于大多数典型的ThreadPoolExecutors,并将正确处理corePoolSize

丁阳羽
2023-03-14

这是我如何解决这个问题的:

(注意:此解决方案会阻止提交可调用的线程,因此可以防止抛出RejectedExecutionException)

public class BoundedExecutor extends ThreadPoolExecutor{

    private final Semaphore semaphore;

    public BoundedExecutor(int bound) {
        super(bound, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        semaphore = new Semaphore(bound);
    }

    /**Submits task to execution pool, but blocks while number of running threads 
     * has reached the bound limit
     */
    public <T> Future<T> submitButBlockIfFull(final Callable<T> task) throws InterruptedException{

        semaphore.acquire();            
        return submit(task);                    
    }


    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);

        semaphore.release();
    }
}
丁承德
2023-03-14

我也做过同样的事。诀窍是创建一个BlockingQueue,其中offer()方法实际上是put()。(您可以使用所需的任何基本BlockingQueue impl)。

public class LimitedQueue<E> extends LinkedBlockingQueue<E> 
{
    public LimitedQueue(int maxSize)
    {
        super(maxSize);
    }

    @Override
    public boolean offer(E e)
    {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
            put(e);
            return true;
        } catch(InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        return false;
    }

}

注意,这只适用于corePoolSize==maxPoolSize的线程池,所以要小心(参见注释)。

 类似资料:
  • 问题内容: 我正在尝试编写一个解决方案,其中单个线程会生成可并行执行的I / O密集型任务。每个任务都有重要的内存数据。因此,我希望能够限制当前待处理的任务数。 如果我这样创建ThreadPoolExecutor: 然后,当队列已满并且所有线程都已经繁忙时,抛出该异常。 当队列已满并且所有线程都忙时,我该怎么做才能阻塞? 编辑 :我试过了: 它以某种微妙的方式达到了我想要达到的效果(基本上被拒绝的

  • 问题内容: 我搜索了一堆页面,但是找不到我的问题,所以我不得不发表一个帖子。 我有一个带有提交按钮的表单,提交后我不希望它刷新或重定向。我只希望jQuery执行功能。 形式如下: 这是jQuery: 问题答案: 只需处理Submit事件中的表单提交,然后返回false: 您不再需要提交按钮上的onclick事件:

  • 我不知所措。我有一个阻塞请求 此代码通常会阻塞<code>队列。takeFirst()当队列中没有项目时。但是,一旦我按预期添加了项目,它就不会解除阻止。调试时,我可以看到中的项目,并且当我停止Tomcat时,我会序列化。启动后,我将对队列进行反序列化,并在那一点上对<code>队列进行反串行化。takeFirst()检索项目(与之前未检索的相同)并提交它。 有人有什么想法吗? 编辑 再强调一下我

  • 假设,我有一个服务A,它有一个线程池执行器来调用服务B。我们可以用自己的值设置该池的核心池大小和队列。现在,服务B对请求的响应速度很慢,因为服务A线程池中的活动线程增加,导致阻塞队列大小增加。如何防止服务A的队列大小增加? 背景

  • 我想要实现的是,我有一个来自UI的REST调用,它调用添加一个用户。因此,用户必须执行异步队列(这是一个约束),但在将结果发送回UI之前,必须等待响应队列一段配置的时间并对其进行处理。如果队列返回时引用号为空,那么我必须删除用户记录并抛出异常,说明用户无效。如果响应返回时引用有效(或者超时发生),那么我假设它有效并返回success。 我有一个应用程序,我在其中发送一条队列消息,以获取我的用户对象

  • 下面是两种主要的方法。 代码: