我正在使用Spring集成流入站通道适配器,从远程SFTP获取流并解析内容进程的每一行。
我使用:
IntegrationFlows.from(Sftp.inboundStreamingAdapter(template)
.filter(remoteFileFilter)
.remoteDirectory("test_dir"),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(fetchInt)))
.handle(Files.splitter(true, true))
....
它现在可以工作了。但是我只能从test_dir
目录中获取文件,但是我需要递归地从这个目录和子目录中获取文件,并解析每一行。
我注意到入站通道适配器是Sftp。内置适配器(sftpSessionFactory)。扫描仪(…) 。它可以扫描子目录。但我没有看到任何关于流式入站通道适配器的内容。
那么,如何在流入站通道适配器中实现递归地从目录获取文件?
谢谢。
就像dsillman2000评论的那样,我也发现这个答案可能需要更多的解释。
在根据这里的示例弄清楚这一点后,这是我的扩展示例,它对我有用,提取的变量和方法(希望)清楚地说明各个部分是什么或做什么。
依赖关系:主要是组织。springframework。集成:spring集成sftp:2.6.6
package com.example.sftp.incoming;
import com.jcraft.jsch.ChannelSftp;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.dsl.HeaderEnricherSpec;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.file.filters.AbstractFileListFilter;
import org.springframework.integration.file.remote.gateway.AbstractRemoteFileOutboundGateway;
import org.springframework.integration.file.remote.session.CachingSessionFactory;
import org.springframework.integration.file.remote.session.SessionFactory;
import org.springframework.integration.file.splitter.FileSplitter;
import org.springframework.integration.metadata.SimpleMetadataStore;
import org.springframework.integration.sftp.dsl.Sftp;
import org.springframework.integration.sftp.dsl.SftpOutboundGatewaySpec;
import org.springframework.integration.sftp.filters.SftpPersistentAcceptOnceFileListFilter;
import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
import org.springframework.messaging.Message;
import java.util.function.Consumer;
import java.util.function.Function;
@Configuration
public class SftpIncomingRecursiveConfiguration {
private static final String BASE_REMOTE_FOLDER_TO_GET_FILES_FROM = "/tmp/sftptest/";
private static final String REMOTE_FOLDER_PATH_AS_EXPRESSION = "'" + BASE_REMOTE_FOLDER_TO_GET_FILES_FROM + "'";
private static final String INBOUND_CHANNEL_NAME = "sftpGetInputChannel";
@Value("${demo.sftp.host}")
private String sftpHost;
@Value("${demo.sftp.user}")
private String sftpUser;
@Value("${demo.sftp.password}")
private String sftpPassword;
@Value("${demo.sftp.port}")
private String sftpPort;
@Bean
public SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(sftpHost);
factory.setPort(Integer.parseInt(sftpPort));
factory.setUser(sftpUser);
factory.setPassword(sftpPassword);
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
// poll for new files every 500ms
@InboundChannelAdapter(value = INBOUND_CHANNEL_NAME, poller = @Poller(fixedDelay = "500"))
public String filesForSftpGet() {
return BASE_REMOTE_FOLDER_TO_GET_FILES_FROM;
}
@Bean
public IntegrationFlow sftpGetFlow(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
return IntegrationFlows
.from(INBOUND_CHANNEL_NAME)
.handle(listRemoteFiles(sessionFactory))
.split()
.log(logTheFilePath())
.enrichHeaders(addAMessageHeader())
.log(logTheMessageHeaders())
.handle(getTheFile(sessionFactory))
.split(splitContentIntoLines())
.log(logTheFileContent())
.get();
}
private SftpOutboundGatewaySpec listRemoteFiles(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
return Sftp.outboundGateway(sessionFactory,
AbstractRemoteFileOutboundGateway.Command.LS, REMOTE_FOLDER_PATH_AS_EXPRESSION)
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE, AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
.filter(onlyFilesWeHaveNotSeenYet())
.filter(onlyTxtFiles());
}
/* Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
Without it, the program would report the same file over and over again. */
private SftpPersistentAcceptOnceFileListFilter onlyFilesWeHaveNotSeenYet() {
return new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "keyPrefix");
}
private AbstractFileListFilter<ChannelSftp.LsEntry> onlyTxtFiles() {
return new AbstractFileListFilter<>() {
@Override
public boolean accept(ChannelSftp.LsEntry file) {
return file.getFilename().endsWith(".txt");
}
};
}
private Function<Message<Object>, Object> logTheFilePath() {
return message -> "### File path: " + message.getPayload();
}
private Consumer<HeaderEnricherSpec> addAMessageHeader() {
return headers -> headers.headerExpression("fileToRemove", REMOTE_FOLDER_PATH_AS_EXPRESSION + " + payload");
}
private Function<Message<Object>, Object> logTheMessageHeaders() {
return message -> "### Header file info: " + message.getHeaders();
}
private SftpOutboundGatewaySpec getTheFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
return Sftp.outboundGateway(sessionFactory, AbstractRemoteFileOutboundGateway.Command.GET,
REMOTE_FOLDER_PATH_AS_EXPRESSION + " + payload")
.options(AbstractRemoteFileOutboundGateway.Option.STREAM);
}
private FileSplitter splitContentIntoLines() {
return new FileSplitter();
}
private Function<Message<Object>, Object> logTheFileContent() {
return message -> "### File content line: '" + message.getPayload() + "'";
}
}
编辑:请注意,这里有一个区别。另一个示例使用轮询器一次又一次地从“filesForMGET”生成带有远程文件路径的消息,并且该消息有效负载(文件路径)被用作“ls”的参数。我在这里硬编码它,忽略轮询器中的消息内容。
它对我有效,这是我的完整代码
@Configuration
public class SftpIFConfig {
@InboundChannelAdapter(value = "sftpMgetInputChannel",
poller = @Poller(fixedDelay = "5000"))
public String filesForMGET(){
return "/upload/done";
}
@Bean
public IntegrationFlow sftpMGetFlow(SessionFactory<ChannelSftp.LsEntry> csf) {
return IntegrationFlows.from("sftpMgetInputChannel")
.handle(Sftp.outboundGateway(csf,
AbstractRemoteFileOutboundGateway.Command.LS, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE, AbstractRemoteFileOutboundGateway.Option.NAME_ONLY)
//Persistent file list filter using the server's file timestamp to detect if we've already 'seen' this file.
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
.split()
.log(message -> "file path -> "+message.getPayload())
.enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'/upload/done/' + payload"))
.log(message -> "Heder file info -> "+message.getHeaders())
.handle(Sftp.outboundGateway(csf, AbstractRemoteFileOutboundGateway.Command.GET, "'/upload/done/' + payload")
.options(AbstractRemoteFileOutboundGateway.Option.STREAM))
.split(new FileSplitter())
.log(message -> "File content -> "+message.getPayload())
.get();
}
@Bean
CachingSessionFactory<ChannelSftp.LsEntry> csf(DefaultSftpSessionFactory sf) {
return new CachingSessionFactory<>(sf);
}
@Bean
DefaultSftpSessionFactory sf() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost("0.0.0.0");
factory.setPort(2222);
factory.setAllowUnknownKeys(true);
factory.setUser("xxxx");
factory.setPassword("xxx");
return factory;
}
您可以使用两个出站网关-第一个执行ls-R
(递归列表);拆分结果并使用配置有mget-stream
的网关来获取每个文件。
编辑
@SpringBootApplication
public class So60987851Application {
public static void main(String[] args) {
SpringApplication.run(So60987851Application.class, args);
}
@Bean
IntegrationFlow flow(SessionFactory<LsEntry> csf) {
return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(5_000)))
.handle(Sftp.outboundGateway(csf, Command.LS, "payload")
.options(Option.RECURSIVE, Option.NAME_ONLY)
// need a more robust metadata store for persistence, unless the files are removed
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "test")))
.split()
.log()
.enrichHeaders(headers -> headers.headerExpression("fileToRemove", "'foo/' + payload"))
.handle(Sftp.outboundGateway(csf, Command.GET, "'foo/' + payload")
.options(Option.STREAM))
.split(new FileSplitter())
.log()
// instead of a filter, we can remove the remote file.
// but needs some logic to wait until all lines read
// .handle(Sftp.outboundGateway(csf, Command.RM, "headers['fileToRemove']"))
// .log()
.get();
}
@Bean
CachingSessionFactory<LsEntry> csf(DefaultSftpSessionFactory sf) {
return new CachingSessionFactory<>(sf);
}
@Bean
DefaultSftpSessionFactory sf() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("10.0.0.8");
sf.setUser("gpr");
sf.setPrivateKey(new FileSystemResource(new File("/Users/grussell/.ssh/id_rsa")));
sf.setAllowUnknownKeys(true);
return sf;
}
}
我正在使用spring集成从远程sftp服务器中提取文件。一个要求是,在将文件从远程服务器目录拉入本地目录后,需要将拉入文件的副本归档到同一个sftp服务器上的不同目录中。 我在这里检查了不同的选项。本地有文件同步或文件复制选项。但没有找到远程的选项。 我应该尝试Spring集成以外的其他东西吗?我使用Spring集成是因为它已经是应用程序的一部分。这是一个新要求。
我在Spring Boot应用程序中使用Spring集成版本5.4.4。我需要从“server_sftp”目录下的子目录中获取所有的XML文件。为此,我使用带有mget命令的SFTP流入站通道适配器和SFTP出站网关。不幸的是,应用程序只从根目录(server_sftp)下载文件,而不从子目录下载文件。 我哪里出错了? sftp服务器上的目录结构
我正在尝试实现一个Spring Integration类,它获取一个. xml文件并对其进行解析,如果有效,就将其移动到一个“存档”目录,如果无效,就将其移动到一个错误目录。 然而,每当调用< code>calback.execute()时,我都会得到这个错误,我不太明白为什么。 虽然我有一个消息处理程序,但我怀疑这个问题的原因是我没有重写handle方法。但我不知道该怎么做。
问题内容: 我想在子目录/超级目录中执行脚本(我需要先在此子目录/超级目录中)。我无法进入子目录: Python抛出OSError,我不知道为什么。无论是尝试进入现有的子目录还是进入一个目录(如上所述)都没有关系-我总是会遇到相同的错误。 问题答案: 您的代码尝试执行的操作是调用名为的程序。您想要的是称为的命令。 但是是外壳内部的。所以你只能称它为 但这是没有意义的。 由于没有进程可以更改另一个进
在阅读的API时,我错过了很多函数。首先,它建议使用for循环从stream转到。而且我忽略了一个事实,即不是。 如何在Java8中从生成?
我想把文件从远程服务器拉到我的本地服务器。我正在使用Spring Integration SFTP来拉文件。我有以下配置: 我不想在成功拉取后从remotedir中删除文件。这里发生的事情是每30秒,从remotedir的文件就会被拉到我的本地文件。我希望spring batch程序在第一次之后停止,不要再次拉出相同的文件。我怎样才能做到这一点?