@Bean
@InboundChannelAdapter(value = "stream", poller = @Poller(fixedRate = "1000"))
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(), null);
messageSource.setRemoteDirectory(remotedirectory);
messageSource.setFilter(filter());
return messageSource;
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(5000));
return pollerMetadata;
}
public static void main(String[] args) {
SpringApplication.run(FtpinboundApp.class, args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost(remotehost);
sf.setPort(remoteport);
sf.setUsername(remoteuser);
sf.setPassword(remotepassword);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "data", adviceChain = "after")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
try {
httpposthgfiles.getHGFilesfromRestful(message.getPayload().toString());
httppost990.get990fromRestful(message.getPayload().toString());
} catch (IOException e) {
logger.error(e);
} catch (Exception e) {
logger.error(e);
}
}
};
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression("@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
@Bean
@InboundChannelAdapter(value = "stream", poller = @Poller(fixedRate = "1000"))
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template(), null);
messageSource.setRemoteDirectory(remotedirectory);
messageSource.setFilter(filter());
return messageSource;
}
public FileListFilter<FTPFile> filter() {
CompositeFileListFilter<FTPFile> filter = new CompositeFileListFilter<>();
filter.addFilter(new FtpSimplePatternFileListFilter("xxxx_aaa204*"));
filter.addFilter(acceptOnceFilter());
return filter;
}
@Bean
public FtpPersistentAcceptOnceFileListFilter acceptOnceFilter() {
FtpPersistentAcceptOnceFileListFilter filter = new FtpPersistentAcceptOnceFileListFilter(meta(), "xxxx_aaa204");
filter.setFlushOnUpdate(true);
return filter;
}
@Bean
public ConcurrentMetadataStore meta() {
PropertiesPersistingMetadataStore meta = new PropertiesPersistingMetadataStore();
meta.setBaseDirectory("/tmp/foo");
meta.setFileName("ftpStream.properties");
return meta;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(5000));
return pollerMetadata;
}
只有在使用CachingSessionFactory
时,它才会保持登录状态。
最好不要像那样Hibernate和绑定线程,而是使用任务调度器(这是轮询器所做的)。
New PerioadTrigger(600_000)
将调度一个任务,每10分钟登录并检查一次文件。
问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置
如果我创建一个SFTP入站通道适配器,并使用在SFTP中配置为channel属性的通道发送一些文件。文件将传输到SFTP远程目录本地目录,还是直接从通道流到本地目录
问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更
如何通过注释而不是常规配置文件配置入站通道适配器?我可以为会话工厂定义bean,如下所示: 如何配置通过注释下给出的入站通道适配器? 我正在寻找的是在应用程序启动时连接所有bean,然后公开一些方法来开始轮询服务器,处理它们,然后从本地删除它们,类似于 其中getPollableChannel()为我提供了用于轮询的bean。