当前位置: 首页 > 知识库问答 >
问题:

spring Batch中的多线程Step和本地分区有什么区别?

赵俊远
2023-03-14

使用java配置时,可以向步骤中添加一个TaskExecutor,如下例所示:

@Bean
public TaskExecutor taskExecutor(){
    return new SimpleAsyncTaskExecutor("spring_batch");
}

@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
        return this.stepBuilderFactory.get("sampleStep")
                                .<String, String>chunk(10)
                                .reader(itemReader())
                                .writer(itemWriter())
                                .taskExecutor(taskExecutor)
                                .build();
}

上述配置的结果是,该步骤通过在单独的执行线程中读取、处理和写入每个项块(每个提交间隔)来执行。请注意,这意味着要处理的项没有固定的顺序,与单线程的情况相比,块可能包含不连续的项。除了任务执行器设置的任何限制(例如它是否由线程池支持)之外,在tasklet配置中还有一个节流限制,默认设置为4。您可能需要增加这一点,以确保线程池得到充分利用。

但是在我认为应该通过局部分区来实现之前,我应该提供一个分区器,它说明如何将数据分成几个部分。多线程步骤应该自动完成。

@Configuration
public class MultithreadedStepConfig {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;
    @Autowired
    private ToLowerCasePersonProcessor toLowerCasePersonProcessor;

    @Autowired
    private DbPersonWriter dbPersonWriter;

    @Value("${app.single-file}")
    Resource resources;

    @Bean
    public Job job(Step databaseToDataBaseLowercaseSlaveStep) {
        return jobBuilderFactory.get("myMultiThreadedJob")
                .incrementer(new RunIdIncrementer())
                .flow(csvToDataBaseSlaveStep())
                .end()
                .build();
    }

    private Step csvToDataBaseSlaveStep() {
        return stepBuilderFactory.get("csvToDatabaseStep")
                .<Person, Person>chunk(50)
                .reader(csvPersonReaderMulti())
                .processor(toLowerCasePersonProcessor)
                .writer(dbPersonWriter)
                .taskExecutor(jobTaskExecutorMultiThreaded())
                .build();

    }

    @Bean
    @StepScope
    public FlatFileItemReader csvPersonReaderMulti() {
        return new FlatFileItemReaderBuilder()
                .name("csvPersonReaderSplitted")
                .resource(resources)
                .delimited()
                .names(new String[]{"firstName", "lastName"})
                .fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                    setTargetType(Person.class);
                }})
                .saveState(false)
                .build();

    }

    @Bean
    public TaskExecutor jobTaskExecutorMultiThreaded() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // there are 21 sites currently hence we have 21 threads
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setCorePoolSize(25);
        taskExecutor.setThreadGroupName("multi-");
        taskExecutor.setThreadNamePrefix("multi-");
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }
}

共有1个答案

赏成益
2023-03-14

当您使用多线程步骤和分区时,这里基本上有根本的区别。

多线程步骤是单个进程,因此如果您已经为处理器/编写器保留了状态,那么使用此步骤就不是一个好主意。然而,假设您只生成一个报告而不保存任何内容,这是一个不错的选择。

正如您所提到的,假设您希望处理一个平面文件,并希望将记录存储在DB中,那么您可以使用远程组块概念,假设您的阅读器不太重。

 类似资料:
  • 问题内容: 我想调试Java程序的整个流程。Eclipse中(进入)和(进入)之间有什么区别? 问题答案: 考虑以下代码,将当前指令指针(将在下一步执行的行,由表示)放在in 的行,并由in 的行调用: 如果此时要 进入 ,则将移至中的行,进入函数调用。 如果要在那一步 结束 ,您将移至中的行,从而结束函数调用。 调试器的另一个有用功能是单步 退出 或单步 返回。 在这种情况下,单步返回基本上将使

  • 我试图优化两个spark dataframes之间的联接查询,让我们将它们称为df1、df2(在公共列“saleid”上联接)。df1非常小(5M),所以我在spark集群的节点中广播它。df2非常大(200米行),所以我尝试通过“saleid”对它进行桶/重新分区。 例如: 分区: 水桶: 我不知道哪一个是正确的技术使用。谢谢。

  • 本文向大家介绍线程的 run() 和 start() 有什么区别?相关面试题,主要包含被问及线程的 run() 和 start() 有什么区别?时的应答技巧和注意事项,需要的朋友参考一下 start() 方法用于启动线程,run() 方法用于执行线程的运行时代码。run() 可以重复调用,而 start() 只能调用一次。

  • 美洲狮工人和美洲狮线程之间的区别是什么? 我知道的(如果我错了,请纠正我): > 在unicorn中,我知道我可以在一个进程中使用多个unicorn Worker来添加并发性。 但在美洲狮有线程和工人...工人不是美洲狮进程中的线程吗? 我可以在Heroku中使用更多的工作线程来添加web并发吗?

  • 每个线程都是通过某个特定Thread对象所对应的方法run()来完成其操作的,run()方法称为线程体。通过调用Thread类的start()方法来启动一个线程。 start() 方法用于启动线程,run() 方法用于执行线程的运行时代码。run() 可以重复调用,而 start() 只能调用一次。 start()方法来启动一个线程,真正实现了多线程运行。调用start()方法无需等待run方法体

  • 问题内容: 进程和线程之间的技术区别是什么? 我感到像“过程”这样的词已被过度使用,并且还有硬件和软件线程。像Erlang这样的语言的轻量级进程怎么样?有确定的理由使用一个术语而不是另一个术语吗? 问题答案: 进程和线程都是独立的执行序列。典型的区别是(同一进程的)线程在共享内存空间中运行,而进程在单独的内存空间中运行。 我不确定您可能指的是“硬件”还是“软件”线程。线程是一种操作环境功能,而不是