我遵循了spring批处理文档,无法异步运行我的作业。
因此,我从一个web容器运行该作业,该作业将通过RESTendpoint触发。
我想让JobInstance ID在完成整个作业之前传递它作为响应。因此,他们可以稍后使用JobInstance ID检查作业的状态,而不是等待。但我没能让它工作。下面是我尝试过的示例代码。请让我知道我错过了什么或错了什么。
BatchConfig创建异步JobLauncher
@Configuration
public class BatchConfig {
@Autowired
JobRepository jobRepository;
@Bean
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}
控制器
@Autowired
JobLauncher jobLauncher;
@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
JobParameters jobParameters = new JobParametersBuilder().
addLong("time", System.currentTimeMillis())
.toJobParameters();
JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
System.out.println(jobExecution.getJobInstance().getInstanceId());
System.out.println("OK RESPONSE");
return jobExecution.getJobInstance().getInstanceId();
}
以及作为组件的JobBuilder
@Component
public class BatchComponent {
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
public Job customJob(String someParam) throws Exception {
return jobBuilderFactory.get("personProcessor")
.incrementer(new RunIdIncrementer()).listener(listener())
.flow(personPorcessStep(someParam)).end().build();
}
private Step personPorcessStep(String someParam) throws Exception {
return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
.reader(new PersonReader(someParam)).faultTolerant().
skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
.writer(new PersonWriter()).build();
}
private JobExecutionListener listener() {
return new PersonJobCompletionListener();
}
private class PersonInput {
String firstName;
public PersonInput(String firstName) {
this.firstName = firstName;
}
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
private class PersonOutput {
String firstName;
public String getFirstName() {
return firstName;
}
public void setFirstName(String firstName) {
this.firstName = firstName;
}
}
public class PersonReader implements ItemReader<PersonInput> {
private List<PersonInput> items;
private int count = 0;
public PersonReader(String someParam) throws InterruptedException {
Thread.sleep(10000L); //to simulate processing
//manipulate and provide data in the read method
//just for testing i have given some dummy example
items = new ArrayList<PersonInput>();
PersonInput pi = new PersonInput("john");
items.add(pi);
}
@Override
public PersonInput read() {
if (count < items.size()) {
return items.get(count++);
}
return null;
}
}
public class DataDuplicateSkipper implements SkipPolicy {
@Override
public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
if (exception instanceof DataIntegrityViolationException) {
return true;
}
return true;
}
}
private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {
@Override
public PersonOutput process(PersonInput item) throws Exception {
return null;
}
}
private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
@Override
public void write(List<? extends PersonOutput> results) throws Exception {
return;
}
}
private class PersonJobCompletionListener implements JobExecutionListener {
public PersonJobCompletionListener() {
}
@Override
public void beforeJob(JobExecution jobExecution) {
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("JOB COMPLETED");
}
}
}
主要功能
@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}
我正在使用基于注释的配置,并在下面的批处理包中使用gradle。
compile('org.springframework.boot:spring-boot-starter-batch')
如果需要更多信息,请告诉我。我找不到任何运行此常见用例的示例。
谢谢你的时间。
我知道这是一个老生常谈的问题,但我还是把这个答案发布给未来的用户。
在检查了你的代码后,我不能告诉你为什么会有这个问题,但是我可以建议你使用一个限定符注释,再加上像这样使用ThreadPoolTaskExec导师,看看它是否解决了你的问题。
您还可以查看本教程:异步Spring批处理作业处理以了解更多详细信息。它将帮助您异步配置spring批处理作业。这篇教程是我写的。
@Configuration
public class BatchConfig {
@Autowired
private JobRepository jobRepository;
@Bean
public TaskExecutor threadPoolTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setMaxPoolSize(12);
executor.setCorePoolSize(8);
executor.setQueueCapacity(15);
return executor;
}
@Bean
public JobLauncher asyncJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
return jobLauncher;
}
}
如果我看一下你的代码,我会发现几个错误。首先,您的自定义配置没有加载,因为如果加载了,那么对于同一接口的重复bean实例,注入将失败。
spring boot有很多魔力,但如果你不告诉他做一些组件扫描,就不会加载任何东西。
我看到的第二个问题是BatchConfig类:它既不扩展DefaultBatchConfigure,也不重写getJobLauncher(),所以即使引导魔法将加载所有内容,也会得到默认的。这是一个可以工作的配置,它符合@enableBackProcessing API文档
BatchConfig
@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {
@Override
@Bean
public JobLauncher getJobLauncher() {
try {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
} catch (Exception e) {
log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
return super.getJobLauncher();
}
}
}
主要功能
@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchTestApplication.class, args);
}
}
试试这个,在您的配置中,您需要使用@Bean(name=“myJobLauncher”)创建带有SimpleAsynctAskeExecutor的customJobLauncher,并且在控制器中的@Qualifier中也会使用同样的方法。
@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
在你的控制器里
@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;
我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中
我正在用异步JobLauncher在Spring Batch中配置一个(长时间运行的)作业,我有两个RESTendpoint: null 谢谢朱利奥
我需要从远程SFTP服务器下载一个文件,并使用spring batch处理它们。我已经实现了使用Spring集成下载文件的代码。但我无法从Spring集成组件启动Spring批处理作业。我有以下代码: 但这不起作用(上一个方法中的错误),因为找不到文件类型的bean。我不能把这两部分连在一起。如何连接集成和批处理?
是否可以配置Spring批处理管理员来启动主作业和从作业。我们有一个进程作为主节点和3-4个从节点。 Spring batch admin在单独的JVM进程中运行,但所有Spring批处理作业都使用相同的批处理数据库模式。
布尔沙赫布尔
我们需要执行从一个数据库到其他数据库的数据移动,并为此探索spring batch。我们应用程序的用户选择源数据源和目标数据源,以及需要为其移动数据的表列表。 在以下方面需要帮助: 构建作业所需的信息在运行时来自我们的web应用程序-包括数据源详细信息和表名列表。我们希望通过将这些详细信息发送到job builder模块来创建一个新作业,并使用JobLauncher启动它。我们如何编写这个job