当前位置: 首页 > 知识库问答 >
问题:

Spring批处理调度器:作业侦听器仅在作业第一次运行时工作

周志文
2023-03-14

我当时正在开发一个Spring批处理应用程序,使用java配置执行两个批处理作业。最近,我添加了一个Spring调度程序来调度我编写的一个作业。侦听器在作业第一次完成时被调用,但在下一次执行后不会被调用。以下是我的作业配置代码:

@Configuration
@EnableBatchProcessing
public class BatchConfiguration{

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public MongoTemplate mongoTemplate;

    @Autowired
    UnitsRepository unitsRepos;

    @Autowired
    UserRepository userRepository;

    @Autowired
    ElectraService electraService;

    /*@Autowired InfrastructureConfiguration infrastructureConfiguration;*/

    // tag::readerwriterprocessor[]
    @Bean
    @StepScope
    public MongoItemReader<UserBean> reader() {
        MongoItemReader<UserBean> reader = new MongoItemReader<UserBean>();
        reader.setTemplate(mongoTemplate);
        reader.setCollection("user");
        reader.setQuery("{ '_id': 'U3'}");
        reader.setSort(new HashMap<String,Direction>(){{put("_id", Direction.ASC);}});
        reader.setTargetType(UserBean.class);
        return reader;
    }

    @Bean
    public ExceedUsageProcessor processor() {
        return new ExceedUsageProcessor(unitsRepos,electraService);
    }

    @Bean
    public AnomalyProcessor anomalyProcessor() {
        return new AnomalyProcessor(unitsRepos);
    }
    @Bean
    @StepScope
    public MongoItemWriter<DayByDayUsage> writer() {
        MongoItemWriter<DayByDayUsage> writer = new MongoItemWriter<DayByDayUsage>();
        writer.setTemplate(mongoTemplate);
        writer.setCollection("usage");
        return writer;
    }
    // end::readerwriterprocessor[]

    // tag::listener[]

    @Bean
    @StepScope
    public MongoItemWriter<AnomalyBean> anomalyWriter() {
        MongoItemWriter<AnomalyBean> writer = new MongoItemWriter<AnomalyBean>();
        writer.setTemplate(mongoTemplate);
        writer.setCollection("anomaly");
        return writer;
    }

    @Bean
    public ExceedJobNotificationListener listener() {
        return new ExceedJobNotificationListener(mongoTemplate);
    }

    @Bean
    public AnomalyJobListener anomalyListener(){
        return new AnomalyJobListener(mongoTemplate,userRepository);

    }
    // end::listener[]

    // tag::jobstep[]
    @Bean
    public Job notifyUserJob() {
        return jobBuilderFactory.get("notifyUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .flow(step1())
                .end()
                .build();
    }

    @Bean
    public Job anomalyJob() {
        return jobBuilderFactory.get("anomalyJob")
                .incrementer(new RunIdIncrementer())
                .listener(anomalyListener())
                .flow(step2())
                .end()
                .build();
    }
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<UserBean, DayByDayUsage> chunk(50)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .taskExecutor(taskExecutor())
                .throttleLimit(10)
                .allowStartIfComplete(true)
                .build();
    }
    // end::jobstep[]

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .<UserBean, AnomalyBean> chunk(50)
                .reader(reader())
                .processor(anomalyProcessor())
                .writer(anomalyWriter())
                .taskExecutor(taskExecutor())
                .throttleLimit(10)
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        // TODO Auto-generated method stub
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.HSQL)
                .addScript("classpath:/org/springframework/batch/core/schema-hsqldb.sql")
                .build();
    }
}

下面是我的调度程序的代码:

@Component
public class AnomalyScheduler {
     private Job myImportJob;
        private JobLauncher jobLauncher;

        @Autowired
        public AnomalyScheduler(JobLauncher jobLauncher, @Qualifier("anomalyJob") Job myImportJob){
            this.myImportJob = myImportJob; 
            this.jobLauncher = jobLauncher;
       }

       @Scheduled(fixedDelay=60000)
       public void runJob(){
           try {
            jobLauncher.run(myImportJob, new JobParameters());
        } catch (JobExecutionAlreadyRunningException | JobRestartException
                | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
       }
}

我的听众如下:

    public class AnomalyJobListener extends JobExecutionListenerSupport {
    private PushNotification pushNotification = PushNotification
            .getPushNotificationInstance();

