当前位置: 首页 > 知识库问答 >
问题:

Spring Batch从相同的执行和步骤重新启动未完成的作业

冉锋
2023-03-14

我使用以下逻辑重新启动Spring批处理未完成(例如,在应用程序异常终止后)作业:

public void restartUncompletedJobs() {

    LOGGER.info("Restarting uncompleted jobs");

    try {
        jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

        List<String> jobs = jobExplorer.getJobNames();
        for (String job : jobs) {
            Set<JobExecution> runningJobs = jobExplorer.findRunningJobExecutions(job);

            for (JobExecution runningJob : runningJobs) {
                runningJob.setStatus(BatchStatus.FAILED);
                runningJob.setEndTime(new Date());
                jobRepository.update(runningJob);
                jobOperator.restart(runningJob.getId());
                LOGGER.info("Job restarted: " + runningJob);
            }
        }
    } catch (Exception e) {
        LOGGER.error(e.getMessage(), e);
    }
}

这很好,但有一个副作用——它不会重新启动失败的作业执行,而是创建一个新的执行实例。如何更改这个逻辑,以便从失败的步骤重新启动失败的执行,而不创建新的执行?

更新

当我尝试以下代码时:

public void restartUncompletedJobs() {
try {
    jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

    List<String> jobs = jobExplorer.getJobNames();
    for (String job : jobs) {

    Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job);

    for (JobExecution jobExecution : jobExecutions) {
        jobOperator.restart(jobExecution.getId());
    }
    }
} catch (Exception e) {
    LOGGER.error(e.getMessage(), e);
}
}

它失败,但出现以下异常:

2018-07-30 06:50:47.090 ERROR 1588 --- [           main] c.v.p.d.service.batch.BatchServiceImpl   : Illegal state (only happens on a race condition): job execution already running with name=documetPipelineJob and parameters={ID=826407fa-d3bc-481a-8acb-b9643b849035, inputDir=/home/public/images, STORAGE_TYPE=LOCAL}

org.springframework.batch.core.UnexpectedJobExecutionException: Illegal state (only happens on a race condition): job execution already running with name=documetPipelineJob and parameters={ID=826407fa-d3bc-481a-8acb-b9643b849035, inputDir=/home/public/images, STORAGE_TYPE=LOCAL}
    at org.springframework.batch.core.launch.support.SimpleJobOperator.restart(SimpleJobOperator.java:283) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobOperator$$FastClassBySpringCGLIB$$44ee6049.invoke(<generated>) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) [spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:684) [spring-aop-5.0.6.RELEASE.jar!/:5.0.6.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobOperator$$EnhancerBySpringCGLIB$$7659d4c.restart(<generated>) ~[spring-batch-core-4.0.1.RELEASE.jar!/:4.0.1.RELEASE]
    at com.example.pipeline.domain.service.batch.BatchServiceImpl.restartUncompletedJobs(BatchServiceImpl.java:143) ~[domain-0.0.1.jar!/:0.0.1]

以下代码在jobstore数据库中创建新的执行:

public void restartUncompletedJobs() {
try {
    jobRegistry.register(new ReferenceJobFactory(documetPipelineJob));

    List<String> jobs = jobExplorer.getJobNames();
    for (String job : jobs) {

    Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(job);

    for (JobExecution jobExecution : jobExecutions) {

        jobExecution.setStatus(BatchStatus.STOPPED);
        jobExecution.setEndTime(new Date());
        jobRepository.update(jobExecution);

        Long jobExecutionId = jobExecution.getId();
        jobOperator.restart(jobExecutionId);
    }
    }
} catch (Exception e) {
    LOGGER.error(e.getMessage(), e);
}

}

问题是——在应用程序重启后,如何在不创建新执行的情况下继续运行旧的未完成执行?

共有1个答案

齐雅畅
2023-03-14

TL;DR:Spring批处理将始终创建新的作业执行,并且不会重用以前失败的作业执行来继续执行。

详细回答:首先,你需要理解Spring Batch中三个相似但不同的概念:作业、作业实例和作业执行

我总是用这个例子:

  • 作业:日终批
  • 工作实例:2018-01-01的结束日批处理
  • 作业执行:2018-01-01的日终批次,执行#1

在高层,这就是Spring Batch的恢复工作原理

假设您的第一次执行在步骤3中失败。您可以使用相同的参数(2018-01-01)提交相同的作业(日终批次)。Spring Batch将尝试查找提交的作业实例(2018-01-01的日终批)的最后一次作业执行(2018-01-01的日终批,执行#1),并发现之前在步骤3中失败。然后,Spring Batch将创建一个新的执行,[2018-01-01的日终批,执行#2],并从步骤3开始执行。

因此,根据设计,Spring试图恢复的是一个以前失败的作业实例(而不是作业执行)。当您重新运行上一次失败的执行时,Spring batch将不会重用执行。

 类似资料:
  • 使用Spring Batch 3.0.4.Release。 我将作业配置为使用分区步骤。从机步骤使用块大小1。任务执行器中有六个线程。我使用从六到数百的各种网格大小来运行这个测试。我的网格大小是从StepExecutions的数量,我希望==我的分区器创建的ExecutionContexts的数量。 下面是Java配置代码:

  • 我正在使用activiti 6为我们的一个项目设计业务流程。这个过程非常简单,它由“用户任务”或“服务任务”的数量以及基于角色的分配组成。然而,我们的客户希望管理员用户能够在任何时间点重新运行之前的任何“用户任务”或“服务任务”。 例如,以下是我的流程: 开始- 客户端希望在流程执行期间的任何时间点,管理员用户都应该能够将工作流执行从:例如状态“User-Task8”更改为Service-Task

  • 我试图从步骤(实现接口Tasklet的类的execute方法)内部启动作业。 显然我收到了例外 Java语言lang.IllegalStateException:在JobRepository中检测到现有事务 如何使Spring批处理步骤不是事务性的? 有人能解决我从一步内启动工作的主要需求吗? 提前感谢您的帮助!

  • 我有一个单一的Kafka消费者,它连接到一个有3个分区的主题。一旦我从Kafka那里得到一张唱片,我就想捕捉偏移量和分区。在重新启动时,我希望从上次读取的偏移量恢复使用者的位置 摘自Kafka文档: 每个记录都有自己的偏移量,因此要管理自己的偏移量,只需执行以下操作: 配置enable.auto.commit=false 下面是我的示例代码: 这是正确的做法吗?有没有更好的办法?

  • 启动登录时,Keycloak在请求中发送一个< code>relaystate参数。但是,在成功登录后,pingbean不会返回此< code>relaystate。 奇怪的是,如果我向平联邦请求URL添加参数,它会将此参数的值返回为。我错过了什么?

  • 我有一个spring批处理工作,比如说5个步骤(