当前位置: 首页 > 知识库问答 >
问题:

我怎么能通过Spring DSL在java 1.7在Sftp入站适配器动态传输消息

淳于坚壁
2023-03-14

我有一个Sftp入站流,我从DefaultSftpSessionFactory获得了会话信息。但我需要动态实现从数据库表中获取的多个会话信息。这意味着我需要在集成流程中实现多个Sftp服务器细节。现在我已经完成了从单个源到单个目标的文件传输,但我需要实现多个源到多个目标。因此,任何人都可以提供一些关于这方面的建议。

这是我的会话工厂。。。这里我有一个单一的Sftp服务器信息,但如何配置多个服务器的详细信息。

    @Autowired
    private DefaultSftpSessionFactory sftpSessionFactory;

    @Bean
    public DefaultSftpSessionFactory sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(
                true);
        factory.setHost("111.11.12.143");
        factory.setPort(22);
        factory.setUser("sftp");
        factory.setPassword("*******");         
        return factory;
    }

这是我的Sftp入站流程。。

    @Bean
    public IntegrationFlow sftpInboundFlow() {
    System.out.println("enter sftpInboundFlow....."
            + sftpSessionFactory.getSession());     
    return IntegrationFlows
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true).remoteDirectory(remDir)
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase()")
                    .localDirectory(new File(localDir))
                    .remoteFileSeparator("/"),
                    new Consumer<SourcePollingChannelAdapterSpec>() {
                        @Override
                        public void accept(SourcePollingChannelAdapterSpec e) {
                            e.id("sftpInboundAdapter")
                                    .autoStartup(true)
                                    .poller(Pollers.fixedRate(1000)
                                            .maxMessagesPerPoll(1));
                        }
                    })
            //.channel(MessageChannels.queue("sftpInboundResultChannel"))
                    .channel(sftpInboundResultChannel())
            .get();
}

按照加里的建议,我正在编辑我的帖子。。。。

你好,加里,我正在参考Github动态FTP的例子。

通过ChannelResolver类,我需要调用我上面的DSL类。并在不使用XML的情况下在context属性中设置动态值。

在我的ChannelResolver课程中,我想要这样的东西

StandardEnvironment env = new StandardEnvironment();
Properties props = new Properties();
props.setProperty("inbound.host", host);    //I am getting the value of 'host' from a DB table.
PropertiesPropertySource pps = new PropertiesPropertySource("sftpprop", props);
env.getPropertySources().addLast(pps);
context.setEnvironment(env); 

And my DSL class I need to use like this.
@Value("${inbound.host}")
private String host;

So in this way can I set dynamic value for String 'host' ? 

我正在编辑我的原始帖子............

In my Outbound dynamic resolver class I am doing like this


    StandardEnvironment env = new StandardEnvironment();
    Properties props = new Properties();        
    props.setProperty("outbound.host", host);
    props.setProperty("outbound.port", String.valueOf(port));
    props.setProperty("outbound.user", user);
    props.setProperty("outbound.password", password);
    props.setProperty("outbound.remote.directory", remoteDir);
    props.setProperty("outbound.local.directory", localDir);        

    PropertiesPropertySource pps = new PropertiesPropertySource("ftpprops", props);
    env.getPropertySources().addLast(pps);
    ctx.setEnvironment(env);


And this is my dsl class....

@Autowired
private DefaultSftpSessionFactory sftpSessionFactory;

@Bean
public DefaultSftpSessionFactory sftpSessionFactory(@Value("${outbound.host}") String host, @Value("${outbound.port}") int port,
        @Value("${outbound.user}") String user, @Value("${outbound.password}") String password
        ) {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
    factory.setHost(host);
    factory.setPort(port);
    factory.setUser(user);
    factory.setPassword(password);      
    return factory;
}


@Bean
public IntegrationFlow fileInboundFlow(@Value("${outbound.local.directory}") String localDir)
{
    return IntegrationFlows
            .from(Files.inboundAdapter(new File(localDir)),
                    new Consumer<SourcePollingChannelAdapterSpec>() {

                        @Override
                        public void accept(SourcePollingChannelAdapterSpec e) {
                            e.autoStartup(true).poller(
                                    Pollers.fixedDelay(5000)
                                            .maxMessagesPerPoll(1));
                        }
                    })
                    .channel(sftpSendChannel())
                    .get();
}

