当前位置: 首页 > 知识库问答 >
问题:

Spring内存批处理(MapJobRepositoryFactoryBean)清除旧作业,而不是正在运行的作业

陆飞龙
2023-03-14

我使用spring批处理来计划批处理作业,即内存中的批处理作业,作为项目特定的需求(即不在生产中,它只是用于测试环境),下面是我的配置类

// Batch Scheulder class
    package org.learning.scheduler
    import org.springframework.batch.core.explore.JobExplorer;
    import org.springframework.batch.core.explore.support.SimpleJobExplorer;
    import org.springframework.batch.core.launch.support.SimpleJobLauncher;
    import org.springframework.batch.core.repository.JobRepository;
    import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
    import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.core.task.SimpleAsyncTaskExecutor;
    import org.springframework.scheduling.annotation.EnableScheduling;

    /**
     * Job Inmemory Config
     * 
     */
    @EnableScheduling
    @Configuration
    public class InmemoryJobConfig  {


        @Bean
        public ResourcelessTransactionManager transactionManager() {
            return new ResourcelessTransactionManager();
        }

        @Bean
        public MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean(ResourcelessTransactionManager resourcelessTransactionManager) throws Exception {
            MapJobRepositoryFactoryBean factoryBean = new MapJobRepositoryFactoryBean(resourcelessTransactionManager);
            factoryBean.afterPropertiesSet();
            return factoryBean;
        }

        @Bean
        public JobRepository jobRepository(MapJobRepositoryFactoryBean factoryBean) throws Exception{
            return (JobRepository) factoryBean.getObject();
        }
        @Bean
        public JobExplorer jobExplorer(MapJobRepositoryFactoryBean repositoryFactory) {
            return new SimpleJobExplorer(repositoryFactory.getJobInstanceDao(), repositoryFactory.getJobExecutionDao(),
                    repositoryFactory.getStepExecutionDao(), repositoryFactory.getExecutionContextDao());
        }

        @Bean
        public SimpleJobLauncher jobLauncher(JobRepository jobRepository) throws Exception {
            SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
            simpleJobLauncher.setJobRepository(jobRepository);
            simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());

            return simpleJobLauncher;
        }
    }

//Job ConfiguratinClass

/**
 * Batch Entry Point for Scheduler for all Jobs
 *
 * 
 */
@Import({InmemoryJobConfig.class})
@EnableBatchProcessing
@Configuration
@Slf4j
public class BatchScheduler {


    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Autowired
    private SimpleJobLauncher jobLauncher;


    @Autowired
    private JobExplorer jobExplorer;

    @Autowired
    private MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean;


    @Bean
    public ItemReader<UserDTO> userReader() {
        return new UserReader();

    }

    @Bean
    public ItemWriter<User> userWriter() {
        return new UserWriter();

    }

    @Bean
    public ItemReader<OrderDTO> orderReader() {
        return new OrderReader();
    }

    @Bean
    public ItemWriter<Order> orderWriter() {
        return new OrderWriter();
    }

    @Bean
    public Step userStep(ItemReader<UserDTO> reader, ItemWriter<User> writer) {
        return steps.get("userStep")
                .<UserDTO, User>chunk(20)
                .reader(userReader())
                .processor(new UserProcessor())
                .writer(userWriter())
                .build();
    }

    @Bean
    public Step orderStep(ItemReader<OrderDTO> reader, ItemWriter<Order> writer) {
        return steps.get("orderStep")
                .<OrderDTO, Order>chunk(20)
                .reader(orderReader())
                .processor(new OrderProcessor())
                .writer(orderWriter())
                .build();
    }


    @Bean
    public Job userJob() {
        return jobs.get("userJob").incrementer(new RunIdIncrementer()).start(userStep(userReader(), userWriter())).build();
    }

    @Bean
    public Job orderJob() {
        return jobs.get("orderJob").incrementer(new RunIdIncrementer()).start(orderStep(orderReader(), orderWriter())).build();
    }


