当前位置: 首页 > 知识库问答 >
问题:

使用spring DSL构建spring集成发布策略

戚研
2023-03-14

我对Spring Integration是新手。我尝试使用文件拆分器将消息从文件中拆分出来,然后使用.aggregate()构建单个消息并发送到输出通道。我有标记为true,因此apply-sequence现在默认为false。我已经使用EnricHeaders将correlationId设置为常量“1”。我在设置realease策略时遇到了困难,因为我没有在序列结束上保持。下面是我的代码的外观。

    IntegrationFlows
                .from(s -> s.file(new File(fileDir))
                                .filter(getFileFilter(fileName)),
                        e -> e.poller(poller))

                .split(Files.splitter(true, true)
                                .charset(StandardCharsets.US_ASCII),
                        e -> e.id(beanName)).enrichHeaders(h -> h.header("correlationId", "1"));

 IntegrationFlow integrationFlow = integrationFlowBuilder
            .<Object, Class<?>>route(Object::getClass, m -> m
                    .channelMapping(FileSplitter.FileMarker.class, "markers.input")
                    .channelMapping(String.class, "lines.input"))
            .get();

@Bean
    public IntegrationFlow itemExcludes() {
        return flow -> flow.transform(new ItemExcludeRowMapper(itemExcludeRowUnmarshaller)) //This maps each line to ItemExclude object
                .aggregate(aggregator -> aggregator
                        .outputProcessor(group -> group.getMessages()
                        .stream()
                        .map(message -> ((ItemExclude) message.getPayload()).getPartNumber())
                        .collect(Collectors.joining(","))))
                .transform(Transformers.toJson())
                .channel(customSource.itemExclude());
    }

    @Bean
    public IntegrationFlow itemExcludeMarkers() {
        return flow -> flow
                .log(LoggingHandler.Level.INFO)
                .<FileSplitter.FileMarker>filter(m -> m.getMark().equals(FileSplitter.FileMarker.Mark.END))
                .<FileHandler>handle(new FileHandler(configProps))
                .channel(NULL_CHANNEL);
    }

任何帮助都很感激。

共有1个答案

融泓
2023-03-14

我将在拆分器之前为correlationid移动头部enricher并使其如下所示:

 .enrichHeaders(h -> h
        .headerFunction(IntegrationMessageHeaderAccessor.CORRELATION_ID, 
                   m -> m.getHeaders().getId())) 

常量CorrelationID在多线程环境中绝对不好:不同的线程拆分不同的文件并将不同的行发送到同一个聚合器。因此,使用“1”作为关联键,您将始终拥有一个要聚合和释放的组。默认的序列行为是将原始消息ID填充到CorrelationID。由于您将不依赖filesplitter中的applysequence,所以我建议使用简单的解决方案来模拟这种行为。

正如Gary在回答中指出的,您需要考虑自定义releaseStrategy,并将filesplitter.fileMarker发送到聚合器。filesplitter.fileMarker.end具有lineCount属性,可以将该属性与messageGroup.size进行比较,以决定是否可以释放组。在生成输出结果的过程中,MessageGroupProcessor确实必须过滤FilespLitter.FileMarker消息。

 类似资料:
  • 我想用Spring集成创建一个简单的IntegrationFlow,但我遇到了一些困难。 我想创建一个集成流,从Rabbit Mq中的队列中获取消息并将消息发布到endpointRest。 我要处理的问题是,当一个请求失败时,它会继续无休止地重试,如何在这段代码中实现重试策略?例如,我想要3次重试,第一次重试在1秒后,第二次重试在5秒后,第三次重试在1分钟后。

  • 问题内容: 如何建立持续集成标签而不是分支的实践? 我具有标签存储库目录的以下结构: 我想配置我的持续集成工具(可以是CruiseControl,Hudson和Jenkins到TeamCity的任何工具),以在两个文件夹中的任何一个中创建了最新标记。 例如,如果结构已更改并且标签已出现在目录中,我想触发标签下的源代码构建: 是否可以使用任何现有的持续集成工具在标签下构建源代码,或者为此目的我应该编

  • 构建 当项目开发完毕,只需要运行一行命令就可以打包你的应用: # 打包正式环境 npm run build:prod # 打包预发布环境 npm run build:stage 构建打包成功之后,会在根目录生成 dist 文件夹,里面就是构建打包好的文件,通常是 ***.js 、***.css、index.html 等静态文件。 如果需要自定义构建,比如指定 dist 目录等,则需要通过 co

  • 在项目正常开发,预览效果达到要求的情况下,可以构建项目来进行更多的测试。 导读 熟悉构建发布面板 了解通用构建参数 命令行发布项目 定制项目的构建模版 扩展构建流程 构建流程简介与常见错误处理 发布到 web 平台 发布到原生平台 安装配置原生环境 原生平台 JavaScript 调试 发布到 iOS App Clip (轻 App) 发布到支付宝小游戏 发布到字节小游戏 发布到 Cocos Pl

  • 我们有一个包含多个子项目的多项目gradle存储库。其中一个子项目应该生成单独的testJar并将其发布到本地maven存储库: 这有效,我可以看到是在中生成的。 然后,在根项目gradle中。kts我们设立了一个发布扩展: 这也正常工作,我可以看到模块发布在目录中。但是,没有发布。我找不到如何将工件附加到根项目中定义的发布。

  • 现在我们来试一试从代码托管平台上拉取代码,并进行镜像构建和持续集成。 创建新项目并构建镜像 在控制台点击「代码构建」,然后在「代码构建」的界面中点击「创建新项目」。 第一步:填写项目名称 在「项目名称」上填写该项目的 Docker 镜像名称。 第二步:选择代码库 从第三方代码托管平台中选择我们想要构建的代码库,如果找不到需要的代码库可以点击右上角刷新按钮同步列表。 第三步:开启持续集成 持续集成是