@Bean
public IntegrationFlow sftpOutboundFlow(@Value("${outbound.remote.directory}") String remDir) {    
    return IntegrationFlows
            .from(sftpSendChannel())
            .handle(Sftp.outboundAdapter(this.sftpSessionFactory)                       
                    .useTemporaryFileName(false)
                    .remoteDirectory(remDir))
                    .get();
}

@Bean
public MessageChannel sftpSendChannel() {
    return new DirectChannel();
}

@Bean
public static PropertySourcesPlaceholderConfigurer configurer1() {
    return new PropertySourcesPlaceholderConfigurer();      
}


And this the error log from console...

共有1个答案

禄烨然
2023-03-14

它目前不受支持。

我们有一个开放的JIRA来添加对动态服务器选择的支持,但它不太可能在即将发布的4.2版本中及时完成。

您可以通过编写自己的自定义委托会话工厂来解决这个问题,该会话工厂使用一些标准(例如Threadlocal)来确定要使用哪个委托工厂。

编辑:

与XML一样,您需要一个propertySourcesplaceConfigurerbean。

您还应该使用工厂方法注入,因为@Configuration类创建得太早,无法注入@Value。。。

@Configuration
public class FooConfig {

    @Bean
    public DefaultSftpSessionFactory factory(
            @Value("${inbound.host}") String host, 
            @Value("${inbound.port}") int port) {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost(host);
        sf.setPort(port);
        return sf;
    }

    @Bean
    public PropertySourcesPlaceholderConfigurer configurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}

.

public class Testing {

    @Test
    public void test() {
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
        context.register(FooConfig.class);
        StandardEnvironment env = new StandardEnvironment();
        Properties props = new Properties();
        props.setProperty("inbound.host", "bar");
        props.setProperty("inbound.port", "23");
        PropertiesPropertySource pps = new PropertiesPropertySource("sftpprop", props);
        env.getPropertySources().addLast(pps);
        context.setEnvironment(env);
        context.refresh();
        DefaultSftpSessionFactory sessionFactory = context.getBean(DefaultSftpSessionFactory.class);
        assertEquals("bar", TestUtils.getPropertyValue(sessionFactory, "host"));
        context.close();
    }

}

顺便说一句,授权会话工厂终究会出现在4.2版本中。

编辑2:

您可以避免配置类的早期实例化,并使用全局@Value注入,只要您使PSPC bean静态...

@Configuration
public class FooConfig {

    @Value("${foo}")
    public String foo;

    @Bean
    public String earlyFoo() {
        return this.foo;
    }

    @Bean
    public String foo(@Value("${foo}") String foo) {
        return foo;
    }

    @Bean
    public static PropertySourcesPlaceholderConfigurer configurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

}

在这种情况下,earlyFoo按预期填充。

 类似资料:
  • 如果我创建一个SFTP入站通道适配器,并使用在SFTP中配置为channel属性的通道发送一些文件。文件将传输到SFTP远程目录本地目录,还是直接从通道流到本地目录

  • 这就是我的配置 这个想法是每3秒轮询一个目录,并根据通道向调度程序发送3条消息,以允许异步执行。然后根据消息数量聚合消息,然后发送到下一个服务激活器。第一个服务激活器将文件放在源目录中,第二个服务激活器获取聚合列表以将这些文件移动到暂存目录。 似乎发生的情况是,源文件夹跳过了一些文件,但临时文件夹确实获取了所有文件。我的猜测是,轮询器将消息发送到dispatcher通道,但当其线程池变满时,它会忽

  • 流是Gateway-->RequsetChannel-->Transform-->OutboundMailChannl-->MailSender(MailAdapter)

  • 我有一个,我使用它向kafka发送消息,然后使用接收消息。通信似乎工作正常,我能够发送和接收消息,但格式有点奇怪。我单独向我的出站适配器发送单个消息,但当我收到消息时,我会收到一条消息,所有消息都聚合到该消息的有效负载中。 这就是我收到消息时消息负载的样子 [有效负载={mytopic={0=[字符串消息1,字符串消息2,字符串消息3,字符串消息4,字符串消息5,…]}},标头={id=3934d