当前位置: 首页 > 知识库问答 >
问题:

如何解决spring批处理中的InvalidIsolationLevelException?

慕弘义
2023-03-14

我正在使用spring batch,在执行批处理作业时,我得到了一个InvalidIsolationLevelException。当我使用MapJobRepositoryFactoryBean时,该作业可以工作,但我需要在数据库中生成batch_*表。

应用程序已经设置了事务管理器。

@Configuration
@ComponentScan({"com..."})
public class BatchConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(BatchConfig.class);

    private static final String REPORTS_STEP_NAME = "reportStep";
    private static final String REPORTS_JOB_NAME = "reportsJob";
    private static final int REPORTS_STEP_CHUNK_SIZE = 20;
    private static final String MAX_END_DATE = "2999-12-31 00:00:00";
    private static final String MAX_END_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
    private static final String END_DATE_PARAM = "endDate";
    private static final String REPORT_STATUS_PARAM = "reportSend";
    private static final YesNo NO = new YesNo("N");

    @Autowired
    private ApplicationConfig applicationConfig;

    @Autowired
    private ReportProcessor reportProcessor;

    @Autowired
    private ReportItemWriter reportItemWriter;

    @Bean
    public JobBuilderFactory jobBuilderFactory() throws Exception {
        JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository());
        return jobBuilderFactory;
    }

    @Bean
    public StepBuilderFactory stepBuilderFactory() throws Exception {
        StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository(), applicationConfig.transactionManager());
        return stepBuilderFactory;
    }

    @Bean
    public PlatformTransactionManager transactionManager() {
        JpaTransactionManager jpaTransactionManager = new JpaTransactionManager();
        jpaTransactionManager.setDataSource(dataSource());
        jpaTransactionManager.setEntityManagerFactory(applicationConfig.entityManagerFactory().getObject());
        return jpaTransactionManager;
    }

    @Bean
    public JobRegistry jobRegistry() {
        return new MapJobRegistry();
    }

    @Bean
    public JobExplorer jobExplorer() throws Exception {
        JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
        jobExplorerFactoryBean.setDataSource(dataSource());
        jobExplorerFactoryBean.afterPropertiesSet();
        return jobExplorerFactoryBean.getObject();
    }

    @Bean
    public JobOperator jobOperator() throws Exception {
        SimpleJobOperator jobOperator = new SimpleJobOperator();
        jobOperator.setJobExplorer(jobExplorer());
        jobOperator.setJobRegistry(jobRegistry());
        jobOperator.setJobLauncher(jobLauncher());
        jobOperator.setJobRepository(jobRepository());
        return jobOperator;
    }

    @Bean
    public DataSource dataSource() {
        DriverManagerDataSource dataSource = new DriverManagerDataSource();
        dataSource.setDriverClassName("...");
        dataSource.setUrl("...");
        dataSource.setUsername("...");
        dataSource.setPassword("...");
        return dataSource;
    }

    @Bean
    public JobRepository jobRepository() throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource());
        jobRepositoryFactoryBean.setTransactionManager(transactionManager());
        jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
        return jobRepositoryFactoryBean.getObject();
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository());
        return simpleJobLauncher;
    }

    public JpaPagingItemReader<FishingVessel> readFishingVesselRecords() throws ParseException {
        LOGGER.info("Inside JpaPagingItemReader");
        JpaPagingItemReader<FishingVessel> fishingVesselsReader = new JpaPagingItemReader<>();
        fishingVesselsReader.setEntityManagerFactory(applicationConfig.entityManagerFactory().getObject());
        fishingVesselsReader.setQueryString(FishingVessel.FISHING_VESSEL_REPORTS);
        fishingVesselsReader.setTransacted(false);
        fishingVesselsReader.setSaveState(false);
        Map<String, Object> params = createQueryParams();
        fishingVesselsReader.setParameterValues(params);
        fishingVesselsReader.setPageSize(100);
        return fishingVesselsReader;
    }

    private Map<String, Object> createQueryParams() throws ParseException {
        Map<String, Object> params = new HashMap<>();
        DateFormatter dateFormatter = new DateFormatter(MAX_END_DATE_FORMAT);
        Date date = dateFormatter.parse(MAX_END_DATE, Locale.ENGLISH);
        params.put(END_DATE_PARAM, date);
        //params.put(REPORT_STATUS_PARAM, NO);
        return params;
    }

    @Bean
    public Step reportsStep() throws Exception {
        DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
        attribute.setPropagationBehavior(Propagation.REQUIRED.value());
        attribute.setIsolationLevel(Isolation.SERIALIZABLE.value());
        attribute.setTimeout(30);

        return stepBuilderFactory()
                .get(REPORTS_STEP_NAME)
                .<FishingVessel, FishingVessel>chunk(REPORTS_STEP_CHUNK_SIZE)
                .reader(readFishingVesselRecords())
                .processor(reportProcessor)
                .writer(reportItemWriter)
                .transactionAttribute(attribute)
                .build();
    }

    @Bean
    public Job reportsJob() throws Exception {
        return jobBuilderFactory()
                .get(REPORTS_JOB_NAME)
                .start(reportsStep())
                .build();
    }
}
@Configuration
@ComponentScan({
    "com....components",
    "com....services",
    "com....converters",
    "com....controllers",
    "com....fluxFolder",
    "com....processors",
    "com....flux.service",
    "com....flux.gateway"
})
@EnableJpaRepositories("com....repositories")
@EnableTransactionManagement
@EnableJpaAuditing
@EnableWebMvc
@Import({SecurityConfig.class})
@PropertySource("classpath:application.properties")
public class ApplicationConfig {
    @Bean
    public LocalContainerEntityManagerFactoryBean entityManagerFactory() {

        LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
        factory.setPersistenceXmlLocation("classpath:/META-INF/persistence.xml");
        return factory;
    }

