当前位置: 首页 > 知识库问答 >
问题:

基于文件名的Spring集成流路由

太叔逸春
2023-03-14

我有一个解压缩和文件的要求,并处理它的内容。在zip文件中,可以有两种类型的文件个人或公司。可以通过文件名区分的。在处理完所有文件后,它应该调用另一个程序模块,并将处理后的文件存档在不同的位置。希望使用Spring集成相同。我试图通过下面的代码来实现这一点,但它在基于文件名的路由时产生了问题。我使用的是JDK 8,Spring 5

.<File, Boolean>route(new Function<File, Boolean>() {

                    @Override
                    public Boolean apply(File f) {
                        return f.getName().contains("individual");
                    }
                }, m -> m
                        .subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
                        .subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
                )

例外

Caused by: java.lang.IllegalArgumentException: Found ambiguous parameter type [interface java.util.function.Function] for method match: [public default <V> java.util.function.Function<V, R> java.util.function.Function.compose(java.util.function.Function<? super V, ? extends T>), public static <T> java.util.function.Function<T, T> java.util.function.Function.identity(), public java.lang.Boolean com.xxx.thirdpatysystem.config.IntegrationConfig$1.apply(java.io.File)]
    at org.springframework.util.Assert.isNull(Assert.java:155) ~[spring-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.findHandlerMethodsForTarget(MessagingMethodInvokerHelper.java:843) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:362) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:231) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.util.MessagingMethodInvokerHelper.<init>(MessagingMethodInvokerHelper.java:225) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.<init>(MethodInvokingMessageProcessor.java:60) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.router.MethodInvokingRouter.<init>(MethodInvokingRouter.java:46) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dsl.IntegrationFlowDefinition.route(IntegrationFlowDefinition.java:1922) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dsl.IntegrationFlowDefinition.route(IntegrationFlowDefinition.java:1895) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
.<File, Boolean>route(f -> f.getName().contains("individual"), m -> m
                        .subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
                        .subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
                )
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.integration.dsl.IntegrationFlow]: Factory method 'thirdpatysystemFlow' threw exception; nested exception is java.lang.UnsupportedOperationException
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:579) ~[spring-beans-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    ... 17 common frames omitted
Caused by: java.lang.UnsupportedOperationException: null
    at org.springframework.integration.dsl.StandardIntegrationFlow.configure(StandardIntegrationFlow.java:89) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dsl.IntegrationFlowDefinition.gateway(IntegrationFlowDefinition.java:2172) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dsl.IntegrationFlowDefinition.gateway(IntegrationFlowDefinition.java:2151) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]

下面是整个代码段

import java.io.File;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
import org.springframework.integration.file.filters.ChainFileListFilter;
import org.springframework.integration.file.filters.RegexPatternFileListFilter;
import org.springframework.integration.zip.splitter.UnZipResultSplitter;
import org.springframework.integration.zip.transformer.UnZipTransformer;
import org.springframework.integration.zip.transformer.ZipResultType;
import org.springframework.messaging.MessageHandler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

/**
 * @author dpoddar
 *
 */
@Configuration
@EnableIntegration
public class IntegrationConfig {

    @Value("${input.directory}")
    private String inputDir;

    @Value("${outputDir.directory}")
    private String outputDir;

    @Value("${input.scan.frequency: 100}")
    private long scanFrequency;

    @Bean
    public MessageSource<File> inputFileSource() {
        FileReadingMessageSource src = new FileReadingMessageSource();

        src.setDirectory(new File(inputDir));
        src.setAutoCreateDirectory(true);

        ChainFileListFilter<File> chainFileListFilter = new ChainFileListFilter<>();
        chainFileListFilter.addFilter(new AcceptOnceFileListFilter<>() );
        chainFileListFilter.addFilter(new RegexPatternFileListFilter("(?i)^.+\\.zip$"));
        src.setFilter(chainFileListFilter);
        return src;
    }

    @Bean
    public UnZipTransformer unZipTransformer() {
        UnZipTransformer unZipTransformer = new UnZipTransformer();
        unZipTransformer.setExpectSingleResult(false);
        unZipTransformer.setZipResultType(ZipResultType.FILE);
        //unZipTransformer.setWorkDirectory(new File("/usr/tmp/uncompress"));
        unZipTransformer.setDeleteFiles(true);
        return unZipTransformer;
    }

    @Bean
    public UnZipResultSplitter splitter() {
        UnZipResultSplitter splitter = new UnZipResultSplitter();

        return splitter;
    }

    @Bean
    public DirectChannel outputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageHandler fileOutboundChannelAdapter() {
        FileWritingMessageHandler adapter = new FileWritingMessageHandler(new File(outputDir));
        adapter.setDeleteSourceFiles(true);
        adapter.setAutoCreateDirectory(true);
        adapter.setExpectReply(false);
        return adapter;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }

