当前位置: 首页 > 知识库问答 >
问题:

初始化后,Spring批处理或运行读写器函数

百里飞捷
2023-03-14

我运行了一个spring批处理,它在开始和结束时运行read()函数,从此不再运行。我还有JobExecutionListenerSupport,它有beforeJobafterJob它运行这个方法,但不运行read and writer函数。

批处理配置

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public ListItemReader<Blog> reader() {
        System.out.println("reader");
        return new ListItemReader<Blog>(new ArrayList<Blog>());
    }

    @Bean
    public JdbcBatchItemWriter<Blog> writer(DataSource dataSource) {
        return new JdbcBatchItemWriterBuilder<Blog>()
                .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Blog>())
                .sql("INSERT INTO blog (param, blog_url, blog_title) VALUES (:identity.param,:identity.url, :blogTitle)")
                .dataSource(dataSource).build();
    }

    @Bean
    public Job importCrawlingBatch(JobCompletionNotificationListener listener, Step step1) {
        return jobBuilderFactory.get("importCrawlingBatch").incrementer(new RunIdIncrementer()).listener(listener)
                .flow(step1).end().build();
    }

    @Bean
    public Step step1(JdbcBatchItemWriter<Blog> writer)
        
        return stepBuilderFactory.get("step1")
                .<Blog, Blog>chunk(10)
                .reader(reader())
                .writer(writer)
                .faultTolerant()
                .skip(DuplicateKeyException.class).skipPolicy(new ItemSkipPolicy())
                .allowStartIfComplete(true).build();
    }
}

我怎么跑了好几次

@Component
@AllArgsConstructor
@EnableScheduling
public class ScheduledJobConfiguration {
    private final JobLauncher jobLauncher;

    private final Job ourJob;

    @Scheduled(cron = "0 */1 * * * *")
    public void startBatchJob() throws Exception {
        JobParameters jobParameters = new JobParametersBuilder().addString("time", new Date().toString())
                .toJobParameters();

        jobLauncher.run(ourJob, jobParameters);
    }

}

和我的JobExecutionListenerSupport

@Component
public class JobCompletionNotificationListener extends JobExecutionListenerSupport {

    @Override
    public void beforeJob(JobExecution jobExecution) {
        super.beforeJob(jobExecution);

        System.out.println(">>> CRAWLING STARTED <<<");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        if (jobExecution.getStatus() == BatchStatus.COMPLETED) {
            System.out.println(">>> CRAWLING FINISHED <<<");
        }
    }

}

控制台日志:

 .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.4.4)

