当前位置: 首页 > 知识库问答 >
问题:

移交ScheduledThreadPoolExecutor

孟昊空
2023-03-14

我在理解ScheduledExecutorService/ScheduledThreadPoolExecutor的具体工作方式时遇到一些问题。我想进行以下设置:

  1. 一个类,它设置ScheduledThreadPoolExecutor,在调度任务之前将RemoveOnCancelPolicy设置为true,并关闭ScheduledThreadPoolExecutor(因为我想在try-catch块中调用product函数)。TAK的处理应在第二类中完成,这就是为什么我想移交ScheduledThreadPoolExecutor
public class SetupExecutorService implements AutoCloseable {
    
    private final ScheduledThreadPoolExecutor executorService;
    private final Producer producer;

    public SetupExecutorService(Producer producer) {
        executorService = new ScheduledThreadPoolExecutor(1);
        this.producer = producer;
    }

    public void produce() {
        executorService.setRemoveOnCancelPolicy(true);
        producer.schedule(executorService);
    }
    
    @Override
    public void close() throws Exception {
        executorService.shutdown();
    }

}
public class Producer implements Runnable {

    public Producer(int pauseInMilliSec) {
        this.pauseInMilliSec = pauseInMilliSec;
    }

    @Override
    public void run() {
        System.out.println("calling run");
        ...
    }

    public void schedule(ScheduledExecutorService executorService) {
         executorService.scheduleAtFixedRate(this, 0, pauseInMilliSec, TimeUnit.MILLISECONDS);
    }

}

不幸的是,此设置不起作用。我不确定哪里出了问题,但是,如果我只是在Producer类中拉取executorService,一切都会按预期工作,并且任务会定期执行:

public class Producer implements Runnable {

    public Producer(int pauseInMilliSec) {
        this.pauseInMilliSec = pauseInMilliSec;
    }

    @Override
    public void run() {
        System.out.println("calling run");
        ...
    }

    public void schedule() {
         ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1);
         executorService.setRemoveOnCancelPolicy(true);
         executorService.scheduleAtFixedRate(this, 0, pauseInMilliSec, TimeUnit.MILLISECONDS);
    }

}

有人能告诉我错误在哪里吗?

非常感谢您的帮助:)

共有1个答案

吕飞翼
2023-03-14

当您关闭执行器时,它将不再接受任何任务,只会耗尽当前队列中的挂起任务。在ScheduledExecutor的情况下,所有未来计划的任务都将被取消,并且不会触发。

换句话说,在第一个计划任务运行之前,您正在关闭执行程序,有效地取消了它。在第二种情况下,你没有关闭执行者——因此它起作用了。

如果在“已损坏”设置中删除关机,则其行为方式将与“已修复”版本相同。

 类似资料:
  • 我目前正在从具有特定偏移量的主题中获取消息。我正在使用寻求()来实现它。但是当我将enable.auto.commit设置为true或使用手动同步(委托同步()/委托同步())时,Seek()不起作用,因为它没有轮询来自特定偏移量的消息,而是从最后提交的偏移量中选择。 因此,在使用Seek()时,是否必须将偏移量存储在外部DB中,而不提交给Kafka?Seek和Commit不能并行工作吗? 客户端

  • 我有一个Kafka消费者,我从它消费数据从一个特定的主题,我看到下面的例外。我使用的是Kafka版本。 我添加了这两个额外的消费者属性,但仍然没有帮助: 那个错误意味着什么?我该如何解决它?我需要添加一些其他消费者属性吗?

  • 进入考勤打卡-设置-“我是考勤负责人”-“移交考勤负责人权限”-选择需要移交的成员

  • 假设我有以下类及其接口 步骤1:使用Autofac在测试项目中设置构造函数: 步骤2:设置测试 现在,我如何注入配置器未知但“MyApplication”已知的参数“value”? 在我想要这么做之前 但是现在我正在尝试我们依赖反转和依赖注入,最终出现了上面描述的问题。

  • 我有一个ReactorKafka项目,它消耗来自Kafka主题的消息,转换消息,然后写入到另一个主题。 我的理解是,只有在Reactor中成功完成所有顺序步骤后,才会提交偏移量。对吗?我想确保不会处理下一条记录,除非当前记录成功发送到目标Kafka主题。

  • 我需要把一个项目交给第三个人。该项目使用Maven进行依赖管理,我们安装了一个Nexus存储库管理器。 我认为使用dependency:go offline plugin/goal like生成包含项目所需的所有依赖项的存储库是很容易的 此命令将按预期创建存储库。我将此存储库复制到目标计算机,并将localRepository(settings.xml)设置到存储库文件夹。 尝试在目标计算机上构建