当前位置: 首页 > 知识库问答 >
问题:

Spring批处理线程安全项目阅读器(进程指示模式)

贺君浩
2023-03-14
@Controller
public class JobController {

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job testJob;

    @RequestMapping("/job/test")
    public void test() {
        JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
        jobParametersBuilder.addDate("date",new Date());
        try {
            jobLauncher.run(personJob,jobParametersBuilder.toJobParameters());
        } catch (JobExecutionAlreadyRunningException | JobRestartException | JobParametersInvalidException | JobInstanceAlreadyCompleteException e) {
            e.printStackTrace();
        }

    }

}

TestJob从文件系统(主块)读取数据并将其发送到远程块(从块)。问题是ItemReader不是线程安全的。

对于一些常见的批处理用例,使用多线程步骤有一些实际的限制。一个步骤中的许多参与者(例如,读取器和写入器)都是有状态的,如果状态不按线程隔离,那么这些组件在多线程步骤中是不可用的。特别是,Spring Batch中的大多数现成阅读器和编写器都不是为多线程使用而设计的。但是,可以使用无状态或线程安全的读取器和写入器,并且Spring批处理示例中有一个示例(parallelJob)显示了使用进程指示符(请参见第6.12节“防止状态持久性”)来跟踪数据库输入表中已处理的项。

spring batch github存储库上的parallelJob示例https://github.com/spring-projects/spring-batch/blob/master/spring-batch-samples/src/main/java/org/springframework/batch/sample/common/stagingItemReader.java

我对过程指示器模式有点困惑。我在哪里可以找到更多关于这种模式的详细信息?

共有1个答案

左仰岳
2023-03-14

如果您所关心的只是ItemReader实例将在作业调用中共享,那么您可以将ItemReader声明为一个步骤范围,并且每次调用都将获得一个新实例,这将消除线程问题。

但是要回答您关于过程指示符模式的直接问题,我不确定关于它的好文档本身在哪里。Spring批处理示例中有一个它的实现示例(并行作业使用它)。

它背后的思想是,您为将要处理的记录提供一个状态。在作业/步骤开始时,将这些记录标记为正在处理。当记录提交时,您将它们标记为已处理。这样就不需要在读取器中跟踪状态,因为您的状态实际上在数据库中(查询只查找标记为正在处理的记录)。

 类似资料:
  • 在Spring批处理作业中,我将项目写入目标文件(使用FlatFileItemWriter),并将输入记录“process indicator”字段更新为“processed”/“failed”(使用JdbcBatchItemWriter)。在“物品交易”中实现这一点的最佳方式是什么? 使用CompositeItemWriter(委托FlatFileItemWriter写入文件,委托JdbcBat

  • 每次作业运行时,都在从不断增长的现有表中读取数据。我正在寻找Spring batch中的选项,以便在每次运行调度作业时只查询新记录。 如果我读了50000条记录,下一个时间表应该从50001开始。 我的想法是将ItemReader读取的最后一条记录的id(整个读卡器输出的最后一条,而不是每个块的最后一条)保存在DB中,并在后续的作业计划中使用。我将从主表返回按id排序的数据。 我怎么知道作者最后的

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出

  • 我必须使用Spring Batch配置一个作业。是否可以有一个单线程的项目阅读器,但多线程处理器? 在这种情况下,ItemReader将通过从数据库中读取工作项(通过执行预定义的查询)来创建要处理的工作项,每个处理器将并行处理项/块。

  • 我正在使用JpaPagingItemReaderBuilder查询一个DB,结果被插入到另一个DB中。 查询返回的结果没有任何问题,但我得到了一个错误与读取器的返回,在处理器中,您可以检查我的编码和错误下面。 有谁能给我一点启示吗?为什么我不能处理结果?

  • 在Spring批处理中,我试图读取CSV文件,并希望将每一行分配给一个单独的线程并对其进行处理。我试图通过使用TaskExecutor来实现它,但所有线程都在一次拾取同一行。我还尝试使用Partioner实现这个概念,同样的事情也发生了。请参阅下面我的配置Xml。 步骤说明 我尝试过不同类型的任务执行器,但它们的行为方式都是一样的。如何将每一行分配给单独的线程?