当前位置: 首页 > 知识库问答 >
问题:

如何异步启动Spring批处理作业

蒯华彩
2023-03-14

我遵循了spring批处理文档,无法异步运行我的作业。

因此,我从一个web容器运行该作业,该作业将通过RESTendpoint触发。

我想让JobInstance ID在完成整个作业之前传递它作为响应。因此,他们可以稍后使用JobInstance ID检查作业的状态,而不是等待。但我没能让它工作。下面是我尝试过的示例代码。请让我知道我错过了什么或错了什么。

BatchConfig创建异步JobLauncher

@Configuration
public class BatchConfig {

    @Autowired
    JobRepository jobRepository;


    @Bean
    public JobLauncher simpleJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}

控制器

@Autowired
JobLauncher jobLauncher;

@RequestMapping(value="/trigger-job", method = RequestMethod.GET)
public Long workHard() throws Exception {
    JobParameters jobParameters = new JobParametersBuilder().
            addLong("time", System.currentTimeMillis())
            .toJobParameters();
    JobExecution jobExecution = jobLauncher.run(batchComponent.customJob("paramhere"), jobParameters);
    System.out.println(jobExecution.getJobInstance().getInstanceId());
    System.out.println("OK RESPONSE");
    return jobExecution.getJobInstance().getInstanceId();
}

以及作为组件的JobBuilder

@Component
public class BatchComponent {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    public Job customJob(String someParam) throws Exception {
        return jobBuilderFactory.get("personProcessor")
                .incrementer(new RunIdIncrementer()).listener(listener())
                .flow(personPorcessStep(someParam)).end().build();
    }


    private Step personPorcessStep(String someParam) throws Exception {
        return stepBuilderFactory.get("personProcessStep").<PersonInput, PersonOutput>chunk(1)
                .reader(new PersonReader(someParam)).faultTolerant().
                        skipPolicy(new DataDuplicateSkipper()).processor(new PersonProcessor())
                .writer(new PersonWriter()).build();
    }


    private JobExecutionListener listener() {
        return new PersonJobCompletionListener();
    }

    private class PersonInput {
        String firstName;

        public PersonInput(String firstName) {
            this.firstName = firstName;
        }

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    private class PersonOutput {
        String firstName;

        public String getFirstName() {
            return firstName;
        }

        public void setFirstName(String firstName) {
            this.firstName = firstName;
        }
    }

    public class PersonReader implements ItemReader<PersonInput> {
        private List<PersonInput> items;
        private int count = 0;

        public PersonReader(String someParam) throws InterruptedException {
            Thread.sleep(10000L); //to simulate processing
            //manipulate and provide data in the read method
            //just for testing i have given some dummy example
            items = new ArrayList<PersonInput>();
            PersonInput pi = new PersonInput("john");
            items.add(pi);
        }

        @Override
        public PersonInput read() {
            if (count < items.size()) {
                return items.get(count++);
            }
            return null;
        }
    }


    public class DataDuplicateSkipper implements SkipPolicy {

        @Override
        public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
            if (exception instanceof DataIntegrityViolationException) {
                return true;
            }
            return true;
        }
    }


    private class PersonProcessor implements ItemProcessor<PersonInput, PersonOutput> {

        @Override
        public PersonOutput process(PersonInput item) throws Exception {
            return null;
        }
    }

    private class PersonWriter implements org.springframework.batch.item.ItemWriter<PersonOutput> {
        @Override
        public void write(List<? extends PersonOutput> results) throws Exception {
            return;
        }
    }

    private class PersonJobCompletionListener implements JobExecutionListener {
        public PersonJobCompletionListener() {
        }

        @Override
        public void beforeJob(JobExecution jobExecution) {

        }

        @Override
        public void afterJob(JobExecution jobExecution) {
            System.out.println("JOB COMPLETED");
        }
    }
}

主要功能

@SpringBootApplication
@EnableBatchProcessing
@EnableScheduling
@EnableAsync
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}

我正在使用基于注释的配置,并在下面的批处理包中使用gradle。

compile('org.springframework.boot:spring-boot-starter-batch')

如果需要更多信息,请告诉我。我找不到任何运行此常见用例的示例。

