我正在使用spring批处理使用RepositoryItemReader从postgresql DB读取记录,然后将其写入主题。我看到大约有100万条记录需要处理,但它并没有处理所有的记录。我已经将reader的pageSize设置为10,000并且与提交间隔(块大小)相同
@Bean
public TaskletStep broadcastProductsStep(){
return stepBuilderFactory.get("broadcastProducts")
.<Product, Product> chunk(10000)
.reader(productsReader.repositoryItemReader())
.processor(productsProcessor)
.writer(compositeItemWriter)
.faultTolerant()
.skip(Exception.class)
.skipLimit(100000)
.processorNonTransactional()
.listener(new SkipListenerProducts())
.listener(productsChunkListener)
.build();
}
@Bean
public RepositoryItemReader repositoryItemReader() {
RepositoryItemReader<Product> repositoryReader = new RepositoryItemReader<>();
try {
repositoryReader.setRepository(skuRepository);
repositoryReader.setMethodName("findByIsUpdatedAndStatusCodeIn");
repositoryReader.setPageSize(10000);
repositoryReader.setSaveState(false);
List<List<String>> arguments = new ArrayList<>();
arguments.add(Stream.of(SkuStatus.RELEASED.getValue().toString(), SkuStatus.BLOCKED.getValue().toString(),
SkuStatus.DISCONTINUED.getValue().toString())
.collect(Collectors.toList()));
repositoryReader.setArguments(arguments);
Map sorts = new HashMap();
sorts.put("catalog_number", Sort.Direction.ASC);
repositoryReader.setSort(sorts);
repositoryReader.afterPropertiesSet();
} catch (Exception exception){
exception.printStackTrace();
}
return repositoryReader;
}
@Query(value = "SELECT * FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
countQuery = "SELECT COUNT(*) FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
nativeQuery = true)
public Page<Product> findByIsUpdatedAndStatusCodeIn(@Param(value = "statusCode") List<String> statusCode,
Pageable pageable);
问题可能是在reader查询(IS_UPDATED)的条件上混合了分页和更新。
页面大小=2和6行(以db为单位)的示例
第二次阅读将移至第2页,因此它将采用E和F行,而不是C和D行
其中之一:
RepositoryItemReader
的子类,并在其中重写getPage @Override
public int getPage() {
return 0;
}
我的批处理作业不处理所有已读记录。 完成作业后,Spring批处理日志中读取了198282条记录,但在处理器中我有一条日志,在开始处理之前只记录了196503条,但有时,处理器处理了所有的记录。 步进 Spring启动版本:2.0.1
我正在spring Boot中使用异步任务执行器对数百万条记录的数据进行分区,块大小为1000条,网格大小为10条。为了从数据库中获取特定的分区数据,我正在使用项目读取器的before步骤中的StepExecution获取分区数据的开始和结束索引(来自Partitioner类)。 例如:项目阅读器 Item Reader遍历testData列表并将testData值返回给writer TestDa
通过定义节流限制来解决:这里定义的相同:Spring batch Multithreading:节流限制影响 我注意到当我用20k条记录运行批处理时,一些线程已经开始处理,但在10个请求后就停止了。但是,其他线程正在正常处理。你能建议一下问题是什么吗?如果我保持corepoolsize=threadpoolsize=5,那么所有的线程都是正确分布的。
我使用的是spring批处理,和通常使用的一样,我有读取器、处理器和写入器。 我有两个问题 1>Reader查询所有200条记录(表中记录总大小为200,我给出了pageSize=200),因此它得到所有200条记录,在处理器中,我们需要所有这些记录的列表,因为我们必须将每个记录与其他199条记录进行比较,以便将它们分组在不同的层中。因此我在想,如果我们能在处理步骤中得到那个列表,我就可以操纵它们
我在表中总共有8条记录,其中6条在spring批处理调用read时可以使用jpareader。现在我将页面大小和块大小设置为1以进行测试。期望作业运行时,它应该进行6次读取调用,然后它应该逐个处理,逐个写入。但实际上发生的是,它只是调用read 4次(从日志中我可以看到这样读取页面0...1)并处理4个,其中一个由于不匹配写入标准而被过滤掉,然后它只是更新了3个记录,作业标记为成功完成。
当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?