我有一个正在工作的Spring Boot/批处理项目,包含2个作业。
我现在尝试添加集成,仅使用java配置/java DSL从远程SFTP轮询文件,然后启动作业。
文件轮询工作正常,但我不知道如何在我的流中启动作业,尽管阅读了以下链接:
一些代码段:
@Bean
public SessionFactory SftpSessionFactory()
{
DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
sftpSessionFactory.setHost("myip");
sftpSessionFactory.setPort(22);
sftpSessionFactory.setUser("user");
sftpSessionFactory.setPrivateKey(new FileSystemResource("path to my key"));
return sftpSessionFactory;
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(SftpSessionFactory())
.deleteRemoteFiles(Boolean.FALSE)
.preserveTimestamp(Boolean.TRUE)
.autoCreateLocalDirectory(Boolean.TRUE)
.remoteDirectory("remote dir")
.regexFilter(".*\\.txt$")
.localDirectory(new File("C:/sftp/")),
e -> e.id("sftpInboundAdapter").poller(Pollers.fixedRate(600000)))
.handle("FileMessageToJobRequest","toRequest")
// what to put next to process the jobRequest ?
对于.handle(“FileMessageToJobRequest”,“ToRequest”),我使用的是http://docs.spring.io/spring-batch/trunk/reference/html/springbatchintegration.html
我将非常感谢任何帮助,非常感谢。
在我添加了Gary comment之后编辑,它不会编译--当然--因为我不理解请求是如何传播的:
.handle("FileMessageToJobRequest","toRequest")
.handle(jobLaunchingGw())
.get();
}
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher());
}
@Autowired
private JobLauncher jobLauncher;
@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
return execution;
}
我找到了一种使用@ServiceActivator启动作业并将其添加到我的流中的方法,但我不确定这是否是一种好的做法:
.handle("lauchBatchService", "launch")
@Component("lauchBatchService")
public class LaunchBatchService {
private static Logger log = LoggerFactory.getLogger(LaunchBatchService.class);
@Autowired
private JobLauncher jobLauncher;
@ServiceActivator
public JobExecution launch(JobLaunchRequest req) throws JobExecutionException {
JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
return execution;
}
}
.handle(jobLaunchingGw())
// handle result
...
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher());
}
其中joblauncher()
是joblauncher
bean。
编辑
您的服务激活器的操作与JLG的操作大致相同;它使用这个代码。
@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
return execution;
}
@Autowired
private JobLauncher jobLauncher;
@Bean
public MessageHandler jobLaunchingGw() {
return new JobLaunchingGateway(jobLauncher);
}
我需要从远程SFTP服务器下载一个文件,并使用spring batch处理它们。我已经实现了使用Spring集成下载文件的代码。但我无法从Spring集成组件启动Spring批处理作业。我有以下代码: 但这不起作用(上一个方法中的错误),因为找不到文件类型的bean。我不能把这两部分连在一起。如何连接集成和批处理?
布尔沙赫布尔
我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中
我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。
是否可以配置Spring批处理管理员来启动主作业和从作业。我们有一个进程作为主节点和3-4个从节点。 Spring batch admin在单独的JVM进程中运行,但所有Spring批处理作业都使用相同的批处理数据库模式。
我遵循了spring批处理文档,无法异步运行我的作业。 因此,我从一个web容器运行该作业,该作业将通过RESTendpoint触发。 我想让JobInstance ID在完成整个作业之前传递它作为响应。因此,他们可以稍后使用JobInstance ID检查作业的状态,而不是等待。但我没能让它工作。下面是我尝试过的示例代码。请让我知道我错过了什么或错了什么。 BatchConfig创建异步JobL