    @Bean
    @Primary
    public PlatformTransactionManager transactionManager() {
        JpaTransactionManager transactionManager = new JpaTransactionManager();
        transactionManager.setEntityManagerFactory(entityManagerFactory().getObject());
        return transactionManager;
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

    @Bean
    public ViewResolver viewResolver() {
        UrlBasedViewResolver viewResolver = new UrlBasedViewResolver();
        viewResolver.setViewClass(JsfView.class);
        viewResolver.setSuffix(".xhtml");
        return viewResolver;
    }
}
@RestController
public class BatchController {
    private static final String JOB_ID = "JobID";

    private JobExecution jobExecution;

    @Autowired
    private JobOperator jobOperator;

    @Autowired
    private JobLauncher jobLauncher;

    @Autowired
    private Job reportsJob;

    @Autowired
    private JobExplorer jobExplorer;

    @GetMapping("/batch/enable")
    public String batchTest() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
        JobParameters jobParameters = new JobParametersBuilder()
                .addString(JOB_ID, String.valueOf(System.currentTimeMillis()))
                .toJobParameters();
        JobInstance job;
        jobExecution = new JobExecution(jobLauncher.run(reportsJob, jobParameters));
        return "batch job started";
    }

    @GetMapping("/batch/disable")
    public String disable() {
        String jobName = jobExecution.getJobInstance().getJobName();
        Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(jobName);
        for (JobExecution je : jobExecutions) {
            try {
                jobOperator.stop(je.getId());
            } catch (NoSuchJobExecutionException e) {

            } catch (JobExecutionNotRunningException e) {

            }
        }
        return jobName + " disabled.";
    }
}

共有1个答案

管和志
2023-03-14

您可以将JobRepositoryFactoryBean中的隔离级别更改为默认:

 @Bean
public JobRepository jobRepository() throws Exception {
    JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
    jobRepositoryFactoryBean.setDataSource(dataSource());
    jobRepositoryFactoryBean.setTransactionManager(transactionManager());
    jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_DEFAULT");
    return jobRepositoryFactoryBean.getObject();
}
 类似资料:
  • 我在表中总共有8条记录,其中6条在spring批处理调用read时可以使用jpareader。现在我将页面大小和块大小设置为1以进行测试。期望作业运行时,它应该进行6次读取调用,然后它应该逐个处理,逐个写入。但实际上发生的是,它只是调用read 4次(从日志中我可以看到这样读取页面0...1)并处理4个,其中一个由于不匹配写入标准而被过滤掉,然后它只是更新了3个记录,作业标记为成功完成。

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 我是Spring批处理的新手,我只想问如何从多行结果集中检索数据。我有以下场景: > 有两个不同的表说员工 使用时,我只能创建一个工资单子级,但该表可能有多个子级。请帮助...

  • 我有一个带有spring Batch的spring Boot2应用程序。 实际上,当应用程序启动时,批处理就启动了。

  • 我正在尝试使用StoredProcedureItemReader for Cursor读取spring批处理中的一个DB2存储过程。sql字符串未被执行,默认sql被传递给jdbctemplate 我正在使用由作业调用的批处理步骤: 为什么我没有得到结果集或者无法执行查询。我是个新手,有点卡住了。调试显示,数据源配置正确。 谢谢!

  • 我正在尝试实现一个Spring批处理作业,为了处理记录,它需要2-3个db调用,这会减慢记录的处理速度(大小为100万)。如果我使用基于块的处理,它会单独处理每条记录,性能会很慢。因此,我需要一次性处理1000条记录,作为批量处理,这将减少数据库调用,并提高性能。但我的问题是,如果我实现Tasklet,那么我也会失去可重启性和重试/跳过功能,如果使用AggregateInputReader实现,我