我当时正在开发一个Spring批处理应用程序,使用java配置执行两个批处理作业。最近,我添加了一个Spring调度程序来调度我编写的一个作业。侦听器在作业第一次完成时被调用,但在下一次执行后不会被调用。以下是我的作业配置代码:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration{
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public MongoTemplate mongoTemplate;
@Autowired
UnitsRepository unitsRepos;
@Autowired
UserRepository userRepository;
@Autowired
ElectraService electraService;
/*@Autowired InfrastructureConfiguration infrastructureConfiguration;*/
// tag::readerwriterprocessor[]
@Bean
@StepScope
public MongoItemReader<UserBean> reader() {
MongoItemReader<UserBean> reader = new MongoItemReader<UserBean>();
reader.setTemplate(mongoTemplate);
reader.setCollection("user");
reader.setQuery("{ '_id': 'U3'}");
reader.setSort(new HashMap<String,Direction>(){{put("_id", Direction.ASC);}});
reader.setTargetType(UserBean.class);
return reader;
}
@Bean
public ExceedUsageProcessor processor() {
return new ExceedUsageProcessor(unitsRepos,electraService);
}
@Bean
public AnomalyProcessor anomalyProcessor() {
return new AnomalyProcessor(unitsRepos);
}
@Bean
@StepScope
public MongoItemWriter<DayByDayUsage> writer() {
MongoItemWriter<DayByDayUsage> writer = new MongoItemWriter<DayByDayUsage>();
writer.setTemplate(mongoTemplate);
writer.setCollection("usage");
return writer;
}
// end::readerwriterprocessor[]
// tag::listener[]
@Bean
@StepScope
public MongoItemWriter<AnomalyBean> anomalyWriter() {
MongoItemWriter<AnomalyBean> writer = new MongoItemWriter<AnomalyBean>();
writer.setTemplate(mongoTemplate);
writer.setCollection("anomaly");
return writer;
}
@Bean
public ExceedJobNotificationListener listener() {
return new ExceedJobNotificationListener(mongoTemplate);
}
@Bean
public AnomalyJobListener anomalyListener(){
return new AnomalyJobListener(mongoTemplate,userRepository);
}
// end::listener[]
// tag::jobstep[]
@Bean
public Job notifyUserJob() {
return jobBuilderFactory.get("notifyUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener())
.flow(step1())
.end()
.build();
}
@Bean
public Job anomalyJob() {
return jobBuilderFactory.get("anomalyJob")
.incrementer(new RunIdIncrementer())
.listener(anomalyListener())
.flow(step2())
.end()
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<UserBean, DayByDayUsage> chunk(50)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.throttleLimit(10)
.allowStartIfComplete(true)
.build();
}
// end::jobstep[]
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<UserBean, AnomalyBean> chunk(50)
.reader(reader())
.processor(anomalyProcessor())
.writer(anomalyWriter())
.taskExecutor(taskExecutor())
.throttleLimit(10)
.allowStartIfComplete(true)
.build();
}
@Bean
public TaskExecutor taskExecutor() {
// TODO Auto-generated method stub
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(10);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("classpath:/org/springframework/batch/core/schema-hsqldb.sql")
.build();
}
}
下面是我的调度程序的代码:
@Component
public class AnomalyScheduler {
private Job myImportJob;
private JobLauncher jobLauncher;
@Autowired
public AnomalyScheduler(JobLauncher jobLauncher, @Qualifier("anomalyJob") Job myImportJob){
this.myImportJob = myImportJob;
this.jobLauncher = jobLauncher;
}
@Scheduled(fixedDelay=60000)
public void runJob(){
try {
jobLauncher.run(myImportJob, new JobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
我的听众如下:
public class AnomalyJobListener extends JobExecutionListenerSupport {
private PushNotification pushNotification = PushNotification
.getPushNotificationInstance();
@Autowired
public AnomalyJobListener(MongoTemplate mongoTemplate,
UserRepository userRepository) {
List<AnomalyBean> anomalies = new ArrayList<AnomalyBean>(0);
anomalies = mongoTemplate.findAll(AnomalyBean.class);
int numAnomalies = anomalies.size();
List<UserBean> admins = new ArrayList<UserBean>(0);
admins = userRepository.userByType("admin");
if (numAnomalies > 0) {
for (UserBean admin : admins) {
pushNotification.pushNotification(numAnomalies
+ " anomalies detected ! Keep an eye on that.",
admin.getDeveiceId());
}
}
}
}
以下是控制台输出:
2016-05-04 08:17:39.565 INFO 9348 --- [ main] com.electra.Application : Starting Application on all-PC with PID 9348 (F:\Electrck\ElectrackJobRepository\ElectrackJobs\bin started by all in F:\Electrck\ElectrackJobRepository\ElectrackJobs)
2016-05-04 08:17:39.571 INFO 9348 --- [ main] com.electra.Application : No active profile set, falling back to default profiles: default
2016-05-04 08:17:39.681 INFO 9348 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@20b2475a: startup date [Wed May 04 08:17:39 IST 2016]; root of context hierarchy
2016-05-04 08:17:41.943 WARN 9348 --- [ main] o.s.c.a.ConfigurationClassEnhancer : @Bean method ScopeConfiguration.stepScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
2016-05-04 08:17:41.966 WARN 9348 --- [ main] o.s.c.a.ConfigurationClassEnhancer : @Bean method ScopeConfiguration.jobScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
2016-05-04 08:17:42.258 INFO 9348 --- [ main] o.s.j.d.e.EmbeddedDatabaseFactory : Starting embedded database: url='jdbc:hsqldb:mem:testdb', username='sa'
2016-05-04 08:17:42.646 INFO 9348 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
2016-05-04 08:17:42.658 INFO 9348 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 12 ms.
2016-05-04 08:18:01.793 INFO 9348 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService
2016-05-04 08:18:01.812 INFO 9348 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'taskExecutor'
push status [ messageId=0:1462330092323002%9b3f4867f9fd7ecd ]
push status [ messageId=0:1462330095502779%9b3f4867f9fd7ecd ]
2016-05-04 08:18:16.883 INFO 9348 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
2016-05-04 08:18:16.927 INFO 9348 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 20 ms.
2016-05-04 08:18:17.392 INFO 9348 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2016-05-04 08:18:17.413 INFO 9348 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2016-05-04 08:18:17.737 INFO 9348 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: []
2016-05-04 08:18:17.766 INFO 9348 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: HSQL
2016-05-04 08:18:18.032 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2016-05-04 08:18:18.147 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=notifyUserJob]] launched with the following parameters: [{run.id=1}]
2016-05-04 08:18:18.187 INFO 9348 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
2016-05-04 08:18:22.044 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
2016-05-04 08:18:22.079 INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
2016-05-04 08:18:28.990 INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl : date Mon May 02 08:18:12 IST 1
2016-05-04 08:18:28.991 INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl : push new unit
2016-05-04 08:18:32.581 INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl : date Wed May 04 08:18:32 IST 2016
2016-05-04 08:18:32.581 INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl : push new unit
2016-05-04 08:19:16.876 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=notifyUserJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
2016-05-04 08:19:16.999 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{run.id=1}]
2016-05-04 08:19:17.053 INFO 9348 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
2016-05-04 08:19:17.491 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
2016-05-04 08:19:52.399 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
2016-05-04 08:19:52.401 INFO 9348 --- [ main] com.electra.Application : Started Application in 133.639 seconds (JVM running for 134.724)
2016-05-04 08:20:21.066 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
2016-05-04 08:20:21.288 INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
2016-05-04 08:20:31.103 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
请告诉我我做错了什么,为什么听者没有被执行后续尝试。
侦听器构造函数中只有代码。但不是在beforeJob()afterJob()或任何其他方法中。。。所以我猜听众被叫来了,但什么也没做。。。因此,您需要覆盖适当的侦听器方法。
我已经创建了Spring Batch(RESTReader、自定义处理器和自定义ItemWriter),我计划了它。Spring批工作良好。当监听器在每个计划的间隔内完成打印作业时,计划似乎可以工作,但似乎不读或写。 我的Sprint启动应用程序 BATCH_STEP_EXECUTION 控制台日志的一部分 BatchApplication-在27.638秒内启动BatchApplication(
Spring批处理作业与flatfileitemreader(从csv读取)、processor(更新adwords api提要详细信息,对于csv文件中的每个记录(大约有40条记录),这一步大约需要40秒)和正在更新DB中记录的定制writer一起使用。 web.xml
我正在寻找最好的解决方案,以创建一个java web应用程序,以生成Excel/PDF格式的报告。类似于Google Adwords的东西,用户可以创建日程报告,并在以后生成报告时下载。 我正在考虑开发一个java应用程序,在其中用户记录,选择一个预先定义的报告,并提供输入参数(如报告日期等),这个请求将被排队或保存为Quarts作业(首选持久队列)。一个作业将监视队列/作业并执行该作业,生成报告
我想一个接一个地运行两个工作。是的,我确实在网上搜索过,但他们有解决方案,包括在第一份工作中增加第二份工作。但我有不同的要求。我会在第一批作业执行完成后得到一个通知,而第二批作业只有在收到这个通知后才会运行。是否有可能在Spring Boot中一个接一个地运行两个作业。请救命!
在一个项目中,我们必须运行一个定期开始的作业(现在QA env上每5分钟开始一次),该作业处理40K用户的一些任务。我们决定使用Spring Batch,因为它非常适合,并且几乎用默认配置实现了它(例如,它使用)。好的,有一个工作由一个步骤组成: 开箱即用 在内存中执行轻量级计算的自定义 自定义,它通过多个JPQL和本机查询将数据保存到同一个PostgreSQL db。 作业本身是用调度的,并且每