当前位置: 首页 > 知识库问答 >
问题:

从Spring集成启动Spring批处理作业

楚俊逸
2023-03-14

我需要从远程SFTP服务器下载一个文件,并使用spring batch处理它们。我已经实现了使用Spring集成下载文件的代码。但我无法从Spring集成组件启动Spring批处理作业。我有以下代码:

    @Autowired
private JobLauncher jobLauncher;

public String OUTPUT_DIR = "temp_dir";

@Value("${sftp.remote.host}")
private String sftpRemoteHost;

@Value("${sftp.remote.user}")
private String sftpUsername;

@Value("${sftp.remote.password}")
private String sftpPassword;

@Value("${sftp.remote.folder}")
private String sftpFolder;

@Bean
public DefaultSftpSessionFactory sftpSessionFactory() {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
    factory.setHost(sftpRemoteHost);
    factory.setAllowUnknownKeys(true);
    factory.setUser(sftpUsername);
    factory.setPassword(sftpPassword);
    return factory;
}

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
    final SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
    fileSynchronizer.setDeleteRemoteFiles(false);
    fileSynchronizer.setRemoteDirectory(sftpFolder);
    fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.csv"));
    return fileSynchronizer;
}

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
    final SftpInboundFileSynchronizingMessageSource source =
            new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
    source.setLocalDirectory(new File(OUTPUT_DIR));
    source.setAutoCreateLocalDirectory(true);
    source.setLocalFilter(new AcceptOnceFileListFilter<>());
    return source;
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
    final FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
    handler.setFileExistsMode(FileExistsMode.REPLACE);
    handler.setExpectReply(true);
    handler.setOutputChannelName("parse-csv-channel");
    return handler;
}

@ServiceActivator(inputChannel = "parse-csv-channel", outputChannel = "job-channel")
public JobLaunchRequest adapt(final File file) throws Exception {
    final JobParameters jobParameters = new JobParametersBuilder().addString(
            "input.file", file.getAbsolutePath()).toJobParameters();
    return new JobLaunchRequest(batchConfiguration.job(), jobParameters);
}

@ServiceActivator(inputChannel = "job-channel", outputChannel = "finish")
public JobLaunchingMessageHandler jobHandler(JobLaunchRequest request) throws JobExecutionException {
    return new JobLaunchingMessageHandler(jobLauncher);//.launch(request);
}

@ServiceActivator(inputChannel = "finish")
public void finish() {
    System.out.println("FINISH");
}

但这不起作用(上一个方法adapt中的错误),因为找不到文件类型的bean。我不能把这两部分连在一起。如何连接集成和批处理?

共有1个答案

司马宏邈
2023-03-14

您肯定只需要从adapt()方法中删除@Bean注释。如果我们真的构建了MessageHandlerBean,我们需要@Bean,例如JobLaunchingMessageHandler来接受JobLaunchRequest负载:https://docs.spring.io/spring-batch/trunk/reference/html/springBatchIntegration.html#launching-通过消息批处理作业。

请参阅参考手册中有关消息注释的更多信息:https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/configuration.html#annotations_on_beans

使现代化

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
    final FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(OUTPUT_DIR));
    handler.setFileExistsMode(FileExistsMode.REPLACE);
    handler.setExpectReply(true);
    handler.setOutputChannelName("parse-csv-channel");
    return handler;
}

@ServiceActivator(inputChannel = "parse-csv-channel", outputChannel = "job-channel")
public JobLaunchRequest adapt(final File file) throws Exception {
    final JobParameters jobParameters = new JobParametersBuilder().addString(
            "input.file", file.getAbsolutePath()).toJobParameters();
    return new JobLaunchRequest(batchConfiguration.job(), jobParameters);
}

@Bean
@ServiceActivator(inputChannel = "job-channel")
public JobLaunchingGateway jobHandler() {
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
    jobLaunchingGateway.setOutputChannelName("finish");
    return jobLaunchingGateway;
}
 类似资料:
  • 我有一个正在工作的Spring Boot/批处理项目,包含2个作业。 我现在尝试添加集成,仅使用java配置/java DSL从远程SFTP轮询文件,然后启动作业。 文件轮询工作正常,但我不知道如何在我的流中启动作业,尽管阅读了以下链接: 一些代码段: 对于.handle(“FileMessageToJobRequest”,“ToRequest”),我使用的是http://docs.spring.

  • 是否可以配置Spring批处理管理员来启动主作业和从作业。我们有一个进程作为主节点和3-4个从节点。 Spring batch admin在单独的JVM进程中运行,但所有Spring批处理作业都使用相同的批处理数据库模式。

  • 我有一个spring批处理作业,从CSV文件读取并写入数据库。我想让它重新启动。例如,如果在读取文件或写入db时出现异常,导致作业失败,则应从失败的同一点/块重新开始,而不是从头开始读取整个文件。 我正在从一个endpoint触发作业启动器,并在我的控制器中配置了它。 目前,我正在通过控制器将参数(这是一个唯一的标识符/数字)传递给作业参数,以运行新的作业实例。如果作业失败,我将使用与GET请求中

  • 我最近开始进行groovy测试,以测试使用一个步骤(ItemReader、ItemProcessor、ItemWriter)的spring批处理作业 知道我正在使用h2内存数据库,我已尝试通过单元测试启动作业,但无法验证作业是否已完成: 此外,当我试图调试它时,它不会从一端到另一端,而是在测试完成后在ItemReader中停止。我知道这不是等待工作完成。 有没有办法在工作完成后才完成测试? 非常感

  • 我遵循了spring批处理文档,无法异步运行我的作业。 因此,我从一个web容器运行该作业,该作业将通过RESTendpoint触发。 我想让JobInstance ID在完成整个作业之前传递它作为响应。因此,他们可以稍后使用JobInstance ID检查作业的状态,而不是等待。但我没能让它工作。下面是我尝试过的示例代码。请让我知道我错过了什么或错了什么。 BatchConfig创建异步JobL