当前位置: 首页 > 知识库问答 >
问题:

用于并行处理的Spring批处理中TaskExecutor的实现

杨昆
2023-03-14

考虑一个阶跃豆:

@Bean
  public Step stepForChunkProcessing() {
    return stepBuilderFactory
        .get("stepForChunkProcessing")
        .<Entity1, Entity2>chunk(1000)
        .reader(reader())
        .processor(processor())
        .writer(writer())
        .taskExecutor(taskExecutor())
        .throttleLimit(10)
        .build();
  }
//@formatter:on

  @Bean
  public TaskExecutor taskExecutor(){
      return new SimpleAsyncTaskExecutor("MyApplication");
  }

要求:在Reader中,它从文件中读取(Entity1的)记录。在处理器中,它进行处理,在Writer中,它写入数据库。

在TaskExecutor之前,只创建了一个线程,它将在读取器和处理器中循环1000次,如上面的块设置中所定义的。然后它将移动到writer并写入所有1000条记录。它将再次从记录编号1001开始,然后在读取器和处理器中处理另外1000条记录。这是一个同步执行。

在TaskExecutor和throttle限制为10之后,创建了10个彼此独立的线程。他们如何维护文件中已经被其他线程处理的记录数量?还要考虑一下,如果我在reader的Read方法中给出synchronized关键字,不同的线程如何检查文件中已经处理过的记录?

共有1个答案

汪臻
2023-03-14

这在多线程环境中是不可能的,如参考文档的多线程部分所述:

 Many participants in a Step (such as readers and writers) are stateful.
 If the state is not segregated by thread, then those components are not
 usable in a multi-threaded Step

这就是为什么文档提到关闭 AbstractItemCountingItemStreamItemReader#setSaveState 的 javadoc 上的状态管理的原因,以下是摘录:

Always set it to false if the reader is being used in a concurrent environment.
 类似资料:
  • 当我使用Spring批处理管理运行长时间运行的批处理作业的多个实例时,它会在达到jobLauncher线程池任务执行程序池大小后阻止其他作业运行。但是从cron中提取多个工作似乎效果不错。下面是作业启动器配置。 Spring批处理管理员Restful API是否使用不同于xml配置中指定的作业启动器?

  • 我是Spring批处理的新手,我只想问如何从多行结果集中检索数据。我有以下场景: > 有两个不同的表说员工 使用时,我只能创建一个工资单子级,但该表可能有多个子级。请帮助...

  • 我目前正在构建一个spring批处理应用程序,其中执行了几个步骤。除了一个,所有的步骤都是简单的tasklet(没有读取器或写入器),它们负责各种任务,如复制文件、发送请求、启动批处理(*.bat)文件等。 大多数步骤应该是串行执行的。在一个特定的步骤中,我希望启动X文件,这些文件最多可以有Y个实例。 null 如果:)我想我必须使用taskExecutor,下面我有一个示例,在这里我开始第一步(

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我想分散加工大批量。这个想法是使用Spring Batch在云中激发一堆AMQP消费者,然后加载廉价的任务(如项目ID)并将它们提交给AMQP交换。结果的书写将由消费者自己完成。 null

  • 我正在尝试实现一个Spring批处理作业,为了处理记录,它需要2-3个db调用,这会减慢记录的处理速度(大小为100万)。如果我使用基于块的处理,它会单独处理每条记录,性能会很慢。因此,我需要一次性处理1000条记录,作为批量处理,这将减少数据库调用,并提高性能。但我的问题是,如果我实现Tasklet,那么我也会失去可重启性和重试/跳过功能,如果使用AggregateInputReader实现,我