我有点困惑,因为当通过HTTP请求启动Spring Batch作业的执行时,如果我在作业执行时收到另一个HTTP请求来启动相同的作业,但参数不同,则正在执行的作业停止未完成并开始处理新作业。
我开发了一个API REST来加载和处理Excel文件的内容。web服务公开了两个endpoint,一个用于加载、验证和存储数据库中Excel文件的内容,另一个用于开始处理存储在html" target="_blank">数据库中的记录。
>
POST /api/excel/upload此endpoint接收Excel文件。当接收到请求时,每个文件都会被分配一个唯一标识符并验证其内容。如果内容正确,它会将其插入到等待处理的临时表中。
获取/api/Excel/process?id=x此endpoint接收要处理的文件的标识符。当收到请求时,一个Spring批处理作业开始处理临时表中的记录。
@PostMapping(produces = {APPLICATION_JSON_VALUE})
public ResponseEntity<Page<ExcelLoad>> post(@RequestParam("file") MultipartFile multipartFile)
{
return super.getResponse().returnPage(service.upload(multipartFile));
}
@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public DeferredResult<ResponseEntity<Void>> get(@RequestParam("id") Integer idCarga)
{
DeferredResult<ResponseEntity<Void>> response = new DeferredResult<>(1000L);
response.onTimeout(() -> response.setResult(super.getResponse().returnVoid()));
ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
return response;
}
我使用DelferredResult在收到请求后向客户端发送响应,而不用等待作业完成
public void startJob(int idCarga)
{
JobParameters params = new JobParametersBuilder()
.addString("mainJob", String.valueOf(System.currentTimeMillis()))
.addString("idCarga", String.valueOf(idCarga))
.toJobParameters();
try
{
jobLauncher.run(job, params);
}
catch (JobExecutionException e)
{
log.error("---ERROR: {}", e.getMessage());
}
}
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
return stepBuilderFactory.get("step")
.<List<ExcelLoad>, Invoice>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant().skipPolicy(new ExceptionSkipPolicy())
.listener(stepSkipListener)
.build();
}
@Bean
public Job mainJob(Step mainStep)
{
return jobBuilderFactory.get("mainJob")
.listener(mainJobExecutionListener)
.incrementer(new RunIdIncrementer())
.start(mainStep)
.build();
}
在执行一些测试时,我观察到以下行为:
>
如果我向endpoint/进程发出请求,请求第一个处理文件1,在它完成之前,我再次请求处理文件2:在这种情况下,并不是所有存储在临时表中的记录都被处理:
多亏了@Mahmoud Ben Hassine的帮助,我才得以解决这个问题。为了帮助实现,如果有人遇到这个问题,我将分享代码,在我的例子中,这些代码已经解决了这个问题:
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job job;
@GetMapping(value = "/process", produces = APPLICATION_JSON_VALUE)
public void get(@RequestParam("id") Integer idCarga) throws JobExecutionException
{
JobParameters params = new JobParametersBuilder()
.addString("mainJob", String.valueOf(System.currentTimeMillis()))
.addString("idCarga", String.valueOf(idCarga))
.toJobParameters();
jobLauncher.run(job, params);
}
@Configuration
@EnableBatchProcessing
public class BatchConfig extends DefaultBatchConfigurer
{
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private StepSkipListener stepSkipListener;
@Autowired
private MainJobExecutionListener mainJobExecutionListener;
@Bean
public TaskExecutor taskExecutor()
{
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(10);
taskExecutor.setThreadNamePrefix("batch-thread-");
return taskExecutor;
}
@Bean
public JobLauncher jobLauncher() throws Exception
{
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(getJobRepository());
jobLauncher.setTaskExecutor(taskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
@Bean
public Step mainStep(ReaderImpl reader, ProcessorImpl processor, WriterImpl writer)
{
return stepBuilderFactory.get("step")
.<List<ExcelLoad>, Invoice>chunk(10)
.reader(reader)
.processor(processor)
.writer(writer)
.faultTolerant().skipPolicy(new ExceptionSkipPolicy())
.listener(stepSkipListener)
.build();
}
@Bean
public Job mainJob(Step mainStep)
{
return jobBuilderFactory.get("mainJob")
.listener(mainJobExecutionListener)
.incrementer(new RunIdIncrementer())
.start(mainStep)
.build();
}
}
如果在应用了这个代码之后,就像我遇到的那样,你在数据库中插入记录时也遇到了问题,你可以通过这个问题,我也把适用于我的代码放在这里。
JobLauncher
不会停止作业执行,它只会启动它们。Spring Batch提供的默认作业启动程序是SimpleZoblancher
,它将作业启动委托给TaskExecutor
。现在,根据您使用的任务执行器实现以及如何配置它以启动并发任务,您可以看到不同的行为。例如,当启动新作业执行并将新任务提交给任务执行者时,如果所有工作人员都很忙,任务执行者可以决定拒绝此提交,或者将其放入等待队列,或者停止另一个任务并提交新任务。这些策略取决于几个参数(TaskExecutor
实现、幕后使用的队列类型、RejectedExecutionHandler
实现等)。
在您的情况下,您似乎在使用以下内容:
ForkJoinPool.commonPool().submit(() -> service.startJob(idCarga));
因此,您需要检查这个池在处理新任务提交方面的行为(我猜这就是停止工作的原因,但您需要确认这一点)。也就是说,我不明白你为什么需要这个。如果您的要求如下:
我使用DelferredResult在收到请求后向客户端发送响应,而不用等待作业完成
然后,可以在作业启动器中使用异步任务执行器实现(如线程池任务执行器
),请参阅从Web容器中运行作业。
我有一个作业流,我希望以以下方式运行它: 作业流将从Job1开始。在Job1成功完成后,Job1将同时启动Job2和Job4。 Job2和Job4将并行运行。 在Job2成功完成后,Job2将启动Job3。 在Job4成功完成后,Job4将启动Job5。 下面是job1.xml和job1的作业启动器类的代码片段: job1.xml uijobLauncher.java “job2,Job3”对和“
问题内容: 运行main方法时,将执行作业。这样我无法弄清楚如何控制作业的执行。例如,您如何安排作业,访问作业执行或设置作业参数的方式。 我试图注册自己的JobLauncher 但是当我尝试在主要方法中使用它时: 当加载上下文时,该作业再次执行,而当我尝试手动运行它时,我得到了。有没有办法防止自动作业执行? 问题答案: 通过设置可以防止作业执行 在application.properties中。或
我按照这个示例使用Boot进行Spring批处理。 运行main方法时,作业将执行。这样我就不知道如何控制作业的执行了。例如如何排定作业、访问作业执行或设置作业参数。 我尝试注册自己的JobLauncher 但当我尝试在主法中使用时: 当加载上下文时,再次执行作业,并且尝试手动运行作业时得到。有没有办法防止自动执行作业?
我在BatchScheduler中有多个计划作业,它在特定时间运行。简单的内置JobLauncher,这是同步的。在自然界中最初使用。现在,我想并行运行这些作业,这样没有作业可以等待其他作业完成。 我在不同的作业上尝试过@Async注释,但都不起作用。 然后,我尝试设置joblauncher.settaskexecutor(新的SimpleAsyncTaskExecutor())。但这并不奏效。
我正在使用spring批处理读取CSV文件并使用controller触发器将其写入DB。在启动应用程序时,在我从浏览器url中点击之前,我会在启动时看到来自阅读器的打印语句。虽然它不为我的处理器或写入器打印它,它们是在单独的类中,我已经自动连线。是因为读者是豆子吗?
我不知道如何使用调用Spring批处理中定义的作业,文档细节对我来说是不够的。 我遵循了Spring Batch官方指南,使用Java注释(例如)在Spring Batch中编写作业,因为我希望避免使用XML配置文件来描述作业、步骤等。 到目前为止我已经: 配置类(见下文),我使用AnnotaIon将定义、、、和(带有的所有内容放入其中。 具有方法的类,该方法具有并具有注释,以导入处理作业中的数据