当前位置: 首页 > 知识库问答 >
问题:

Spring批处理的集成测试因线程执行而失败

柯翔
2023-03-14

我有一个Spring批处理项目。目标是对Spring作业中的各个步骤执行集成测试。

我正在使用JobLauncherTestUtils来启动步骤。但是,当这个util启动步骤时,它会在一个单独的线程中运行它。一旦线程完成执行,应该会有一些值分配给jobExecution.getStepExections()

问题:由于某些原因,即使在线程完成执行之前,测试也会转到下一行List

问题:有没有什么优雅的方法可以等待步骤执行线程完成,然后继续测试中的下一行进行验证?

代码:

集成测试类:

@Slf4j
@SpringBatchTest
@SpringBootTest
@ActiveProfiles({"test", "master"})
@ContextConfiguration(classes = {InhouseClass3.class, InhouseClass1.class, InhouseClass2.class})
public class BatchJobIntegrationTest {

    private static final String Param1 = "someParam";

    @Autowired
    @Qualifier("hikariDatasource")
    DataSource hikariDatasource;

    @Autowired
    Job BatchJob;

    @Autowired
    JobLauncher jobLauncher;

    JobExecution jobExecution;

    @Autowired
    CreateDirectoryTasklet createDirectoryTasklet;

    JobParameters jobParameters;

    JobLauncherTestUtils jobLauncherTestUtils;

    @BeforeEach
    void setUp() {
        String startTimestamp = Timestamp.from(Instant.now()).toString();
        jobParameters = new JobParametersBuilder()
                .addString(Param1, startTimestamp)
                .toJobParameters();

        jobLauncherTestUtils = new JobLauncherTestUtils();
        jobLauncherTestUtils.setJob(BatchJob);
        jobLauncherTestUtils.setJobLauncher(jobLauncher);
    }


    @SneakyThrows
    @Test
    void TaskletToTest_Test() {
        jobExecution = jobLauncherTestUtils.launchStep("loadTaskletToTest", jobParameters);

        List<StepExecution> actualStepExecutions = new ArrayList<>(jobExecution.getStepExecutions());
        // ERROR: jobExecution.getStepExecutions() is NULL. 

        ExitStatus actualJobExitStatus = actualStepExecutions.get(0).getExitStatus();
        // ERROR: Index 0 out of bounds for length 0

        assertEquals("loadGdxClaims", actualStepExecutions.get(0).getStepName());
        assertEquals(ExitStatus.COMPLETED, actualJobExitStatus);
    }

    @SneakyThrows
    @Test
    // This is my workaround to make my above test run. 
    // I added a sleep for 2 seconds. But this doesn't look like an ideal way, coz what 
    // if the launchstep thread running the tasklet took more than 2 seconds? 
    void loadGDXClaimTaskletTest_Working_() {
        jobExecution = jobLauncherTestUtils.launchStep("createDirectory", jobParameters);
        boolean counter = true;
        while(counter) {
            if (jobExecution.getStepExecutions().size()!=0 ) {
                List<StepExecution> actualStepExecutions = new ArrayList<>(jobExecution.getStepExecutions());

                ExitStatus actualJobExitStatus = actualStepExecutions.get(0).getExitStatus();
                log.info("------- step executions : {}", actualStepExecutions);
                assertEquals("createDirectory", actualStepExecutions.get(0).getStepName());
                assertEquals(ExitStatus.COMPLETED, actualJobExitStatus);
                counter = false;
            } else {
                TimeUnit.SECONDS.sleep(2);
            }
        }
    }



}

要测试的Tasklet步骤:

@Slf4j
@Component
public class TaskletToTest implements Tasklet {

    private final InhouseService inhouseService;

    public TaskletToTest(InhouseService inhouseService) {
        this.inhouseService = inhouseService;
    }

    @Override
    public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws InterruptedException, IllegalJobNameException, JSchException, IOException {
        log.info("TaskletToTest before");
        inhouseService.retry();
        log.info("TaskletToTest after");
        return RepeatStatus.FINISHED;
    }
}

批处理作业,包括需要测试的步骤列表:

