当前位置: 首页 > 知识库问答 >
问题:

跨越文件入站适配器和队列通道的Spring集成事务策略

王楚青
2023-03-14

我有一个由入站文件适配器读取的目录,该适配器被管道输送到优先级通道,该通道按文件名称对文件进行排序。我创建了一个事务同步工厂,用于在处理完成后移动文件,它对入站适配器和其他文件编写器流中发生的所有转换/聚合都很好。一旦我添加了PriorityChannel,事务似乎就完成了,它没有被传递给转换/聚合逻辑。

return IntegrationFlows
                .from(fileReadingMessageSource,
                        c -> c.poller(Pollers.fixedDelay(period)
                                             .taskExecutor(taskExecutor)
                                             .maxMessagesPerPoll(maxMessagesPerPoll)))
                                             .transactionSynchronizationFactory(transactionSynchronizationFactory())
                                             .transactional(transactionManager())))
                .channel("alphabetically")
                .bridge(s -> s.poller(Pollers.fixedDelay(100)))
                .channel(ApplicationConfiguration.INBOUND_CHANNEL)
                .get();
@Bean
    TransactionSynchronizationFactory transactionSynchronizationFactory() {
        ExpressionParser parser = new SpelExpressionParser();
        ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
        syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        syncProcessor.setAfterCommitExpression(parser.parseExpression(
                "payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
        syncProcessor.setAfterRollbackExpression(parser.parseExpression(
                "payload.renameTo(new java.io.File(@inboundFailedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
        return new DefaultTransactionSynchronizationFactory(syncProcessor);
    }

根据Gary的说法,这应该是可行的(按照要求提供整个示例):

@Configuration
class FilePollingIntegrationFlow {

    @Autowired
    public File inboundReadDirectory;

    @Autowired
    private ApplicationContext applicationContext;

    @Bean
    public IntegrationFlow inboundFileIntegration(@Value("${inbound.file.poller.fixed.delay}") long period,
                                                  @Value("${inbound.file.poller.max.messages.per.poll}") int maxMessagesPerPoll, TaskExecutor taskExecutor,
                                                  MessageSource<File> fileReadingMessageSource) {
        return IntegrationFlows
                .from(fileReadingMessageSource,
                        c -> c.poller(Pollers.fixedDelay(period)
                                             .taskExecutor(taskExecutor)
                                             .maxMessagesPerPoll(maxMessagesPerPoll)
                                             .transactionSynchronizationFactory(transactionSynchronizationFactory())
                                             .transactional(transactionManager())))
                .channel(ApplicationConfiguration.INBOUND_CHANNEL)
                .get();
    }

    @Bean
    TaskExecutor taskExecutor(@Value("${inbound.file.poller.thread.pool.size}") int poolSize) {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(poolSize);
        return taskExecutor;
    }

    @Bean
    PseudoTransactionManager transactionManager() {
        return new PseudoTransactionManager();
    }

    @Bean
    TransactionSynchronizationFactory transactionSynchronizationFactory() {
        ExpressionParser parser = new SpelExpressionParser();
        ExpressionEvaluatingTransactionSynchronizationProcessor syncProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
        syncProcessor.setBeanFactory(applicationContext.getAutowireCapableBeanFactory());
        syncProcessor.setAfterCommitExpression(parser.parseExpression(
                "payload.renameTo(new java.io.File(@inboundProcessedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
        syncProcessor.setAfterRollbackExpression(parser.parseExpression(
                "payload.renameTo(new java.io.File(@inboundFailedDirectory.path " + " + T(java.io.File).separator + payload.name))"));
        return new DefaultTransactionSynchronizationFactory(syncProcessor);
    }

    @Bean
    public FileReadingMessageSource fileReadingMessageSource(DirectoryScanner directoryScanner) {
        FileReadingMessageSource source = new FileReadingMessageSource();
        source.setDirectory(this.inboundReadDirectory);
        source.setScanner(directoryScanner);
        source.setAutoCreateDirectory(true);
        return source;
    }

    @Bean
    public DirectoryScanner directoryScanner(@Value("${inbound.filename.regex}") String regex) {
        DirectoryScanner scanner = new RecursiveDirectoryScanner();
        CompositeFileListFilter<File> filter = new CompositeFileListFilter<>(
                Arrays.asList(new AcceptOnceFileListFilter<>(), new RegexPatternFileListFilter(regex), new AlphabeticalFileListFilter()));
        scanner.setFilter(filter);
        return scanner;
    }

    private class AlphabeticalFileListFilter implements FileListFilter<File> {
        @Override
        public List<File> filterFiles(File[] files) {
            List<File> list = Arrays.asList(files);
            list.sort(Comparator.comparing(File::getName));
            return list;
        }
    }
}

@Configuration
public class FilePollingConfiguration {

    @Bean(name="inboundReadDirectory")
    public File inboundReadDirectory(@Value("${inbound.read.path}") String path) {
        return makeDirectory(path);
    }

    @Bean(name="inboundProcessedDirectory")
    public File inboundProcessedDirectory(@Value("${inbound.processed.path}") String path) {
        return makeDirectory(path);
    }

    @Bean(name="inboundFailedDirectory")
    public File inboundFailedDirectory(@Value("${inbound.failed.path}") String path) {
        return makeDirectory(path);
    }

    @Bean(name="inboundOutDirectory")
    public File inboundOutDirectory(@Value("${inbound.out.path}") String path) {
        return makeDirectory(path);
    }

    private File makeDirectory(String path) {
        File file = new File(path);
        file.mkdirs();
        return file;
    }

}

通过这样做并移除PriorityChannel,事务似乎仍然没有像我想象的那样工作。使用此流,该文件在Http出站网关中不可用。知道为什么吗?

@Component
public class MessageProcessingIntegrationFlow {

    public static final String OUTBOUND_FILENAME_GENERATOR = "outboundFilenameGenerator.handler";
    public static final String FILE_WRITING_MESSAGE_HANDLER = "fileWritingMessageHandler";
    @Autowired
    public File inboundOutDirectory;

    @Bean
    public IntegrationFlow writeToFile(@Value("${api.base.uri}") URI uri,
                                       @Value("${out.filename.dateFormat}") String dateFormat, @Value("${out.filename.suffix}") String filenameSuffix) {
        return IntegrationFlows.from(ApplicationConfiguration.INBOUND_CHANNEL)
                               .enrichHeaders(h -> h.headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, m -> ((String) m
                                       .getHeaders()
                                       .get(FileHeaders.FILENAME)).substring(0, 17)))
                               .aggregate(a -> a.groupTimeout(2000)
                                                .sendPartialResultOnExpiry(true))
                               .transform(m -> {
                                   MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
                                   //noinspection unchecked
                                   ((List<File>) m).forEach(f -> body.add("documents", new FileSystemResource((File) f)));
                                   return body;
                               })
                               .handle(Http.outboundGateway(uri)
                                           .httpMethod(HttpMethod.POST)
                                           .expectedResponseType(byte[].class))
                               .handle(Files.outboundGateway(inboundOutDirectory)
                                            .autoCreateDirectory(true)
                                            .fileNameGenerator(
                                                    m -> m.getHeaders()
                                                          .get(FileHeaders.FILENAME) + "_" + DateTimeFormatter.ofPattern(dateFormat)
                                                                                                              .format(LocalDateTime
                                                                                                                      .now()) + filenameSuffix))
                               .log(LoggingHandler.Level.INFO)
                               .get();
    }
}

共有1个答案

荆乐
2023-03-14

不能用Spring事务切换线程;事务绑定到线程。

您可以在消息源中使用自定义的FileListFilter,并在那里对文件进行排序。

 类似资料:
  • 我试图将从Quickfix读取消息(读取修复消息)配置到spring集成中。我知道我可以使用入站通道适配器从外部源(如QuickFix)读取数据。您能提供如何编写事件驱动入站通道适配器的示例吗?我有以下配置不起作用

  • 问题内容: 入站和出站通道适配器之间的根本区别是什么? 任何示例都将非常有帮助。 我已经查看过Spring文档,这种“方向性”的区别对我来说还不清楚。我支持配置了outbound-channel-adapter的应用程序,但是我发现使用 出站 标签可以直观地了解行为计数器。该适配器获取一个外部文件,然后 将其 引入应用程序中, 在 该应用程序中我们解析文件并保留数据。 这类似于这个问题,但是我想更

  • 我们正在使用spring integration sftp入站通道适配器,它每隔几秒钟轮询一次,并将zip文件下载到本地目录进行进一步处理。当有一个大文件,客户端仍在上载该文件,而这个入站适配器拾取了那个不完整的文件时,问题就开始了。我们使用AcceptAllFileFilter进行远程筛选,对于本地筛选,我们有自定义筛选。 有没有更好的方法来忽略或检查文件是否完全上传,然后拿起进行处理?

  • 如何通过注释而不是常规配置文件配置入站通道适配器?我可以为会话工厂定义bean,如下所示: 如何配置通过注释下给出的入站通道适配器? 我正在寻找的是在应用程序启动时连接所有bean,然后公开一些方法来开始轮询服务器,处理它们,然后从本地删除它们,类似于 其中getPollableChannel()为我提供了用于轮询的bean。

  • 问题内容: Spring Integration FTP中的入站通道适配器和出站通道适配器之间有什么区别?我应该使用哪一个?何时使用? 我从文档中了解到,出站可以发送任何类型的文件(例如byte [],String,java.io.File),但入站仅限于文件类型。那仅仅是区别还是其他? 问题答案: 我建议您首先阅读理论 。 任何Inbound适配器都旨在从外部系统获取数据。Outbound-放置