使用java配置时,可以向步骤中添加一个TaskExecutor,如下例所示:
@Bean
public TaskExecutor taskExecutor(){
return new SimpleAsyncTaskExecutor("spring_batch");
}
@Bean
public Step sampleStep(TaskExecutor taskExecutor) {
return this.stepBuilderFactory.get("sampleStep")
.<String, String>chunk(10)
.reader(itemReader())
.writer(itemWriter())
.taskExecutor(taskExecutor)
.build();
}
上述配置的结果是,该步骤通过在单独的执行线程中读取、处理和写入每个项块(每个提交间隔)来执行。请注意,这意味着要处理的项没有固定的顺序,与单线程的情况相比,块可能包含不连续的项。除了任务执行器设置的任何限制(例如它是否由线程池支持)之外,在tasklet配置中还有一个节流限制,默认设置为4。您可能需要增加这一点,以确保线程池得到充分利用。
但是在我认为应该通过局部分区来实现之前,我应该提供一个分区器,它说明如何将数据分成几个部分。多线程步骤应该自动完成。
@Configuration
public class MultithreadedStepConfig {
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
private ToLowerCasePersonProcessor toLowerCasePersonProcessor;
@Autowired
private DbPersonWriter dbPersonWriter;
@Value("${app.single-file}")
Resource resources;
@Bean
public Job job(Step databaseToDataBaseLowercaseSlaveStep) {
return jobBuilderFactory.get("myMultiThreadedJob")
.incrementer(new RunIdIncrementer())
.flow(csvToDataBaseSlaveStep())
.end()
.build();
}
private Step csvToDataBaseSlaveStep() {
return stepBuilderFactory.get("csvToDatabaseStep")
.<Person, Person>chunk(50)
.reader(csvPersonReaderMulti())
.processor(toLowerCasePersonProcessor)
.writer(dbPersonWriter)
.taskExecutor(jobTaskExecutorMultiThreaded())
.build();
}
@Bean
@StepScope
public FlatFileItemReader csvPersonReaderMulti() {
return new FlatFileItemReaderBuilder()
.name("csvPersonReaderSplitted")
.resource(resources)
.delimited()
.names(new String[]{"firstName", "lastName"})
.fieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
setTargetType(Person.class);
}})
.saveState(false)
.build();
}
@Bean
public TaskExecutor jobTaskExecutorMultiThreaded() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
// there are 21 sites currently hence we have 21 threads
taskExecutor.setMaxPoolSize(30);
taskExecutor.setCorePoolSize(25);
taskExecutor.setThreadGroupName("multi-");
taskExecutor.setThreadNamePrefix("multi-");
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
}
当您使用多线程步骤和分区时,这里基本上有根本的区别。
多线程步骤是单个进程,因此如果您已经为处理器/编写器保留了状态,那么使用此步骤就不是一个好主意。然而,假设您只生成一个报告而不保存任何内容,这是一个不错的选择。
正如您所提到的,假设您希望处理一个平面文件,并希望将记录存储在DB中,那么您可以使用远程组块概念,假设您的阅读器不太重。
问题内容: 我想调试Java程序的整个流程。Eclipse中(进入)和(进入)之间有什么区别? 问题答案: 考虑以下代码,将当前指令指针(将在下一步执行的行,由表示)放在in 的行,并由in 的行调用: 如果此时要 进入 ,则将移至中的行,进入函数调用。 如果要在那一步 结束 ,您将移至中的行,从而结束函数调用。 调试器的另一个有用功能是单步 退出 或单步 返回。 在这种情况下,单步返回基本上将使
我试图优化两个spark dataframes之间的联接查询,让我们将它们称为df1、df2(在公共列“saleid”上联接)。df1非常小(5M),所以我在spark集群的节点中广播它。df2非常大(200米行),所以我尝试通过“saleid”对它进行桶/重新分区。 例如: 分区: 水桶: 我不知道哪一个是正确的技术使用。谢谢。
本文向大家介绍线程的 run() 和 start() 有什么区别?相关面试题,主要包含被问及线程的 run() 和 start() 有什么区别?时的应答技巧和注意事项,需要的朋友参考一下 start() 方法用于启动线程,run() 方法用于执行线程的运行时代码。run() 可以重复调用,而 start() 只能调用一次。
美洲狮工人和美洲狮线程之间的区别是什么? 我知道的(如果我错了,请纠正我): > 在unicorn中,我知道我可以在一个进程中使用多个unicorn Worker来添加并发性。 但在美洲狮有线程和工人...工人不是美洲狮进程中的线程吗? 我可以在Heroku中使用更多的工作线程来添加web并发吗?
每个线程都是通过某个特定Thread对象所对应的方法run()来完成其操作的,run()方法称为线程体。通过调用Thread类的start()方法来启动一个线程。 start() 方法用于启动线程,run() 方法用于执行线程的运行时代码。run() 可以重复调用,而 start() 只能调用一次。 start()方法来启动一个线程,真正实现了多线程运行。调用start()方法无需等待run方法体
问题内容: 进程和线程之间的技术区别是什么? 我感到像“过程”这样的词已被过度使用,并且还有硬件和软件线程。像Erlang这样的语言的轻量级进程怎么样?有确定的理由使用一个术语而不是另一个术语吗? 问题答案: 进程和线程都是独立的执行序列。典型的区别是(同一进程的)线程在共享内存空间中运行,而进程在单独的内存空间中运行。 我不确定您可能指的是“硬件”还是“软件”线程。线程是一种操作环境功能,而不是