当前位置: 首页 > 知识库问答 >
问题:

使用Java DSL/启动作业的Spring批处理集成

阎祖鹤
2023-03-14

我有一个正在工作的Spring Boot/批处理项目,包含2个作业。

我现在尝试添加集成,仅使用java配置/java DSL从远程SFTP轮询文件,然后启动作业。

文件轮询工作正常,但我不知道如何在我的流中启动作业,尽管阅读了以下链接:

一些代码段:

@Bean
public SessionFactory SftpSessionFactory()
    {
        DefaultSftpSessionFactory sftpSessionFactory = new DefaultSftpSessionFactory();
        sftpSessionFactory.setHost("myip");
        sftpSessionFactory.setPort(22);
        sftpSessionFactory.setUser("user");
        sftpSessionFactory.setPrivateKey(new FileSystemResource("path to my key"));

    return sftpSessionFactory;
}

@Bean
public IntegrationFlow ftpInboundFlow() {
    return IntegrationFlows
        .from(Sftp.inboundAdapter(SftpSessionFactory())
            .deleteRemoteFiles(Boolean.FALSE)
            .preserveTimestamp(Boolean.TRUE)
            .autoCreateLocalDirectory(Boolean.TRUE)
            .remoteDirectory("remote dir")
            .regexFilter(".*\\.txt$")
            .localDirectory(new File("C:/sftp/")),
                e -> e.id("sftpInboundAdapter").poller(Pollers.fixedRate(600000)))
        .handle("FileMessageToJobRequest","toRequest")
        // what to put next to process the jobRequest ?

对于.handle(“FileMessageToJobRequest”,“ToRequest”),我使用的是http://docs.spring.io/spring-batch/trunk/reference/html/springbatchintegration.html

我将非常感谢任何帮助,非常感谢。

在我添加了Gary comment之后编辑,它不会编译--当然--因为我不理解请求是如何传播的:

.handle("FileMessageToJobRequest","toRequest")
.handle(jobLaunchingGw())
.get();
}

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher());
}

@Autowired
private JobLauncher jobLauncher;

@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
    JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
    return execution;
}

我找到了一种使用@ServiceActivator启动作业并将其添加到我的流中的方法,但我不确定这是否是一种好的做法:

 .handle("lauchBatchService", "launch")

@Component("lauchBatchService")
public class LaunchBatchService {
    private static Logger log = LoggerFactory.getLogger(LaunchBatchService.class);

@Autowired
private JobLauncher jobLauncher;

@ServiceActivator
public JobExecution launch(JobLaunchRequest req) throws JobExecutionException {

    JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());

    return execution;
}

}

共有1个答案

陈浩
2023-03-14
    .handle(jobLaunchingGw())
    // handle result

...

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher());
}

其中joblauncher()joblauncherbean。

编辑

您的服务激活器的操作与JLG的操作大致相同;它使用这个代码。

@Bean
public JobExecution jobLauncher(JobLaunchRequest req) throws JobExecutionException {
    JobExecution execution = jobLauncher.run(req.getJob(), req.getJobParameters());
    return execution;
}
@Autowired
private JobLauncher jobLauncher;

@Bean
public MessageHandler jobLaunchingGw() {
    return new JobLaunchingGateway(jobLauncher);   
} 
 类似资料:
  • 我需要从远程SFTP服务器下载一个文件,并使用spring batch处理它们。我已经实现了使用Spring集成下载文件的代码。但我无法从Spring集成组件启动Spring批处理作业。我有以下代码: 但这不起作用(上一个方法中的错误),因为找不到文件类型的bean。我不能把这两部分连在一起。如何连接集成和批处理?

  • 我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中

  • 我有以下工作要处理在一定的时间间隔或特别的基础上。 作业中的步骤如下: 我也想要用户界面,在那里我可以触发一个特别的基础上的工作,而且我应该能够提供参数从用户界面。 我想用Spring batch来完成这个任务,但它更多的是用于读->处理->写之类的工作。这里,在第一步中,我正在生成由第二步读取的数据。我不确定我是否还可以使用Spring batch来实现这个,或者有更好的方法来实现这个。

  • 是否可以配置Spring批处理管理员来启动主作业和从作业。我们有一个进程作为主节点和3-4个从节点。 Spring batch admin在单独的JVM进程中运行,但所有Spring批处理作业都使用相同的批处理数据库模式。

  • 我遵循了spring批处理文档,无法异步运行我的作业。 因此,我从一个web容器运行该作业,该作业将通过RESTendpoint触发。 我想让JobInstance ID在完成整个作业之前传递它作为响应。因此,他们可以稍后使用JobInstance ID检查作业的状态,而不是等待。但我没能让它工作。下面是我尝试过的示例代码。请让我知道我错过了什么或错了什么。 BatchConfig创建异步JobL