    @Autowired
    DirectChannel outputChannel;

    @Autowired
    MessageHandler fileOutboundChannelAdapter;

    @Bean
    public IntegrationFlow individualProcessor() {
        return flow -> flow.handle("thirdpatysystemprocessor","processfile").channel(outputChannel).handle(fileOutboundChannelAdapter);
    }

    @Bean
    public IntegrationFlow firmProcessor() {
        return flow -> flow.handle("thirdpatysystemprocessor","processfile").channel(outputChannel).handle(fileOutboundChannelAdapter);
    }

    @Bean
    public IntegrationFlow thirdpatysystemAgentDemographicFlow() {
        return IntegrationFlows
                .from(inputFileSource(), spec -> spec.poller(Pollers.fixedDelay(scanFrequency,TimeUnit.SECONDS)))
                .transform(unZipTransformer())
                .split(splitter())
                .channel(MessageChannels.executor(taskExecutor()))
                .<File, Boolean>route(new Function<File, Boolean>() {

                    @Override
                    public Boolean apply(File f) {
                        return f.getName().contains("individual");
                    }
                }, m -> m
                        .subFlowMapping(true, sf -> sf.gateway(individualProcessor()))
                        .subFlowMapping(false, sf -> sf.gateway(firmProcessor()))
                )
                .aggregate()
                /*.handle("thirdpatysystemprocessor","processfile")
                .channel(outputChannel())
                .handle(fileOutboundChannelAdapter())*/
                .get()
                ;
    }
}

共有1个答案

经俊茂
2023-03-14

在Spring Integration5.0.5:https://jira.Spring.io/browse/int-4456中修复了java.lang.IllegalArgumentException:Found ambutious参数类型[interface java.util.function.function]。因此,现在我们使用显式的函数impl来执行以下操作:

MethodInvokingRouter methodInvokingRouter = isLambda(router)
            ? new MethodInvokingRouter(new LambdaMessageProcessor(router, payloadType))
            : new MethodInvokingRouter(router, ClassUtils.FUNCTION_APPLY_METHOD);

我们显式地指向apply()方法

在子流(gateway())中重用现有IntegrationFlowbeans的问题已经在版本5.0.4:https://jira.spring.io/browse/int-4434中得到了修正

所以,您需要的只是将项目升级到最新的依赖项。特别是Spring Integration5.0.7:https://Spring.io/projects/Spring-Integration#learn

 类似资料:
  • 我正在尝试在最新版本的 Spring Cloud 流中使用基于内容的路由。根据这份文件 - 这是我用StreamListener编写的代码 通过使用该条件,可以将消息路由到两个不同的函数。 我正试图用如下的功能接口方法来消费消息。 如何在函数中实现类似的基于内容的路由?蒂亚。 其他细节- Spring引导版本 - 2.3.12.发布 Spring云版 - Hoxton.SR11

  • 我们正在使用Spring Cloud Stream v2。2带有Kafka和Avro(本机编码器/解码器)。我们正在尝试根据负载的条件使用基于内容的路由。据我所知,根据Spring Cloud Stream文档,基于内容的路由只能在标头上实现,因为负载到达条件时没有经过类型转换过程。因此,除非条件基于字节格式,否则它将无法按预期工作。但是,我知道,当在本机模式下使用Avro时,会跳过消息头,并且不

  • 我有两个简介,dev和Prod。我的文件如下所示 我的理解是: 属性告诉Spring,如果没有指定配置文件,则使用prod配置文件 Spring将视为“文件分隔符”,如果参数的计算结果为true,则重新计算每组属性并覆盖以前的值 鉴于这种理解,我希望Spring首先解析“默认”属性,了解默认激活的配置文件应该是prod。然后它将解析prod属性。由于“prod”是一个活动配置文件(唯一的活动配置文

  • 在docker容器中部署应用程序时,我无法使用Spring Boot实现liquibase迁移。我有一个胖罐子,它是在docker图像创建时提取的。 我有一个单独的模块,保存迁移文件。我能装上主机。xml: 然后它继续并加载其余的资源。问题在于然后从加载的资源中提取仍然正常的实际路径: 但是接下来做一些路径操作,结果是: 这反过来又不是有效的类路径: 我需要(类路径)或以能够读取文件。但它总是以上

  • 我有一个多模块的弹簧引导项目。我想知道如何设置集成测试来测试Spring数据JPA存储库?以下方法失败,但有此异常:HV000183:无法加载'javax.el.ExpressionFactory'。检查是否在类路径上有EL依赖项。 由于该模块不依赖于web模块,因此没有可以启动的web应用程序。

  • 如何使用java dsl Integrationflows从spring集成触发spring批处理作业。 我有下面的代码,它轮询目录中的文件,当新文件添加到目录中时,会生成一条消息,我想在该实例中触发一个Spring批处理作业。请建议。