23:46:02 INFO  c.b.a.Application:55 - Starting Application using Java 15 on katesel with PID 6081 (/home/atesel/Projects/PostgreSQLBlogCrawling/Workspace/blogcrawling/target/classes started by atesel in /home/atesel/Projects/PostgreSQLBlogCrawling/Workspace/blogcrawling)
23:46:02 INFO  c.b.a.Application:662 - No active profile set, falling back to default profiles: default
23:46:03 INFO  o.s.d.r.c.RepositoryConfigurationDelegate:128 - Bootstrapping Spring Data JPA repositories in DEFAULT mode.
23:46:03 INFO  o.s.d.r.c.RepositoryConfigurationDelegate:188 - Finished Spring Data repository scanning in 31 ms. Found 2 JPA repository interfaces.
23:46:03 INFO  o.s.b.w.e.t.TomcatWebServer:108 - Tomcat initialized with port(s): 8080 (http)
23:46:03 INFO  o.a.c.c.StandardService:173 - Starting service [Tomcat]
23:46:03 INFO  o.a.c.c.StandardEngine:173 - Starting Servlet engine: [Apache Tomcat/9.0.44]
23:46:03 INFO  o.a.c.c.C.[.[.[/]:173 - Initializing Spring embedded WebApplicationContext
23:46:03 INFO  o.s.b.w.s.c.ServletWebServerApplicationContext:289 - Root WebApplicationContext: initialization completed in 1074 ms
23:46:03 INFO  c.z.h.HikariDataSource:110 - HikariPool-1 - Starting...
23:46:04 INFO  c.z.h.HikariDataSource:123 - HikariPool-1 - Start completed.
23:46:04 INFO  o.h.j.i.u.LogHelper:31 - HHH000204: Processing PersistenceUnitInfo [name: default]
23:46:04 INFO  o.h.Version:44 - HHH000412: Hibernate ORM core version 5.4.29.Final
23:46:04 INFO  o.h.a.c.Version:56 - HCANN000001: Hibernate Commons Annotations {5.1.2.Final}
23:46:04 INFO  o.h.d.Dialect:175 - HHH000400: Using dialect: org.hibernate.dialect.PostgreSQL10Dialect
23:46:04 INFO  o.h.e.t.j.p.i.JtaPlatformInitiator:52 - HHH000490: Using JtaPlatform implementation: [org.hibernate.engine.transaction.jta.platform.internal.NoJtaPlatform]
23:46:04 INFO  o.s.o.j.LocalContainerEntityManagerFactoryBean:437 - Initialized JPA EntityManagerFactory for persistence unit 'default'
Writer
step 1
Reader
23:46:04 INFO  o.s.s.c.ThreadPoolTaskScheduler:181 - Initializing ExecutorService
23:46:04 INFO  o.s.s.c.ThreadPoolTaskScheduler:181 - Initializing ExecutorService 'poolScheduler'
23:46:04 WARN  o.s.b.a.o.j.JpaBaseConfiguration$JpaWebConfiguration:221 - spring.jpa.open-in-view is enabled by default. Therefore, database queries may be performed during view rendering. Explicitly configure spring.jpa.open-in-view to disable this warning
23:46:05 INFO  o.s.s.c.ThreadPoolTaskExecutor:181 - Initializing ExecutorService 'applicationTaskExecutor'
23:46:05 WARN  o.s.b.a.b.JpaBatchConfigurer:57 - JPA does not support custom isolation levels, so locks may not be taken when launching Jobs
23:46:05 INFO  o.s.b.c.r.s.JobRepositoryFactoryBean:185 - No database type set, using meta data indicating: POSTGRES
23:46:05 INFO  o.s.b.c.l.s.SimpleJobLauncher:215 - No TaskExecutor has been set, defaulting to synchronous executor.
23:46:05 INFO  o.s.b.w.e.t.TomcatWebServer:220 - Tomcat started on port(s): 8080 (http) with context path ''
23:46:05 INFO  o.s.s.c.ThreadPoolTaskScheduler:181 - Initializing ExecutorService
Hibernate: 
    select
        cronconfig0_.id as id1_1_0_,
        cronconfig0_.cron_exp as cron_exp2_1_0_ 
    from
        cron_conf cronconfig0_ 
    where
        cronconfig0_.id=?
23:46:05 INFO  c.b.a.Application:61 - Started Application in 3.271 seconds (JVM running for 3.512)
Hibernate: 
    select
        cronconfig0_.id as id1_1_0_,
        cronconfig0_.cron_exp as cron_exp2_1_0_ 
    from
        cron_conf cronconfig0_ 
    where
        cronconfig0_.id=?
23:47:00 INFO  o.s.b.c.l.s.SimpleJobLauncher:146 - Job: [FlowJob: [name=importCrawlingBatch]] launched with the following parameters: [{time=Wed Mar 09 23:47:00 TRT 2022}]
>>> BATCH CRAWLING STARTED <<<
23:47:00 INFO  o.s.b.c.j.SimpleStepHandler:149 - Executing step: [step1]
23:47:00 INFO  o.s.b.c.s.AbstractStep:273 - Step: [step1] executed in 16ms
>>> BATCH  CRAWLING FINISHED <<<

从运行中可以看出,没有调用读和写函数。它们只在初始化期间调用一次,再也不会调用了。

共有1个答案

越福
2023-03-14

这是因为我没有为阅读器和写入器函数添加@StepScop属性。如果您想了解更多添加@StepScop时会发生什么

@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Bean
@StepScope
public ListItemReader<Blog> reader() {
    System.out.println("reader");
    return new ListItemReader<Blog>(new ArrayList<Blog>());
}

@Bean
@StepScope
public JdbcBatchItemWriter<Blog> writer(DataSource dataSource) {
    return new JdbcBatchItemWriterBuilder<Blog>()
            .itemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Blog>())
            .sql("INSERT INTO blog (param, blog_url, blog_title) VALUES (:identity.param,:identity.url, :blogTitle)")
            .dataSource(dataSource).build();
}

@Bean
public Job importCrawlingBatch(JobCompletionNotificationListener listener, Step step1) {
    return jobBuilderFactory.get("importCrawlingBatch").incrementer(new RunIdIncrementer()).listener(listener)
            .flow(step1).end().build();
}

@Bean
public Step step1(JdbcBatchItemWriter<Blog> writer)
    
    return stepBuilderFactory.get("step1")
            .<Blog, Blog>chunk(10)
            .reader(reader())
            .writer(writer)
            .faultTolerant()
            .skip(DuplicateKeyException.class).skipPolicy(new ItemSkipPolicy())
            .allowStartIfComplete(true).build();
   }
}
 类似资料:
  • 我有一个Spring批处理工作,分两步。第一个下载文件,第二个处理文件。问题是,在第一步运行之前,第二步不知道文件的名称。 作业已自动实例化这些步骤,以便在需要时运行。我想不出任何方法来确保第一步运行后该步骤会初始化。 以下是代码: 你可以看到获取我在第一步的Tasklet中设置的局部变量。

  • 我试图在JUnit 5和SpringBatchTest注释中加载JobLauncherTestUtils对象。但是,它无法加载应用程序上下文。所有其他自动构建的bean都成功加载,但是JobLauncherTestUtils无法加载。这是我的测试配置,省略了导入。我尝试在BeforeAll中手动加载它,但是JobRepository和JobLauncher无法加载。我只对能够成功实例化JobLau

  • 问题: 我正在为我的一个spring批处理作业方法编写单元测试。我使用mockito来模拟我的批处理作业依赖关系。在jobExecution发挥作用之前,一切都很好。我要测试的方法调用了jobExecution变量,但它给了我NPE(NullPointerException)并且我没有成功地用mockito模拟它。 删除此currentJobExecution时 从我要测试的方法,然后测试成功完成

  • 我正在寻找测量Spring批处理读取、处理和写入操作的执行时间的最佳方法。在元数据中,有关于整个步骤的信息,而不是关于每个动作的信息。 谢谢你所有的回答!

  • 我有一个批处理步骤 读取器和处理器流程如何工作?读取器是读取块并等待处理器处理它,还是一次读取所有块。

  • 我有一个JavaFX应用程序,它与用Java编写的控制器类一起使用FXML。在Java控制器中,在FXML节点初始化之前,我需要小心不要对其进行操作(否则我会得到一个NullPointerException),这在运行初始化方法之前是无法保证的。所以我发现自己经常这样做: 控制器在FXML文件中设置如下: 然后是Java文件中的控制器。 这是可行的,但它笨重且重复。我必须创建globalValue