    @Autowired
    public AnomalyJobListener(MongoTemplate mongoTemplate,
            UserRepository userRepository) {
        List<AnomalyBean> anomalies = new ArrayList<AnomalyBean>(0);
        anomalies = mongoTemplate.findAll(AnomalyBean.class);
        int numAnomalies = anomalies.size();
        List<UserBean> admins = new ArrayList<UserBean>(0);
        admins = userRepository.userByType("admin");
        if (numAnomalies > 0) {
            for (UserBean admin : admins) {
                pushNotification.pushNotification(numAnomalies
                        + " anomalies detected ! Keep an eye on that.",
                        admin.getDeveiceId());
            }

        }
    }
}

以下是控制台输出:

2016-05-04 08:17:39.565  INFO 9348 --- [           main] com.electra.Application                  : Starting Application on all-PC with PID 9348 (F:\Electrck\ElectrackJobRepository\ElectrackJobs\bin started by all in F:\Electrck\ElectrackJobRepository\ElectrackJobs)
    2016-05-04 08:17:39.571  INFO 9348 --- [           main] com.electra.Application                  : No active profile set, falling back to default profiles: default
    2016-05-04 08:17:39.681  INFO 9348 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@20b2475a: startup date [Wed May 04 08:17:39 IST 2016]; root of context hierarchy
    2016-05-04 08:17:41.943  WARN 9348 --- [           main] o.s.c.a.ConfigurationClassEnhancer       : @Bean method ScopeConfiguration.stepScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
    2016-05-04 08:17:41.966  WARN 9348 --- [           main] o.s.c.a.ConfigurationClassEnhancer       : @Bean method ScopeConfiguration.jobScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
    2016-05-04 08:17:42.258  INFO 9348 --- [           main] o.s.j.d.e.EmbeddedDatabaseFactory        : Starting embedded database: url='jdbc:hsqldb:mem:testdb', username='sa'
    2016-05-04 08:17:42.646  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
    2016-05-04 08:17:42.658  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 12 ms.
    2016-05-04 08:18:01.793  INFO 9348 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 
    2016-05-04 08:18:01.812  INFO 9348 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService  'taskExecutor'
    push status [ messageId=0:1462330092323002%9b3f4867f9fd7ecd ]
    push status [ messageId=0:1462330095502779%9b3f4867f9fd7ecd ]
    2016-05-04 08:18:16.883  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
    2016-05-04 08:18:16.927  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 20 ms.
    2016-05-04 08:18:17.392  INFO 9348 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
    2016-05-04 08:18:17.413  INFO 9348 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
    2016-05-04 08:18:17.737  INFO 9348 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: []
    2016-05-04 08:18:17.766  INFO 9348 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: HSQL
    2016-05-04 08:18:18.032  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
    2016-05-04 08:18:18.147  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=notifyUserJob]] launched with the following parameters: [{run.id=1}]
    2016-05-04 08:18:18.187  INFO 9348 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
    2016-05-04 08:18:22.044  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
    2016-05-04 08:18:22.079  INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
    2016-05-04 08:18:28.990  INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl   : date Mon May 02 08:18:12 IST 1
    2016-05-04 08:18:28.991  INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl       : push new unit
    2016-05-04 08:18:32.581  INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl   : date Wed May 04 08:18:32 IST 2016
    2016-05-04 08:18:32.581  INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl       : push new unit
    2016-05-04 08:19:16.876  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=notifyUserJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
    2016-05-04 08:19:16.999  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{run.id=1}]
    2016-05-04 08:19:17.053  INFO 9348 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
    2016-05-04 08:19:17.491  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
    2016-05-04 08:19:52.399  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
    2016-05-04 08:19:52.401  INFO 9348 --- [           main] com.electra.Application                  : Started Application in 133.639 seconds (JVM running for 134.724)
    2016-05-04 08:20:21.066  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
    2016-05-04 08:20:21.288  INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
    2016-05-04 08:20:31.103  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

请告诉我我做错了什么,为什么听者没有被执行后续尝试。

共有1个答案

燕琨
2023-03-14

侦听器构造函数中只有代码。但不是在beforeJob()afterJob()或任何其他方法中。。。所以我猜听众被叫来了,但什么也没做。。。因此,您需要覆盖适当的侦听器方法。

 类似资料:
  • 我已经创建了Spring Batch(RESTReader、自定义处理器和自定义ItemWriter),我计划了它。Spring批工作良好。当监听器在每个计划的间隔内完成打印作业时,计划似乎可以工作,但似乎不读或写。 我的Sprint启动应用程序 BATCH_STEP_EXECUTION 控制台日志的一部分 BatchApplication-在27.638秒内启动BatchApplication(

  • Spring批处理作业与flatfileitemreader(从csv读取)、processor(更新adwords api提要详细信息,对于csv文件中的每个记录(大约有40条记录),这一步大约需要40秒)和正在更新DB中记录的定制writer一起使用。 web.xml

  • 我正在寻找最好的解决方案,以创建一个java web应用程序,以生成Excel/PDF格式的报告。类似于Google Adwords的东西,用户可以创建日程报告,并在以后生成报告时下载。 我正在考虑开发一个java应用程序,在其中用户记录,选择一个预先定义的报告,并提供输入参数(如报告日期等),这个请求将被排队或保存为Quarts作业(首选持久队列)。一个作业将监视队列/作业并执行该作业,生成报告

  • 我想一个接一个地运行两个工作。是的,我确实在网上搜索过,但他们有解决方案,包括在第一份工作中增加第二份工作。但我有不同的要求。我会在第一批作业执行完成后得到一个通知,而第二批作业只有在收到这个通知后才会运行。是否有可能在Spring Boot中一个接一个地运行两个作业。请救命!

  • 在一个项目中,我们必须运行一个定期开始的作业(现在QA env上每5分钟开始一次),该作业处理40K用户的一些任务。我们决定使用Spring Batch,因为它非常适合,并且几乎用默认配置实现了它(例如,它使用)。好的,有一个工作由一个步骤组成: 开箱即用 在内存中执行轻量级计算的自定义 自定义,它通过多个JPQL和本机查询将数据保存到同一个PostgreSQL db。 作业本身是用调度的,并且每