谢谢你的时间。

共有3个答案

闽哲
2023-03-14

我知道这是一个老生常谈的问题,但我还是把这个答案发布给未来的用户。

在检查了你的代码后,我不能告诉你为什么会有这个问题,但是我可以建议你使用一个限定符注释,再加上像这样使用ThreadPoolTaskExec导师,看看它是否解决了你的问题。

您还可以查看本教程:异步Spring批处理作业处理以了解更多详细信息。它将帮助您异步配置spring批处理作业。这篇教程是我写的。

@Configuration
public class BatchConfig {

 @Autowired
 private JobRepository jobRepository;

 @Bean
 public TaskExecutor threadPoolTaskExecutor(){

  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setMaxPoolSize(12);
        executor.setCorePoolSize(8);
        executor.setQueueCapacity(15);

   return executor;
 }

 @Bean
    public JobLauncher asyncJobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();

        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(threadPoolTaskExecutor());
        return jobLauncher;
 }
}
齐昊焱
2023-03-14

如果我看一下你的代码,我会发现几个错误。首先,您的自定义配置没有加载,因为如果加载了,那么对于同一接口的重复bean实例,注入将失败。

spring boot有很多魔力,但如果你不告诉他做一些组件扫描,就不会加载任何东西。

我看到的第二个问题是BatchConfig类:它既不扩展DefaultBatchConfigure,也不重写getJobLauncher(),所以即使引导魔法将加载所有内容,也会得到默认的。这是一个可以工作的配置,它符合@enableBackProcessing API文档

BatchConfig

@Configuration
@EnableBatchProcessing(modular = true)
@Slf4j
public class BatchConfig extends DefaultBatchConfigurer {

  @Override
  @Bean
  public JobLauncher getJobLauncher() {
    try {
      SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
      jobLauncher.setJobRepository(getJobRepository());
      jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
      jobLauncher.afterPropertiesSet();
      return jobLauncher;

    } catch (Exception e) {
      log.error("Can't load SimpleJobLauncher with SimpleAsyncTaskExecutor: {} fallback on default", e);
      return super.getJobLauncher();
    }
  }
}

主要功能

@SpringBootApplication
@EnableScheduling
@EnableAsync
@ComponentScan(basePackageClasses = {BatchConfig.class})
public class SpringBatchTestApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchTestApplication.class, args);
    }
}
公西良骏
2023-03-14

试试这个,在您的配置中,您需要使用@Bean(name=“myJobLauncher”)创建带有SimpleAsynctAskeExecutor的customJobLauncher,并且在控制器中的@Qualifier中也会使用同样的方法。

@Bean(name = "myJobLauncher")
public JobLauncher simpleJobLauncher() throws Exception {
    SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    jobLauncher.afterPropertiesSet();
    return jobLauncher;
}

在你的控制器里

@Autowired
@Qualifier("myJobLauncher")
private JobLauncher jobLauncher;
 类似资料:
  • 我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中

  • 我正在用异步JobLauncher在Spring Batch中配置一个(长时间运行的)作业,我有两个RESTendpoint: null 谢谢朱利奥

  • 我需要从远程SFTP服务器下载一个文件,并使用spring batch处理它们。我已经实现了使用Spring集成下载文件的代码。但我无法从Spring集成组件启动Spring批处理作业。我有以下代码: 但这不起作用(上一个方法中的错误),因为找不到文件类型的bean。我不能把这两部分连在一起。如何连接集成和批处理?

  • 是否可以配置Spring批处理管理员来启动主作业和从作业。我们有一个进程作为主节点和3-4个从节点。 Spring batch admin在单独的JVM进程中运行,但所有Spring批处理作业都使用相同的批处理数据库模式。

  • 我们需要执行从一个数据库到其他数据库的数据移动,并为此探索spring batch。我们应用程序的用户选择源数据源和目标数据源,以及需要为其移动数据的表列表。 在以下方面需要帮助: 构建作业所需的信息在运行时来自我们的web应用程序-包括数据源详细信息和表名列表。我们希望通过将这些详细信息发送到job builder模块来创建一个新作业,并使用JobLauncher启动它。我们如何编写这个job