我最近开始使用java配置方式编写spring批处理程序,并使用spring批处理和starter包。我使用了分区的步骤和任务执行器来完成我的工作,我面临的问题是,一旦作业完成,批处理过程就不会停止,它一直在我的eclipse和Linux盒子中运行。我手动找到并终止作业。你能帮个忙吗。当我在没有分区步骤的情况下以单线程的方式运行作业时,这工作很好。
我的作业配置:
@Bean
@StepScope
public ItemReader<MediaAsset> metaDataExportReader(@Value("#{jobParameters[sourceSystemCode]}") String sourceSystemCode,@Value("#{jobParameters[assetType]}") String assetType,@Value("#{stepExecutionContext[startingMediaAssetId]}") long startingMediaAssetId,
@Value("#{stepExecutionContext[endingMediaAssetId]}") long endingMediaAssetId,@Value("#{stepExecutionContext[threadName]}") String threadName) throws Exception {
logger.debug("Reader is called...."+sourceSystemCode);
logger.debug("page size---------->"+jobConfig.getPageOrChunkSizeMetaDataExport());
logger.debug("startingMediaAssetId---------->"+startingMediaAssetId);
logger.debug("endingMediaAssetId"+endingMediaAssetId);
logger.debug("threadName"+threadName);
final Map<String,Object> parameters = new HashMap<>();
parameters.put("startingMediaAssetId",startingMediaAssetId);
parameters.put("endingMediaAssetId",endingMediaAssetId);
JdbcPagingItemReader<MediaAsset> jdbcPagingItemReader = getJdbcPagingItemReader(sourceSystemCode, assetType);
jdbcPagingItemReader.setParameterValues(parameters);
return jdbcPagingItemReader;
}
@Bean(destroyMethod="close")
@StepScope
public ItemWriter<MediaAsset> metaDataExportWriter(@Value("#{jobParameters[sourceSystemCode]}") String sourceSystemCode,@Value("#{jobParameters[assetType]}") String assetType,@Value("#{stepExecutionContext[startingMediaAssetId]}") long startingMediaAssetId,
@Value("#{stepExecutionContext[endingMediaAssetId]}") long endingMediaAssetId,@Value("#{stepExecutionContext[threadName]}") String threadName) throws Exception {
logger.debug("Coming here Item Writer,..."+threadName);
logger.debug("getItemsPerFile---------->"+jobConfig.getPageOrChunkSizeMetaDataExport());
//for xml file creation
StaxEventItemWriter<MediaAsset> staxEventItemWriter = new StaxEventItemWriter<>();
staxEventItemWriter.setRootTagName(DL3ConstantUtil.EXPORT_ASSET_METADATA_BY_SOURCESYSTEM_CODE_ROOT_TAG);
staxEventItemWriter.setMarshaller(marshaller);
staxEventItemWriter.setOverwriteOutput(true);
//for splitting the files into multiple files based on record size
MultiResourceItemWriter<MediaAsset> multiResourceItemWriter = new MultiResourceItemWriter<>();
multiResourceItemWriter.setItemCountLimitPerResource(jobConfig.getPageOrChunkSizeMetaDataExport());
multiResourceItemWriter.setDelegate(staxEventItemWriter);
multiResourceItemWriter.setResourceSuffixCreator(new ResourceSuffixCreator() {
@Override
public String getSuffix(int index) {
return DL3ConstantUtil.UNDERSCORE+threadName+DL3ConstantUtil.UNDERSCORE+startingMediaAssetId+DL3ConstantUtil.UNDERSCORE+endingMediaAssetId+DL3ConstantUtil.UNDERSCORE+index+DL3ConstantUtil.EXPORT_ASSET_METADATA_BY_SOURCESYSTEM_CODE_FILE_NAME_SUFFIX;
}
});
logger.debug("writer sourceSystemCode"+sourceSystemCode);
switch (assetType) {
case DL3ConstantUtil.IMAGE_ASSET:
switch (sourceSystemCode) {
case DL3ConstantUtil.LIGHTBOX:
multiResourceItemWriter.setResource(new FileSystemResource(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"IA"+jobConfig.getBackSlash()+"DPL"+jobConfig.getBackSlash()+DL3ConstantUtil.EXPORT_ASSET_METADATA_BY_SOURCESYSTEM_CODE_LIGHT_BOX_FILE_NAME_PREFIX_NAME_IMG));
break;
case DL3ConstantUtil.SOLAR:
multiResourceItemWriter.setResource(new FileSystemResource(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"IA"+jobConfig.getBackSlash()+"SOLAR"+jobConfig.getBackSlash()+DL3ConstantUtil.EXPORT_ASSET_METADATA_BY_SOURCESYSTEM_CODE_SOLAR_BOX_FILE_NAME_PREFIX_NAME_IMG));
break;
case DL3ConstantUtil.MANUAL_UPLOAD:
multiResourceItemWriter.setResource(new FileSystemResource(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"IA"+jobConfig.getBackSlash()+"DDDS"+jobConfig.getBackSlash()+DL3ConstantUtil.EXPORT_ASSET_METADATA_BY_SOURCESYSTEM_CODE_DDDS_BOX_FILE_NAME_PREFIX_NAME_IMG));
break;
default:
break;
}
break;
case DL3ConstantUtil.DOCUMENT_ASSET:
switch (sourceSystemCode) {
case DL3ConstantUtil.SOLAR:
multiResourceItemWriter.setResource(new FileSystemResource(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"DA"+jobConfig.getBackSlash()+"SOLAR"+jobConfig.getBackSlash()+DL3ConstantUtil.EXPORT_ASSET_METADATA_BY_SOURCESYSTEM_CODE_SOLAR_BOX_FILE_NAME_PREFIX_NAME_DOC));
break;
case DL3ConstantUtil.MANUAL_UPLOAD:
multiResourceItemWriter.setResource(new FileSystemResource(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"DA"+jobConfig.getBackSlash()+"DDDS"+jobConfig.getBackSlash()+DL3ConstantUtil.EXPORT_ASSET_METADATA_BY_SOURCESYSTEM_CODE_DDDS_BOX_FILE_NAME_PREFIX_NAME_DOC));
break;
default:
break;
}
break;
default:
throw new Exception("no matching assetType ");
}
return multiResourceItemWriter;
}
@Bean(name="GenerateXMLFilesMaster")
public Step generateXMLFilesMaster(ItemReader<MediaAsset> metaDataExportReader,ItemWriter<MediaAsset> metaDataExportWriter) {
logger.debug("Master Step initialization...");
return stepBuilderFactory.get("GenerateXMLFilesMaster").
partitioner(generateXMLFilesSlave(metaDataExportReader,metaDataExportWriter)).
partitioner("GenerateXMLFilesSlave",metaDataExportPartioner(null,null,null)).
partitionHandler(metaDataExportPartionHandler(metaDataExportReader,metaDataExportWriter)).
build();
}
@Bean(name="GenerateXMLFilesSlave")
public Step generateXMLFilesSlave(ItemReader<MediaAsset> metaDataExportReader,ItemWriter<MediaAsset> metaDataExportWriter) {
return stepBuilderFactory.get("GenerateXMLFilesSlave")
.<MediaAsset, MediaAsset> chunk(jobConfig.getPageOrChunkSizeMetaDataExport())
.reader(metaDataExportReader)
.writer(metaDataExportWriter)
.build();
}
@Bean(name="uploadTaskletMetaData")
@StepScope
public Tasklet uploadTaskletMetaData(@Value("#{jobParameters[sourceSystemCode]}") String sourceSystemCode,@Value("#{jobParameters[assetType]}") String assetType){
MetaDataUploadTasklet metaDataUploadTasklet = new MetaDataUploadTasklet();
logger.debug("sourceSystemCode----->"+sourceSystemCode);
logger.debug("assetType----->"+assetType);
metaDataUploadTasklet.setTargetFolder(jobConfig.getTargetMetaDataRootPath());
switch (assetType) {
case DL3ConstantUtil.IMAGE_ASSET:
switch (sourceSystemCode) {
case DL3ConstantUtil.LIGHTBOX:
metaDataUploadTasklet.setSourceDirectory(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"IA"+jobConfig.getBackSlash()+"DPL"+jobConfig.getBackSlash());
//metaDataUploadTasklet.setTargetFolder(jobConfig.getTargetMetaDataRootPath()+"/IA/DPL");
break;
case DL3ConstantUtil.SOLAR:
metaDataUploadTasklet.setSourceDirectory(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"IA"+jobConfig.getBackSlash()+"SOLAR"+jobConfig.getBackSlash());
//metaDataUploadTasklet.setTargetFolder(jobConfig.getTargetMetaDataRootPath()+"/IA/SOLAR");
break;
case DL3ConstantUtil.MANUAL_UPLOAD:
metaDataUploadTasklet.setSourceDirectory(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"IA"+jobConfig.getBackSlash()+"DDDS"+jobConfig.getBackSlash());
//metaDataUploadTasklet.setTargetFolder(jobConfig.getTargetMetaDataRootPath()+"/IA/DDDS");
break;
default:
break;
}
break;
case DL3ConstantUtil.DOCUMENT_ASSET:
switch (sourceSystemCode) {
case DL3ConstantUtil.SOLAR:
metaDataUploadTasklet.setSourceDirectory(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"DA"+jobConfig.getBackSlash()+"SOLAR"+jobConfig.getBackSlash());
//metaDataUploadTasklet.setTargetFolder(jobConfig.getTargetMetaDataRootPath()+"/DA/SOLAR");
break;
case DL3ConstantUtil.MANUAL_UPLOAD:
metaDataUploadTasklet.setSourceDirectory(jobConfig.getTargetFileLocation()+jobConfig.getBackSlash()+"DA"+jobConfig.getBackSlash()+"DDDS"+jobConfig.getBackSlash());
//metaDataUploadTasklet.setTargetFolder(jobConfig.getTargetMetaDataRootPath()+"/DA/DDDS");
break;
default:
break;
}
break;
default:
break;
}
return metaDataUploadTasklet;
}
@Bean(name="UploadXMLFiles")
public Step uploadXMLFiles(){
return stepBuilderFactory.get("UploadXMLFiles").tasklet(uploadTaskletMetaData(null,null)).build();
}
@Bean
@StepScope
public Partitioner metaDataExportPartioner(@Value("#{jobParameters[sourceSystemCode]}") String sourceSystemCode,@Value("#{jobParameters[assetType]}") String assetType,@Value("#{jobExecutionContext[totalCount]}") String totalCount){
logger.debug("source system code--->"+sourceSystemCode);
logger.debug("assetType--->"+assetType);
MetaDataExportPartioner metaDataExportPartioner = new MetaDataExportPartioner();
metaDataExportPartioner.setSourceSystemCode(sourceSystemCode);
metaDataExportPartioner.setAssetType(assetType);
logger.debug("In the partioner initiliazation------>"+totalCount);
metaDataExportPartioner.setTotalCount(StringUtils.isEmpty(totalCount)?0:Integer.parseInt(totalCount));
return metaDataExportPartioner;
}
@Bean
public PartitionHandler metaDataExportPartionHandler(ItemReader<MediaAsset> reader,ItemWriter<MediaAsset> writer){
logger.debug("Initializing partionHandler------>");
TaskExecutorPartitionHandler partitionHandler = new TaskExecutorPartitionHandler();
partitionHandler.setStep(generateXMLFilesSlave(reader,writer));
partitionHandler.setGridSize(6);
partitionHandler.setTaskExecutor(taskExecutor());
return partitionHandler;
}
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(10);
taskExecutor.setCorePoolSize(10);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean()
public JobExecutionListener metaDataExportJobExecutionListener(){
JobExecutionListener jobExecutionListener = new MetaDataExportJobListener();
return jobExecutionListener;
}
@Bean
public Job exportMetaDataJob(JobExecutionListener metaDataExportJobExecutionListener) throws Exception {
return jobBuilderFactory.get("ExportMetaDataJob")
.incrementer(new RunIdIncrementer())
.listener(metaDataExportJobExecutionListener)
.flow(generateXMLFilesMaster(metaDataExportReader(null,null,0L,0L,null),metaDataExportWriter(null,null,0L,0L,null)))
//.next(uploadXMLFiles())
.end()
.build();
}
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.3.2.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<spring-cloud-version>1.0.4.RELEASE</spring-cloud-version>
<spring-batch-admin.version>1.3.0.RELEASE</spring-batch-admin.version>
</properties>
<dependencies>
<!-- <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId>
</dependency> -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- <dependency> <groupId>org.springframework.batch</groupId> <artifactId>spring-batch-admin-manager</artifactId>
<version>${spring-batch-admin.version}</version> <exclusions> <exclusion>
<artifactId>slf4j-log4j12</artifactId> <groupId>org.slf4j</groupId> </exclusion>
<exclusion> <artifactId>slf4j-api</artifactId> <groupId>org.slf4j</groupId>
</exclusion> </exclusions> </dependency> -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-aws-context</artifactId>
<version>${spring-cloud-version}</version>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>sqljdbc4</artifactId>
<version>4.0</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc14</artifactId>
<version>10.2.0.3.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.3</version>
</dependency>
<!-- <dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.0.1</version>
</dependency> -->
<!-- <dependency> <groupId>com.sun.xml.bind</groupId> <artifactId>jaxb-impl</artifactId>
<version>2.0.1</version> </dependency> -->
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
当没有任何非守护进程线程运行时,JVM会自动关闭。在非分区情况下,当作业完成时没有非守护进程线程运行,因此JVM关闭。但是,在分区用例中,必须有一些等待工作的东西仍然阻止应用程序关闭。执行线程转储将有助于诊断问题,但我敢打赌,ThreadPoolTaskExecutor
持有的线程才是问题所在。如果是,您可能希望查看一个不创建线程池(防止JVM关闭)的选项。
下面是JobLauncher.run()返回的JobExecution的内容,它发出作业运行成功的信号。 JobExecution:ID=0,Version=2,StartTime=Fri Nov 27 06:05:23 CST 2015,EndTime=Fri Nov 27 06:05:39 CST 2015,LastUpdated=Fri Nov 27 06:05:39 CST 2015,St
我正在尝试在后台运行作业,允许我根据某种条件或在超时发生后停止它。 我有这两块代码:
我已经使用开始我的作业,当我尝试使用另一个请求停止作业时,然后获取exeption: JobExecutionNotrunningException:JobExecution必须正在运行,才能停止 当打印作业状态总是获取但批处理作业正在运行时 它的web应用程序,首先上传一些CSV文件,并使用spring batch启动一些操作,在执行过程中,如果用户需要停止,则从另一个控制器方法来停止请求,并试
我正在做一个项目,我们正在使用Spring Boot、Spring Batch和Camel。 关于如何在JobExecution数据可用时立即返回它,有什么想法吗?
我知道我可以用云函数和PubSub通知来完成每个写入的文件,但我更喜欢只在整个文件夹完成时这样做一次。 谢了!
我们有一个用例,需要从一些分页的API读取数据,然后写入一些下游的Kafka主题。 我们已经能够通过Spring批处理集成远程分区来实现解决方案,其中管理器通过创建包含页码和偏移量以读取数据的执行上下文来处理任务的分区。管理器创建此执行上下文并将它们放在MessagingChannel上(我可以使用RabbitMQ和Kafka主题,以提供解决方案者为准)。工作人员(超过1个)从MessagingC