我正在使用spring batch,在执行批处理作业时,我得到了一个InvalidIsolationLevelException。当我使用MapJobRepositoryFactoryBean时,该作业可以工作,但我需要在数据库中生成batch_*表。
应用程序已经设置了事务管理器。
@Configuration
@ComponentScan({"com..."})
public class BatchConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchConfig.class);
private static final String REPORTS_STEP_NAME = "reportStep";
private static final String REPORTS_JOB_NAME = "reportsJob";
private static final int REPORTS_STEP_CHUNK_SIZE = 20;
private static final String MAX_END_DATE = "2999-12-31 00:00:00";
private static final String MAX_END_DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final String END_DATE_PARAM = "endDate";
private static final String REPORT_STATUS_PARAM = "reportSend";
private static final YesNo NO = new YesNo("N");
@Autowired
private ApplicationConfig applicationConfig;
@Autowired
private ReportProcessor reportProcessor;
@Autowired
private ReportItemWriter reportItemWriter;
@Bean
public JobBuilderFactory jobBuilderFactory() throws Exception {
JobBuilderFactory jobBuilderFactory = new JobBuilderFactory(jobRepository());
return jobBuilderFactory;
}
@Bean
public StepBuilderFactory stepBuilderFactory() throws Exception {
StepBuilderFactory stepBuilderFactory = new StepBuilderFactory(jobRepository(), applicationConfig.transactionManager());
return stepBuilderFactory;
}
@Bean
public PlatformTransactionManager transactionManager() {
JpaTransactionManager jpaTransactionManager = new JpaTransactionManager();
jpaTransactionManager.setDataSource(dataSource());
jpaTransactionManager.setEntityManagerFactory(applicationConfig.entityManagerFactory().getObject());
return jpaTransactionManager;
}
@Bean
public JobRegistry jobRegistry() {
return new MapJobRegistry();
}
@Bean
public JobExplorer jobExplorer() throws Exception {
JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
jobExplorerFactoryBean.setDataSource(dataSource());
jobExplorerFactoryBean.afterPropertiesSet();
return jobExplorerFactoryBean.getObject();
}
@Bean
public JobOperator jobOperator() throws Exception {
SimpleJobOperator jobOperator = new SimpleJobOperator();
jobOperator.setJobExplorer(jobExplorer());
jobOperator.setJobRegistry(jobRegistry());
jobOperator.setJobLauncher(jobLauncher());
jobOperator.setJobRepository(jobRepository());
return jobOperator;
}
@Bean
public DataSource dataSource() {
DriverManagerDataSource dataSource = new DriverManagerDataSource();
dataSource.setDriverClassName("...");
dataSource.setUrl("...");
dataSource.setUsername("...");
dataSource.setPassword("...");
return dataSource;
}
@Bean
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource());
jobRepositoryFactoryBean.setTransactionManager(transactionManager());
jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
return jobRepositoryFactoryBean.getObject();
}
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
simpleJobLauncher.setJobRepository(jobRepository());
return simpleJobLauncher;
}
public JpaPagingItemReader<FishingVessel> readFishingVesselRecords() throws ParseException {
LOGGER.info("Inside JpaPagingItemReader");
JpaPagingItemReader<FishingVessel> fishingVesselsReader = new JpaPagingItemReader<>();
fishingVesselsReader.setEntityManagerFactory(applicationConfig.entityManagerFactory().getObject());
fishingVesselsReader.setQueryString(FishingVessel.FISHING_VESSEL_REPORTS);
fishingVesselsReader.setTransacted(false);
fishingVesselsReader.setSaveState(false);
Map<String, Object> params = createQueryParams();
fishingVesselsReader.setParameterValues(params);
fishingVesselsReader.setPageSize(100);
return fishingVesselsReader;
}
private Map<String, Object> createQueryParams() throws ParseException {
Map<String, Object> params = new HashMap<>();
DateFormatter dateFormatter = new DateFormatter(MAX_END_DATE_FORMAT);
Date date = dateFormatter.parse(MAX_END_DATE, Locale.ENGLISH);
params.put(END_DATE_PARAM, date);
//params.put(REPORT_STATUS_PARAM, NO);
return params;
}
@Bean
public Step reportsStep() throws Exception {
DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
attribute.setPropagationBehavior(Propagation.REQUIRED.value());
attribute.setIsolationLevel(Isolation.SERIALIZABLE.value());
attribute.setTimeout(30);
return stepBuilderFactory()
.get(REPORTS_STEP_NAME)
.<FishingVessel, FishingVessel>chunk(REPORTS_STEP_CHUNK_SIZE)
.reader(readFishingVesselRecords())
.processor(reportProcessor)
.writer(reportItemWriter)
.transactionAttribute(attribute)
.build();
}
@Bean
public Job reportsJob() throws Exception {
return jobBuilderFactory()
.get(REPORTS_JOB_NAME)
.start(reportsStep())
.build();
}
}
@Configuration
@ComponentScan({
"com....components",
"com....services",
"com....converters",
"com....controllers",
"com....fluxFolder",
"com....processors",
"com....flux.service",
"com....flux.gateway"
})
@EnableJpaRepositories("com....repositories")
@EnableTransactionManagement
@EnableJpaAuditing
@EnableWebMvc
@Import({SecurityConfig.class})
@PropertySource("classpath:application.properties")
public class ApplicationConfig {
@Bean
public LocalContainerEntityManagerFactoryBean entityManagerFactory() {
LocalContainerEntityManagerFactoryBean factory = new LocalContainerEntityManagerFactoryBean();
factory.setPersistenceXmlLocation("classpath:/META-INF/persistence.xml");
return factory;
}
@Bean
@Primary
public PlatformTransactionManager transactionManager() {
JpaTransactionManager transactionManager = new JpaTransactionManager();
transactionManager.setEntityManagerFactory(entityManagerFactory().getObject());
return transactionManager;
}
@Bean
public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
@Bean
public ViewResolver viewResolver() {
UrlBasedViewResolver viewResolver = new UrlBasedViewResolver();
viewResolver.setViewClass(JsfView.class);
viewResolver.setSuffix(".xhtml");
return viewResolver;
}
}
@RestController
public class BatchController {
private static final String JOB_ID = "JobID";
private JobExecution jobExecution;
@Autowired
private JobOperator jobOperator;
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job reportsJob;
@Autowired
private JobExplorer jobExplorer;
@GetMapping("/batch/enable")
public String batchTest() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
JobParameters jobParameters = new JobParametersBuilder()
.addString(JOB_ID, String.valueOf(System.currentTimeMillis()))
.toJobParameters();
JobInstance job;
jobExecution = new JobExecution(jobLauncher.run(reportsJob, jobParameters));
return "batch job started";
}
@GetMapping("/batch/disable")
public String disable() {
String jobName = jobExecution.getJobInstance().getJobName();
Set<JobExecution> jobExecutions = jobExplorer.findRunningJobExecutions(jobName);
for (JobExecution je : jobExecutions) {
try {
jobOperator.stop(je.getId());
} catch (NoSuchJobExecutionException e) {
} catch (JobExecutionNotRunningException e) {
}
}
return jobName + " disabled.";
}
}
您可以将JobRepositoryFactoryBean中的隔离级别更改为默认:
@Bean
public JobRepository jobRepository() throws Exception {
JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
jobRepositoryFactoryBean.setDataSource(dataSource());
jobRepositoryFactoryBean.setTransactionManager(transactionManager());
jobRepositoryFactoryBean.setIsolationLevelForCreate("ISOLATION_DEFAULT");
return jobRepositoryFactoryBean.getObject();
}
我在表中总共有8条记录,其中6条在spring批处理调用read时可以使用jpareader。现在我将页面大小和块大小设置为1以进行测试。期望作业运行时,它应该进行6次读取调用,然后它应该逐个处理,逐个写入。但实际上发生的是,它只是调用read 4次(从日志中我可以看到这样读取页面0...1)并处理4个,其中一个由于不匹配写入标准而被过滤掉,然后它只是更新了3个记录,作业标记为成功完成。
我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。
我是Spring批处理的新手,我只想问如何从多行结果集中检索数据。我有以下场景: > 有两个不同的表说员工 使用时,我只能创建一个工资单子级,但该表可能有多个子级。请帮助...
我有一个带有spring Batch的spring Boot2应用程序。 实际上,当应用程序启动时,批处理就启动了。
我正在尝试使用StoredProcedureItemReader for Cursor读取spring批处理中的一个DB2存储过程。sql字符串未被执行,默认sql被传递给jdbctemplate 我正在使用由作业调用的批处理步骤: 为什么我没有得到结果集或者无法执行查询。我是个新手,有点卡住了。调试显示,数据源配置正确。 谢谢!
我正在尝试实现一个Spring批处理作业,为了处理记录,它需要2-3个db调用,这会减慢记录的处理速度(大小为100万)。如果我使用基于块的处理,它会单独处理每条记录,性能会很慢。因此,我需要一次性处理1000条记录,作为批量处理,这将减少数据库调用,并提高性能。但我的问题是,如果我实现Tasklet,那么我也会失去可重启性和重试/跳过功能,如果使用AggregateInputReader实现,我