我有一个简单的控制器,它接受文件路径的JSON字符串,并对这些文件运行spring批处理作业。为了实现spring batch,我遵循了一个教程,该教程最终将在https://github.com/michaelhoffmantech/patter-batch-loader中生成代码。
@RestController
public class JobController {
private static final Log _logger = LogFactory.getLog(JobController.class);
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@RequestMapping(value = "/job/import", method = RequestMethod.POST)
public void importAsset(@RequestBody String uploadedFiles) throws JobParametersInvalidException, JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException {
_logger.debug("importAsset() >>");
try{
_logger.debug("Uploaded files: ");
_logger.debug(uploadedFiles);
_logger.debug("Putting values into the parameter map...");
Map<String, JobParameter> parameterMap = new LinkedHashMap<>();
parameterMap.put(Constants.JOB_PARAM_UPLOADED_FILES, new JobParameter(uploadedFiles));
_logger.debug("Launching job [" + job.getName() + "]...");
jobLauncher.run(job, new JobParameters(parameterMap));
_logger.debug("<< importAsset()");
}
catch (Exception e){
String errorMessage = "An error occured while importing a batch. " + e.getLocalizedMessage();
_logger.error(errorMessage, e);
throw e;
}
}
}
@Configuration
@EnableBatchProcessing
public class BatchConfig implements BatchConfigurer {
@Autowired
private PlatformTransactionManager platformTransactionManager;
@Autowired
private DataSource dataSource;
private JobRepository jobRepository;
private JobExplorer jobExplorer;
private JobLauncher jobLauncher;
@Override
public JobRepository getJobRepository() throws Exception {
return this.jobRepository;
}
@Override
public PlatformTransactionManager getTransactionManager() throws Exception {
return this.platformTransactionManager;
}
@Override
public JobLauncher getJobLauncher() throws Exception {
return this.jobLauncher;
}
@Override
public JobExplorer getJobExplorer() throws Exception {
return this.jobExplorer;
}
protected JobLauncher createJobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(this.jobRepository);
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
protected JobRepository createJobRepository() throws Exception {
JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
factory.setDataSource(this.dataSource);
factory.setTransactionManager(getTransactionManager());
factory.afterPropertiesSet();
return factory.getObject();
}
@PostConstruct
public void afterPropertiesSet() throws Exception{
this.jobRepository = createJobRepository();
JobExplorerFactoryBean jobExplorerFactoryBean = new JobExplorerFactoryBean();
jobExplorerFactoryBean.setDataSource(this.dataSource);
jobExplorerFactoryBean.afterPropertiesSet();
this.jobExplorer = jobExplorerFactoryBean.getObject();
this.jobLauncher = createJobLauncher();
}
}
@Configuration
public class BatchJobConfig {
private static final Log _logger = LogFactory.getLog(BatchJobConfig.class);
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry){
JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
postProcessor.setJobRegistry(jobRegistry);
return postProcessor;
}
@Bean
public Step step() throws Exception{
return this.stepBuilderFactory
.get(Constants.STEP_NAME)
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
_logger.debug("execute() >>");
_logger.debug("<< execute()");
return RepeatStatus.FINISHED;
}
})
.build();
}
@Bean
public Job job(Step step){
return this.jobBuilderFactory
.get(Constants.JOB_NAME)
.validator(validator())
.start(step)
.build();
}
public JobParametersValidator validator(){
return new JobParametersValidator(){
@Override
public void validate(JobParameters parameters) throws JobParametersInvalidException {
_logger.debug("validate() >>");
String filePathsJsonStr = parameters.getString(Constants.JOB_PARAM_UPLOADED_FILES);
if(StringUtils.isBlank(filePathsJsonStr)){
throw new JobParametersInvalidException("'" + Constants.JOB_PARAM_UPLOADED_FILES + "' parameter is required for job '" + Constants.JOB_NAME + "'.");
}
Gson gson = new Gson();
Type listType = new TypeToken<ArrayList<UploadFile>>(){}.getType();
ArrayList<UploadFile> uploadedFiles = gson.fromJson(filePathsJsonStr, listType);
for(UploadFile uploadFile: uploadedFiles){
File file = new File(uploadFile.getPath());
if(!file.exists()){
throw new JobParametersInvalidException("File '" + uploadFile.getPath() + "' did not exist or was not readable.");
}
}
_logger.debug("<< validate()");
}
};
}
}
2018-09-29 20:00:51.680 DEBUG 10104 --- [nio-8102-exec-2] c.g.m.t.t.controllers.JobController : importAsset() >>
2018-09-29 20:00:51.680 DEBUG 10104 --- [nio-8102-exec-2] c.g.m.t.t.controllers.JobController : Uploaded files:
2018-09-29 20:00:51.680 DEBUG 10104 --- [nio-8102-exec-2] c.g.m.t.t.controllers.JobController : [{"path":"C:\\app-import-staging\\cat-pet-animal-domestic-104827.jpeg","username":"test"},{"path":"C:\\app-import-staging\\kittens-cat-cat-puppy-rush-45170.jpeg","username":"test"}]
2018-09-29 20:00:51.680 DEBUG 10104 --- [nio-8102-exec-2] c.g.m.t.t.controllers.JobController : Putting values into the parameter map...
2018-09-29 20:00:51.681 DEBUG 10104 --- [nio-8102-exec-2] c.g.m.t.t.controllers.JobController : Launching job [app-import]...
2018-09-29 20:00:51.745 ERROR 10104 --- [nio-8102-exec-2] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.StackOverflowError] with root cause
java.lang.StackOverflowError: null
at java.util.HashMap.putVal(HashMap.java:657) ~[na:1.8.0_181]
at java.util.HashMap.put(HashMap.java:612) ~[na:1.8.0_181]
at java.util.HashSet.add(HashSet.java:220) ~[na:1.8.0_181]
at java.util.Collections$SynchronizedCollection.add(Collections.java:2035) ~[na:1.8.0_181]
at java.lang.ClassLoader.checkPackageAccess(ClassLoader.java:508) ~[na:1.8.0_181]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at com.sun.proxy.$Proxy83.getTransaction(Unknown Source) ~[na:na]
at sun.reflect.GeneratedMethodAccessor93.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_181]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_181]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at com.sun.proxy.$Proxy83.getTransaction(Unknown Source) ~[na:na]
at sun.reflect.GeneratedMethodAccessor93.invoke(Unknown Source) ~[na:na]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_181]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_181]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:333) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157) ~[spring-aop-4.3.14.RELEASE.jar:4.3.14.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-3.0.8.RELEASE.jar:3.0.8.RELEASE]
继续下去,直到它抛出StackOverflowError。
任何关于改变什么来修复此问题的建议或帮助都将不胜感激。
BatchConfig.java中的更改
@Override
public PlatformTransactionManager getTransactionManager() throws Exception {
return this.platformTransactionManager;
}
到
@Override
public PlatformTransactionManager getTransactionManager() throws Exception {
return new DataSourceTransactionManager(dataSource);
}
我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中
我需要从远程SFTP服务器下载一个文件,并使用spring batch处理它们。我已经实现了使用Spring集成下载文件的代码。但我无法从Spring集成组件启动Spring批处理作业。我有以下代码: 但这不起作用(上一个方法中的错误),因为找不到文件类型的bean。我不能把这两部分连在一起。如何连接集成和批处理?
我正在处理一个Spring批处理应用程序,该应用程序包含两个不同的作业bean(表示两个不同的作业)。这两项工作都必须由我的应用程序执行(目前,它可以顺序和并行地完成。目前它不是那么重要)。 我会试着解释我的情况和遇到的问题是什么: 首先,我有一个配置类,其中声明了我的两个作业对象(以及相关步骤): 然后,在第一时间,我创建了另一个SpringBatchExampleJobLauncher启动器类
我有一个Spring批处理tasklet,我不知道如何从中失败。我想检查某些参数,如果它们不存在,则在该步骤中使作业失败。 注释掉的行是我试图让工作退出的行。有人有过这样的经历吗?
是否可以配置Spring批处理管理员来启动主作业和从作业。我们有一个进程作为主节点和3-4个从节点。 Spring batch admin在单独的JVM进程中运行,但所有Spring批处理作业都使用相同的批处理数据库模式。
我正试图从oozie开始一项养猪工作,但失败了。 Oozie stacktrace: java.lang.运行时间异常:java.lang.类org.apache.oozie.action.hadoop.PigMain未在org.apache.hadoop.conf.找到onfiguration.get类(Configuration.java:1897)在org.apache.oozie.acti