    @Scheduled(cron = "0 0/15 * * *  ?")
    public void scheduleUserJob() throws JobExecutionException {
        Set<JobExecution> runningJob = jobExplorer.findRunningJobExecutions("userJob");

        if (!runningJob.isEmpty()) {
            throw new JobExecutionException(" User Job  is already in Start State  ");
        }

        JobParameters userParam =
                new JobParametersBuilder().addLong("date", System.currentTimeMillis())
                        .toJobParameters();
        jobLauncher.run(userJob(), userParam);

    }

    @Scheduled(cron = "0 0/15 * * *  ?")
    public void scheduleOrderJob() throws JobExecutionException {
        Set<JobExecution> runningJob = jobExplorer.findRunningJobExecutions("orderJob");

        if (!runningJob.isEmpty()) {
            throw new JobExecutionException(" Order Job  is already in Start State  ");
        }

        JobParameters orderParam =
                new JobParametersBuilder().addLong("date", System.currentTimeMillis())
                        .toJobParameters();
        jobLauncher.run(orderJob(), orderParam);

    }

    @Scheduled(cron = "0 0/30 * * *  ?")
    public void scheduleCleanupMemoryJob() throws BatchException {
        Set<JobExecution> orderRunningJob = jobExplorer.findRunningJobExecutions("orderJob");
        Set<JobExecution> userRunningJob = jobExplorer.findRunningJobExecutions("userJob");
        if (!orderRunningJob.isEmpty() || !userRunningJob.isEmpty()) {
            throw new BatchException(" Order/user Job  is running state , cleanup job is aborted  ");
        }

        mapJobRepositoryFactoryBean.clear();

    }
}

我每0/15分钟安排两个作业,这将执行一些业务逻辑,我还安排了内存清理作业,仅当这两个作业中的任何一个未处于运行状态时,才从“mapJobRepositoryFactoryBean”bean清理内存中的作业数据。

我想建议找到如何删除已经执行的旧作业的最佳方法,如果其中任何作业处于运行状态,上述方法不会删除旧作业的详细信息。

或者spring批处理中是否有任何API可以在作业执行后从内存中清除特定的作业详细信息。?在记忆中被JobId清除

注意:我只想使用MapJobRepositoryFactoryBean,而不是持久数据库或任何嵌入式数据库(H2)

共有1个答案

闻人举
2023-03-14

MapJobRepository提供了一个clear()方法,可以清除基于地图的作业存储库中的所有数据,但我看不到任何明显的方法来删除特定作业的元数据

我只想使用MapJobRepositoryFactoryBean,而不是持久数据库或任何嵌入式数据库(H2)

我真的建议使用基于JDBC的作业存储库和内存中的数据库。这种方法更好,因为它允许您对内存中的数据库运行查询,并删除特定作业的数据。

 类似资料:
  • 我试图检查是否有任何正在运行的作业实例。 但是,当我们有未正确完成的旧执行时,上述代码将不起作用。在这种情况下,作业执行的大小大于 1。

  • 我正在使用spring批处理读取CSV文件并使用controller触发器将其写入DB。在启动应用程序时,在我从浏览器url中点击之前,我会在启动时看到来自阅读器的打印语句。虽然它不为我的处理器或写入器打印它,它们是在单独的类中,我已经自动连线。是因为读者是豆子吗?

  • Spring批处理作业与flatfileitemreader(从csv读取)、processor(更新adwords api提要详细信息,对于csv文件中的每个记录(大约有40条记录),这一步大约需要40秒)和正在更新DB中记录的定制writer一起使用。 web.xml

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 问题内容: 运行main方法时,将执行作业。这样我无法弄清楚如何控制作业的执行。例如,您如何安排作业,访问作业执行或设置作业参数的方式。 我试图注册自己的JobLauncher 但是当我尝试在主要方法中使用它时: 当加载上下文时,该作业再次执行,而当我尝试手动运行它时,我得到了。有没有办法防止自动作业执行? 问题答案: 通过设置可以防止作业执行 在application.properties中。或

  • 我按照这个示例使用Boot进行Spring批处理。 运行main方法时,作业将执行。这样我就不知道如何控制作业的执行了。例如如何排定作业、访问作业执行或设置作业参数。 我尝试注册自己的JobLauncher 但当我尝试在主法中使用时: 当加载上下文时,再次执行作业,并且尝试手动运行作业时得到。有没有办法防止自动执行作业?