当前位置: 首页 > 知识库问答 >
问题:

spring批处理不处理所有记录

楚乐逸
2023-03-14

我正在使用spring批处理使用RepositoryItemReader从postgresql DB读取记录,然后将其写入主题。我看到大约有100万条记录需要处理,但它并没有处理所有的记录。我已经将reader的pageSize设置为10,000并且与提交间隔(块大小)相同

@Bean
public TaskletStep broadcastProductsStep(){
    return stepBuilderFactory.get("broadcastProducts")
            .<Product, Product> chunk(10000)
            .reader(productsReader.repositoryItemReader())
            .processor(productsProcessor)
            .writer(compositeItemWriter)                    
            .faultTolerant()
            .skip(Exception.class)                              
            .skipLimit(100000)
            .processorNonTransactional()                        
            .listener(new SkipListenerProducts())               
            .listener(productsChunkListener)
            .build();
}


@Bean
public RepositoryItemReader repositoryItemReader() {

    RepositoryItemReader<Product> repositoryReader = new RepositoryItemReader<>();

    try {
        repositoryReader.setRepository(skuRepository);
        repositoryReader.setMethodName("findByIsUpdatedAndStatusCodeIn");
        repositoryReader.setPageSize(10000);
        repositoryReader.setSaveState(false);

        List<List<String>> arguments = new ArrayList<>();
        arguments.add(Stream.of(SkuStatus.RELEASED.getValue().toString(), SkuStatus.BLOCKED.getValue().toString(),
                SkuStatus.DISCONTINUED.getValue().toString())
                  .collect(Collectors.toList()));
        repositoryReader.setArguments(arguments);

        Map sorts = new HashMap();
        sorts.put("catalog_number", Sort.Direction.ASC);

        repositoryReader.setSort(sorts);
        repositoryReader.afterPropertiesSet();

    } catch (Exception exception){
        exception.printStackTrace();
    }

    return repositoryReader;
}

@Query(value = "SELECT * FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
       countQuery = "SELECT COUNT(*) FROM CATALOG.PRODUCTS WHERE IS_UPDATED = 'true' AND STATUS_CODE IN (:statusCode)",
       nativeQuery = true)
public Page<Product> findByIsUpdatedAndStatusCodeIn(@Param(value = "statusCode") List<String> statusCode, 
        Pageable pageable);

共有1个答案

江智
2023-03-14

问题可能是在reader查询(IS_UPDATED)的条件上混合了分页和更新。

页面大小=2和6行(以db为单位)的示例

  • a is_updated=true
  • B is_updated=true
  • C is_updated=true
  • d is_updated=true
  • E is_updated=true
  • f is_updated=true
    null

第二次阅读将移至第2页,因此它将采用E和F行,而不是C和D行

其中之一:

  1. 不应更新IS_UPDATED列。
  2. 或者创建RepositoryItemReader的子类,并在其中重写getPage
    @Override
    public int getPage() {
        return 0;
    }
 类似资料:
  • 我的批处理作业不处理所有已读记录。 完成作业后,Spring批处理日志中读取了198282条记录,但在处理器中我有一条日志,在开始处理之前只记录了196503条,但有时,处理器处理了所有的记录。 步进 Spring启动版本:2.0.1

  • 我正在spring Boot中使用异步任务执行器对数百万条记录的数据进行分区,块大小为1000条,网格大小为10条。为了从数据库中获取特定的分区数据,我正在使用项目读取器的before步骤中的StepExecution获取分区数据的开始和结束索引(来自Partitioner类)。 例如:项目阅读器 Item Reader遍历testData列表并将testData值返回给writer TestDa

  • 通过定义节流限制来解决:这里定义的相同:Spring batch Multithreading:节流限制影响 我注意到当我用20k条记录运行批处理时,一些线程已经开始处理,但在10个请求后就停止了。但是,其他线程正在正常处理。你能建议一下问题是什么吗?如果我保持corepoolsize=threadpoolsize=5,那么所有的线程都是正确分布的。

  • 我使用的是spring批处理,和通常使用的一样,我有读取器、处理器和写入器。 我有两个问题 1>Reader查询所有200条记录(表中记录总大小为200,我给出了pageSize=200),因此它得到所有200条记录,在处理器中,我们需要所有这些记录的列表,因为我们必须将每个记录与其他199条记录进行比较,以便将它们分组在不同的层中。因此我在想,如果我们能在处理步骤中得到那个列表,我就可以操纵它们

  • 我在表中总共有8条记录,其中6条在spring批处理调用read时可以使用jpareader。现在我将页面大小和块大小设置为1以进行测试。期望作业运行时,它应该进行6次读取调用,然后它应该逐个处理,逐个写入。但实际上发生的是,它只是调用read 4次(从日志中我可以看到这样读取页面0...1)并处理4个,其中一个由于不匹配写入标准而被过滤掉,然后它只是更新了3个记录,作业标记为成功完成。

  • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?