@Slf4j
@Profile("master")
@Configuration
public class MasterConfig extends DefaultBatchConfigurer {
    @Bean(name = "BatchJob")
    public Job remoteChunkingJob(TaskletStep someOtherTasklet,
                                 TaskletStep loadTaskletToTest,
                                 JobExecutionListener jobExecutionListener) {
        return this.jobBuilderFactory.get("extract gdx load")
                .incrementer(new RunIdIncrementer())     
                .listener(jobExecutionListener)
                .start(someOtherTasklet) 
                .next(loadTaskletToTest)
                .build();
    }

    @Bean
    TaskletStep loadTaskletToTest(TaskletToTest taskletToTest) {
        return this.stepBuilderFactory.get("loadTaskletToTest").tasklet(taskletToTest).build();
    }
}

共有1个答案

孔皓
2023-03-14

我正在使用JobLauncherTestUtils启动步骤。然而,当这个util启动这些步骤时,它会在一个单独的线程中运行它。

问题:由于某种原因,即使在线程完成执行之前,测试也会转到下一行

JobLauncherTestUtils使用JobLauncher启动作业和步骤。因此,根据您使用的JobLauncher实现,您可以在当前线程或单独的线程中运行作业/步骤。

您没有分享哪个JobLauncher在您的测试中是自动加载的,但是您似乎已经定义了一个基于异步TaskExecator实现的作业启动器。这就是为什么您的作业/步骤在后台执行:

// This returns immediately with an asynchrnous TaskExecutor
// However, with a synchronous TaskExectuor it will block waiting for the step to finish 
jobExecution = jobLauncherTestUtils.launchStep("loadTaskletToTest", jobParameters);

List<StepExecution> actualStepExecutions = new ArrayList<>(jobExecution.getStepExecutions());
        

因此,您需要检查测试中自动连接的是哪个JobLauncher(通常是带有同步或异步TaskExecutorSimpleJobLauncher)。

 类似资料:
  • 我正在寻找一些关于测试Spring批处理步骤和步骤执行的一般性意见和建议。 我的基本步骤是从api读入数据,处理实体对象,然后写入数据库。我已经测试了快乐之路,这一步成功地完成了。我现在想做的是在处理器阶段数据丢失时测试异常处理。我可以单独测试processor类,但我更愿意测试整个步骤,以确保在步骤/作业级别正确反映流程故障。 我已经阅读了spring批量测试指南,如果我是诚实的,我对它有点迷茫

  • 我是spring batch的新手。我已经使用多个线程从spring创建并成功执行了作业,它工作得很好,只是当程序执行完成时,程序流不会结束/停止。i、 即使main方法的最后一条语句被执行,程序也不会退出。我不确定它是否一直在等待线程完成,或者是什么。有人能给我一些建议吗?“下面是我的作业配置文件 下面是启动器代码 如上所述,代码在5个不同的线程中为任务“hello”运行,为任务“world”运

  • 我正在使用STS 2.81附带的Spring Batch模板和Manning的Spring Batch in Action中的示例创建一个Spring Batch作业。我可以毫无问题地执行块读取器和写入器,但我的代码跳过了处理器。我甚至尝试过在处理器中取消所有对象,但什么也没有,对象仍然设法被写入,就像处理器被忽略一样。我尝试在处理器中调用system.out.println,但没有在终端中打印出

  • 我正在尝试将BeanIO与spring Batch集成。使用BeanIO,我正在读取一个固定长度的流文件。我已经测试并验证了使用独立类读取平面文件的代码,它可以无缝地工作,但是当我试图将它与Spring Batch集成时,BeanIOFlatFileItemReader的doRead()方法没有被调用,而且我编写的RedemptionEventCustomProcessor是如何直接被调用的。 我

  • null 我更新了我的步骤并添加了一个ThreadPoolTaskExecutor,如下所示 在此之后,我的处理器将被多个线程调用,但使用相同的源数据。我还有什么需要做的吗?

  • 我正在尝试使用多个处理器类在处理器步骤中处理记录。这些类可以并行工作。目前我已经编写了一个多线程步骤,其中我 设置处理器类的输入和输出行 提交给遗嘱执行人服务 获取所有未来对象并收集最终输出