当前位置: 首页 > 知识库问答 >
问题:

容错spring批处理作业读取少量记录后卡顿

习洲
2023-03-14

Im使用存储库项目读取器从数据库读取事务,使用flatfileitem写入器处理和写入文件。我使用容错跳过任何记录,如果它是错误的。为了检查一条记录是否有错误,我们在处理器中进行一些验证并抛出自定义异常。

批处理配置

return stepBuilderFactory.get("test")
        .<Transaction, Transaction>chunk(1)
        .reader(this.reader.read())
        .processor(transactionProcessor)
        .writer(transactionWriter)
        .faultTolerant()
        .skipPolicy(new UnlimitedSkipPolicy()
        .listener(new TransactionSkipListenerTransaction())
        .stream(this.transactionwriter)

事务读取器bean

 @Override
@StepScope
public RepositoryItemReader<Transaction> read() throws Exception {
    final ZonedDateTime zonedDateTime = LocalDate.now().atStartOfDay(ZoneId.systemDefault());
    final Date today = Date.from(zonedDateTime.toInstant());
    final Date yesterday = Date.from(zonedDateTime.minusDays(1L).toInstant());
    RepositoryItemReader<Transaction> reader = new RepositoryItemReader<>();
    reader.setRepository(transactionDao);
    reader.setMethodName("findByTransactionDateGreaterThanEqualAndTransactionDateLessThan");
    reader.setArguments(Arrays.asList(yesterday, today));
    reader.setSort(Collections.singletonMap("transactionId", Sort.Direction.ASC));
    reader.setPageSize(10000);
    return reader;
}


Processor bean 
@Override
public Transaction process(Transaction transaction) throws Exception {
if(utility.isInvalidTransaction(transaction)) {
  log.error("Invalid transaction {} and skipped from reporting", transaction.getTransactionId());
  throw new CustomException("invalid transaction" + transaction.getTransactionid());
}

而writer只是一个flatfile编写器。问题是,在读取了很少事务之后,在我的例子中,在读取并处理了9个无效事务之后,批处理作业被卡住了。不知道为什么。

.reader(this.reader.read())
        .processor(transactionProcessor)
        .writer(transactionWriter)
        .faultTolerant()
        .skipPolicy(new UnlimitedSkipPolicy()
        .noRetry(Exception.class)
        .noRollBack(Exception.class)

困在这里

共有1个答案

别宏盛
2023-03-14

在我看来,processbean的职责是添加自定义转换或逻辑,在此基础上将数据传递给编写器。在这种情况下,我只是记录错误并返回null,在编写器处理null对象引用的情况下,即无效事务(null)没有传递给编写器。

 类似资料:
  • 我希望我的Spring批处理应用程序一次从数据库中读取50条记录,然后将这50条记录发送给处理器,然后发送给写入器。 有人可以告诉我如何做到这一点吗? 我尝试使用JdbcPagingItemReader并将pageSize设置为50,这样可以读取50条记录,但是rowMapper、处理器和编写器一次接收一条记录,而不是获得50条记录。 如何使处理器和写入器在dto中获得50条记录,而不是一次接收一

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我当前正在命令行中传递文件名在spring批处理作业中的参数并运行我的作业,spring批处理作业将查找文件并读取、处理和写入该文件。我目前在读取器中的作业参数文件名和读取器文件名,如何才能在处理器和写入器中使用相同的作业参数文件名。

  • 我有以下步骤:

  • 我正在使用spring批处理使用RepositoryItemReader从postgresql DB读取记录,然后将其写入主题。我看到大约有100万条记录需要处理,但它并没有处理所有的记录。我已经将reader的pageSize设置为10,000并且与提交间隔(块大小)相同