当前位置: 首页 > 知识库问答 >
问题:

如果开始运行同一个作业(Spring批处理),则运行作业将停止

夏侯渊
2023-03-14

我有点困惑,因为当通过HTTP请求启动Spring Batch作业的执行时,如果我在作业执行时收到另一个HTTP请求来启动相同的作业,但参数不同,则正在执行的作业停止未完成并开始处理新作业。

我开发了一个API REST来加载和处理Excel文件的内容。web服务公开了两个endpoint,一个用于加载、验证和存储数据库中Excel文件的内容,另一个用于开始处理存储在html" target="_blank">数据库中的记录。

>

  • POST /api/excel/upload此endpoint接收Excel文件。当接收到请求时,每个文件都会被分配一个唯一标识符并验证其内容。如果内容正确,它会将其插入到等待处理的临时表中。

    获取/api/Excel/process?id=x此endpoint接收要处理的文件的标识符。当收到请求时,一个Spring批处理作业开始处理临时表中的记录。

    • 控制器
    @PostMapping(produces = {APPLICATION_JSON_VALUE})
    public ResponseEntity<Page<ExcelLoad>> post(@RequestParam("file") MultipartFile multipartFile)
    {
        return super.getResponse().returnPage(service.upload(multipartFile));
    }
    
    @GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
    public DeferredResult<ResponseEntity<Void>> get(@RequestParam("id") Integer idCarga)
    {
        DeferredResult<ResponseEntity<Void>> response = new DeferredResult<>(1000L);
        response.onTimeout(() -> response.setResult(super.getResponse().returnVoid()));
    
        ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
    
        return response;
    }
    

    我使用DelferredResult在收到请求后向客户端发送响应,而不用等待作业完成

    • 服务
    public void startJob(int idCarga)
    {
        JobParameters params = new JobParametersBuilder()
                .addString("mainJob", String.valueOf(System.currentTimeMillis()))
                .addString("idCarga", String.valueOf(idCarga))
                .toJobParameters();
    
        try
        {
            jobLauncher.run(job, params);
        }
        catch (JobExecutionException e)
        {
            log.error("---ERROR: {}", e.getMessage());
        }
    }
    
    • 一批
    @Bean
    public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
    {
        return stepBuilderFactory.get("step")
                .<List<ExcelLoad>, Invoice>chunk(10)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .faultTolerant().skipPolicy(new ExceptionSkipPolicy())
                .listener(stepSkipListener)
                .build();
    }
    
    @Bean
    public Job mainJob(Step mainStep)
    {
        return jobBuilderFactory.get("mainJob")
                                .listener(mainJobExecutionListener)
                                .incrementer(new RunIdIncrementer())
                                .start(mainStep)
                                .build();
    }
    

    在执行一些测试时,我观察到以下行为:

    >

    • 记录处理文件1:3606(预期为3606)。
    • 记录处理文件2:1776(预期为1776)。在此处输入图像描述在此处输入图像描述

    如果我向endpoint/进程发出请求,请求第一个处理文件1,在它完成之前,我再次请求处理文件2:在这种情况下,并不是所有存储在临时表中的记录都被处理:

    • 记录处理文件1:1080(预计3606)
    • 记录处理文件2:1774(预计为1776)
  • 共有2个答案

    干照
    2023-03-14

    多亏了@Mahmoud Ben Hassine的帮助,我才得以解决这个问题。为了帮助实现,如果有人遇到这个问题,我将分享代码,在我的例子中,这些代码已经解决了这个问题:

    • 控制器
    @Autowired
    private JobLauncher jobLauncher;
    
    @Autowired
    private Job job;
    
    @GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
    public void get(@RequestParam("id") Integer idCarga) throws JobExecutionException
    {
        JobParameters params = new JobParametersBuilder()
                .addString("mainJob", String.valueOf(System.currentTimeMillis()))
                .addString("idCarga", String.valueOf(idCarga))
                .toJobParameters();
    
        jobLauncher.run(job, params);
    }
    
    • 批处理配置、作业和步骤
    @Configuration
    @EnableBatchProcessing
    public class BatchConfig extends DefaultBatchConfigurer
    {
        @Autowired
        private JobBuilderFactory jobBuilderFactory;
    
        @Autowired
        private StepBuilderFactory stepBuilderFactory;
    
        @Autowired
        private StepSkipListener stepSkipListener;
    
        @Autowired
        private MainJobExecutionListener mainJobExecutionListener;
    
        @Bean
        public TaskExecutor taskExecutor()
        {
            ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
            taskExecutor.setMaxPoolSize(10);
            taskExecutor.setThreadNamePrefix("batch-thread-");
    
            return taskExecutor;
        }
    
        @Bean
        public JobLauncher jobLauncher() throws Exception
        {
            SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
            jobLauncher.setJobRepository(getJobRepository());
            jobLauncher.setTaskExecutor(taskExecutor());
            jobLauncher.afterPropertiesSet();
    
            return jobLauncher;
        }
    
        @Bean
        public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
        {
            return stepBuilderFactory.get("step")
                    .<List<ExcelLoad>, Invoice>chunk(10)
                    .reader(reader)
                    .processor(processor)
                    .writer(writer)
                    .faultTolerant().skipPolicy(new ExceptionSkipPolicy())
                    .listener(stepSkipListener)
                    .build();
        }
    
        @Bean
        public Job mainJob(Step mainStep)
        {
            return jobBuilderFactory.get("mainJob")
                                    .listener(mainJobExecutionListener)
                                    .incrementer(new RunIdIncrementer())
                                    .start(mainStep)
                                    .build();
        }
    }
    

    如果在应用了这个代码之后,就像我遇到的那样,你在数据库中插入记录时也遇到了问题,你可以通过这个问题,我也把适用于我的代码放在这里。

    沈畅
    2023-03-14

    JobLauncher不会停止作业执行,它只会启动它们。Spring Batch提供的默认作业启动程序SimpleZoblancher,它将作业启动委托给TaskExecutor。现在,根据您使用的任务执行器实现以及如何配置它以启动并发任务,您可以看到不同的行为。例如,当启动新作业执行并将新任务提交给任务执行者时,如果所有工作人员都很忙,任务执行者可以决定拒绝此提交,或者将其放入等待队列,或者停止另一个任务并提交新任务。这些策略取决于几个参数(TaskExecutor实现、幕后使用的队列类型、RejectedExecutionHandler实现等)。

    在您的情况下,您似乎在使用以下内容:

    ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
    

    因此,您需要检查这个池在处理新任务提交方面的行为(我猜这就是停止工作的原因,但您需要确认这一点)。也就是说,我不明白你为什么需要这个。如果您的要求如下:

    我使用DelferredResult在收到请求后向客户端发送响应,而不用等待作业完成

    然后,可以在作业启动器中使用异步任务执行器实现(如线程池任务执行器),请参阅从Web容器中运行作业。

     类似资料:
    • 我有一个作业流,我希望以以下方式运行它: 作业流将从Job1开始。在Job1成功完成后,Job1将同时启动Job2和Job4。 Job2和Job4将并行运行。 在Job2成功完成后,Job2将启动Job3。 在Job4成功完成后,Job4将启动Job5。 下面是job1.xml和job1的作业启动器类的代码片段: job1.xml uijobLauncher.java “job2,Job3”对和“

    • 问题内容: 运行main方法时,将执行作业。这样我无法弄清楚如何控制作业的执行。例如,您如何安排作业,访问作业执行或设置作业参数的方式。 我试图注册自己的JobLauncher 但是当我尝试在主要方法中使用它时: 当加载上下文时,该作业再次执行,而当我尝试手动运行它时,我得到了。有没有办法防止自动作业执行? 问题答案: 通过设置可以防止作业执行 在application.properties中。或

    • 我按照这个示例使用Boot进行Spring批处理。 运行main方法时,作业将执行。这样我就不知道如何控制作业的执行了。例如如何排定作业、访问作业执行或设置作业参数。 我尝试注册自己的JobLauncher 但当我尝试在主法中使用时: 当加载上下文时,再次执行作业,并且尝试手动运行作业时得到。有没有办法防止自动执行作业?

    • 我在BatchScheduler中有多个计划作业,它在特定时间运行。简单的内置JobLauncher,这是同步的。在自然界中最初使用。现在,我想并行运行这些作业,这样没有作业可以等待其他作业完成。 我在不同的作业上尝试过@Async注释,但都不起作用。 然后,我尝试设置joblauncher.settaskexecutor(新的SimpleAsyncTaskExecutor())。但这并不奏效。

    • 我正在使用spring批处理读取CSV文件并使用controller触发器将其写入DB。在启动应用程序时,在我从浏览器url中点击之前,我会在启动时看到来自阅读器的打印语句。虽然它不为我的处理器或写入器打印它,它们是在单独的类中,我已经自动连线。是因为读者是豆子吗?

    • 我不知道如何使用调用Spring批处理中定义的作业,文档细节对我来说是不够的。 我遵循了Spring Batch官方指南,使用Java注释(例如)在Spring Batch中编写作业,因为我希望避免使用XML配置文件来描述作业、步骤等。 到目前为止我已经: 配置类(见下文),我使用AnnotaIon将定义、、、和(带有的所有内容放入其中。 具有方法的类,该方法具有并具有注释,以导入处理作业中的数据