我正在处理一个Spring批处理应用程序,该应用程序包含两个不同的作业bean(表示两个不同的作业)。这两项工作都必须由我的应用程序执行(目前,它可以顺序和并行地完成。目前它不是那么重要)。
我会试着解释我的情况和遇到的问题是什么:
首先,我有一个配置类,其中声明了我的两个作业对象(以及相关步骤):
@Configuration
public class UpdateInfoBatchConfig {
private static final String PROPERTY_REST_API_URL = "rest.api.url";
@Autowired
private NotaryListServiceAdapter notaryListServiceAdapter;
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
private NotaryService notaryService;
@Bean("firstStepItemReader")
public ItemReader<NotaryDistrict> itemReader(Environment environment, RestTemplate restTemplate) throws IllegalStateException, URISyntaxException {
System.out.println("itemReader() START !!!");
return new RESTNotaryDistrictsReader();
}
@Bean("firstStepItemWriter")
public ItemWriter<NotaryDistrict> itemWriter() {
return new LoggingItemWriter();
}
@Bean("secondStepItemReader")
public ItemReader<NotaryDistrict> secondStepReader(Environment environment) throws IllegalStateException {
System.out.println("secondStepItemReader() creation !!!");
return new SecondStepItemReader();
}
@Bean("secondStepItemProcessor")
public ItemProcessor<NotaryDistrict, NotaryDistrict> secondStepItemProcessor() {
return new SecondStepItemProcessor();
}
@Bean("secondStepItemWriter")
public ItemWriter<NotaryDistrict> secondStepItemWriter() {
return new SecondStepItemWriter();
}
/**
************************************ UPDATE NOTARY DISTRICTS LIST JOB SECTION ********************************************
*/
/**
* Creates a bean that represents the first step of the batch.
* How it works:
* 1) Call an external API in order to retrieve notary districts list
* 2) Return notary district one by one to the second step
* @param reader a custom reader calling an external API
* @param writer
* @param stepBuilderFactory
* @return
*/
@Bean("firstStep")
public Step firstStep(@Qualifier("firstStepItemReader") ItemReader<NotaryDistrict> reader,
@Qualifier("firstStepItemWriter") ItemWriter<NotaryDistrict> writer,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("updateNotaryDistrictsStep")
.<NotaryDistrict, NotaryDistrict>chunk(1)
.reader(reader)
.writer(writer)
.build();
}
@Bean("secondStep")
public Step secondStep(@Qualifier("secondStepItemReader") ItemReader<NotaryDistrict> secondStepItemReader,
@Qualifier("secondStepItemProcessor") ItemProcessor<NotaryDistrict, NotaryDistrict> secondStepItemProcessor,
@Qualifier("secondStepItemWriter") ItemWriter<NotaryDistrict> secondStepItemWriter,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("secondStep")
.<NotaryDistrict, NotaryDistrict>chunk(1)
.reader(secondStepItemReader)
.processor(secondStepItemProcessor)
.writer(secondStepItemWriter)
.build();
}
@Bean("updateNotaryDistrictsJob")
public Job updateNotaryDistrictsJob(JobBuilderFactory jobBuilderFactory,
@Qualifier("firstStep") Step firstStep,
@Qualifier("secondStep") Step secondStep) {
return jobBuilderFactory.get("updateNotaryDistrictsJob")
.start(firstStep)
.next(secondStep)
//.next(playerSummarization())
.build();
}
@Bean
public ExecutionContext executionContext() {
return new ExecutionContext();
}
/**
************************************ UPDATE NOTARY LIST JOB SECTION ********************************************
*/
@Bean()
public ItemReaderAdapter serviceItemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(notaryListServiceAdapter);
reader.setTargetMethod("nextNotaryElement");
return reader;
}
@Bean
public Step readNotaryListStep(){
return steps.get("readNotaryListStep").
<Integer,Integer>chunk(1)
.reader(serviceItemReader())
.processor(new NotaryDetailsEnrichProcessor(notaryService))
.writer(new ConsoleItemWriter())
.build();
}
@Bean("updateNotaryListInfoJob")
public Job updateNotaryListInfoJob(){
return jobs.get("updateNotaryListInfoJob")
.incrementer(new RunIdIncrementer())
.start(readNotaryListStep())
.build();
}
}
然后,在第一时间,我创建了另一个SpringBatchExampleJobLauncher启动器类。它工作得很好,最初用于启动单个作业(我想我必须更改这个launcher类的逻辑,以便执行两个作业,而不是单个作业):
public class SpringBatchExampleJobLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringBatchExampleJobLauncher.class);
private final Job job;
private final JobLauncher jobLauncher;
private ExecutionContext executionContext;
@Autowired
public SpringBatchExampleJobLauncher(@Qualifier("updateNotaryDistrictsJob") Job job,
JobLauncher jobLauncher,
ExecutionContext executionContext) {
this.job = job;
this.jobLauncher = jobLauncher;
this.executionContext = executionContext;
}
//@Scheduled(cron = "0 */3 * * * *")
@Scheduled(cron = "0/30 * * * * *")
public void runSpringBatchExampleJob() throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
LOGGER.info("Spring Batch example job was started");
List<NotaryDistrict> notaryDistrictsList = new ArrayList<NotaryDistrict>();
executionContext.put("notaryDistrictsList", notaryDistrictsList);
jobLauncher.run(job, newExecution());
LOGGER.info("Spring Batch example job was stopped");
}
private JobParameters newExecution() {
Map<String, JobParameter> parameters = new HashMap<>();
JobParameter parameter = new JobParameter(new Date());
parameters.put("currentTime", parameter);
return new JobParameters(parameters);
}
}
所以我所做的。首先,我将ThreadPoolTaskExecutor和JobLauncher bean定义添加到UpdateInfoBatchConfig配置类中,它们如下所示:
@Configuration
public class UpdateInfoBatchConfig {
private static final String PROPERTY_REST_API_URL = "rest.api.url";
@Autowired
private NotaryListServiceAdapter notaryListServiceAdapter;
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Autowired
private NotaryService notaryService;
@Bean("firstStepItemReader")
public ItemReader<NotaryDistrict> itemReader(Environment environment, RestTemplate restTemplate) throws IllegalStateException, URISyntaxException {
System.out.println("itemReader() START !!!");
return new RESTNotaryDistrictsReader();
}
@Bean("firstStepItemWriter")
public ItemWriter<NotaryDistrict> itemWriter() {
return new LoggingItemWriter();
}
@Bean("secondStepItemReader")
public ItemReader<NotaryDistrict> secondStepReader(Environment environment) throws IllegalStateException {
System.out.println("secondStepItemReader() creation !!!");
return new SecondStepItemReader();
}
@Bean("secondStepItemProcessor")
public ItemProcessor<NotaryDistrict, NotaryDistrict> secondStepItemProcessor() {
return new SecondStepItemProcessor();
}
@Bean("secondStepItemWriter")
public ItemWriter<NotaryDistrict> secondStepItemWriter() {
return new SecondStepItemWriter();
}
/**
* Creates a bean that represents the first step of the batch.
* How it works:
* 1) Call an external API in order to retrieve notary districts list
* 2) Return notary district one by one to the second step
* @param reader a custom reader calling an external API
* @param writer
* @param stepBuilderFactory
* @return
*/
@Bean("firstStep")
public Step firstStep(@Qualifier("firstStepItemReader") ItemReader<NotaryDistrict> reader,
@Qualifier("firstStepItemWriter") ItemWriter<NotaryDistrict> writer,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("updateNotaryDistrictsStep")
.<NotaryDistrict, NotaryDistrict>chunk(1)
.reader(reader)
.writer(writer)
.build();
}
@Bean("secondStep")
public Step secondStep(@Qualifier("secondStepItemReader") ItemReader<NotaryDistrict> secondStepItemReader,
@Qualifier("secondStepItemProcessor") ItemProcessor<NotaryDistrict, NotaryDistrict> secondStepItemProcessor,
@Qualifier("secondStepItemWriter") ItemWriter<NotaryDistrict> secondStepItemWriter,
StepBuilderFactory stepBuilderFactory) {
return stepBuilderFactory.get("secondStep")
.<NotaryDistrict, NotaryDistrict>chunk(1)
.reader(secondStepItemReader)
.processor(secondStepItemProcessor)
.writer(secondStepItemWriter)
.build();
}
@Bean("updateNotaryDistrictsJob")
public Job updateNotaryDistrictsJob(JobBuilderFactory jobBuilderFactory,
@Qualifier("firstStep") Step firstStep,
@Qualifier("secondStep") Step secondStep) {
return jobBuilderFactory.get("updateNotaryDistrictsJob")
.start(firstStep)
.next(secondStep)
//.next(playerSummarization())
.build();
}
@Bean
public ExecutionContext executionContext() {
return new ExecutionContext();
}
/**
************************************ UPDATE NOTARY LIST JOB ********************************************
*/
@Bean()
public ItemReaderAdapter serviceItemReader() {
ItemReaderAdapter reader = new ItemReaderAdapter();
reader.setTargetObject(notaryListServiceAdapter);
reader.setTargetMethod("nextNotaryElement");
return reader;
}
@Bean
public Step readNotaryListStep(){
return steps.get("readNotaryListStep").
<Integer,Integer>chunk(1)
.reader(serviceItemReader())
.processor(new NotaryDetailsEnrichProcessor(notaryService))
.writer(new ConsoleItemWriter())
.build();
}
@Bean("updateNotaryListInfoJob")
public Job updateNotaryListInfoJob(){
return jobs.get("updateNotaryListInfoJob")
.incrementer(new RunIdIncrementer())
.start(readNotaryListStep())
.build();
}
/**
************************************ MULTIPLE JOB CONFIGURATION ********************************************
*/
@Bean
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(15);
taskExecutor.setMaxPoolSize(20);
taskExecutor.setQueueCapacity(30);
return taskExecutor;
}
@Bean
public JobLauncher jobLauncher(ThreadPoolTaskExecutor taskExecutor, JobRepository jobRepository){
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setTaskExecutor(taskExecutor);
jobLauncher.setJobRepository(jobRepository);
return jobLauncher;
}
}
如您所见,最后两个bean是ThreadPoolTaskExecutor和我的JobLauncher bean。
然后我更改了我的SpringBatchExampleJobLauncher,以便使用这个启动器并执行我的两个工作,而不是一个,这就是我所做的:
/**
* This bean schedules and runs our Spring Batch job.
*/
@Component
public class SpringBatchExampleJobLauncher {
private static final Logger LOGGER = LoggerFactory.getLogger(SpringBatchExampleJobLauncher.class);
@Autowired
private JobLauncher jobLauncher;
@Autowired
@Qualifier("updateNotaryDistrictsJob")
private Job updateNotaryDistrictsJob;
@Autowired
@Qualifier("updateNotaryListInfoJob")
private Job updateNotaryListInfoJob;
@Scheduled(cron = "0/30 * * * * *")
public void run1(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(updateNotaryDistrictsJob, jobParameters);
}catch (Exception ex){
LOGGER.error(ex.getMessage());
}
}
@Scheduled(cron = "0/50 * * * * *")
public void run2(){
Map<String, JobParameter> confMap = new HashMap<>();
confMap.put("time", new JobParameter(System.currentTimeMillis()));
JobParameters jobParameters = new JobParameters(confMap);
try {
jobLauncher.run(updateNotaryListInfoJob, jobParameters);
}catch (Exception ex){
LOGGER.error(ex.getMessage());
}
}
}
***************************
APPLICATION FAILED TO START
***************************
Description:
The bean 'jobLauncher', defined in class path resource [org/springframework/batch/core/configuration/annotation/SimpleBatchConfiguration.class], could not be registered. A bean with that name has already been defined in class path resource [com/notariato/updateInfo/UpdateInfoBatchConfig.class] and overriding is disabled.
Action:
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true
基本上,在我看来,这个错误似乎告诉我,我试图注入到launcher类中的JobLauncher bean尚未定义到UpdateInfoBatchConfig配置类中。但这正是我所期望的,因为我将bean定义到configuration类中,然后将其注入到要使用的launcher类中。
怎么啦?我错过了什么?我如何解决这个问题?
这是因为您在应用程序上下文中定义了JobLauncher
bean,Spring Batch也通过@enableBatchProcessing
定义了该bean(请参见其Javadoc)。
如果希望使用自定义JobLauncher
,则应该提供BatchConfigurer
bean并重写GetJobLauncher
。一种方法是使您的配置类扩展DefaultBatchConfigurer
并重写CreateJobLauncher()
。这在这里的文档中有更详细的解释。
我有一个简单的控制器,它接受文件路径的JSON字符串,并对这些文件运行spring批处理作业。为了实现spring batch,我遵循了一个教程,该教程最终将在https://github.com/michaelhoffmantech/patter-batch-loader中生成代码。 继续下去,直到它抛出StackOverflowError。 任何关于改变什么来修复此问题的建议或帮助都将不胜感激
我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中
我需要从远程SFTP服务器下载一个文件,并使用spring batch处理它们。我已经实现了使用Spring集成下载文件的代码。但我无法从Spring集成组件启动Spring批处理作业。我有以下代码: 但这不起作用(上一个方法中的错误),因为找不到文件类型的bean。我不能把这两部分连在一起。如何连接集成和批处理?
我正试图从oozie开始一项养猪工作,但失败了。 Oozie stacktrace: java.lang.运行时间异常:java.lang.类org.apache.oozie.action.hadoop.PigMain未在org.apache.hadoop.conf.找到onfiguration.get类(Configuration.java:1897)在org.apache.oozie.acti
我开发了spring批处理作业,它从JDBC获取数据。我面临的问题是,它在项目启动时执行,而不管启用了什么属性。属性的值为FALSE。我试图在属性上创建一个条件bean,但它也不起作用,作业正在项目启动时执行。 遵循我的代码段。 我还试图对计划的注释进行注释,但它仍在执行作业和步骤。 //@调度(固定延迟=15000)公共无效调度ByFixed费率()抛出异常{ } 有人能告诉我这里缺少什么吗?以
布尔沙赫布尔