当前位置: 首页 > 知识库问答 >
问题:

Spring批处理——如何使用分区并行地读写数据?

莫繁
2023-03-14

我有一个Spring Batch应用程序(3.0.7),通过Spring Boot启动,它并行读取多个XML文件,处理它们,并针对Oracle DB“吐出”INSERT或UPDATE语句。

为了并行处理文件,我使用了分区器。这项工作运行良好,除了JdbcWriter,它似乎只绑定到一个线程。由于我使用的是ThreadPoolTaskExecitor,我希望Step可以为阅读器、处理器和写入器并行运行。但是,JdbcWriter似乎总是绑定到Thread-1(我可以看到在日志中,但也分析了数据库连接,只有一个连接处于活动状态-请注意,我的数据源配置为使用具有20个连接的池)。

我已经将读者、处理者和作者注释为@StepScope。如何有效地使用taskExecutor中配置的所有线程并行读写?

这是我的配置摘录:

@Bean
public Job parallelJob() throws Exception {
    return jobBuilderFactory.get("parallelJob")
            .start(splitFileStep())
            .next(recordPartitionStep())
            .build();
}

@Bean
public Step recordPartitionStep() {
    return stepBuilderFactory.get("factiva-recordPartitionStep")
            .partitioner(recordStep())
            .partitioner("recordStep", recordPartitioner(null)) <!-- this is used to inject some data from the job context
            .taskExecutor(taskExecutor())
            .build();
}

@Bean
public Step recordStep() {
    return stepBuilderFactory.get("recordStep")
            .<Object, StatementHolderMap>chunk(1000)
            .reader(recordReader(null)) <!-- this is used to inject some data from the job context
            .processor(recordProcessor) <!-- this is @Autowired, and the bean is marked as @StepScope
            .writer(jdbcItemWriter())
            .build();
}

@Bean
@StepScope
public ItemStreamReader recordReader(@Value("#{stepExecutionContext['record-file']}") Resource resource) {
    // THIS IS A StaxEventItemReader
}

@Bean
@StepScope
public JdbcItemWriter jdbcItemWriter() {

    JdbcItemWriter jdbcItemWriter = new JdbcItemWriter();
    jdbcItemWriter.setDataSource(dataSource);
    ...
    return jdbcItemWriter;
}

@Value("${etl.factiva.partition.cores}")
private int threadPoolSize;

@Bean
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
    if (threadPoolSize == 0) {
        threadPoolSize = Runtime.getRuntime().availableProcessors();
    }
    taskExecutor.setMaxPoolSize(threadPoolSize);
    taskExecutor.afterPropertiesSet();

    return taskExecutor;
}

共有1个答案

庄弘业
2023-03-14

我弄明白了为什么Spring批处理没有使用所有配置的线程。

首先,分区器的Spring配置是错误的。原始配置没有设置gridSize值,并且错误地引用了要在分区中运行的步骤。

其次,原始配置中使用的ThreadPoolTaskExecutor似乎不起作用。切换到SimpleAsyncTaskExecutor成功了。

我仍然不确定ThreadPoolTaskExecutor为什么不工作。SimpleAsynctAskeExecutor的javadoc实际上建议使用池来重用线程。

我也不是100%确定我完全理解设置gridSize值的含义。目前,我正在将gridSize设置为一个值,该值等于分区步骤中使用的线程数。如果有人能对这种方法发表评论(Michael Minella?:)

这是正确的配置,仅供参考。

@Bean
public Job parallelJob() throws Exception {
    return jobBuilderFactory.get("parallelJob")
            .start(splitFileStep())
            .next(recordPartitionStep())
            .build();
}

@Bean
public Step recordPartitionStep() {
    return stepBuilderFactory.get("factiva-recordPartitionStep")
            .partitioner(recordStep().getName(), recordPartitioner(null)) <!-- the value for the recordPartitioner constructor is injected at runtime
            .step(recordStep())
            .gridSize(determineWorkerThreads()) <!-- GRID SIZE VALUE MUST BE EQUAL TO THE NUMBER OF THREAD CONFIGURED FOR THE THREAD POOL
            .taskExecutor(taskExecutor())
            .build();


}

@Bean
public Step recordStep() {
    return stepBuilderFactory.get("recordStep")
            .<Object, StatementHolderMap>chunk(1000)
            .reader(recordReader(null)) <!-- this is used to inject some data from the job context
            .processor(recordProcessor) <!-- this is @Autowired, and the bean is marked as @StepScope
            .writer(jdbcItemWriter())
            .build();
}

@Bean
@StepScope
public ItemStreamReader recordReader(@Value("#{stepExecutionContext['record-file']}") Resource resource) {
    // THIS IS A StaxEventItemReader
}

@Bean
@StepScope
public JdbcItemWriter jdbcItemWriter() {

    JdbcItemWriter jdbcItemWriter = new JdbcItemWriter();
    jdbcItemWriter.setDataSource(dataSource);
    ...
    return jdbcItemWriter;
}

@Value("${etl.factiva.partition.cores}")
private int threadPoolSize;

@Bean
public TaskExecutor taskExecutor() {
    SimpleAsyncTaskExecutor taskExecutor = new SimpleAsyncTaskExecutor("fac-thrd-");

    taskExecutor.setConcurrencyLimit(determineWorkerThreads());
    return taskExecutor;
}

// threadPoolSize is a configuration parameter for the job
private int determineWorkerThreads() {
    if (threadPoolSize == 0) {
        threadPoolSize = Runtime.getRuntime().availableProcessors();
    }
    return threadPoolSize;

}
 类似资料:
  • 我正在实现spring批处理作业,用于使用分区方法处理一个DB表中的数百万条记录,如下所示- > 从分区器中的表中提取唯一的分区代码,并在执行上下文中设置相同的代码。 创建一个包含读取器、处理器和写入器的块步骤,以基于特定分区代码处理记录。 是否可以创建分区/线程来处理像thread1进程1-1000,thread2进程1001-2000等? 如何控制创建的线程数,因为分区代码可以是100个左右,

  • 我正在寻找测量Spring批处理读取、处理和写入操作的执行时间的最佳方法。在元数据中,有关于整个步骤的信息,而不是关于每个动作的信息。 谢谢你所有的回答!

  • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?

  • 我有一个批处理任务,从SQLServer读取记录并写入MARIADB。尽管我在批处理过程中实现了分区的概念,但该过程非常缓慢 下面是源系统和目标系统的数据源配置。 以下是配置的步骤和分区步骤 用读者和作者更新帖子 有人能介绍如何使用Spring Batch提高读写性能吗?

  • 我有一个作业流,我希望以以下方式运行它: 作业流将从Job1开始。在Job1成功完成后,Job1将同时启动Job2和Job4。 Job2和Job4将并行运行。 在Job2成功完成后,Job2将启动Job3。 在Job4成功完成后,Job4将启动Job5。 下面是job1.xml和job1的作业启动器类的代码片段: job1.xml uijobLauncher.java “job2,Job3”对和“

  • 我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(