当前位置: 首页 > 面试题库 >

具有有限队列的线程池

祁驰
2023-03-14
问题内容

我已经看到了线程池执行程序的实现及其所提供的拒绝执行策略。但是,我有一个自定义要求-
我想拥有一个回调机制,在该机制中,当达到队列大小限制时,我会收到通知,并说何时队列大小减少到最大允许队列大小的80%。

public interface ISaturatedPoolObserver {
 void onSaturated(); // called when the blocking queue reaches the size limit
 void onUnsaturated(); // called when blocking queues size goes below the threshold.
}

我觉得可以通过子类化线程池执行程序来实现,但是已经有一个实现的版本吗?我很乐意在需要时提供更多详细信息和我的工作,以便提供清晰的信息。


问题答案:

我希望有一个回调机制,当达到队列大小限制时,我会在其中收到通知…

我不会将执行器子类化,但会将执行BlockingQueue器使用的子类化。像下面这样的东西应该起作用。checkUnsaturated()如果删除条目并将某人放回原处,则代码中存在竞争条件。如果这些条件需要完善,则可能必须在队列上进行同步。另外,我也不知道执行器实现使用什么方法,因此您可能不需要覆盖其中的一些方法。

public class ObservableBlockingQueue<E> extends LinkedBlockingQueue<E> {
     private ISaturatedPoolObserver observer;
     private int capacity;
     public ObservableBlockingQueue(ISaturatedPoolObserver observer,
         int capacity) {
         super(capacity);
         this.observer = observer;
         this.capacity = capacity;
    }
    @Override
    public boolean offer(E o) {
        boolean offered = super.offer(o);
        if (!offered) {
            observer.onSaturated();
        }
        return offered;
    }
    @Override
    public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException {
        boolean offered = super.offer(o, timeout, unit);
        if (!offered) {
            observer.onSaturated();
        }
        return offered;
    }
    @Override
    public E poll() {
        E e = super.poll();
        if (e != null) {
             checkUnsaturated();
        }
        return e;
    }
    @Override
    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E e = super.poll(timeout, unit);
        if (e != null) {
             checkUnsaturated();
        }
        return e;
    }
    @Override
    public E take() throws InterruptedException {
        E e = super.take();
        checkUnsaturated();
        return e;
    }
    @Override
    public boolean remove(E e) throws InterruptedException {
        boolean removed = super.remove(e);
        if (removed) {
            checkUnsaturated();
        }
        return removed;
    }
    private void checkUnsaturated() {
        if (super.size() * 100 / capacity < UNSATURATED_PERCENTAGE) {
            observer.onUnsaturated();
        }
    }
}


 类似资料:
  • 问题内容: 我正在使用的类创建用于运行Web服务器的请求处理程序的固定线程池: 并且说明是: 创建一个线程池,该线程池重用在共享的 无边界 队列上运行的一组固定线程。 但是,我正在寻找实现与缓冲池完全相同的线程池实现,除了使用有 界 队列。有这样的实现吗?还是我需要为固定线程池实现自己的包装器? 问题答案: 您想要做的是新建自己的ExecutorService,可能使用ThreadPoolExec

  • 问题内容: 我无法创建新线程。实际上,我编写了一个有点黑的程序,可以接受任何任务(即,它是不受限制的),但是调用了一个附加处理程序- 在我的应用程序中,它会发出警告跟踪表明该池已落后-这为我提供了TPE拒绝创建的非常明确的信息新队列,即使队列中有成千上万的条目。我的构造函数如下: 为什么不创建新线程? 问题答案: 此博客文章介绍了这一难题: 这种线程池的构造根本无法按预期工作。这是由于Thread

  • 我必须在一个系统中管理计划的文件复制。文件复制是由用户安排的,我需要限制复制期间使用的系统资源数量。没有定义每次复制可能需要的时间(即,可能计划每15分钟运行一次复制,并且在下一次运行到期时,上一次运行可能仍在运行)。 我有一个调度器,它定期检查到期的文件复制,对于每个文件复制,(1)如果它没有排队也没有运行,就将它添加到阻塞队列中;(2)否则就删除它。 我还有一个线程池,等待直到队列中有复制并执

  • 我正在尝试使用Lambda的一纸空文队列。我已将其配置为将消息发送到SNS队列。我放入了一个不正确的处理程序,以使Lambda调用错误消失。错误消息永远不会到达SNS队列。我相信这是一个权限问题。下面是SNS队列的“我的访问策略”设置 lambda函数附加了一个角色,该角色具有、到以及到 我错过什么了吗?有没有其他原因消息可能不会到达DLQ?

  • 问题内容: 我有一个要 并行 处理的文件夹名称字典。在每个文件夹,里面是文件名的数组,我想在加工 系列 : 最终,我将创建一个名为的文件夹,其中包含名称为的文件。我有这样的方法: 请注意上面的内容,由于通过UDP端口进行的调用会花费很长时间。我也仅限于在上面的阵列中使用UDP端口。因此,我必须等待UDP端口完成操作,然后才能再次使用该UDP端口。这意味着我一次只能运行线程。 因此,我最终将通过调用

  • 问题内容: 是否有这样的Java类: 可以通过ID添加可执行任务,其中保证所有具有相同ID的任务都不会同时运行 线程数可以限制为固定数量 Map的幼稚解决方案很容易解决(1),但难以管理(2)。同样,我所知道的所有线程池类都将从单个队列中提取,这意味着不能保证(1)。 欢迎使用涉及外部库的解决方案。 问题答案: 如果找不到开箱即用的东西,那么推出自己的产品就不难了。您可以做的一件事是将每个任务包装