当前位置: 首页 > 知识库问答 >
问题:

Spring Boot批处理-ThreadPoolTaskExecutor未处理所有线程

陆才俊
2023-03-14

通过定义节流限制来解决:这里定义的相同:Spring batch Multithreading:节流限制影响

@Bean
public ThreadPoolTaskExecutor getJobTaskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    taskExecutor.setCorePoolSize(10);
    taskExecutor.setMaxPoolSize(10);
    taskExecutor.setQueueCapacity(0);
    taskExecutor.setThreadNamePrefix("ProcessJob-");
    taskExecutor.afterPropertiesSet();
    taskExecutor.initialize();
    return taskExecutor;
}

我注意到当我用20k条记录运行批处理时,一些线程已经开始处理,但在10个请求后就停止了。但是,其他线程正在正常处理。你能建议一下问题是什么吗?如果我保持corepoolsize=threadpoolsize=5,那么所有的线程都是正确分布的。

CorePoolSize=MaxPoolSize=10 (Threads are not distributed properly)
Thread Name           Count
---------------------------
Thread-ProcessJob-1     10
Thread-ProcessJob-10    4200
Thread-ProcessJob-2     10
Thread-ProcessJob-3     10
Thread-ProcessJob-4     10
Thread-ProcessJob-5     1290
Thread-ProcessJob-6     10
Thread-ProcessJob-7     4980
Thread-ProcessJob-8     4479
Thread-ProcessJob-9     4999

CorePoolSize=MaxPoolSize=5 (Threads are distributed properly)
Thread-ProcessJob-1     1199
Thread-ProcessJob-2     1201
Thread-ProcessJob-3     1214
Thread-ProcessJob-4     1211
Thread-ProcessJob-5     1209

共有1个答案

仲孙夕
2023-03-14

您正在将任务执行器的QueueCapacity设置为0,这可能是问题的原因。还有一个参数可能在这里发挥作用,它是步骤的ThrottleLimit,默认值为4。您应该尝试增加它,并为您的用例找到最佳价值。

这里有多种因素:线程池核心/最大大小、步骤的chunkSize和throttleLimit、taskExecutor队列大小等。所有这些都可以在观察到的现象中发挥作用,没有找到最佳组合的方法,您需要以经验的方式进行。

 类似资料:
  • 我正在使用spring批处理使用RepositoryItemReader从postgresql DB读取记录,然后将其写入主题。我看到大约有100万条记录需要处理,但它并没有处理所有的记录。我已经将reader的pageSize设置为10,000并且与提交间隔(块大小)相同

  • 我正在spring Boot中使用异步任务执行器对数百万条记录的数据进行分区,块大小为1000条,网格大小为10条。为了从数据库中获取特定的分区数据,我正在使用项目读取器的before步骤中的StepExecution获取分区数据的开始和结束索引(来自Partitioner类)。 例如:项目阅读器 Item Reader遍历testData列表并将testData值返回给writer TestDa

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

  • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

  • 批处理 本书展示的几个例子中,ElasticSearch提供了高效的批量索引数据的功能,用户只需按批量索引的格式组织数据即可。同时,ElasticSearch也为获取数据和搜索数据提供了批处理功能。值得一提的是,该功能使用方式与批量索引类似,只需把多个请求组合到一起,每个请求可以独立指定索引及索引类型。接下来了解这些功能。 MultiGetMultiGet操作允许用户通过_mget端点在单个请求命

  • 我在表中总共有8条记录,其中6条在spring批处理调用read时可以使用jpareader。现在我将页面大小和块大小设置为1以进行测试。期望作业运行时,它应该进行6次读取调用,然后它应该逐个处理,逐个写入。但实际上发生的是,它只是调用read 4次(从日志中我可以看到这样读取页面0...1)并处理4个,其中一个由于不匹配写入标准而被过滤掉,然后它只是更新了3个记录,作业标记为成功完成。