当前位置: 首页 > 知识库问答 >
问题:

监视远程目录中添加的文件,并将其流式传输以通过SFTP读取数据

穆单鹗
2023-03-14

我想在远程机器上添加一个手表,用于新添加的CSV文件或未读文件。一旦文件被识别,根据文件名中的时间戳读取。文件将使用流媒体而不是本地机器读取。读取文件时,将读取附加到文件名,并在读取文件后附加读取。该文件将通过SFTP协议读取,我计划使用spring集成SFTP。如果读取文件时出错或文件中的数据不符合预期,我想将该文件移动到子目录中。

我尝试轮询远程目录并读取一次CSV文件。一旦读取,我将从目录中删除该文件。

 <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-sftp</artifactId>
            <version>5.1.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>5.0.6.RELEASE</version>
        </dependency>

Spring boot version 2.0.3.RELEASE   
 @Bean
    public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost(hostname);
        factory.setPort(22);
        factory.setUser(username);
        factory.setPassword(password);
        factory.setAllowUnknownKeys(true);
        return new CachingSessionFactory<ChannelSftp.LsEntry>(factory);
    }

    @Bean
    public MessageSource<InputStream> sftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory(path);
        messageSource.setFilter(compositeFilters());
        return messageSource;
    }

    public CompositeFileListFilter compositeFilters() {
        return new CompositeFileListFilter()
                .addFilter(new SftpRegexPatternFileListFilter(".*csv"));
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @Bean
    public IntegrationFlow sftpOutboundListFlow() {
        return IntegrationFlows.from(this.sftpMessageSource(), e -> e.poller(Pollers.fixedDelay(5, TimeUnit.SECONDS)))
                .handle(Sftp.outboundGateway(template(), NLST, path).options(Option.RECURSIVE)))
                .filter(compositeFilters())
                .transform(sorter())
                .split()
                .handle(Sftp.outboundGateway(template(), GET, "headers['file_remoteDirectory'] + headers['file_remoteFile']").options(STREAM))
                .transform(csvToPojoTransformer())
                .handle(service())
                .handle(Sftp.outboundGateway(template(), MV, "headers['file_remoteDirectory'] + headers['file_remoteFile'] + _read"))
                .handle(after())
                .get();
    }

    @Bean
    public MessageHandler sorter() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                List<String> fileNames = (List<String>) message.getPayload();
                Collections.sort(fileNames);
            }
        };
    }

    @Bean
    public MessageHandler csvToPojoTransformer() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                InputStream streamData = (InputStream) message.getPayload();
                convertStreamtoObject(streamData, Class.class);
            }
        };
    }

    public List<?> convertStreamtoObject(InputStream inputStream, Class clazz) {
        HeaderColumnNameMappingStrategy ms = new HeaderColumnNameMappingStrategy();
        ms.setType(clazz);
        Reader reader = new InputStreamReader(inputStream);

        CsvToBean cb = new CsvToBeanBuilder(reader)
                .withType(clazz)
                .withMappingStrategy(ms)
                .withSkipLines(0)
                .withSeparator('|')
                .withThrowExceptions(true)
                .build();
        return cb.parse();
    }

    @Bean
    public MessageHandler service() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                List<Class> csvDataAsListOfPojo = List < Class > message.getPayload();
                // use this
            }
        };
    }
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString("payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }
    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

更新的代码

共有1个答案

董权
2023-03-14

对于复杂的场景(列表、移动、获取、删除等),应该改用SFTP远程文件网关

SFTP出站网关提供了一组有限的命令,允许您与远程SFTP服务器交互:

ls(列表文件)

nlst(列出文件名)

获取(检索文件)

mget(检索多个文件)

rm(删除文件)

mv(移动和重命名文件)

put(发送文件)

mput(发送多个文件)

或者直接从代码中使用SftpRemoteFileTemplate。

编辑

回应您的评论;你需要这样的东西

  • 入站通道适配器(带轮询器)-返回目录名
  • LS网关
  • 筛选(删除所有已提取的文件)
  • Transformer(对列表排序)
  • 分离器
  • 获取网关(流选项)
  • 变压器(csv到POJO)
  • 服务(过程POJO)

如果你加上

  • RM网关

服务完成后(删除远程文件),您不需要过滤步骤。

您可能会发现Java DSL更容易组装此流。。。

java prettyprint-override">@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(() -> "some/dir", e -> e.poller(...))
        .handle(...) // LS Gateway
        .filter(...)
        .transform(sorter())
        .split
        .handle(...) // GET Gateway
        .transform(csvToPojoTransformer())
        .handle(myService())
        .get()
}
 类似资料:
  • 我正在使用Spring集成流入站通道适配器,从远程SFTP获取流并解析内容进程的每一行。 我使用: 它现在可以工作了。但是我只能从目录中获取文件,但是我需要递归地从这个目录和子目录中获取文件,并解析每一行。 我注意到入站通道适配器是Sftp。内置适配器(sftpSessionFactory)。扫描仪(…) 。它可以扫描子目录。但我没有看到任何关于流式入站通道适配器的内容。 那么,如何在流入站通道适

  • 问题内容: 我只能通过FTP访问远程服务器上的目录,并且希望在目录中出现新文件后立即获取它们。 是否有FAM for Python之类的东西可以让我通过FTP监视新文件? 问题答案: 如果轮询服务器是一种选择:

  • 问题内容: 如何在Java中通过SFTP传输文件? 我想要SFTP客户端的示例代码。我想将SFTP服务器嵌入到我的应用程序中,并且客户端应该能够将文件发送到我的应用程序。 PS:这是要求SFTP客户端。这个问题不是其他两个问题的重复。 找到以下链接以实现SFTP。 https://codetransient.wordpress.com/2019/06/22/sftp-secured-file-tr

  • 问题内容: 我想流式传输文件中包含的行,但是一旦处理完每个文件就将其移动到另一个文件夹。 当前的过程是这样的: 说明: 我创建的小号 我为他们每个人创建一个 我到线的 我打印每一行。 代码(为简单起见,省略了例外): 一旦每个文件都已被完全读取 ,是否可以 移动它 并继续处理流中的其他文件? 问题答案: 您可以将关闭动作链接到流,如果发生以下情况,该动作将自动执行: 状态文档很重要: 当在流上调用

  • 我有一个在远程服务器(debian linux)上运行的Java应用程序。该应用程序将运行时信息记录到文件中。 偶尔,支持人员需要使用客户端工具(不是在服务器上运行,而是在支持人员的桌面上运行)分析日志。然后,该工具需要远程访问 < li >服务器上的旧日志文件 < li >当前日志文件(实时增长) 我正在搜索一种良好的(=标准,灵活,安全,调试良好等)方式将这些过去和现在的日志消息流式传输到我的

  • 问题内容: 我希望用户能够将视频文件上传到我的网站,并且希望它们排列在适当的文件夹中,再加上数据库条目,以便以后我可以知道上传每个特定文件的人。 我的HTML表单在这里: 我的PHP在这里: 我不了解PHP,所以不知道出了什么问题。我还想添加接受名称及其电子邮件地址的功能。 问题答案: “您能否建议一个简单的代码主要是上传文件,数据库条目是次要的” 图片和视频上传代码(已通过PHP 5.4.17版